Tag Archives: Analytics

Simplify and speed up Apache Spark applications on Amazon Redshift data with Amazon Redshift integration for Apache Spark

Post Syndicated from Gagan Brahmi original https://aws.amazon.com/blogs/big-data/simplify-and-speed-up-apache-spark-applications-on-amazon-redshift-data-with-amazon-redshift-integration-for-apache-spark/

Customers use Amazon Redshift to run their business-critical analytics on petabytes of structured and semi-structured data. Apache Spark is a popular framework that you can use to build applications for use cases such as ETL (extract, transform, and load), interactive analytics, and machine learning (ML). Apache Spark enables you to build applications in a variety of languages, such as Java, Scala, and Python, by accessing the data in your Amazon Redshift data warehouse.

Amazon Redshift integration for Apache Spark helps developers seamlessly build and run Apache Spark applications on Amazon Redshift data. Developers can use AWS analytics and ML services such as Amazon EMR, AWS Glue, and Amazon SageMaker to effortlessly build Apache Spark applications that read from and write to their Amazon Redshift data warehouse. You can do so without compromising on the performance of your applications or transactional consistency of your data.

In this post, we discuss why Amazon Redshift integration for Apache Spark is critical and efficient for analytics and ML. In addition, we discuss use cases that use Amazon Redshift integration with Apache Spark to drive business impact. Finally, we walk you through step-by-step examples of how to use this official AWS connector in an Apache Spark application.

Amazon Redshift integration for Apache Spark

The Amazon Redshift integration for Apache Spark minimizes the cumbersome and often manual process of setting up a spark-redshift connector (community version) and shortens the time needed to prepare for analytics and ML tasks. You only need to specify the connection to your data warehouse, and you can start working with Amazon Redshift data from your Apache Spark-based applications within minutes.

You can use several pushdown capabilities for operations such as sort, aggregate, limit, join, and scalar functions so that only the relevant data is moved from your Amazon Redshift data warehouse to the consuming Apache Spark application. This allows you to improve the performance of your applications. Amazon Redshift admins can easily identify the SQL generated from Spark-based applications. In this post, we show how you can find out the SQL generated by the Apache Spark job.

Moreover, Amazon Redshift integration for Apache Spark uses Parquet file format when staging the data in a temporary directory. Amazon Redshift uses the UNLOAD SQL statement to store this temporary data on Amazon Simple Storage Service (Amazon S3). The Apache Spark application retrieves the results from the temporary directory (stored in Parquet file format), which improves performance.

You can also help make your applications more secure by utilizing AWS Identity and Access Management (IAM) credentials to connect to Amazon Redshift.

Amazon Redshift integration for Apache Spark is built on top of the spark-redshift connector (community version) and enhances it for performance and security, helping you gain up to 10 times faster application performance.

Use cases for Amazon Redshift integration with Apache Spark

For our use case, the leadership of the product-based company wants to know the sales for each product across multiple markets. As sales for the company fluctuate dynamically, it has become a challenge for the leadership to track the sales across multiple markets. However, the overall sales are declining, and the company leadership wants to find out which markets aren’t performing so that they can target these markets for promotion campaigns.

For sales across multiple markets, the product sales data such as orders, transactions, and shipment data is available on Amazon S3 in the data lake. The data engineering team can use Apache Spark with Amazon EMR or AWS Glue to analyze this data in Amazon S3.

The inventory data is available in Amazon Redshift. Similarly, the data engineering team can analyze this data with Apache Spark using Amazon EMR or an AWS Glue job by using the Amazon Redshift integration for Apache Spark to perform aggregations and transformations. The aggregated and transformed dataset can be stored back into Amazon Redshift using the Amazon Redshift integration for Apache Spark.

Using a distributed framework like Apache Spark with the Amazon Redshift integration for Apache Spark can provide the visibility across the data lake and data warehouse to generate sales insights. These insights can be made available to the business stakeholders and line of business users in Amazon Redshift to make informed decisions to run targeted promotions for the low revenue market segments.

Additionally, we can use the Amazon Redshift integration with Apache Spark in the following use cases:

  • An Amazon EMR or AWS Glue customer running Apache Spark jobs wants to transform data and write that into Amazon Redshift as a part of their ETL pipeline
  • An ML customer uses Apache Spark with SageMaker for feature engineering for accessing and transforming data in Amazon Redshift
  • An Amazon EMR, AWS Glue, or SageMaker customer uses Apache Spark for interactive data analysis with data on Amazon Redshift from notebooks

Examples for Amazon Redshift integration for Apache Spark in an Apache Spark application

In this post, we show the steps to connect Amazon Redshift from Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), Amazon EMR Serverless, and AWS Glue using a common script. In the following sample code, we generate a report showing the quarterly sales for the year 2008. To do that, we join two Amazon Redshift tables using an Apache Spark DataFrame, run a predicate pushdown, aggregate and sort the data, and write the transformed data back to Amazon Redshift. The script uses PySpark

The script uses IAM-based authentication for Amazon Redshift. IAM roles used by Amazon EMR and AWS Glue should have the appropriate permissions to authenticate Amazon Redshift, and access to an S3 bucket for temporary data storage.

The following example policy allows the IAM role to call the GetClusterCredentials operations:

{
  "Version": "2012-10-17",
  "Statement": {
    "Effect": "Allow",
    "Action": "redshift:GetClusterCredentials",
    "Resource": "arn:aws:redshift:<aws_region_name>:xxxxxxxxxxxx:dbuser:*/temp_*"
  }
}

The following example policy allows access to an S3 bucket for temporary data storage:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::<s3_bucket_name>"
        }
    ]
}

The complete script is as follows:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initiate Apache Spark session
spark = SparkSession \
        .builder \
        .appName("SparkRedshiftConnector") \
        .enableHiveSupport() \
        .getOrCreate()

# Set connection options for Amazon Redshift
jdbc_iam_url = "jdbc:redshift:iam://redshift-spark-connector-1.xxxxxxxxxxx.<aws_region_name>.redshift.amazonaws.com:5439/sample_data_dev"
temp_dir = 's3://<s3_bucket_name>/redshift-temp-dir/'
aws_role = 'arn:aws:iam::xxxxxxxxxxxx:role/redshift-s3'

# Set query group for the query. More details on Amazon Redshift WLM https://docs.aws.amazon.com/redshift/latest/dg/cm-c-executing-queries.html
queryGroup = "emr-redshift"
jdbc_iam_url_withQueryGroup = jdbc_iam_url+'?queryGroup='+queryGroup

# Set User name for the query
userName = 'awsuser'
jdbc_iam_url_withUserName = jdbc_iam_url_withQueryGroup+';user='+userName

# Define the Amazon Redshift context
redshiftOptions = {
    "url": jdbc_iam_url_withUserName,
    "tempdir": temp_dir,
    "aws_iam_role" : aws_role
}

# Create the sales DataFrame from Amazon Redshift table using io.github.spark_redshift_community.spark.redshift class
sales_df = (
    spark.read
        .format("io.github.spark_redshift_community.spark.redshift")
        .options(**redshiftOptions)
        .option("dbtable", "tickit.sales")
        .load()
)

# Create the date Data Frame from Amazon Redshift table
date_df = (
    spark.read
        .format("io.github.spark_redshift_community.spark.redshift")
        .options(**redshiftOptions)
        .option("dbtable", "tickit.date")
        .load()
)

# Assign a Data Frame to the above output which will be written back to Amazon Redshift
output_df= sales_df.join(date_df, sales_df.dateid == date_df.dateid, 'inner').where(
    col("year") == 2008).groupBy("qtr").sum("qtysold").select(
        col("qtr"), col("sum(qtysold)")).sort(["qtr"], ascending=[1]).withColumnRenamed("sum(qtysold)","total_quantity_sold")

# Display the output
output_df.show()

## Lets drop the queryGroup for easy validation of push down queries
# Set User name for the query
userName = 'awsuser'
jdbc_iam_url_withUserName = jdbc_iam_url+'?user='+userName

# Define the Amazon Redshift context
redshiftWriteOptions = {
    "url": jdbc_iam_url_withUserName,
    "tempdir": temp_dir,
    "aws_iam_role" : aws_role
}

# Write the Data Frame back to Amazon Redshift
output_df.write \
    .format("io.github.spark_redshift_community.spark.redshift") \
    .mode("overwrite") \
    .options(**redshiftWriteOptions) \
    .option("dbtable", "tickit.test") \
    .save()

If you plan to use the preceding script in your environment, make sure you replace the values for the following variables with the appropriate values for your environment: jdbc_iam_url, temp_dir, and aws_role.

In the next section, we walk through the steps to run this script to aggregate a sample dataset that is made available in Amazon Redshift.

Prerequisites

Before we begin, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy the CloudFormation stack:

  1. Sign in to the AWS Management Console, then launch the CloudFormation stack:
    BDB-2063-launch-cloudformation-stack

You can also download the CloudFormation template to create the resources mentioned in this post through infrastructure as code (IaC). Use this template when launching a new CloudFormation stack.

  1. Scroll down to the bottom of the page to select I acknowledge that AWS CloudFormation might create IAM resources under Capabilities, then choose Create stack.

The stack creation process takes 15–20 minutes to complete. The CloudFormation template creates the following resources:

    • An Amazon VPC with the needed subnets, route tables, and NAT gateway
    • An S3 bucket with the name redshift-spark-databucket-xxxxxxx (note that xxxxxxx is a random string to make the bucket name unique)
    • An Amazon Redshift cluster with sample data loaded inside the database dev and the primary user redshiftmasteruser. For the purpose of this blog post, redshiftmasteruser with administrative permissions is used. However, it is recommended to use a user with fine grained access control in production environment.
    • An IAM role to be used for Amazon Redshift with the ability to request temporary credentials from the Amazon Redshift cluster’s dev database
    • Amazon EMR Studio with the needed IAM roles
    • Amazon EMR release version 6.9.0 on an EC2 cluster with the needed IAM roles
    • An Amazon EMR Serverless application release version 6.9.0
    • An AWS Glue connection and AWS Glue job version 4.0
    • A Jupyter notebook to run using Amazon EMR Studio using Amazon EMR on an EC2 cluster
    • A PySpark script to run using Amazon EMR Studio and Amazon EMR Serverless
  1. After the stack creation is complete, choose the stack name redshift-spark and navigate to the Outputs

We utilize these output values later in this post.

In the next sections, we show the steps for Amazon Redshift integration for Apache Spark from Amazon EMR on Amazon EC2, Amazon EMR Serverless, and AWS Glue.

Use Amazon Redshift integration with Apache Spark on Amazon EMR on EC2

Starting from Amazon EMR release version 6.9.0 and above, the connector using Amazon Redshift integration for Apache Spark and Amazon Redshift JDBC driver are available locally on Amazon EMR. These files are located under the /usr/share/aws/redshift/ directory. However, in the previous versions of Amazon EMR, the community version of the spark-redshift connector is available.

The following example shows how to connect Amazon Redshift using a PySpark kernel via an Amazon EMR Studio notebook. The CloudFormation stack created Amazon EMR Studio, Amazon EMR on an EC2 cluster, and a Jupyter notebook available to run. To go through this example, complete the following steps:

  1. Download the Jupyter notebook made available in the S3 bucket for you:
    • In the CloudFormation stack outputs, look for the value for EMRStudioNotebook, which should point to the redshift-spark-emr.ipynb notebook available in the S3 bucket.
    • Choose the link or open the link in a new tab by copying the URL for the notebook.
    • After you open the link, download the notebook by choosing Download, which will save the file locally on your computer.
  1. Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
  2. In the navigation pane, choose Workspaces.
  3. Choose Create Workspace.
  4. Provide a name for the Workspace, for instance redshift-spark.
  5. Expand the Advanced configuration section and select Attach Workspace to an EMR cluster.
  6. Under Attach to an EMR cluster, choose the EMR cluster with the name emrCluster-Redshift-Spark.
  7. Choose Create Workspace.
  8. After the Amazon EMR Studio Workspace is created and in Attached status, you can access the Workspace by choosing the name of the Workspace.

This should open the Workspace in a new tab. Note that if you have a pop-up blocker, you may have to allow the Workspace to open or disable the pop-up blocker.

In the Amazon EMR Studio Workspace, we now upload the Jupyter notebook we downloaded earlier.

  1. Choose Upload to browse your local file system and upload the Jupyter notebook (redshift-spark-emr.ipynb).
  2. Choose (double-click) the redshift-spark-emr.ipynb notebook within the Workspace to open the notebook.

The notebook provides the details of different tasks that it performs. Note that in the section Define the variables to connect to Amazon Redshift cluster, you don’t need to update the values for jdbc_iam_url, temp_dir, and aws_role because these are updated for you by AWS CloudFormation. AWS CloudFormation has also performed the steps mentioned in the Prerequisites section of the notebook.

You can now start running the notebook.

  1. Run the individual cells by selecting them and then choosing Play.

You can also use the key combination of Shift+Enter or Shift+Return. Alternatively, you can run all the cells by choosing Run All Cells on the Run menu.

  1. Find the predicate pushdown operation performed on the Amazon Redshift cluster by the Amazon Redshift integration for Apache Spark.

We can also see the temporary data stored on Amazon S3 in the optimized Parquet format. The output can be seen from running the cell in the section Get the last query executed on Amazon Redshift.

  1. To validate the table created by the job from Amazon EMR on Amazon EC2, navigate to the Amazon Redshift console and choose the cluster redshift-spark-redshift-cluster on the Provisioned clusters dashboard page.
  2. In the cluster details, on the Query data menu, choose Query in query editor v2.
  3. Choose the cluster in the navigation pane and connect to the Amazon Redshift cluster when it requests for authentication.
  4. Select Temporary credentials.
  5. For Database, enter dev.
  6. For User name, enter redshiftmasteruser.
  7. Choose Save.
  8. In the navigation pane, expand the cluster redshift-spark-redshift-cluster, expand the dev database, expand tickit, and expand Tables to list all the tables inside the schema tickit.

You should find the table test_emr.

  1. Choose (right-click) the table test_emr, then choose Select table to query the table.
  2. Choose Run to run the SQL statement.

Use Amazon Redshift integration with Apache Spark on Amazon EMR Serverless

The Amazon EMR release version 6.9.0 and above provides the Amazon Redshift integration for Apache Spark JARs (managed by Amazon Redshift) and Amazon Redshift JDBC JARs locally on Amazon EMR Serverless as well. These files are located under the /usr/share/aws/redshift/ directory. In the following example, we use the Python script made available in the S3 bucket by the CloudFormation stack we created earlier.

  1. In the CloudFormation stack outputs, make a note of the value for EMRServerlessExecutionScript, which is the location of the Python script in the S3 bucket.
  2. Also note the value for EMRServerlessJobExecutionRole, which is the IAM role to be used with running the Amazon EMR Serverless job.
  3. Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
  4. Choose Applications under Serverless in the navigation pane.

You will find an EMR application created by the CloudFormation stack with the name emr-spark-redshift.

  1. Choose the application name to submit a job.
  2. Choose Submit job.
  3. Under Job details, for Name, enter an identifiable name for the job.
  4. For Runtime role, choose the IAM role that you noted from the CloudFormation stack output earlier.
  5. For Script location, provide the path to the Python script you noted earlier from the CloudFormation stack output.
  6. Expand the section Spark properties and choose the Edit in text
  7. Enter the following value in the text box, which provides the path to the redshift-connector, Amazon Redshift JDBC driver, spark-avro JAR, and minimal-json JAR files:
    --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-redshift.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar,/usr/share/aws/redshift/spark-redshift/lib/minimal-json.jar

  8. Choose Submit job.
  9. Wait for the job to complete and the run status to show as Success.
  10. Navigate to the Amazon Redshift query editor to view if the table was created successfully.
  11. Check the pushdown queries run for Amazon Redshift query group emr-serverless-redshift. You can run the following SQL statement against the database dev:
    SELECT query_text FROM SYS_QUERY_HISTORY WHERE query_label = 'emr-serverless-redshift' ORDER BY start_time DESC LIMIT 1

You can see that the pushdown query and return results are stored in Parquet file format on Amazon S3.

Use Amazon Redshift integration with Apache Spark on AWS Glue

Starting with AWS Glue version 4.0 and above, the Apache Spark jobs connecting to Amazon Redshift can use the Amazon Redshift integration for Apache Spark and Amazon Redshift JDBC driver. Existing AWS Glue jobs that already use Amazon Redshift as source or target can be upgraded to AWS Glue 4.0 to take advantage of this new connector. The CloudFormation template provided with this post creates the following AWS Glue resources:

  • AWS Glue connection for Amazon Redshift – The connection to establish connection from AWS Glue to Amazon Redshift using the Amazon Redshift integration for Apache Spark
  • IAM role attached to the AWS Glue job – The IAM role to manage permissions to run the AWS Glue job
  • AWS Glue job – The script for the AWS Glue job performing transformations and aggregations using the Amazon Redshift integration for Apache Spark

The following example uses the AWS Glue connection attached to the AWS Glue job with PySpark and includes the following steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Under Connections, choose the AWS Glue connection for Amazon Redshift created by the CloudFormation template.
  3. Verify the connection details.

You can now reuse this connection within a job or across multiple jobs.

  1. On the Connectors page, choose the AWS Glue job created by the CloudFormation stack under Your jobs, or access the AWS Glue job by using the URL provided for the key GlueJob in the CloudFormation stack output.
  2. Access and verify the script for the AWS Glue job.
  3. On the Job details tab, make sure that Glue version is set to Glue 4.0.

This ensures that the job uses the latest redshift-spark connector.

  1. Expand Advanced properties and in the Connections section, verify that the connection created by the CloudFormation stack is attached.
  2. Verify the job parameters added for the AWS Glue job. These values are also available in the output for the CloudFormation stack.
  3. Choose Save and then Run.

You can view the status for the job run on the Run tab.

  1. After the job run completes successfully, you can verify the output of the table test-glue created by the AWS Glue job.
  2. We check the pushdown queries run for Amazon Redshift query group glue-redshift. You can run the following SQL statement against the database dev:
    SELECT query_text FROM SYS_QUERY_HISTORY WHERE query_label = 'glue-redshift' ORDER BY start_time DESC LIMIT 1

Best practices

Keep in mind the following best practices:

  • Consider using the Amazon Redshift integration for Apache Spark from Amazon EMR instead of using the redshift-spark connector (community version) for your new Apache Spark jobs.
  • If you have existing Apache Spark jobs using the redshift-spark connector (community version), consider upgrading them to use the Amazon Redshift integration for Apache Spark
  • The Amazon Redshift integration for Apache Spark automatically applies predicate and query pushdown to optimize for performance. We recommend using supported functions (autopushdown) in your query. The Amazon Redshift integration for Apache Spark will turn the function into a SQL query and run the query in Amazon Redshift. This optimization results in required data being retrieved, so Apache Spark can process less data and have better performance.
    • Consider using aggregate pushdown functions like avg, count, max, min, and sum to retrieve filtered data for data processing.
    • Consider using Boolean pushdown operators like in, isnull, isnotnull, contains, endswith, and startswith to retrieve filtered data for data processing.
    • Consider using logical pushdown operators like and, or, and not (or !) to retrieve filtered data for data processing.
  • It’s recommended to pass an IAM role using the parameter aws_iam_role for the Amazon Redshift authentication from your Apache Spark application on Amazon EMR or AWS Glue. The IAM role should have necessary permissions to retrieve temporary IAM credentials to authenticate to Amazon Redshift as shown in this blog’s “Examples for Amazon Redshift integration for Apache Spark in an Apache Spark application” section.
  • With this feature, you don’t have to maintain your Amazon Redshift user name and password in the secrets manager and Amazon Redshift database.
  • Amazon Redshift uses the UNLOAD SQL statement to store this temporary data on Amazon S3. The Apache Spark application retrieves the results from the temporary directory (stored in Parquet file format). This temporary directory on Amazon S3 is not cleaned up automatically, and therefore could add additional cost. We recommend using Amazon S3 lifecycle policies to define the retention rules for the S3 bucket.
  • It’s recommended to turn on Amazon Redshift audit logging to log the information about connections and user activities in your database.
  • It’s recommended to turn on Amazon Redshift at-rest encryption to encrypt your data as Amazon Redshift writes it in its data centers and decrypt it for you when you access it.
  • It’s recommended to upgrade to AWS Glue v4.0 and above to use the Amazon Redshift integration for Apache Spark, which is available out of the box. Upgrading to this version of AWS Glue will automatically make use of this feature.
  • It’s recommended to upgrade to Amazon EMR v6.9.0 and above to use the Amazon Redshift integration for Apache Spark. You don’t have to manage any drivers or JAR files explicitly.
  • Consider using Amazon EMR Studio notebooks to interact with your Amazon Redshift data in your Apache Spark application.
  • Consider using AWS Glue Studio to create Apache Spark jobs using a visual interface. You can also switch to writing Apache Spark code in either Scala or PySpark within AWS Glue Studio.

Clean up

Complete the following steps to clean up the resources that are created as a part of the CloudFormation template to ensure that you’re not billed for the resources if you’ll no longer be using them:

  1. Stop the Amazon EMR Serverless application:
    • Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
    • Choose Applications under Serverless in the navigation pane.

You will find an EMR application created by the CloudFormation stack with the name emr-spark-redshift.

    • If the application status shows as Stopped, you can move to the next steps. However, if the application status is Started, choose the application name, then choose Stop application and Stop application again to confirm.
  1. Delete the Amazon EMR Studio Workspace:
    • Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
    • Choose Workspaces in the navigation pane.
    • Select the Workspace that you created and choose Delete, then choose Delete again to confirm.
  2. Delete the CloudFormation stack:
    • On the AWS CloudFormation console, navigate to the stack you created earlier.
    • Choose the stack name and then choose Delete to remove the stack and delete the resources created as a part of this post.
    • On the confirmation screen, choose Delete stack.

Conclusion

In this post, we explained how you can use the Amazon Redshift integration for Apache Spark to build and deploy applications with Amazon EMR on Amazon EC2, Amazon EMR Serverless, and AWS Glue to automatically apply predicate and query pushdown to optimize the query performance for data in Amazon Redshift. It’s highly recommended to use Amazon Redshift integration for Apache Spark for seamless and secure connection to Amazon Redshift from your Amazon EMR or AWS Glue.

Here is what some of our customers have to say about the Amazon Redshift integration for Apache Spark:

“We empower our engineers to build their data pipelines and applications with Apache Spark using Python and Scala. We wanted a tailored solution that simplified operations and delivered faster and more efficiently for our clients, and that’s what we get with the new Amazon Redshift integration for Apache Spark.”

—Huron Consulting

“GE Aerospace uses AWS analytics and Amazon Redshift to enable critical business insights that drive important business decisions. With the support for auto-copy from Amazon S3, we can build simpler data pipelines to move data from Amazon S3 to Amazon Redshift. This accelerates our data product teams’ ability to access data and deliver insights to end-users. We spend more time adding value through data and less time on integrations.”

—GE Aerospace

“Our focus is on providing self-service access to data for all of our users at Goldman Sachs. Through Legend, our open-source data management and governance platform, we enable users to develop data-centric applications and derive data-driven insights as we collaborate across the financial services industry. With the Amazon Redshift integration for Apache Spark, our data platform team will be able to access Amazon Redshift data with minimal manual steps, allowing for zero-code ETL that will increase our ability to make it easier for engineers to focus on perfecting their workflow as they collect complete and timely information. We expect to see a performance improvement of applications and improved security as our users can now easily access the latest data in Amazon Redshift.”

—Goldman Sachs


About the Authors

Gagan Brahmi is a Senior Specialist Solutions Architect focused on big data analytics and AI/ML platform at Amazon Web Services. Gagan has over 18 years of experience in information technology. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. In his spare time, he spends time with his family and explores new places.

Vivek Gautam is a Data Architect with specialization in data lakes at AWS Professional Services. He works with enterprise customers building data products, analytics platforms, and solutions on AWS. When not building and designing data lakes, Vivek is a food enthusiast who also likes to explore new travel destinations and go on hikes.

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Beaux Sharifi is a Software Development Engineer within the Amazon Redshift drivers’ team where he leads the development of the Amazon Redshift Integration with Apache Spark connector. He has over 20 years of experience building data-driven platforms across multiple industries. In his spare time, he enjoys spending time with his family and surfing.

Exploring new ETL and ELT capabilities for Amazon Redshift from the AWS Glue Studio visual editor

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/exploring-new-etl-and-elt-capabilities-for-amazon-redshift-from-the-aws-glue-studio-visual-editor/

In a modern data architecture, unified analytics enable you to access the data you need, whether it’s stored in a data lake or a data warehouse. In particular, we have observed an increasing number of customers who combine and integrate their data into an Amazon Redshift data warehouse to analyze huge data at scale and run complex queries to achieve their business goals.

One of the most common use cases for data preparation on Amazon Redshift is to ingest and transform data from different data stores into an Amazon Redshift data warehouse. This is commonly achieved via AWS Glue, which is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. AWS Glue provides an extensible architecture that enables users with different data processing use cases, and works well with Amazon Redshift. At AWS re:Invent 2022, we announced support for the new Amazon Redshift integration with Apache Spark available in AWS Glue 4.0, which provides enhanced ETL (extract, transform, and load) and ELT capabilities with improved performance.

Today, we are pleased to announce a new and enhanced visual job authoring capabilities for Amazon Redshift ETL and ELT workflows on the AWS Glue Studio visual editor. The new authoring experience gives you the ability to:

  • Get started faster with Amazon Redshift by directly browsing Amazon Redshift schemas and tables from the AWS Glue Studio visual interface
  • Flexible authoring through native Amazon Redshift SQL support as a source or custom preactions and postactions
  • Simplify common data loading operations into Amazon Redshift through new support for INSERT, TRUNCATE, DROP, and MERGE commands

With these enhancements, you can use existing transforms and connectors in AWS Glue Studio to quickly create data pipelines for Amazon Redshift. No-code users can complete end-to-end tasks using only the visual interface, SQL users can reuse their existing Amazon Redshift SQL within AWS Glue, and all users can tune their logic with custom actions on the visual editor.

In this post, we explore the new streamlined user interface and dive deeper into how to use these capabilities. To demonstrate these new capabilities, we showcase the following:

  • Passing a custom SQL JOIN statement to Amazon Redshift
  • Using the results to apply an AWS Glue Studio visual transform
  • Performing an APPEND on the results to load them into a destination table

Set up resources with AWS CloudFormation

To demonstrate the AWS Glue Studio visual editor experience with Amazon Redshift, we provide an AWS CloudFormation template for you to set up baseline resources quickly. The template creates the following resources for you:

  • An Amazon VPC, subnets, route tables, an internet gateway, and NAT gateways
  • An Amazon Redshift cluster
  • An AWS Identity and Access Management (IAM) role associated with the Amazon Redshift cluster
  • An IAM role for running the AWS Glue job
  • An Amazon Simple Storage Service (Amazon S3) bucket to be used as a temporary location for Amazon Redshift ETL
  • An AWS Secrets Manager secret that stores the user name and password for the Amazon Redshift cluster

Note that at the time of writing this post, Amazon Redshift MERGE is in preview, and the cluster created is a preview cluster.

To launch the CloudFormation stack, complete the following steps:

  1. On the AWS CloudFormation console, choose Create stack and then choose With new resources (standard).
  2. For Template source, select Upload a template file, and upload the provided template.
  3. Choose Next.
  4. Enter a name for the CloudFormation stack, then choose Next.
  5. Acknowledge that this stack might create IAM resources for you, then choose Submit.
  6. After the CloudFormation stack is successfully created, follow the steps mentioned at https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-sample-db.html to load sample tickit data into the created Redshift Cluster

Exploring Amazon Redshift reads

In this section, we go over the new read functionality in the AWS Glue Studio visual editor and demonstrate how we can run a custom SQL statement via the new UI.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the Visual with a blank canvas, because we’re authoring a job from scratch, then choose Create.
  3. In the blank canvas, choose the plus sign to add an Amazon Redshift node of type Source.

When you close the node selector, and you should see an Amazon Redshift source node on the canvas along with the data source properties.

You can choose from two methods of accessing your Amazon Redshift data:

  • Direct data connection – This new method allows you to establish a connection to your Amazon Redshift sources without the need to catalog them
  • Glue Data Catalog tables – This method requires you to have already crawled or generated your Amazon Redshift tables in the AWS Glue Data Catalog

For this post, we use the Direct data connection option.

  1. For Redshift access type, select the Direct data connection.
  2. For Redshift connection, choose your AWS Glue Connection redshift-demo-blog-connection created in the CloudFormation stack.

Specifying the connection automatically configures all the network related details along with the name of the database you wish to connect to.

The UI then presents a choice on how you’d like to access the data from within your selected Amazon Redshift cluster’s database:

  • Choose a single table – This option lets you select a single schema, and a single table from your database. You can browse through all of your available schemas and tables right from the AWS Glue Studio visual editor itself, which makes choosing your source table much easier.
  • Enter a custom query If you’re looking to perform your ETL on a subset of data from your Amazon Redshift tables, you can author an Amazon Redshift query from the AWS Glue Studio UI. This query will be passed to the connected Amazon Redshift cluster, and the returned query result will be available in downstream transformations on AWS Glue Studio.

For the purposes of this post, we write our own custom query that joins data from the preloaded event table and venue table.

  1. Select Enter a custom query and enter the following query into the query editor:
select venue.venueid from event, venue where event.venueid = venue.venueid and event.starttime between '2008-01-01 14:00:00' and '2008-01-01 15:00:00' and venue.venueseats = 0

The intent of this query is to gather the venueid of locations that have had an event between 2008-01-01 14:00:00 and 2008-01-01 15:00:00 and have had venueseats = 0. If we run a similar query from the Amazon Redshift Query Editor, we can see that there are actually five such venues within that time frame. We wish to merge this data back into Amazon Redshift without including these rows.

  1. Choose Infer schema, which allows the AWS Glue Studio visual editor to understand the schema from the returned columns from your query.

You can see the schema on the Output schema tab.

  1. Under Performance and security, for S3 staging directory, choose the S3 temporary directory location created by the CloudFormation stack ( RedshiftS3TempPath ).
  2. For IAM role, choose the IAM role specified by RedshiftIamRoleARN in the CloudFormation stack.

Now we’re going to add a transform to drop duplicate rows from our join result. This will ensure that the MERGE operation in the following steps won’t have conflicting keys when performing the operation.

  1. Choose the Drop Duplicates node to view the node properties.
  2. On the Transform tab, for Drop duplicates, select Match specific keys.
  3. For Keys to match rows, choose venueid.

In this section, we defined the steps to read the output of a custom JOIN query. We then dropped the duplicate records from the returned value. In the next section, we explore the write path on the same job.

Exploring Amazon Redshift writes

Now we go over the enhancements for writing to Amazon Redshift as a destination. This section goes over all the simplified options for writing to Amazon Redshift, but highlights the new Amazon Redshift MERGE capabilities for the purposes of this post.

The MERGE operator offers great flexibility for conditionally merging rows from a source into a destination table. MERGE is powerful because it simplifies operations that traditionally were only achievable by using multiple insert, update, or delete statements separately. Within AWS Glue Studio, particularly with the custom MERGE option, you can define a more complex matching condition to handle finding the records to update.

  1. From the canvas page of the job used in the previous section, select Amazon Redshift to add an Amazon Redshift node of type Target.

When you close the selector, you should see your Amazon Redshift target node added on the Amazon Glue Studio canvas, along with possible options.

  1. For Redshift access type, select Direct data connection.

Similar to the Amazon Redshift source node, the Direct data connection method allows you to write directly to your Amazon Redshift tables without needing to have them cataloged within the AWS Glue Data Catalog.

  1. For Redshift connection, choose your AWS Glue connection redshift-demo-blog-connection created in the CloudFormation stack.
  2. For Schema, choose public.
  3. For Table, choose the venue table as the destination Amazon Redshift table where we will store the merged data.
  4. Choose MERGE data into target table.

This selection provides the user with two options:

  • Choose keys and simple actions – This is a user-friendly version of the MERGE operation. You simply specify the matching keys, and choose what happens to the rows that match the key (update them or delete them) or don’t have any matches (insert them).
  • Enter custom MERGE statement – This option provides the most flexibility. You can enter your own custom logic for MERGE.

For this post, we use the simple actions method for performing a MERGE operation.

  1. For Handling of data and target table, select MERGE data into target table, and then select Choose keys and simple actions.
  2. For Matching Keys, select venueid .

This field will become our MERGE condition for checking keys

  1. For When matched, select the Delete record in the table
  2. For When not matched, select Insert source data as a new row into the table

With these selections, we’ve configured the AWS Glue job to run a MERGE statement on Amazon Redshift while inserting our data. Moreover, for performing this MERGE operation, we use the as the key (you can select multiple keys). If there is a key match with the destination table’s record, we delete that record. Otherwise, we insert the record into the destination table.

  1. Navigate to the Job details tab.
  2. For Name, enter a name for the job.
  3. For the IAM Role drop down, select the RedshiftIamRole role that was created via the CloudFormation template.
  4. Choose Save.

  5. Choose Run and wait for the job to finish.

You can track its progress on the Runs tab.

  1. After the run reaches a successful state, navigate back to the Amazon Redshift Query Editor.
  2. Run the same query again to discover that those rows have been deleted in accordance to our MERGE specifications.

In this section, we configured an Amazon Redshift target node to write a MERGE statement to conditionally update records in our destination Amazon Redshift table. We then saved and ran the AWS Glue job, and saw the effect of the MERGE statement on our destination Amazon Redshift table.

Other available write options

In addition to MERGE, the AWS Glue Studio visual editor’s Amazon Redshift destination node also supports a number of other common operations:

  • APPEND – Appending to your target table performs an insert into the selected table without updating any of the existing records (if there are duplicates, both records will be retained). In cases where you want to update existing rows in addition to adding new rows (often referred to an UPSERT operation), you can select the Also update existing records in target table option. Note that both APPEND only and UPSERT (APPEND with UPDATE) are a simpler subset of the MERGE functionality discussed earlier.
  • TRUNCATE – The TRUNCATE option clears all the data in the existing table but retains all the existing table schema, followed by an APPEND of all new data to the empty table. This option is often used when the full dataset needs to be refreshed and downstream services or tools depend on the table schema being consistent. For example, every night an Amazon Redshift table needs to be fully updated with the latest customer information that will be consumed by an Amazon QuickSight dashboard. In this case, the ETL developer would choose TRUNCATE to ensure the data is fully refreshed but the table schema is guaranteed not to change.
  • DROP – This option is used when the full dataset needs to be refreshed and the downstream services or tools that depend on the schema or systems can handle possible schema changes without breaking.

How write operations are being handled on the backend

The Amazon Redshift connector supports two parameters called preactions and postactions. These parameters allow you to run SQL statements that will be passed on to the Amazon Redshift data warehouse before and after the actual write operation is carried out by Spark.

On the Script tab on the AWS Glue Studio page, we can see what SQL statements are being run.

Use a custom implementation for writing data into Amazon Redshift

In the event that the provided presets require more customization, or your use case requires more advanced implementations for writing to Amazon Redshift, AWS Glue Studio also allows you to freely select which preactions and postactions can be run when writing to Amazon Redshift.

To show an example, we create an Amazon Redshift datashare as a preaction, then perform the cleaning up of the same datashare as a postaction via AWS Glue Studio.

NOTE: This section is not executed as part of the above blog and is provided as an example.

  1. Choose the Amazon Redshift data target node.
  2. On the Data target properties tab, expand the Custom Redshift parameters section.
  3. For the parameters, add the following:
    1. Parameter: preactions  with Value BEGIN; CREATE DATASHARE ds1; END
    2. Parameter: postactions with Value BEGIN; DROP DATASHARE ds1; END

As you can see, we can specify multiple Amazon Redshift statements as a part of both the preactions and postactions parameters. Remember that these statements will override any existing preactions or postactions with your specified actions (as you can see in the following generated code).

Cleanup

To avoid additional costs, make sure to delete any unnecessary resources and files:

  • Empty and delete the contents from the S3 temporary bucket
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. Make sure to empty the S3 bucket before you delete the bucket.

Conclusion

In this post, we went over the new AWS Glue Studio visual options for performing reads and writes from Amazon Redshift. We also saw the simplicity with which you can browse your Amazon Redshift tables right from the AWS Glue Studio visual editor UI, and how to run your own custom SQL statements against your Amazon Redshift sources. We then explored how to perform simple ETL loading tasks against Amazon Redshift with just a few clicks, and showcased the new Amazon Redshift MERGE statement.

To dive deeper into the new Amazon Redshift integrations for the AWS Glue Studio visual editor, check out Connecting to Redshift in AWS Glue Studio.


About the Authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team. He works with customers to help improve their big data workloads. In his spare time, he enjoys trying out new food, playing video games, and kickboxing.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Get maximum value out of your cloud data warehouse with Amazon Redshift

Post Syndicated from Sana Ahmed original https://aws.amazon.com/blogs/big-data/get-maximum-value-out-of-your-cloud-data-warehouse-with-amazon-redshift/

Every day, customers are challenged with how to manage their growing data volumes and operational costs to unlock the value of data for timely insights and innovation, while maintaining consistent performance. Data creation, consumption, and storage are predicted to grow to 175 zettabytes by 2025, forecasted by the 2022 IDC Global DataSphere report.

As data workloads grow, costs to scale and manage data usage with the right governance typically increase as well. So how do organizational leaders drive their business forward with high performance, controlled costs, and high security? With the right analytics approach, this is possible.

In this post, we look at three key challenges that customers face with growing data and how a modern data warehouse and analytics system like Amazon Redshift can meet these challenges across industries and segments.

Building an optimal data system

As data grows at an extraordinary rate, data proliferation across your data stores, data warehouse, and data lakes can become a challenge. Different departments within an organization can place data in a data lake or within their data warehouse depending on the type of data and usage patterns of that department. Teams may place their unstructured data like social media feeds within their Amazon Simple Storage Service (Amazon S3) data lake and historical structured data within their Amazon Redshift data warehouse. Teams need access to both the data lake and the data warehouse to work seamlessly for best insights, requiring an optimal data infrastructure that can scale almost infinitely to accommodate a growing number of concurrent data users without impacting performance—all while keeping costs under control.

A quintessential example of a company managing analytics on billions of data points across the data lake and the warehouse in a mission-critical business environment is Nasdaq, an American stock exchange. Within 2 years of migration to Amazon Redshift, Nasdaq was managing 30–70 billion records, growing daily worth over 4 terabytes.

With Amazon Redshift, Nasdaq was able to query their warehouse and use Amazon Redshift Spectrum, a capability to query the data quickly in place without data loading, from their S3 data lakes. Nasdaq minimized time to insights with the ability to query 15 terabytes of data on Amazon S3 immediately without any extra data loading after writing data to Amazon S3. This performance innovation allows Nasdaq to have a multi-use data lake between teams.

Robert Hunt, Vice President of Software Engineering for Nasdaq, shared, “We have to both load and consume the 30 billion records in a time period between market close and the following morning. Data loading delayed the delivery of our reports. We needed to be able to write or load data into our data storage solution very quickly without interfering with the reading and querying of the data at the same time.”

Nasdaq’s massive data growth meant they needed to evolve their data architecture to keep up. They built their foundation of a new data lake on Amazon S3 so they could deliver analytics using Amazon Redshift as a compute layer. Nasdaq’s peak volume of daily data ingestion reached 113 billion records, and they completed data loading for reporting 5 hours faster while running 32% faster queries.

Enabling newer personas with data warehousing and analytics

Another challenge is enabling newer data users and personas with powerful analytics to meet business goals and perform critical decision-making. Where traditionally it was the data engineer and the database administrator who set up and managed the warehouse, today line of business data analysts, data scientists, and developers are all using the data warehouse to get to near-real-time business decision-making.
These personas who don’t have specialized data management or data engineering skills don’t want to be concerned with managing the capacity of their analytics systems to handle unpredictable or spiky data workloads or wait for IT to optimize for cost and capacity. Customers want to get started with analytics on large amounts of data instantly and scale analytics quickly and cost-effectively without infrastructure management.

Take the case of mobile gaming company Playrix. They were able to use Amazon Redshift Serverless to serve their key stakeholders with dashboards with financial data for quick decision-making.

Igor Ivanov, Technical Director of Playrix, stated, “Amazon Redshift Serverless is great for achieving the on-demand high performance that we need for massive queries.”

Playrix had a two-fold business goal, including marketing to its end-users (game players) with near-real-time data while also analyzing their historical data for the past 4–5 years. In seeking a solution, Playrix wanted to avoid disrupting other technical processes while also increasing cost savings. The company migrated to Redshift Serverless and scaled up to handle more complicated analytics on 600 TB from the past 5 years, all without storing two copies of the data or disrupting other analytics jobs. With Redshift Serverless, Playrix achieved a more flexible architecture and saved an overall 20% in costs of its marketing stack, decreasing its cost of customer acquisition.

“With no overhead and infrastructure management,” Ivanov shared, “we now have more time for experimenting, developing solutions, and planning new research.”

Breaking down data silos

Organizations need to easily access and analyze diverse types of structured and unstructured data, including log files, clickstreams, voice, and video. However, these wide-ranging data types are typically stored in silos across multiple data stores. To unlock the true potential of the data, organizations must break down these silos to unify and normalize all types of data and ensure that the right people have access to the right data.

Data unification can get expensive fast, with time and cost spent on building complex, custom extract, transform, load (ETL) pipelines that move or copy data from system to system. If not done right, you can end up with data latency issues, inaccuracies, and potential security and data governance risks. Instead, teams are looking for ways to share transactionally consistent, live, first-party and third-party data with each other or their end customers, without data movement or data copying.

Stripe, a payment processing platform for businesses, is an Amazon Redshift customer and a partner with thousands of end customers who require access to Stripe data for their applications. Stripe built the Stripe Data Pipeline, a solution for Stripe customers to access Stripe datasets within their Amazon Redshift data warehouses, without having to build, maintain, or scale custom ETL jobs. The Stripe Data Pipeline is powered by the data sharing capability of Amazon Redshift. Customers get a single source of truth, with low-latency data access, to speed up financial close and get better insights, analyzing best-performing payment methods, fraud by location, and more. Cutting down data engineering time and effort to access unified data creates new business opportunities from comprehensive insights and saves costs.

A modern data architecture with Amazon Redshift

These stories about harnessing maximum value from siloed data across the organization and applying powerful analytics for business insights in a cost-efficient way are possible because of AWS’s approach to a modern data architecture for their customers. Within this architecture, AWS’s data warehousing solution Amazon Redshift is a fully managed petabyte scale system, deeply integrated with AWS database, analytics, and machine learning (ML) services. Tens of thousands of customers use Amazon Redshift every day to run data warehousing and analytics in the cloud and process exabytes of data for business insights. Customers looking for a highly performing, cost-optimized cloud data warehouse solution choose Amazon Redshift for the following reasons:

  • Its leadership in price-performance
  • The ability to break through data silos for meaningful insights
  • Easy analytics capabilities that cut down data engineering and administrative requirements
  • Security and reliability features that are offered out of the box, at no additional cost

The price-performance in a cloud data warehouse benchmark metric is simply defined as the cost to perform a particular workload. Knowing how much your data warehouse is going to cost and how performance changes as your user base and data processing increases is crucial for planning, budgeting, and decision-making around choosing the best data warehouse.

Amazon Redshift is able to attain the best price-performance for customers (up to five times better than other cloud data warehouses) by optimizing the code for AWS hardware, high-performance and power-efficient compute hardware, new compression and caching algorithms, and autonomics (ML-based optimizations) within the warehouse to abstract the administrative activities away from the user, saving time and improving performance. Flexible pricing options such as pay-as-you-go with Redshift Serverless, separation of storage and compute scaling, and 1–3-year compute reservations with heavy discounts keep prices low.

The native integrations in Amazon Redshift with databases, data lakes, streaming data services, and ML services, employing zero-ETL approaches help you access data in place without data movement and easily ingest data into the warehouse without building complex pipelines. This keeps data engineering costs low and expands analytics for more users.

For example, the integration in Amazon Redshift with Amazon SageMaker allows data analysts to stay within the data warehouse and create, train, and build ML models in SQL with no need for ETL jobs or learning new languages for ML (see Jobcase Scales ML Workflows to Support Billions of Daily Predictions Using Amazon Redshift ML for an example). Every week, over 80 billion predictions happen in the warehouse with Amazon Redshift ML.

Finally, customers don’t have to pay more to secure their critical data assets. Security features offer comprehensive identity management with data encryption, granular access controls at row and column level, and data masking abilities to protect sensitive data and authorizations for the right users or groups. These features are available out of the box, within the standard pricing model.

Conclusion

Overall, customers who choose Amazon Redshift innovate in a new reality where the data warehouse scales up and down automatically as workloads change, and maximizes the value of data for all cornerstones of their business.

For market leaders like Nasdaq, they are able to ingest billions of data points daily for trading and selling at high volume and velocity, all in time for proper billing and trading the following business day. For customers like Playrix, choosing Redshift Serverless means marketing to customers with comprehensive analytics in near-real time without getting bogged down by maintenance and overhead. For Stripe, it also means taking the complexity and TCO out of ETL, removing silos and unifying data.

Although data will continue to grow at unprecedented amounts, your bottom line doesn’t need to suffer. While organizational leaders face the pressures of solving for cost optimization in all types of economic environments, Amazon Redshift gives market leaders a space to innovate without compromising their data value, performance, and budgets of their cloud data warehouse.

Learn more about maximizing the value of your data with a modern data warehouse like Amazon Redshift. For more information about the price-performance leadership of Amazon Redshift and to review benchmarks against other vendors, see Amazon Redshift continues its price-performance leadership. Additionally, you can optimize costs using a variety of performance and cost levers, including Amazon Redshift’s flexible pricing models, which cover pay-as-you-go pricing for variable workloads, free trials, and reservations for steady state workloads.


About the authors

Sana Ahmed is a  Sr. Product Marketing Manager for Amazon Redshift. She is passionate about people, products and problem-solving with product marketing. As a Product Marketer, she has taken 50+ products to market and worked at various different companies including Sprinklr, PayPal and Facebook. Her hobbies include tennis, museum-hopping and fun conversations with friends and family.

Sunaina AbdulSalah leads product marketing for Amazon Redshift. She focuses on educating customers about the impact of data warehousing and analytics and sharing AWS customer stories. She has a deep background in marketing and GTM functions in the B2B technology and cloud computing domains. Outside of work, she spends time with her family and friends and enjoys traveling.

Automate discovery of data relationships using ML and Amazon Neptune graph technology

Post Syndicated from Moira Lennox original https://aws.amazon.com/blogs/big-data/automate-discovery-of-data-relationships-using-ml-and-amazon-neptune-graph-technology/

Data mesh is a new approach to data management. Companies across industries are using a data mesh to decentralize data management to improve data agility and get value from data. However, when a data producer shares data products on a data mesh self-serve web portal, it’s neither intuitive nor easy for a data consumer to know which data products they can join to create new insights. This is especially true in a large enterprise with thousands of data products.

This post shows how to use machine learning (ML) and Amazon Neptune to create automated recommendations to join data products and display those recommendations alongside the existing data products. This allows data consumers to easily identify new datasets and provides agility and innovation without spending hours doing analysis and research.

Background

The success of a data-driven organization recognizes data as a key enabler to increase and sustain innovation. It follows what is called a distributed system architecture. The goal of a data product is to solve the long-standing issue of data silos and data quality. Independent data products often only have value if you can connect them, join them, and correlate them to create a higher order data product that creates additional insights. A modern data architecture is critical in order to become a data-driven organization. It allows stakeholders to manage and work with data products across the organization, enhancing the pace and scale of innovation.

Solution overview

A data mesh architecture starts to solve for the decoupled architecture by decoupling the data infrastructure from the application infrastructure, which is a common challenge in traditional data architectures. It focuses on decentralized ownership, domain design, data products, and self-serve data infrastructure. This allows for a new way of thinking and new organizational elements—namely, a modern data community.

However, today’s data mesh platform contains largely independent data products. Even with well-documented data products, knowing how to connect or join data products is a time-consuming job. Data consumers spend hours, days, or months to understand and analyze the data. Identifying links or relationships between data products is critical to create value from the data mesh and enable a data-driven organization.

The solution in this post illustrates an approach to solving these challenges. It uses a fictional insurance company with several data products shared on their data mesh marketplace. The following figure shows the sample data products used in our solution.

Suppose a consumer is browsing the customer data product in the data mesh marketplace. The consumer wonders if the customer data could be linked to claim, policy, or encounter data. Because these data products come from different lines of business (LOBs) or silos, it’s hard to know. A consumer would have to review each data product and do the necessary analysis and research to know this with any certainty.

To solve this problem, our solution uses ML and Neptune to create recommendations for the data consumer. The solution generates a list of data products, product attributes, and the associated probability scores to show join ability. This reduces the time to discover, analyze, and create new insights.

We use Valentine, a data science algorithm for comparing datasets, to improve data product recommendations. Neptune, the managed AWS graph database service, stores information about explicit connections between datasets, improving the recommendations.

Example use case

Let’s walk through a concrete example. Suppose a consumer is browsing the Customer data product in the data mesh marketplace. Customer is similar to the Policy and Encounter data products, but these products come from different silos. Their similarity to the Customer is hard to gauge. To expedite the consumer’s work, the mesh recommends how the Policy and Encounter products can be connected to the Customer product.

Let’s consider two cases. First, is Customer similar to Claim? The following is a sample of the data in each product.

Intuitively, these two products have lots of overlap. Every Cust_Nbr in Claim has a corresponding Customer_ID in Customer. There is no foreign key constraint in Claim that assures us it points to Customer. We think there is enough similarity to infer a join relationship.

The data science algorithm Valentine is an effective tool for this. Valentine is presented in the paper Valentine: Evaluating Matching Techniques for Dataset Discovery (2021, Koutras et al.). Valentine determines if two datasets are joinable or unionable. We focus on the former. Two datasets are joinable if a record from one dataset has a link to a record in the other dataset using one or more columns. Valentine addresses the use case where data is messy: there is no foreign key constraint in place, and data doesn’t match perfectly between datasets. Valentine looks for similarities, and its findings are probabilistic. It scores its proposed matches.

This solution uses an implementation of Valentine available in the following GitHub repo. The first step is to load each data product from its source into a Pandas data frame. If the data is large, load a representative subset of it, at most a few million records. Pass the frames to the valentine_match() function and select the matching method. We use COMA, one of several methods that Valentine supports. The function’s result indicates the similarity of columns and the score. In this case, it tells us that the Customer_ID for Customer matches the Cust_Nbr for Claim, with a very high score. We then instruct the data mesh to recommend Claim to the consumer browsing Customer.

A graph database isn’t required to recommend Claim; the two products could be directly compared. But let’s consider Encounter. Is Customer similar to Encounter? This case is more complicated. Many encounters in the Encounter product don’t link to a customer. An encounter occurs when someone contacts the contact center, which could be by phone or email. The party may or may not be a customer, and if they are a customer, we may not know their customer ID during this encounter. Additionally, sometimes the phone or email they use isn’t the same as the one from a customer record in the Customer product.

In the following sample encounter set, encounters 1 and 2 match to Customer_ID 4. Note that encounter 2’s inbound_email doesn’t exactly match the inbound_email in that customer’s record in the Customer product. Encounter 3 has no Customer_ID, but its inbound_email matches the customer with ID 8. Encounter 4 appears to refer to the customer with ID 8, but the email doesn’t match, and no Customer_ID is given. Encounter 5 only has Inbound_Phone, but that matches the customer with ID 1. Encounter 6 only has an Inbound_Phone, and it doesn’t appear to match any of the customers we’ve listed so far.

We don’t have a strong enough comparison to infer similarity.

But we know more about the customer than the Customer product tells us. In the Neptune database, we maintain a knowledge graph that combines multiple products and links them through relationships. A knowledge graph allows us to combine data from different sources to gain a better understanding of a specific problem domain. In Neptune, we combine the Customer product data with an additional data product: Sales Opportunity. We ingest each product from its source into the knowledge graph and model a hasSalesOpportunity relationship between Customer and SalesOpportunity resources. The following figure shows these resources, their attributes, and their relationship.

With the AWS SDK for Pandas, we combine this data by running a query against the Neptune graph. We use a graph query language (such as SPARQL) to wrangle a representative subset of customer and sales opportunity data into a Pandas data frame (shown as Enhanced Customer View in the following figure). In the following example, we enhance customers 7 and 8 with alternate phone or email contact data from sales opportunities.

We pass that frame to Valentine and compare it to Encounter. This time, two additional encounters match a customer.

The score meets our threshold, and is high enough to share with the consumer as a possible match. To the customer browsing Customer in the mesh marketplace, we present the recommendation of Encounter, along with scoring details to support the recommendation. With this recommendation, the consumer can explore the Encounter product with greater confidence.

Conclusion

Data-driven organizations are transitioning to a data product way of thinking. Utilizing strategies like data mesh generates value on a large scale. We took this a step further by creating a blueprint to create smart recommendations by linking similar data products using graph technology and ML. In this post, we showed how an organization can augment a data catalog with additional metadata by using ML and Neptune with an automated process.

This solution solves the interoperability and linkage problem for data products. Additionally, it gives organizations real-time insights, agility, and innovation without spending time on data analysis and research. This approach creates a truly connected ecosystem with simplified access to delight your data consumers. The current solution is platform agnostic; however, in a future post we will show how to implement this using data.all (open-source software) and Amazon DataZone.

To learn more about ML in Neptune, refer to Amazon Neptune ML for machine learning on graphs. You can also explore Neptune notebooks demonstrating ML and data science for graphs. For more information about the data mesh architecture, refer to Design a data mesh architecture using AWS Lake Formation and AWS Glue. To learn more about Amazon DataZone and how you can share, search, and discover data at scale across organizational boundaries.


About the Authors


Moira Lennox
is a Senior Data Strategy Technical Specialist for AWS with 27 years’ experience helping companies innovate and modernize their data strategies to achieve new heights and allow for strategic decision-making. She has experience working in large enterprises and technology providers, in both business and technical roles across multiple industries, including health care live sciences, financial services, communications, digital entertainment, energy, and manufacturing.

Joel Farvault is Principal Specialist SA Analytics for AWS with 25 years’ experience working on enterprise architecture, data strategy, and analytics, mainly in the financial services industry. Joel has led data transformation projects on fraud analytics, claims automation, and data governance.

Mike Havey is a Solutions Architect for AWS with over 25 years of experience building enterprise applications. Mike is the author of two books and numerous articles. His Amazon author page

Configure SAML federation for Amazon OpenSearch Serverless with AWS IAM Identity Center

Post Syndicated from Utkarsh Agarwal original https://aws.amazon.com/blogs/big-data/configure-saml-federation-for-amazon-opensearch-serverless-with-aws-iam-identity-center/

Amazon OpenSearch Serverless is a serverless option of Amazon OpenSearch Service that makes it easy for you to run large-scale search and analytics workloads without having to configure, manage, or scale OpenSearch clusters. It automatically provisions and scales the underlying resources to deliver fast data ingestion and query responses for even the most demanding and unpredictable workloads. With OpenSearch Serverless, you can configure SAML to enable users to access data through OpenSearch Dashboards using an external SAML identity provider (IdP).

AWS IAM Identity Center (Successor to AWS Single Sign-On) helps you securely create or connect your workforce identities and manage their access centrally across AWS accounts and applications, OpenSearch Dashboards being one of them.

In this post, we show you how to configure SAML authentication for OpenSearch Dashboards using IAM Identity Center as its IdP.

Solution overview

The following diagram illustrates how the solution allows users or groups to authenticate into OpenSearch Dashboards using single sign-on (SSO) with IAM Identity Center using its built-in directory as the identity source.

The workflow steps are as follows:

  1. A user accesses the OpenSearch Dashboard URL in their browser and chooses the SAML provider.
  2. OpenSearch Serverless redirects the login to the specified IdP.
  3. The IdP provides a login form for the user to specify the credentials for authentication.
  4. After the user is authenticated successfully, a SAML assertion is sent back to OpenSearch Serverless.

OpenSearch Serverless validates the SAML assertion, and the user logs in to OpenSearch Dashboards.

Prerequisites

To get started, you must have an active OpenSearch Serverless collection. Refer to Creating and managing Amazon OpenSearch Serverless collections to learn more about creating a collection. Furthermore, you must have the correct AWS Identity and Access Management (IAM) permissions for configuring SAML authentication along with relevant IAM permissions for configuring the data access policy.

IAM Identity Center should be enabled, and you should have the relevant IAM permissions to create an application in IAM Identity Center and create and manage users and groups.

Create and configure the application in IAM Identity Center

To set up your application in IAM Identity Center, complete the following steps:

  1. On the IAM Identity Center dashboard, choose Applications in the navigation pane.
  2. Choose Add application
  3. For Custom application, select Add custom SAML 2.0 application.
  4. Choose Next.
  5. Under Configure application, enter a name and description for the application.
  6. Under IAM Identity Center metadata, choose Download under IAM Identity Center SAML metadata file.

We use this metadata file to create a SAML provider under OpenSearch Serverless. It contains the public certificate used to verify the signature of the IAM Identity Center SAML assertions.

  1. Under Application properties, leave Application start URL and Relay state blank.
  2. For Session duration, choose 1 hour (the default value).

Note that the session duration you configure in this step takes precedence over the OpenSearch Dashboards timeout setting specified in the configuration of the SAML provider details on the OpenSearch Serverless end.

  1. Under Application metadata, select Manually type your metadata values.
  2. For Application ACS URL, enter your URL using the format https://collection.<REGION>.aoss.amazonaws.com/_saml/acs. For example, we enter https://collection.us-east-1.aoss.amazonaws.com/_saml/acs for this post.
  3. For Application SAML audience, enter your service provider in the format aws:opensearch:<aws account id>.
  4. Choose Submit.

Now you modify the attribute settings. The attribute mappings you configure here become part of the SAML assertion that is sent to the application.

  1. On the Actions menu, choose Edit attribute mappings.
  2. Configure Subject to map to ${user:email}, with the format unspecified.

Using ${user:email} here ensures that the email address for the user in IAM Identity Center is passed in the <NameId> tag of the SAML response.

  1. Choose Save changes.

Now we assign a user to the application.

  1. Create a user in IAM Identity Center to use to log in to OpenSearch Dashboards.

Alternatively, you can use an existing user.

  1. On the IAM Identity Center console, navigate to your application and choose Assign Users and select the user(s) you would like to assign.

You have now created a custom SAML application. Next, you will configure the SAML provider in OpenSearch Serverless.

Create a SAML provider

The SAML provider you create in this step can be assigned to any collection in the same Region. Complete the following steps:

  1. On the OpenSearch Service console, under Serverless in the navigation pane, choose SAML authentication under Security.
  2. Choose Create SAML provider.
  3. Enter a name and description for your SAML provider.
  4. Enter the metadata from your IdP that you downloaded earlier.
  5. Under Additional settings, you can optionally add custom user ID and group attributes. We leave these settings blank for now.
  6. Choose Create a SAML provider.

You have now configured a SAML provider for OpenSearch Serverless. Next, we walk you through configuring the data access policy for accessing collections.

Create the data access policy

In this section, you set up data access policies for OpenSearch Serverless and allow access to the users. Complete the following steps:

  1. On the OpenSearch Service console, under Serverless in the navigation pane, choose Data access policies under Security.
  2. Choose Create access policy.
  3. Enter a name and description for your access policy.
  4. For Policy definition method, select Visual Editor.
  5. In the Rules section, enter a rule name.
  6. Under Select principals, for Add principals, choose SAML users and groups.
  7. For SAML provider name, choose the SAML provider you created earlier.
  8. Specify the user in the format user/<email> (for example, user/[email protected]).

The value of the email address should match the email address in IAM Identity Center.

  1. Choose Save.
  2. Choose Grant and specify the permissions.

You can configure what access you want to provide for the specific user at the collection level and specific indexes at the index pattern level.

You should select the access the user needs based on the least privilege model. Refer to Supported policy permissions and Supported OpenSearch API operations and permissions to set up more granular access for your users.

  1. Choose Save and configure any additional rules, if required.

You can now review and edit your configuration if needed.

  1. Choose Create to create the data access policy.

Now you have the data access policy that will allow the users to perform the allowed actions on OpenSearch Dashboards.

Access OpenSearch Dashboards

To sign in to OpenSearch Dashboards, complete the following steps:

  1. On the OpenSearch Service dashboard, under Serverless in the navigation pane, choose Dashboard.
  2. Locate your dashboard and copy the OpenSearch Dashboards URL (in the format <collection-endpoint>/_dashboards).
  3. Enter this URL into a new browser tab.
  4. On the OpenSearch login page, choose your IdP and specify your SSO credentials.
  5. Choose Login.

Configure SAML authentication using groups in IAM Identity Center

Groups can help you organize your users and permissions in a coherent way. With groups, you can add multiple users from the IdP, and then use groupid as the identifier in the data access policy. For more information, refer to Add groups and Add users to groups.

To configure group access to OpenSearch Dashboards, complete the following steps:

  1. On the IAM Identity Center console, navigate to your application.
  2. In the Attribute mappings section, add an additional user as group and map it to ${user:groups}, with the format unspecified.
  3. Choose Save changes.
  4. For the SAML provider in OpenSearch Serverless, under Additional settings, for Group attribute, enter group.
  5. For the data access policy, create a new rule or add an additional principal in the previous rule.
  6. Choose the SAML provider name and enter group/<GroupId>.

You can fetch the value for the group ID by navigating to the Group section on the IAM Identity Center console.

Clean up

If you don’t want to continue using the solution, be sure to delete the resources you created:

  1. On the IAM Identity Center console, remove the application.
  2. On OpenSearch Dashboards, delete the following resources:
    1. Delete your collection.
    2. Delete the data access policy.
    3. Delete the SAML provider.

Conclusion

In this post, you learned how to set up IAM Identity Center as an IdP to access OpenSearch Dashboards using SAML as SSO. You also learned on how to set up users and groups within IAM Identity Center and control the access of users and groups for OpenSearch Dashboards. For more details, refer to SAML authentication for Amazon OpenSearch Serverless.

Stay tuned for a series of posts focusing on the various options available for you to build effective log analytics and search solutions using OpenSearch Serverless. You can also refer to the Getting started with Amazon OpenSearch Serverless workshop to know more about OpenSearch Serverless.

If you have feedback about this post, submit it in the comments section. If you have questions about this post, start a new thread on the OpenSearch Service forum or contact AWS Support.


About the Authors

Utkarsh Agarwal is a Cloud Support Engineer in the Support Engineering team at Amazon Web Services. He specializes in Amazon OpenSearch Service. He provides guidance and technical assistance to customers thus enabling them to build scalable, highly available and secure solutions in AWS Cloud. In his free time, he enjoys watching movies, TV series and of course cricket! Lately, he his also attempting to master the art of cooking in his free time – The taste buds are excited, but the kitchen might disagree.

Ravi Bhatane is a software engineer with Amazon OpenSearch Serverless Service. He is passionate about security, distributed systems, and building scalable services. When he’s not coding, Ravi enjoys photography and exploring new hiking trails with his friends.

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

How CyberSolutions built a scalable data pipeline using Amazon EMR Serverless and the AWS Data Lab

Post Syndicated from Constantin Scoarță original https://aws.amazon.com/blogs/big-data/how-cybersolutions-built-a-scalable-data-pipeline-using-amazon-emr-serverless-and-the-aws-data-lab/

This post is co-written by Constantin Scoarță and Horațiu Măiereanu from CyberSolutions Tech.

CyberSolutions is one of the leading ecommerce enablers in Germany. We design, implement, maintain, and optimize award-winning ecommerce platforms end to end. Our solutions are based on best-in-class software like SAP Hybris and Adobe Experience Manager, and complemented by unique services that help automate the pricing and sourcing processes.

We have built data pipelines to process, aggregate, and clean our data for our forecasting service. With the growing interest in our services, we wanted to scale our batch-based data pipeline to process more historical data on a daily basis and yet remain performant, cost-efficient, and predictable. To meet our requirements, we have been exploring the use of Amazon EMR Serverless as a potential solution.

To accelerate our initiative, we worked with the AWS Data Lab team. They offer joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics initiatives. We chose to work through a Build Lab, which is a 2–5-day intensive build with a technical customer team.

In this post, we share how we engaged with the AWS Data Lab program to build a scalable and performant data pipeline using EMR Serverless.

Use case

Our forecasting and recommendation algorithm is fed with historical data, which needs to be curated, cleaned, and aggregated. Our solution was based on AWS Glue workflows orchestrating a set of AWS Glue jobs, which worked fine for our requirements. However, as our use case developed, it required more computations and bigger datasets, resulting into unpredictable performance and cost.

This pipeline performs daily extracts from our data warehouse and a few other systems, curates the data, and does some aggregations (such as daily average). Those will be consumed by our internal tools and generate recommendations accordingly. Prior to the engagement, the pipeline was processing 28 days’ worth of historical data in approximately 70 minutes. We wanted to extend that to 100 days and 365 days of data without having to extend the extraction window or factor in the resources configured.

Solution overview

While working with the Data Lab team, we decided to structure our efforts into two approaches. As a short-term improvement, we were looking into optimizing the existing pipeline based on AWS Glue extract, transform, and load (ETL) jobs, orchestrated via AWS Glue workflows. However, for the mid-term to long-term, we looked at EMR Serverless to run our forecasting data pipeline.

EMR Serverless is an option in Amazon EMR that makes it easy and cost-effective for data engineers and analysts to run petabyte-scale data analytics in the cloud. With EMR Serverless, we could run applications built using open-source frameworks such as Apache Spark (as in our case) without having to configure, manage, optimize, or secure clusters. The following factors influenced our decision to use EMR Serverless:

  • Our pipeline had minimal dependency on the AWS Glue context and its features, instead running native Apache Spark
  • EMR Serverless offers configurable drivers and workers
  • With EMR Serverless, we were able to take advantage of its cost tracking feature for applications
  • The need for managing our own Spark History Server was eliminated because EMR Serverless automatically creates a monitoring Spark UI for each job

Therefore, we planned the lab activities to be categorized as follows:

  • Improve the existing code to be more performant and scalable
  • Create an EMR Serverless application and adapt the pipeline
  • Run the entire pipeline with different date intervals

The following solution architecture depicts the high-level components we worked with during the Build Lab.

In the following sections, we dive into the lab implementation in more detail.

Improve the existing code

After examining our code decisions, we identified a step in our pipeline that consumed the most time and resources, and we decided to focus on improving it. Our target job for this optimization was the “Create Moving Average” job, which involves computing various aggregations such as averages, medians, and sums on a moving window. Initially, this step took around 4.7 minutes to process an interval of 28 days. However, running the job for larger datasets proved to be challenging – it didn’t scale well and even resulted in errors in some cases.

While reviewing our code, we focused on several areas, including checking data frames at certain steps to ensure that they contained content before proceeding. Initially, we used the count() API to achieve this, but we discovered that head() was a better alternative because it returns the first n rows only and is faster than count() for large input data. With this change, we were able to save around 15 seconds when processing 28 days’ worth of data. Additionally, we optimized our output writing by using coalesce() instead of repartition().

These changes managed to shave off some time, down to 4 minutes per run. However, we could achieve a better performance by using cache() on data frames before performing the aggregations, which materializes the data frame upon the following transformation. Additionally, we used unpersist() to free up executors’ memory after we were done with the mentioned aggregations. This led to a runtime of approximately 3.5 minutes for this job.

Following the successful code improvements, we managed to extend the data input to 100 days, 1 year, and 3 years. For this specific job, the coalesce() function wasn’t avoiding the shuffle operation and caused uneven data distribution per executor, so we switched back to repartition() for this job. By the end, we managed to get successful runs in 4.7, 12, and 57 minutes, using the same number of workers in AWS Glue (10 standard workers).

Adapt code to EMR Serverless

To observe if running the same job in EMR Serverless would yield better results, we configured an application that uses a comparable number of executors as in AWS Glue jobs. In the job configurations, we used 2 cores and 6 GB of memory for the driver and 20 executors with 4 cores and 16 GB of memory. However, we didn’t use additional ephemeral storage (by default, workers come with free 20 GB).

By the time we had the Build Lab, AWS Glue supported Apache Spark 3.1.1; however, we opted to use Spark 3.2.0 (Amazon EMR version 6.6.0) instead. Additionally, during the Build Lab, only x86_64 EMR Serverless applications were available, although it now also supports arm64-based architecture.

We adapted the code utilizing AWS Glue context to work with native Apache Spark. For instance, we needed to overwrite existing partitions and sync updates with the AWS Glue Data Catalog, especially when old partitions were replaced and new ones were added. We achieved this by setting spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") and using an MSCK REPAIR query to sync the relevant table. Similarly, we replaced the read and write operations to rely on Apache Spark APIs.

During the tests, we intentionally disabled the fine-grained auto scaling feature of EMR Serverless while running jobs, in order to observe how the code would perform with the same number of workers but different date intervals. We achieved that by setting spark.dynamicAllocation.enabled to disabled (the default is true).

For the same code, number of workers, and data inputs, we managed to get better performance results with EMR Serverless, which were 2.5, 2.9, 6, and 16 minutes for 28 days, 100 days, 1 year, and 3 years, respectively.

Run the entire pipeline with different date intervals

Because the code for our jobs was implemented in a modular fashion, we were able to quickly test all of them with EMR Serverless and then link them together to orchestrate the pipeline via Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Regarding performance, our previous pipeline using AWS Glue took around 70 minutes to run with our regular workload. However, our new pipeline, powered by Amazon MWAA-backed EMR Serverless, achieved similar results in approximately 60 minutes. Although this is a notable improvement, the most significant benefit was our ability to scale up to process larger amounts of data using the same number of workers. For instance, processing 1 year’s worth of data only took around 107 minutes to complete.

Conclusion and key takeaways

In this post, we outlined the approach taken by the CyberSolutions team in conjunction with the AWS Data Lab to create a high-performing and scalable demand forecasting pipeline. By using optimized Apache Spark jobs on customizable EMR Serverless workers, we were able to surpass the performance of our previous workflow. Specifically, the new setup resulted in 50–72% better performance for most jobs when processing 100 days of data, resulting in an overall cost savings of around 38%.

EMR Serverless applications’ features helped us have better control over cost. For example, we configured the pre-initialized capacity, which resulted in job start times of 1–4 seconds. And we set up the application behavior to start with the first submitted job and automatically stop after a configurable idle time.

As a next step, we are actively testing AWS Graviton2-based EMR applications, which come with more performance gains and lower cost.


About the Authors

 Constantin Scoarță is a Software Engineer at CyberSolutions Tech. He is mainly focused on building data cleaning and forecasting pipelines. In his spare time, he enjoys hiking, cycling, and skiing.

Horațiu Măiereanu is the Head of Python Development at CyberSolutions Tech. His team builds smart microservices for ecommerce retailers to help them improve and automate their workloads. In his free time, he likes hiking and traveling with his family and friends.

Ahmed Ewis is a Solutions Architect at the AWS Data Lab. He helps AWS customers design and build scalable data platforms using AWS database and analytics services. Outside of work, Ahmed enjoys playing with his child and cooking.

Reference guide to build inventory management and forecasting solutions on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-build-inventory-management-and-forecasting-solutions-on-aws/

Inventory management is a critical function for any business that deals with physical products. The primary challenge businesses face with inventory management is balancing the cost of holding inventory with the need to ensure that products are available when customers demand them.

The consequences of poor inventory management can be severe. Overstocking can lead to increased holding costs and waste, while understocking can result in lost sales, reduced customer satisfaction, and damage to the business’s reputation. Inefficient inventory management can also tie up valuable resources, including capital and warehouse space, and can impact profitability.

Forecasting is another critical component of effective inventory management. Accurately predicting demand for products allows businesses to optimize inventory levels, minimize stockouts, and reduce holding costs. However, forecasting can be a complex process, and inaccurate predictions can lead to missed opportunities and lost revenue.

To address these challenges, businesses need an inventory management and forecasting solution that can provide real-time insights into inventory levels, demand trends, and customer behavior. Such a solution should use the latest technologies, including Internet of Things (IoT) sensors, cloud computing, and machine learning (ML), to provide accurate, timely, and actionable data. By implementing such a solution, businesses can improve their inventory management processes, reduce holding costs, increase revenue, and enhance customer satisfaction.

In this post, we discuss how to streamline inventory management forecasting systems with AWS managed analytics, AI/ML, and database services.

Solution overview

In today’s highly competitive business landscape, it’s essential for retailers to optimize their inventory management processes to maximize profitability and improve customer satisfaction. With the proliferation of IoT devices and the abundance of data generated by them, it has become possible to collect real-time data on inventory levels, customer behavior, and other key metrics.

To take advantage of this data and build an effective inventory management and forecasting solution, retailers can use a range of AWS services. By collecting data from store sensors using AWS IoT Core, ingesting it using AWS Lambda to Amazon Aurora Serverless, and transforming it using AWS Glue from a database to an Amazon Simple Storage Service (Amazon S3) data lake, retailers can gain deep insights into their inventory and customer behavior.

With Amazon Athena, retailers can analyze this data to identify trends, patterns, and anomalies, and use Amazon ElastiCache for customer-facing applications with reduced latency. Additionally, by building a point of sales application on Amazon QuickSight, retailers can embed customer 360 views into the application to provide personalized shopping experiences and drive customer loyalty.

Finally, we can use Amazon SageMaker to build forecasting models that can predict inventory demand and optimize stock levels.

With these AWS services, retailers can build an end-to-end inventory management and forecasting solution that provides real-time insights into inventory levels and customer behavior, enabling them to make informed decisions that drive business growth and customer satisfaction.

The following diagram illustrates a sample architecture.

With the appropriate AWS services, your inventory management and forecasting system can have optimized collection, storage, processing, and analysis of data from multiple sources. The solution includes the following components.

Data ingestion and storage

Retail businesses have event-driven data that requires action from downstream processes. It’s critical for an inventory management application to handle the data ingestion and storage for changing demands.

The data ingestion process is typically triggered by an event such as an order being placed, kicking off the inventory management workflow, which requires actions from backend services. Developers are responsible for the operational overhead of trying to maintain the data ingestion load from an event driven-application.

The volume and velocity of data can change in the retail industry each day. Events like Black Friday or a new campaign can create volatile demand in what is required to process and store the inventory data. Serverless services designed to scale to businesses’ needs help reduce the architectural and operational challenges that are driven from high-demand retail applications.

Understanding the scaling challenges that occur when inventory demand spikes, we can deploy Lambda, a serverless, event-driven compute service, to trigger the data ingestion process. As inventory events occur like purchases or returns, Lambda automatically scales compute resources to meet the volume of incoming data.

After Lambda responds to the inventory action request, the updated data is stored in Aurora Serverless. Aurora Serverless is a serverless relational database that is designed to scale to the application’s needs. When peak loads hit during events like Black Friday, Aurora Serverless deploys only the database capacity necessary to meet the workload.

Inventory management applications have ever-changing demands. Deploying serverless services to handle the ingestion and storage of data will not only optimize cost but also reduce the operational overhead for developers, freeing up bandwidth for other critical business needs.

Data performance

Customer-facing applications require low latency to maintain positive user experiences with microsecond response times. ElastiCache, a fully managed, in-memory database, delivers high-performance data retrieval to users.

In-memory caching provided by ElastiCache is used to improve latency and throughput for read-heavy applications that online retailers experience. By storing critical pieces of data in-memory like commonly accessed product information, the application performance improves. Product information is an ideal candidate for a cached store due to data staying relatively the same.

Functionality is often added to retail applications to retrieve trending products. Trending products can be cycled through the cache dependent on customer access patterns. ElastiCache manages the real-time application data caching, allowing your customers to experience microsecond response times while supporting high-throughput handling of hundreds of millions of operations per second.

Data transformation

Data transformation is essential in inventory management and forecasting solutions for both data analysis around sales and inventory, as well as ML for forecasting. This is because raw data from various sources can contain inconsistencies, errors, and missing values that may distort the analysis and forecast results.

In the inventory management and forecasting solution, AWS Glue is recommended for data transformation. The tool addresses issues such as cleaning, restructuring, and consolidating data into a standard format that can be easily analyzed. As a result of the transformation, businesses can obtain a more precise understanding of inventory, sales trends, and customer behavior, influencing data-driven decisions to optimize inventory management and sales strategies. Furthermore, high-quality data is crucial for ML algorithms to make accurate forecasts.

By transforming data, organizations can enhance the accuracy and dependability of their forecasting models, ultimately leading to improved inventory management and cost savings.

Data analysis

Data analysis has become increasingly important for businesses because it allows leaders to make informed operational decisions. However, analyzing large volumes of data can be a time-consuming and resource-intensive task. This is where Athena come in. With Athena, businesses can easily query historical sales and inventory data stored in S3 data lakes and combine it with real-time transactional data from Aurora Serverless databases.

The federated capabilities of Athena allow businesses to generate insights by combining datasets without the need to build ETL (extract, transform, and load) pipelines, saving time and resources. This enables businesses to quickly gain a comprehensive understanding of their inventory and sales trends, which can be used to optimize inventory management and forecasting, ultimately improving operations and increasing profitability.

With Athena’s ease of use and powerful capabilities, businesses can quickly analyze their data and gain valuable insights, driving growth and success without the need for complex ETL pipelines.

Forecasting

Inventory forecasting is an important aspect of inventory management for businesses that deal with physical products. Accurately predicting demand for products can help optimize inventory levels, reduce costs, and improve customer satisfaction. ML can help simplify and improve inventory forecasting by making more accurate predictions based on historical data.

SageMaker is a powerful ML platform that you can use to build, train, and deploy ML models for a wide range of applications, including inventory forecasting. In this solution, we use SageMaker to build and train an ML model for inventory forecasting, covering the basic concepts of ML, the data preparation process, model training and evaluation, and deploying the model for use in a production environment.

The solution also introduces the concept of hierarchical forecasting, which involves generating coherent forecasts that maintain the relationships within the hierarchy or reconciling incoherent forecasts. The workshop provides a step-by-step process for using the training capabilities of SageMaker to carry out hierarchical forecasting using synthetic retail data and the scikit-hts package. The FBProphet model was used along with bottom-up and top-down hierarchical aggregation and disaggregation methods. We used Amazon SageMaker Experiments to train multiple models, and the best model was picked out of the four trained models.

Although the approach was demonstrated on a synthetic retail dataset, you can use the provided code with any time series dataset that exhibits a similar hierarchical structure.

Security and authentication

The solution takes advantage of the scalability, reliability, and security of AWS services to provide a comprehensive inventory management and forecasting solution that can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. By incorporating user authentication with Amazon Cognito and Amazon API Gateway, the solution ensures that the system is secure and accessible only by authorized users.

Next steps

The next step to build an inventory management and forecasting solution on AWS would be to go through the Inventory Management workshop. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end inventory management solution. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that make up an inventory management system.

Conclusion

In conclusion, building an inventory management and forecasting solution on AWS can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. With AWS services like IoT Core, Lambda, Aurora Serverless, AWS Glue, Athena, ElastiCache, QuickSight, SageMaker, and Amazon Cognito, businesses can use scalable, reliable, and secure technologies to collect, store, process, and analyze data from various sources.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, an inventory management and forecasting solution on AWS can provide businesses with the insights and tools they need to make data-driven decisions and stay competitive in a constantly evolving retail landscape.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on databases and enterprise applications, helping customers architect highly available and scalable solutions.

Navnit Shukla is an AWS Specialist Solution Architect, Analytics, and is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Vetri Natarajan is a Specialist Solutions Architect for Amazon QuickSight. Vetri has 15 years of experience implementing enterprise business intelligence (BI) solutions and greenfield data products. Vetri specializes in integration of BI solutions with business applications and enable data-driven decisions.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS platform and specializes in Data Analytics domain.

Push Amazon EMR step logs from Amazon EC2 instances to Amazon CloudWatch logs

Post Syndicated from Nausheen Sayed original https://aws.amazon.com/blogs/big-data/push-amazon-emr-step-logs-from-amazon-ec2-instances-to-amazon-cloudwatch-logs/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS to build scalable data pipelines in a cost-effective manner. Monitoring the logs generated from the jobs deployed on EMR clusters is essential to help detect critical issues in real time and identify root causes quickly.

Pushing those logs into Amazon CloudWatch enables you to centralize and drive actionable intelligence from your logs to address operational issues without needing to provision servers or manage software. You can instantly begin writing queries with aggregations, filters, and regular expressions. In addition, you can visualize time series data, drill down into individual log events, and export query results to CloudWatch dashboards.

To ingest logs that are persisted on the Amazon Elastic Compute Cloud (Amazon EC2) instances of an EMR cluster into CloudWatch, you can use the CloudWatch agent. This provides a simple way to push logs from an EC2 instance to CloudWatch.

The CloudWatch agent is a software package that autonomously and continuously runs on your servers. You can install and configure the CloudWatch agent to collect system and application logs from EC2 instances, on-premises hosts, and containerized applications. CloudWatch processes and stores the logs collected by the CloudWatch agent, which further helps with the performance and health monitoring of your infrastructure and applications.

In this post, we create an EMR cluster and centralize the EMR step logs of the jobs in CloudWatch. This will make it easier for you to manage your EMR cluster, troubleshoot issues, and monitor performance. This solution is particularly helpful if you want to use CloudWatch to collect and visualize real-time logs, metrics, and event data, streamlining your infrastructure and application maintenance.

Overview of solution

The solution presented in this post is based on a specific configuration where the EMR step concurrency level is set to 1. This means that only one step is run at a time on the cluster. It’s important to note that if the EMR step concurrency level is set to a value greater than 1, the solution may not work as expected. We highly recommend verifying your EMR step concurrency configuration before implementing the solution presented in this post.

The following diagram illustrates the solution architecture.

Solution Architecture Diagram

The workflow includes the following steps:

  1. Users start an Apache Spark EMR job, creating a step on the EMR cluster. Using Apache Spark, the workload is distributed across the different nodes of the EMR cluster.
  2. In each node (EC2 instance) of the cluster, a CloudWatch agent watches different logs directories, capturing new entries in the log files and pushing them to CloudWatch.
  3. Users can view the step logs accessing the different log groups from the CloudWatch console. The step logs written by Amazon EMR are as follows:
    • controller — Information about the processing of the step. If your step fails while loading, you can find the stack trace in this log.
    • stderr — The standard error channel of Spark while it processes the step.
    • stdout — The standard output channel of Spark while it processes the step.

We provide an AWS CloudFormation template in this post as a general guide. The template demonstrates how to configure a CloudWatch agent on Amazon EMR to push Spark logs to CloudWatch. You can review and customize it as needed to include your Amazon EMR security configurations. As a best practice, we recommend including your Amazon EMR security configurations in the template to encrypt data in transit.

You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

In the next sections, we go through the following steps:

  1. Create and upload the bootstrap script to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use the CloudFormation template to create the following resources:
  3. Monitor the Spark logs on the CloudWatch console.

Prerequisites

This post assumes that you have the following:

Create and upload the bootstrap script to an S3 bucket

For more information, see Uploading objects and Installing and running the CloudWatch agent on your servers.

To create and the upload the bootstrap script, complete the following steps:

  1. Create a local file named bootstrap_cloudwatch_agent.sh with the following content:
    #!/bin/bash
    
    echo -e 'Installing CloudWatch Agent... \n'
    sudo rpm -Uvh --force https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm
    
    echo -e 'Starting CloudWatch Agent... \n'
    sudo amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c ssm:AmazonCloudWatch-Config.json -s

  2. On the Amazon S3 console, choose your S3 bucket.
  3. On the Objects tab, choose Upload.
  4. Choose Add files, then choose the bootstrap script.
  5. Choose Upload, then choose the file name: bootstrap_cloudwatch_agent.sh.
  6. Choose Copy S3 URI. We use this value in a later step.

Provision resources with the CloudFormation template

Choose Launch Stack to launch a CloudFormation stack in your account and deploy the template:

This template creates an IAM role, IAM instance profile, Systems Manager parameter, and EMR cluster. The cluster starts the Spark PI estimation example application. You will be billed for the AWS resources used if you create a stack from this template.

The CloudFormation wizard will ask you to modify or provide these parameters:

  • InstanceType – The type of instance for all instance groups. The default is m4.xlarge.
  • InstanceCountCore – The number of instances in the core instance group. The default is 2.
  • EMRReleaseLabel – The Amazon EMR release label you want to use. The default is emr-6.9.0.
  • BootstrapScriptPath – The S3 path of your CloudWatch agent installation bootstrap script that you copied earlier.
  • Subnet – The EC2 subnet where the cluster launches. You must provide this parameter.
  • EC2KeyPairName – An optional EC2 keypair for connecting to cluster nodes, as an alternative to Session Manager.

Monitor the log streams

After the CloudFormation stack deploys successfully, on the CloudWatch console, choose Log groups in the navigation pane. Then filter the log groups by the prefix /aws/emr/master.

choose Log groups in the navigation pane

The ID in the log group corresponds to the EC2 instance ID of the EMR primary node. If you have multiple EMR clusters, you can use this ID to identify a particular EMR cluster, based on the primary node ID.

In the log group, you will find the three different log streams.

In the log group, you will find the three different log streams.

The log streams contain the following information:

  • step-stdout – The standard output channel of Spark while it processes the step.
    The standard output channel of Spark while it processes the step
  • step-stderr – The standard error channel of Spark while it processes the step.
    The standard error channel of Spark while it processes the step.
  • step-controller – Information about the processing of the step. If your step fails while loading, you can find the stack trace in this log.
    Information about the processing of the step.

Clean up

To avoid future charges in your account, delete the resources you created in this walkthrough. The EMR cluster will incur charges as long as the cluster is active, so stop it when you’re done.

  1. On the CloudFormation console, in the navigation pane, choose Stacks.
  2. Choose the stack you launched (EMR-CloudWatch-Demo), then choose Delete.
  3. Empty the S3 bucket you created.
  4. Delete the S3 bucket you created.

Conclusion

Now that you have completed the steps in this walkthrough, you have the CloudWatch agent running on your cluster hosts and configured to push EMR step logs to CloudWatch. With this feature, you can effectively monitor the health and performance of your Spark jobs running on Amazon EMR, detecting critical issues in real time and identifying root causes quickly.

You can package and deploy this solution through a CloudFormation template like this example template, which creates the IAM instance profile role, Systems Manager parameter, and EMR cluster.

To take this further, consider using these logs in CloudWatch alarms for alerts on a log group-metric filter. You could collect them with other alarms into a composite alarm or configure alarm actions such as sending Amazon Simple Notification Service (Amazon SNS) notifications to trigger event-driven processes such as AWS Lambda functions.


About the Author

Ennio Pastore is a Senior Data Architect on the AWS Data Lab team. He is an enthusiast of everything related to new technologies that have a positive impact on businesses and general livelihood. Ennio has over 10 years of experience in data analytics. He helps companies define and implement data platforms across industries, such as telecommunications, banking, gaming, retail, and insurance.

Implement column-level encryption to protect sensitive data in Amazon Redshift with AWS Glue and AWS Lambda user-defined functions

Post Syndicated from Aaron Chong original https://aws.amazon.com/blogs/big-data/implement-column-level-encryption-to-protect-sensitive-data-in-amazon-redshift-with-aws-glue-and-aws-lambda-user-defined-functions/

Amazon Redshift is a massively parallel processing (MPP), fully managed petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using existing business intelligence tools.

When businesses are modernizing their data warehousing solutions to Amazon Redshift, implementing additional data protection mechanisms for sensitive data, such as personally identifiable information (PII) or protected health information (PHI), is a common requirement, especially for those in highly regulated industries with strict data security and privacy mandates. Amazon Redshift provides role-based access control, row-level security, column-level security, and dynamic data masking, along with other database security features to enable organizations to enforce fine-grained data security.

Security-sensitive applications often require column-level (or field-level) encryption to enforce fine-grained protection of sensitive data on top of the default server-side encryption (namely data encryption at rest). In other words, sensitive data should be always encrypted on disk and remain encrypted in memory, until users with proper permissions request to decrypt the data. Column-level encryption provides an additional layer of security to protect your sensitive data throughout system processing so that only certain users or applications can access it. This encryption ensures that only authorized principals that need the data, and have the required credentials to decrypt it, are able to do so.

In this post, we demonstrate how you can implement your own column-level encryption mechanism in Amazon Redshift using AWS Glue to encrypt sensitive data before loading data into Amazon Redshift, and using AWS Lambda as a user-defined function (UDF) in Amazon Redshift to decrypt the data using standard SQL statements. Lambda UDFs can be written in any of the programming languages supported by Lambda, such as Java, Go, PowerShell, Node.js, C#, Python, Ruby, or a custom runtime. You can use Lambda UDFs in any SQL statement such as SELECT, UPDATE, INSERT, or DELETE, and in any clause of the SQL statements where scalar functions are allowed.

Solution overview

The following diagram describes the solution architecture.

Architecture Diagram

To illustrate how to set up this architecture, we walk you through the following steps:

  1. We upload a sample data file containing synthetic PII data to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. A sample 256-bit data encryption key is generated and securely stored using AWS Secrets Manager.
  3. An AWS Glue job reads the data file from the S3 bucket, retrieves the data encryption key from Secrets Manager, performs data encryption for the PII columns, and loads the processed dataset into an Amazon Redshift table.
  4. We create a Lambda function to reference the same data encryption key from Secrets Manager, and implement data decryption logic for the received payload data.
  5. The Lambda function is registered as a Lambda UDF with a proper AWS Identity and Access Management (IAM) role that the Amazon Redshift cluster is authorized to assume.
  6. We can validate the data decryption functionality by issuing sample queries using Amazon Redshift Query Editor v2.0. You may optionally choose to test it with your own SQL client or business intelligence tools.

Prerequisites

To deploy the solution, make sure to complete the following prerequisites:

  • Have an AWS account. For this post, you configure the required AWS resources using AWS CloudFormation in the us-east-2 Region.
  • Have an IAM user with permissions to manage AWS resources including Amazon S3, AWS Glue, Amazon Redshift, Secrets Manager, Lambda, and AWS Cloud9.

Deploy the solution using AWS CloudFormation

Provision the required AWS resources using a CloudFormation template by completing the following steps:

  1. Sign in to your AWS account.
  2. Choose Launch Stack:
    Launch Button
  3. Navigate to an AWS Region (for example, us-east-2).
  4. For Stack name, enter a name for the stack or leave as default (aws-blog-redshift-column-level-encryption).
  5. For RedshiftMasterUsername, enter a user name for the admin user account of the Amazon Redshift cluster or leave as default (master).
  6. For RedshiftMasterUserPassword, enter a strong password for the admin user account of the Amazon Redshift cluster.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create stack.
    Create CloudFormation stack

The CloudFormation stack creation process takes around 5–10 minutes to complete.

  1. When the stack creation is complete, on the stack Outputs tab, record the values of the following:
    1. AWSCloud9IDE
    2. AmazonS3BucketForDataUpload
    3. IAMRoleForRedshiftLambdaUDF
    4. LambdaFunctionName

CloudFormation stack output

Upload the sample data file to Amazon S3

To test the column-level encryption capability, you can download the sample synthetic data generated by Mockaroo. The sample dataset contains synthetic PII and sensitive fields such as phone number, email address, and credit card number. In this post, we demonstrate how to encrypt the credit card number field, but you can apply the same method to other PII fields according to your own requirements.

Sample synthetic data

An AWS Cloud9 instance is provisioned for you during the CloudFormation stack setup. You may access the instance from the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.

CloudFormation stack output for AWSCloud9IDE

On the AWS Cloud9 terminal, copy the sample dataset to your S3 bucket by running the following command:

S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2274/pii-sample-dataset.csv s3://$S3_BUCKET/

Upload sample dataset to S3

Generate a secret and secure it using Secrets Manager

We generate a 256-bit secret to be used as the data encryption key. Complete the following steps:

  1. Create a new file in the AWS Cloud9 environment.
    Create new file in Cloud9
  2. Enter the following code snippet. We use the cryptography package to create a secret, and use the AWS SDK for Python (Boto3) to securely store the secret value with Secrets Manager:
    from cryptography.fernet import Fernet
    import boto3
    import base64
    
    key = Fernet.generate_key()
    client = boto3.client('secretsmanager')
    
    response = client.create_secret(
        Name='data-encryption-key',
        SecretBinary=base64.urlsafe_b64decode(key)
    )
    
    print(response['ARN'])

  3. Save the file with the file name generate_secret.py (or any desired name ending with .py).
    Save file in Cloud9
  4. Install the required packages by running the following pip install command in the terminal:
    pip install --user boto3
    pip install --user cryptography

  5. Run the Python script via the following command to generate the secret:
    python generate_secret.py

    Run Python script

Create a target table in Amazon Redshift

A single-node Amazon Redshift cluster is provisioned for you during the CloudFormation stack setup. To create the target table for storing the dataset with encrypted PII columns, complete the following steps:

  1. On the Amazon Redshift console, navigate to the list of provisioned clusters, and choose your cluster.
    Amazon Redshift console
  2. To connect to the cluster, on the Query data drop-down menu, choose Query in query editor v2.
    Connect with Query Editor v2
  3. If this is the first time you’re using the Amazon Redshift Query Editor V2, accept the default setting by choosing Configure account.
    Configure account
  4. To connect to the cluster, choose the cluster name.
    Connect to Amazon Redshift cluster
  5. For Database, enter demodb.
  6. For User name, enter master.
  7. For Password, enter your password.

You may need to change the user name and password according to your CloudFormation settings.

  1. Choose Create connection.
    Create Amazon Redshift connection
  2. In the query editor, run the following DDL command to create a table named pii_table:
    CREATE TABLE pii_table(
      id BIGINT,
      full_name VARCHAR(50),
      gender VARCHAR(10),
      job_title VARCHAR(50),
      spoken_language VARCHAR(50),
      contact_phone_number VARCHAR(20),
      email_address VARCHAR(50),
      registered_credit_card VARCHAR(50)
    );

We recommend using the smallest possible column size as a best practice, and you may need to modify these table definitions per your specific use case. Creating columns much larger than necessary will have an impact on the size of data tables and affect query performance.

Create Amazon Redshift table

Create the source and destination Data Catalog tables in AWS Glue

The CloudFormation stack provisioned two AWS Glue data crawlers: one for the Amazon S3 data source and one for the Amazon Redshift data source. To run the crawlers, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
    AWS Glue Crawlers
  2. Select the crawler named glue-s3-crawler, then choose Run crawler to trigger the crawler job.
    Run Amazon S3 crawler job
  3. Select the crawler named glue-redshift-crawler, then choose Run crawler.
    Run Amazon Redshift crawler job

When the crawlers are complete, navigate to the Tables page to verify your results. You should see two tables registered under the demodb database.

AWS Glue database tables

Author an AWS Glue ETL job to perform data encryption

An AWS Glue job is provisioned for you as part of the CloudFormation stack setup, but the extract, transform, and load (ETL) script has not been created. We create and upload the ETL script to the /glue-script folder under the provisioned S3 bucket in order to run the AWS Glue job.

  1. Return to your AWS Cloud9 environment either via the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.
    CloudFormation stack output for AWSCloud9IDE

We use the Miscreant package for implementing a deterministic encryption using the AES-SIV encryption algorithm, which means that for any given plain text value, the generated encrypted value will be always the same. The benefit of using this encryption approach is to allow for point lookups, equality joins, grouping, and indexing on encrypted columns. However, you should also be aware of the potential security implication when applying deterministic encryption to low-cardinality data, such as gender, boolean values, and status flags.

  1. Create a new file in the AWS Cloud9 environment and enter the following code snippet:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrameCollection
    from awsglue.dynamicframe import DynamicFrame
    
    import boto3
    import base64
    from miscreant.aes.siv import SIV
    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import StringType
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME", "SecretName", "InputTable"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # retrieve the data encryption key from Secrets Manager
    secret_name = args["SecretName"]
    
    sm_client = boto3.client('secretsmanager')
    get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
    data_encryption_key = get_secret_value_response['SecretBinary']
    siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic
    
    # define the data encryption function
    def pii_encrypt(value):
        if value is None:
            value = ""
        ciphertext = siv.seal(value.encode())
        return base64.b64encode(ciphertext).decode('utf-8')
    
    # register the data encryption function as Spark SQL UDF   
    udf_pii_encrypt = udf(lambda z: pii_encrypt(z), StringType())
    
    # define the Glue Custom Transform function
    def Encrypt_PII (glueContext, dfc) -> DynamicFrameCollection:
        newdf = dfc.select(list(dfc.keys())[0]).toDF()
        
        # PII fields to be encrypted
        pii_col_list = ["registered_credit_card"]
    
        for pii_col_name in pii_col_list:
            newdf = newdf.withColumn(pii_col_name, udf_pii_encrypt(col(pii_col_name)))
    
        encrypteddyc = DynamicFrame.fromDF(newdf, glueContext, "encrypted_data")
        return (DynamicFrameCollection({"CustomTransform0": encrypteddyc}, glueContext))
    
    # Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="demodb",
        table_name=args["InputTable"],
        transformation_ctx="S3bucket_node1",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=[
            ("id", "long", "id", "long"),
            ("full_name", "string", "full_name", "string"),
            ("gender", "string", "gender", "string"),
            ("job_title", "string", "job_title", "string"),
            ("spoken_language", "string", "spoken_language", "string"),
            ("contact_phone_number", "string", "contact_phone_number", "string"),
            ("email_address", "string", "email_address", "string"),
            ("registered_credit_card", "long", "registered_credit_card", "string"),
        ],
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Custom Transform
    Customtransform_node = Encrypt_PII(glueContext, DynamicFrameCollection({"ApplyMapping_node2": ApplyMapping_node2}, glueContext))
    
    # Script generated for node Redshift Cluster
    RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
        frame=Customtransform_node,
        database="demodb",
        table_name="demodb_public_pii_table",
        redshift_tmp_dir=args["TempDir"],
        transformation_ctx="RedshiftCluster_node3",
    )
    
    job.commit()

  2. Save the script with the file name pii-data-encryption.py.
    Save file in Cloud9
  3. Copy the script to the desired S3 bucket location by running the following command:
    S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
    aws s3 cp pii-data-encryption.py s3://$S3_BUCKET/glue-script/pii-data-encryption.py

    Upload AWS Glue script to S3

  4. To verify the script is uploaded successfully, navigate to the Jobs page on the AWS Glue console.You should be able to find a job named pii-data-encryption-job.
    AWS Glue console
  5. Choose Run to trigger the AWS Glue job.It will first read the source data from the S3 bucket registered in the AWS Glue Data Catalog, then apply column mappings to transform data into the expected data types, followed by performing PII fields encryption, and finally loading the encrypted data into the target Redshift table. The whole process should be completed within 5 minutes for this sample dataset.AWS Glue job scriptYou can switch to the Runs tab to monitor the job status.
    Monitor AWS Glue job

Configure a Lambda function to perform data decryption

A Lambda function with the data decryption logic is deployed for you during the CloudFormation stack setup. You can find the function on the Lambda console.

AWS Lambda console

The following is the Python code used in the Lambda function:

import boto3
import os
import json
import base64
import logging
from miscreant.aes.siv import SIV

logger = logging.getLogger()
logger.setLevel(logging.INFO)

secret_name = os.environ['DATA_ENCRYPT_KEY']

sm_client = boto3.client('secretsmanager')
get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
data_encryption_key = get_secret_value_response['SecretBinary']

siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic

# define lambda function logic
def lambda_handler(event, context):
    ret = dict()
    res = []
    for argument in event['arguments']:
        encrypted_value = argument[0]
        try:
            de_val = siv.open(base64.b64decode(encrypted_value)) # perform decryption
        except:
            de_val = encrypted_value
            logger.warning('Decryption for value failed: ' + str(encrypted_value)) 
        res.append(json.dumps(de_val.decode('utf-8')))

    ret['success'] = True
    ret['results'] = res

    return json.dumps(ret) # return decrypted results

If you want to deploy the Lambda function on your own, make sure to include the Miscreant package in your deployment package.

Register a Lambda UDF in Amazon Redshift

You can create Lambda UDFs that use custom functions defined in Lambda as part of your SQL queries. Lambda UDFs are managed in Lambda, and you can control the access privileges to invoke these UDFs in Amazon Redshift.

  1. Navigate back to the Amazon Redshift Query Editor V2 to register the Lambda UDF.
  2. Use the CREATE EXTERNAL FUNCTION command and provide an IAM role that the Amazon Redshift cluster is authorized to assume and make calls to Lambda:
    CREATE OR REPLACE EXTERNAL FUNCTION pii_decrypt (value varchar(max))
    RETURNS varchar STABLE
    LAMBDA '<--Replace-with-your-lambda-function-name-->'
    IAM_ROLE '<--Replace-with-your-redshift-lambda-iam-role-arn-->';

You can find the Lambda name and Amazon Redshift IAM role on the CloudFormation stack Outputs tab:

  • LambdaFunctionName
  • IAMRoleForRedshiftLambdaUDF

CloudFormation stack output
Create External Function in Amazon Redshift

Validate the column-level encryption functionality in Amazon Redshift

By default, permission to run new Lambda UDFs is granted to PUBLIC. To restrict usage of the newly created UDF, revoke the permission from PUBLIC and then grant the privilege to specific users or groups. To learn more about Lambda UDF security and privileges, see Managing Lambda UDF security and privileges.

You must be a superuser or have the sys:secadmin role to run the following SQL statements:

GRANT SELECT ON "demodb"."public"."pii_table" TO PUBLIC;
CREATE USER regular_user WITH PASSWORD '1234Test!';
CREATE USER privileged_user WITH PASSWORD '1234Test!';
REVOKE EXECUTE ON FUNCTION pii_decrypt(varchar) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pii_decrypt(varchar) TO privileged_user;

First, we run a SELECT statement to verify that our highly sensitive data field, in this case the registered_credit_card column, is now encrypted in the Amazon Redshift table:

SELECT * FROM "demodb"."public"."pii_table";

Select statement

For regular database users who have not been granted the permission to use the Lambda UDF, they will see a permission denied error when they try to use the pii_decrypt() function:

SET SESSION AUTHORIZATION regular_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

Permission denied

For privileged database users who have been granted the permission to use the Lambda UDF for decrypting the data, they can issue a SQL statement using the pii_decrypt() function:

SET SESSION AUTHORIZATION privileged_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

The original registered_credit_card values can be successfully retrieved, as shown in the decrypted_credit_card column.

Decrypted results

Cleaning up

To avoid incurring future charges, make sure to clean up all the AWS resources that you created as part of this post.

You can delete the CloudFormation stack on the AWS CloudFormation console or via the AWS Command Line Interface (AWS CLI). The default stack name is aws-blog-redshift-column-level-encryption.

Conclusion

In this post, we demonstrated how to implement a custom column-level encryption solution for Amazon Redshift, which provides an additional layer of protection for sensitive data stored on the cloud data warehouse. The CloudFormation template gives you an easy way to set up the data pipeline, which you can further customize for your specific business scenarios. You can also modify the AWS Glue ETL code to encrypt multiple data fields at the same time, and to use different data encryption keys for different columns for enhanced data security. With this solution, you can limit the occasions where human actors can access sensitive data stored in plain text on the data warehouse.

You can learn more about this solution and the source code by visiting the GitHub repository. To learn more about how to use Amazon Redshift UDFs to solve different business problems, refer to Example uses of user-defined functions (UDFs) and Amazon Redshift UDFs.


About the Author

Aaron ChongAaron Chong is an Enterprise Solutions Architect at Amazon Web Services Hong Kong. He specializes in the data analytics domain, and works with a wide range of customers to build big data analytics platforms, modernize data engineering practices, and advocate AI/ML democratization.

Create threshold alerts on tables and pivot tables in Amazon QuickSight

Post Syndicated from Lillie Atkins original https://aws.amazon.com/blogs/big-data/create-threshold-alerts-on-tables-and-pivot-tables-in-amazon-quicksight/

Amazon QuickSight previously launched threshold alerts on KPIs and gauge charts. Now, QuickSight supports creating threshold alerts on tables and pivot tables—our most popular visual types. This allows readers and authors to track goals or key performance indicators (KPIs) and be notified via email when they are met. These alerts allow readers and authors to relax and rely on notifications for when their attention is needed. In this post, we share how to create threshold alerts on tables or pivot tables to track important metrics.

Background information

Threshold alerts are a QuickSight Enterprise Edition feature and available for dashboards consumed on the QuickSight website. Threshold alerts aren’t yet available in embedded QuickSight dashboards or on the mobile app.

Alerts are created based on the visual at that point in time and are not affected by potential future changes to the visual’s design. This means the visual can be changed or deleted and the alert continues to work as long as the data in the dataset remains valid. In addition, you can create multiple alerts off of one visual, and rename them as appropriate.

Finally, alerts respect RLS and CLS rules.

Set up an alert on a table or pivot table

Threshold alerts are configured for dashboards. On a dashboard, there are three different ways to create an alert on a table or pivot table.

First, you can create directly from a pivot table or table. You click directly on the cell you would like to create an alert on (if there is another action enabled, you may have to right-click to get this option to show). This needs to be on a numeric value (no dates or strings allow for creation of alerts). Then choose Create Alert to start creating the alert.

Let’s assume you want to track the profit coming from online purchases for auto-related merchandise being shipped first class. Choose the appropriate cell and then choose Create Alert.

Create Alert

You’re presented with the creation pane for alerts. The only difference from KPIs or gauge visual alerts is that here you’ll find the other dimensions in the row that you’re creating the alert on. This will help you identify what value from the table you have selected, because there can be duplicates of the numeric values.

In the following screenshot, the value to track is profit, which currently is $437.39. This is the value that will be compared to the threshold you set. You will also see the dimensions being used to define this alert, which are taken from the row of the table. In this case, the Category is Auto, the Segement is Online, and the Ship Mode is First Class.

Now that you have checked that the value is correct, you can update the name of the alert that is automatically filled with the name of the visual it is created off of, set the condition (between Is above, Is below, and Is equal to), and pick the threshold value, notification frequency, and whether you want to be emailed when there is no data.

In the following example, the alert has been configured so that you will receive an email when the profit is above the threshold of $1,000. You’ve also left the notification frquency at Daily at most and haven’t requested to be emailed when there is no data.

If you have a date field, you also will see an option to control the date. This will automatically set the date field to be the most recent of whatever aggregation you’re looking at, such as hour, week, day, month. However, you could override to use the specific date applied to the value you have selected if you would prefer.

Below is an example where the data was aggregated based on the week and so Latest Week has been selected rather than the historical Week of Jan 4, 2015.

You can then choose Save if you’re happy with the alert and it will load the Manage alert pane.

The Create Alert button is also at the bottom of the pane. This is the second way you can start creating an alert off of a table or pivot table.

You can also get to this pane from the upper right alert button on the dashboard.

Create Alert through the icon on dashboard

If you have no alerts, this will automatically drop you into the creation pane. There you will be asked to select a visual that supports alerts to begin creating an alert. If you already have alerts (as previously demonstrated), then all you need to do is choose Create Alert.

Then select a visual and choose Next.

You’re prompted to select a cell if you have picked a table or pivot table visual.

Then you repeat the same steps as creating off a cell within a table or pivot table.

Finally, you can start creating an alert from the bell icon on the pivot table or table. This is the third way to create an alert.

bell icon

You’ll be prompted to select a cell from the table, and the creation pane appears.

After you choose the cell that you want to track, you start the creation process just like the first two examples.

Update and delete alerts

To update or delete an alert, you need to navigate back to the Manage alerts pane. You get there from the bell icon on the top right corner of the dashboard.

Create Alert through the icon on dashboard

You can then choose the options menu (three dots) on the alert you want to manage. Three options appear: Edit alert, View history (to view recent times the alert has breached and notified you), and Delete.

Notifications

You’ll receive an email when your alert breaches the rule you set. The following is an example of what that looks like (the alert has been adjusted to be alerted if profit is over $100 and to be notified as frequently as possible).

notification if alert is breached

The current profit breach is highlighted and the historical numbers are shown along with the date and time of the recorded breaches. You can also navigate to the dashboard by choosing View Dashboard.

Evaluate alerts

The evaluation schedule for threshold alerts is based on the dataset. For SPICE datasets, alert rules are checked against the data after a successful data refresh. With datasets querying your data sources directly, alerts are evaluated daily at a random time between 6:00 PM and 8:00 AM. This is based on the the timezone of the AWS Region your dataset was created in. Dataset owners can set up their own schedules for checking alerts and increase the frequency up to hourly (to learn more, refer to Working with threshold alerts in Amazon QuickSight).

Restrict alerts

The admin for the QuickSight account can restrict who has access to set threshold alerts through custom permissions. For more information, see the section Customizing user permissions in Embed multi-tenant analytics in applications with Amazon QuickSight.

Pricing

Threshold alerts are billed for each evaluation, and follow the familiar pricing used for anomaly detection, starting at $0.50 per 1,000 evaluations. For example, if you set up an alert on a SPICE dataset that refreshes daily, you have 30 evaluations of the alert rule in a month, which costs 30 * $0.5/1000 = $0.015 in a month. For more information, refer to Amazon QuickSight Pricing.

Conclusion

In this post, you learned how to create threshold alerts on tables and pivot tables within QuickSight dashboards so that you can track important metrics. For more information about how to create threshold alerts on KPIs or gauge charts, refer to Create threshold-based alerts in Amazon QuickSight. Additional information is available in the Amazon QuickSight User Guide.


About the Author

Lillie Atkins is a Product Manager for Amazon QuickSight, Amazon Web Service’s cloud-native, fully managed BI service.

Generic orchestration framework for data warehousing workloads using Amazon Redshift RSQL

Post Syndicated from Akhil B original https://aws.amazon.com/blogs/big-data/generic-orchestration-framework-for-data-warehousing-workloads-using-amazon-redshift-rsql/

Tens of thousands of customers run business-critical workloads on Amazon Redshift, AWS’s fast, petabyte-scale cloud data warehouse delivering the best price-performance. With Amazon Redshift, you can query data across your data warehouse, operational data stores, and data lake using standard SQL. You can also integrate AWS services like Amazon EMR, Amazon Athena, Amazon SageMaker, AWS Glue, AWS Lake Formation, and Amazon Kinesis to take advantage of all of the analytic capabilities in the AWS Cloud.

Amazon Redshift RSQL is a native command-line client for interacting with Amazon Redshift clusters and databases. You can connect to an Amazon Redshift cluster, describe database objects, query data, and view query results in various output formats. You can use Amazon Redshift RSQL to replace existing extract, transform, load (ETL) and automation scripts, such as Teradata BTEQ scripts. You can wrap Amazon Redshift RSQL statements within a shell script to replicate existing functionality in the on-premise systems. Amazon Redshift RSQL is available for Linux, Windows, and macOS operating systems.

This post explains how you can create a generic configuration-driven orchestration framework using AWS Step Functions, Amazon Elastic Compute Cloud (Amazon EC2), AWS Lambda, Amazon DynamoDB, and AWS Systems Manager to orchestrate RSQL-based ETL workloads. If you’re migrating from legacy data warehouse workloads to Amazon Redshift, you can use this methodology to orchestrate your data warehousing workloads.

Solution overview

Customers migrating from legacy data warehouses to Amazon Redshift may have a significant investment in proprietary scripts like Basic Teradata Query (BTEQ) scripting for database automation, ETL, or other tasks. You can now use the AWS Schema Conversion Tool (AWS SCT) to automatically convert proprietary scripts like BTEQ scripts to Amazon Redshift RSQL scripts. The converted scripts run on Amazon Redshift with little to no changes. To learn about new options for database scripting, refer to Accelerate your data warehouse migration to Amazon Redshift – Part 4.

During such migrations, you may also want to modernize your current on-premises, third-party orchestration tools with a cloud-native framework to replicate and enhance your current orchestration capability. Orchestrating data warehouse workloads includes scheduling the jobs, checking if the pre-conditions have been met, running the business logic embedded within RSQL, monitoring the status of the jobs, and alerting if there are any failures.

This solution allows on-premises customers to migrate to a cloud-native orchestration framework that uses AWS serverless services such as Step Functions, Lambda, DynamoDB, and Systems Manager to run the Amazon Redshift RSQL jobs deployed on a persistent EC2 instance. You can also deploy the solution for greenfield implementations. In addition to meeting functional requirements, this solution also provides full auditing, logging, and monitoring of all ETL and ELT processes that are run.

To ensure high availability and resilience, you can use multiple EC2 instances that are a part of an auto scaling group along with Amazon Elastic File System (Amazon EFS) to deploy and run the RSQL jobs. When using auto scaling groups, you can install RSQL onto the EC2 instance as a part of the bootstrap script. You can also deploy the Amazon Redshift RSQL scripts onto the EC2 instance using AWS CodePipeline and AWS CodeDeploy. For more details, refer to Auto Scaling groups, the Amazon EFT User Guide, and Integrating CodeDeploy with Amazon EC2 Auto Scaling.

The following diagram illustrates the architecture of the orchestration framework.

Architecture Diagram

The key components of the framework are as follows:

  1. Amazon EventBridge is used as the ETL workflow scheduler, and it triggers a Lambda function at a preset schedule.
  2. The function queries a DynamoDB table for the configuration associated to the RSQL job and queries the status of the job, run mode, and restart information for that job.
  3. After receiving the configuration, the function triggers a Step Functions state machine by passing the configuration details.
  4. Step Functions starts running different stages (like configuration iteration, run type check, and more) of the workflow.
  5. Step Functions uses the Systems Manager SendCommand API to trigger the RSQL job and goes into a paused state with TaskToken. The RSQL scripts are persisted on an EC2 instance and are wrapped in a shell script. Systems Manager runs an AWS-RunShellScript SSM document to run the RSQL job on the EC2 instance.
  6. The RSQL job performs ETL and ELT operations on the Amazon Redshift cluster. When it’s complete, it returns a success/failure code and status message back to the calling shell script.
  7. The shell script calls a custom Python module with the success/failure code, status message, and the callwait TaskToken that was received from Step Functions. The Python module logs the RSQL job status in the job audit DynamoDB audit table, and exports logs to the Amazon CloudWatch log group.
  8. The Python module then performs a SendTaskSuccess or SendTaskFailure API call based on the RSQL job run status. Based on the status of the RSQL job, Step Functions either resumes the flow or stops with failure.
  9. Step Functions logs the workflow status (success or failure) in the DynamoDB workflow audit table.

Prerequisites

You should have the following prerequisites:

Deploy AWS CDK stacks

Complete the following steps to deploy your resources using the AWS CDK:

  1. Clone the GitHub repo:
    git clone https://github.com/aws-samples/amazon-redshift-rsql-orchestration-framework.git

  2. Update the following the environment parameters in cdk.json (this file can be found in the infra directory):
    1. ec2_instance_id – The EC2 instance ID on which RSQL jobs are deployed
    2. redshift_secret_id – The name of the Secrets Manager key that stores the Amazon Redshift database credentials
    3. rsql_script_path – The absolute directory path in the EC2 instance where the RSQL jobs are stored
    4. rsql_log_path – The absolute directory path in the EC2 instance used for storing the RSQL job logs
    5. rsql_script_wrapper – The absolute directory path of the RSQL wrapper script (rsql_trigger.sh) on the EC2 instance.

    The following is a sample cdk.json file after being populated with the parameters

        "environment": {
          "ec2_instance_id" : "i-xxxx",
          "redshift_secret_id" : "blog-secret",
          "rsql_script_path" : "/home/ec2-user/blog_test/rsql_scripts/",
          "rsql_log_path" : "/home/ec2-user/blog_test/logs/",
          "rsql_script_wrapper" : "/home/ec2-user/blog_test/instance_code/rsql_trigger.sh"
        }
    

  3. Deploy the AWS CDK stack with the following code:
    cd amazon-redshift-rsql-orchestration-framework/lambdas/lambda-layer/
    sh zip_lambda_layer.sh
    cd ../../infra/
    python3 -m venv ./venv
    source .venv/bin/activate
    pip install -r requirements.txt
    cdk bootstrap <AWS Account ID>/<AWS Region>
    cdk deploy --all

Let’s look at the resources the AWS CDK stack deploys in more detail.

CloudWatch log group

A CloudWatch log group (/ops/rsql-logs/) is created, which is used to store, monitor, and access log files from EC2 instances and other sources.

The log group is used to store the RSQL job run logs. For each RSQL script, all the stdout and stderr logs are stored as a log stream within this log group.

DynamoDB configuration table

The DynamoDB configuration table (rsql-blog-rsql-config-table) is the basic building block of this solution. All the RSQL jobs, restart information and run mode (sequential or parallel), and sequence in which the jobs are to be run are stored in this configuration table.

The table has the following structure:

  • workflow_id – The identifier for the RSQL-based ETL workflow.
  • workflow_description – The description for the RSQL-based ETL workflow.
  • workflow_stages – The sequence of stages within a workflow.
  • execution_type – The type of run for RSQL jobs (sequential or parallel).
  • stage_description – The description for the stage.
  • scripts – The list of RSQL scripts to be run. The RSQL scripts must be placed in the location defined in a later step.

The following is an example of an entry in the configuration table. You can see the workflow_id is blog_test_workflow and the description is Test Workflow for Blog.

It has three stages that are triggered in the following order: Schema & Table Creation Stage, Data Insertion Stage 1, and Data Insertion Stage 2. The stage Schema & Table Creation Stage has two RSQL jobs running sequentially, and Data Insertion Stage 1 and Data Insertion Stage 2 each have two jobs running in parallel.

{
	"workflow_id": "blog_test_workflow",
	"workflow_description": "Test Workflow for Blog",
	"workflow_stages": [{
			"execution_flag": "y",
			"execution_type": "sequential",
			"scripts": [
				"rsql_blog_script_1.sh",
				"rsql_blog_script_2.sh"
			],
			"stage_description": "Schema & Table Creation Stage"
		},
		{
			"execution_flag": "y",
			"execution_type": "parallel",
			"scripts": [
				"rsql_blog_script_3.sh",
				"rsql_blog_script_4.sh"
			],
			"stage_description": "Data Insertion Stage 1"
		},
		{
			"execution_flag": "y",
			"execution_type": "parallel",
			"scripts": [
				"rsql_blog_script_5.sh",
				"rsql_blog_script_6.sh"
			],
			"stage_description": "Data Insertion Stage 2"
		}
	]
}

DynamoDB audit tables

The audit tables store the run details for each RSQL job within the ETL workflow with a unique identifier for monitoring and reporting purposes. The reason why there are two audit tables is because one table stores the audit information at a RSQL job level and the other stores it at a workflow level.

The job audit table (rsql-blog-rsql-job-audit-table) has the following structure:

  • job_name – The name of the RSQL script
  • workflow_execution_id – The run ID for the workflow
  • execution_start_ts – The start timestamp for the RSQL job
  • execution_end_ts – The end timestamp for the RSQL job
  • execution_status – The run status of the RSQL job (Running, Completed, Failed)
  • instance_id – The EC2 instance ID on which the RSQL job is run
  • ssm_command_id – The Systems Manager command ID used to trigger the RSQL job
  • workflow_id – The workflow_id under which the RSQL job is run

The workflow audit table (rsql-blog-rsql-workflow-audit-table) has the following structure:

  • workflow_execution_id – The run ID for the workflow
  • workflow_id – The identifier for a particular workflow
  • execution_start_ts – The start timestamp for the workflow
  • execution_status – The run status of the workflow or state machine (Running, Completed, Failed)
  • rsql_jobs – The list of RSQL scripts that are a part of the workflow
  • execution_end_ts – The end timestamp for the workflow

Lambda functions

The AWS CDK creates the Lambda functions that retrieve the config data from the DynamoDB config table, update the audit details in DynamoDB, trigger the RSQL scripts on the EC2 instance, and iterate through each stage. The following is a list of the functions:

  • rsql-blog-master-iterator-lambda
  • rsql-blog-parallel-load-check-lambda
  • rsql-blog-sequential-iterator-lambda
  • rsql-blog-rsql-invoke-lambda
  • rsql-blog-update-audit-ddb-lambda

Step Functions state machines

This solution implements a Step Functions callback task integration pattern that enables Step Functions workflows to send a token to an external system via multiple AWS services.

The AWS CDK deploys the following state machines:

  • RSQLParallelStateMachine – The parallel state machine is triggered if the execution_type for a stage in the configuration table is set to parallel. The Lambda function with a callback token is triggered in parallel for each of the RSQL scripts using a Map state.
  • RSQLSequentialStateMachine – The sequential state machine is triggered if the execution_type for a stage in the configuration table is set to sequential. This state machine uses a iterator design pattern to run each RSQL job within the stage as per the sequence mentioned in the configuration.
  • RSQLMasterStatemachine – The primary state machine iterates through each stage and triggers different state machines based on the run mode (sequential or parallel) using a Choice state.

Move the RSQL script and instance code

Copy the instance_code and rsql_scripts directories (present in the GitHub repo) to the EC2 instance. Make sure the framework directory within instance_code is copied as well.

The following screenshots show that the instance_code and rsql_scripts directories are copied to the same parent folder on the EC2 instance.

Instance Code Scripts Image
Instance Code EC2 Copy Image
RSQL Script Image
RSQL Script EC2 Copy Image

RSQL script run workflow

To further illustrate the mechanism to run the RSQL scripts, see the following diagram.

RSQL Script Workflow Diagram

The Lambda function, which gets the configuration details from the configuration DynamoDB table, triggers the Step Functions workflow, which performs the following steps:

  1. A Lambda function defined as a workflow step receives the Step Functions TaskToken and configuration details.
  2. The TaskToken and configuration details are passed onto the EC2 instance using the Systems Manger SendCommand API call. After the Lambda function is run, the workflow branch goes into paused state and waits for a callback token.
  3. The RSQL scripts are run on the EC2 instance, which perform ETL and ELT on Amazon Redshift. After the scripts are run, the RSQL script passes the completion status and TaskToken to a Python script. This Python script is embedded within the RSQL script.
  4. The Python script updates the RSQL job status (success/failure) in the job audit DynamoDB table. It also exports the RSQL job logs to the CloudWatch log group.
  5. The Python script passes the RSQL job status (success/failure) and the status message back to the Step Functions workflow along with TaskToken using the SendTaskSuccess or SendTaskFailure API call.
  6. Depending on the job status received, Step Functions either resumes the workflow or stops the workflow.

If EC2 auto scaling groups are used, then you can use the Systems Manager SendCommand to ensure resilience and high availability by specifying one or more EC2 instances (that are a part of the auto scaling group). For more information, refer to Run commands at scale.

When multiple EC2 instances are used, set the max-concurrency parameter of the RunCommand API call to 1, which makes sure that the RSQL job is triggered on only one EC2 instance. For further details, refer to Using concurrency controls.

Run the orchestration framework

To run the orchestration framework, complete the following steps:

  1. On the DynamoDB console, navigate to the configuration table and insert the configuration details provided earlier. For instructions on how to insert the example JSON configuration details, refer to Write data to a table using the console or AWS CLI.DynamoDB Config Insertion
  2. On the Lambda console, open the rsql-blog-rsql-workflow-trigger-lambda function and choose Test.Workflow Trigger Lambda Function
  3. Add the test event similar to the following code and choose Test:
    {
    	"workflow_id": "blog_test_workflow",
    	"workflow_execution_id": "demo_test_26"
    }

    Workflow Trigger Lambda function Payload

  4. On the Step Functions console, navigate to the rsql-master-state-machine function to open the details page.RSQL Master Step Function
  5. Choose Edit, then choose Workflow Studio New. The following screenshot shows the primary state machine.RSQL Master Step Function Flow
  6. Choose Cancel to leave Workflow Studio, then choose Cancel again to leave edit mode. You’re directed back to the details page.
    RSQL Master Step Function Details
  7. On the Executions tab, choose the latest run.
    RSQL Master Step Function Execution
  8. From the Graph view, you can check the status of each state by choosing it. Every state that uses an external resource has a link to it on the Details tab.RSQL Master Step Function Execution Graph
  9. The orchestration framework runs the ETL load, which consists of the following sample RSQL scripts:
    • rsql_blog_script_1.sh – This script creates a schema rsql_blog within the database
    • rsql_blog_script_2.sh – This script creates a table blog_table within the schema created in the earlier script
    • rsql_blog_script_3.sh – Inserts one row into the table created in the previous script
    • rsql_blog_script_4.sh – Inserts one row into the table created in the previous script
    • rsql_blog_script_5.sh – Inserts one row into the table created in the previous script
    • rsql_blog_script_6.sh – Inserts one row into the table created in the previous script

You need to replace these RSQL scripts with the RSQL scripts developed for your workloads by inserting the relevant configuration details into the configuration DynamoDB table (rsql-blog-rsql-config-table).

Validation

After you run the framework, you’ll find a schema (called rsql_blog) with one table (called blog_table) created. This table consists of four rows.

RSQL Execution Table

You can check the logs of the RSQL job in the CloudWatch log group (/ops/rsql-logs/) and also the run status of the workflow in the workflow audit DynamoDB table (rsql-blog-rsql-workflow-audit-table).

RSQL Script CloudWatch Logs
RSQL Workflow Audit Record

Clean up

To avoid ongoing charges for the resources that you created, delete them. AWS CDK deletes all resources except data resources such as DynamoDB tables.

  • First, delete all AWS CDK stacks
    cdk destroy --all

  • On the DynamoDB console, select the following tables and delete them:
    • rsql-blog-rsql-config-table
    • rsql-blog-rsql-job-audit-table
    • rsql-blog-rsql-workflow-audit-table

Conclusion

You can use Amazon Redshift RSQL, Systems Manager, EC2 instances, and Step Functions to create a modern and cost-effective orchestration framework for ETL workflows. There is no overhead to create and manage different state machines for each of your ETL workflow. In this post, we demonstrated how to use this configuration-based generic orchestration framework to trigger complex RSQL-based ETL workflows.

You can also trigger an email notification through Amazon Simple Notification Service (Amazon SNS) within the state machine to the notify the operations team of the completion status of the ETL process. Further, you can achieve a event-driven ETL orchestration framework by using EventBridge to start the workflow trigger lambda function.


About the Authors

Akhil is a Data Analytics Consultant at AWS Professional Services. He helps customers design & build scalable data analytics solutions and migrate data pipelines and data warehouses to AWS. In his spare time, he loves travelling, playing games and watching movies.


Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

Raza Hafeez is a Senior Data Architect within the Shared Delivery Practice of AWS Professional Services. He has over 12 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Dipal Mahajan is a Lead Consultant with Amazon Web Services based out of India, where he guides global customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings extensive experience on Software Development, Architecture and Analytics from industries like finance, telecom, retail and healthcare.

Implement slowly changing dimensions in a data lake using AWS Glue and Delta

Post Syndicated from Nith Govindasivan original https://aws.amazon.com/blogs/big-data/implement-slowly-changing-dimensions-in-a-data-lake-using-aws-glue-and-delta/

In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.

In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:

  • Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
  • Use consumption tools such as Amazon Athena to query historical records seamlessly

Solution overview

This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:

  • The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
  • The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
  • Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer

The architecture is implemented as follows:

  • Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
  • An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
  • The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a delete_flag value
  • An AWS Glue crawler catalogs the data, which can be queried by Athena

The following diagram illustrates our architecture.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:

  • Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
  • A Lambda function to generate sample records
  • An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:

  1. Enter a stack name.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake resources – The S3 buckets scd-blog-landing-xxxx and scd-blog-processed-xxxx (referred to as scd-blog-landing and scd-blog-processed in the subsequent sections in this post)
  • Sample records generator Lambda functionSampleDataGenaratorLambda-<CloudFormation Stack Name> (referred to as SampleDataGeneratorLambda)
  • AWS Glue Data Catalog databasedeltalake_xxxxxx (referred to as deltalake)
  • AWS Glue Delta job<CloudFormation-Stack-Name>-src-to-processed (referred to as src-to-processed)

Note that deploying the CloudFormation stack in your account incurs AWS usage charges.

Test SCD Type 2 implementation

With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:

  • Run an AWS Glue job for the initial load
  • Simulate a scenario where there are no changes to the source
  • Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
  • Simulate a scenario where the deleted record comes back as a new insert

Generate a sample employee dataset

To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.

Open the function and configure a test event, with the default hello-world template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.

Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.

When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.

Run the AWS Glue job

Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737\nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717\nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas Springs\nDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011\nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson Tunnel\nMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748\nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert Views\nWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813\nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537\nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189\nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229\nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997\nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050\nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks Estates\nEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500\nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004\nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah Creek\nWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659\nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930\nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732\nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049\nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297\nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes Plain\nHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.

To run the job, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job src-to-processed.
  3. On the Runs tab, choose Run.

When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.

When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.

  1. Choose Crawlers in the navigation pane.
  2. Choose Create crawler.

  1. Name your crawler delta-lake-crawler, then choose Next.

  1. Select Not yet for data already mapped to AWS Glue tables.
  2. Choose Add a data source.

  1. On the Data source drop-down menu, choose Delta Lake.
  2. Enter the path to the Delta table.
  3. Select Create Native tables.
  4. Choose Add a Delta Lake data source.

  1. Choose Next.

  1. Choose the role that was created by the CloudFormation template, then choose Next.

  1. Choose the database that was created by the CloudFormation template, then choose Next.

  1. Choose Create crawler.

  1. Select your crawler and choose Run.

Query the data

After the crawler is complete, you can see the table it created.

To query the data, complete the following steps:

  1. Choose the employee table and on the Actions menu, choose View data.

You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.

  1. Under Administration in the navigation pane, choose Workgroups.

  1. Choose Create workgroup.

  1. Provide a name for the workgroup, such as DeltaWorkgroup.
  2. Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.

  1. Choose Create workgroup.

  1. After you create the workgroup, select the workgroup (DeltaWorkgroup) on the drop-down menu in the Athena query editor.

  1. Run the following query on the employee table:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation outputs before running the above query.

You can observe that the employee table has 25 records. The following screenshot shows the total employee records with some sample records.

The Delta table is stored with an emp_key, which is unique to each and every change and is used to track the changes. The emp_key is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id.

The emp_key is created using the SHA256 hashing algorithm, as shown in the following code:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"),
            col("phone_number"), col("isContractor")), 256))

Perform inserts, updates, and deletes

Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select query in the Athena query editor and confirm that there are still 25 active records with the following values:

  • All 25 records with the column isCurrent=true
  • All 25 records with the column end_date=Null
  • All 25 records with the column delete_flag=false

After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:

  1. Change the isContractor flag to false (change it to true if your dataset already shows false) for emp_id=12.
  2. Delete the entire row where emp_id=8 (make sure to save the record in a text editor, because we use this record in another use case).
  3. Copy the row for emp_id=25 and insert a new row. Change the emp_id to be 26, and make sure to change the values for other columns as well.

After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes Plain\nHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}
  1. Now, upload the changed fake_emp_data.json file to the same source prefix.

  1. After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
  2. When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=12:

  • One emp_id=12 record with the following values (for the record that was ingested as part of the initial load):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • A second emp_id=12 record with the following values (for the record that was ingested as part of the change to the source):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (or empty string)

The record for emp_id=8 that was deleted in the source as part of this run will still exist but with the following changes to the values:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

The new employee record will be inserted with the following values:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (or empty string)
  • delete_flag=false

Note that the emp_key values in your actual table may be different than what is provided here as an example.

  1. For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
  2. If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
  3. We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

See the following code:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true"

base_tbl.alias("employee")\
        .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond)\
        .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false",
                                                        "end_date": current_date(),
                                                        "delete_flag": "true"}).execute()
  1. For both the updates and the inserts, we check for the condition if the base table employee.emp_id is equal to the new changes.emp_id and the employee.emp_key is equal to new changes.emp_key, while only retrieving the current records.
  2. If this condition evaluates to true, we then get the current record (isCurrent=true and delete_flag=false).
  3. We merge the changes by updating the following:
    1. If the second condition evaluates to true:
      1. isCurrent=false
      2. end_date=current_date
    2. Or we insert the entire row as follows if the second condition evaluates to false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (or empty string)
      10. isCurrent=true
      11. delete_flag=false

See the following code:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false"

base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond)\
    .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false",
                                                            "end_date": current_date()
                                                            }) \
    .whenNotMatchedInsert(
    values={
        "isCurrent": "true",
        "emp_id": "employeeUpdates.emp_id",
        "first_name": "employeeUpdates.first_name",
        "last_name": "employeeUpdates.last_name",
        "Address": "employeeUpdates.Address",
        "phone_number": "employeeUpdates.phone_number",
        "isContractor": "employeeUpdates.isContractor",
        "emp_key": "employeeUpdates.emp_key",
        "start_date": current_date(),
        "delete_flag":  "employeeUpdates.delete_flag",
        "end_date": "null"
    })\
    .execute()

As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee table in the data lake and observe how the complete history is maintained.

Let’s modify our changed dataset from the previous step and make the following changes.

  1. Add the deleted emp_id=8 back to the dataset.

After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Upload the changed employee dataset file to the same source prefix.
  2. After you upload the changed fake_emp_data.json dataset to Amazon S3, navigate to the AWS Glue console and run the job again.
  3. When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run the following query and confirm there are 5 records:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=8:

  • One emp_id=8 record with the following values (the old record that was deleted):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Another emp_id=8 record with the following values (the new record that was inserted in the last run):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (or empty string)

The emp_key values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key.

End-user sample queries

The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:

  • Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return two employee records who left the organization.

  • Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return 23 active employee records who joined the organization.

  • Query 3 – Find the history of any given employee in the organization (in this case employee 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Note: Update the correct database name from the CloudFormation output before running the above query.

In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.

Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.

Clean up

When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.

Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.


About the authors

Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.

Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

AWS Glue crawlers support cross-account crawling to support data mesh architecture

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/aws-glue-crawlers-support-cross-account-crawling-to-support-data-mesh-architecture/

Data lakes have come a long way, and there’s been tremendous innovation in this space. Today’s modern data lakes are cloud native, work with multiple data types, and make this data easily available to diverse stakeholders across the business. As time has gone by, data lakes have grown significantly and have evolved to data meshes as a way to scale. Thoughtworks defines a data mesh as “a shift in a modern distributed architecture that applies platform thinking to create self-serve data infrastructure, treating data as the product.”

Data mesh advocates for decentralized ownership and delivery of enterprise data management systems that benefit several personas. Data producers can use the data mesh platform to create datasets and share them across business teams to ensure data availability, reliability, and interoperability across functions and data subject areas. Data consumers now have better data sharing with data mesh and federation across business units without compromising data security. The data governance team can support distributed data, where all data is accessible to those with the proper authority to access it. With data mesh, data doesn’t have to be consolidated into a single data lake or account and can remain within different databases and data lakes. An essential capability needed in such a data lake architecture is the ability to continuously understand changes in the data lakes in various other domains and make those available to data consumers. Without such a capability, manual work is needed to understand producers’ updates and make them available to consumers and governance.

AWS customers use a modern data architecture to facilitate governance and data sharing across logical or physical governance boundaries to create data domains aligned to lines of business. Each line of business creates and manages their dataset on Amazon Simple Storage Service (Amazon S3) and uses AWS Glue crawlers to discover new datasets and register them to the AWS Glue Data Catalog, add new tables and partitions, and detect schema changes. These datasets are shared with data consumers that access the data using services like Amazon Athena, Amazon Redshift, Amazon EMR, and more.

In the post Introducing AWS Glue crawlers using AWS Lake Formation permission management, we introduced a new set of capabilities in AWS Glue crawlers and AWS Lake Formation that simplifies crawler setup and supports centralized permissions for in-account and cross-account crawling of S3 data lakes. In this post, we demonstrate the same capability for a data mesh architecture in which we establish a central governance layer to catalog the data owned by the data producer and share it with the data consumer for ease of discovery. The AWS Glue crawler cross-account capability allows you to crawl data sources in different producer accounts while still having those changes cataloged in a centralized governance account. Customers prefer the central governance experience over writing bucket policies separately in each bucket owning the account of a data mesh producer. To build a data mesh architecture, now you can author permissions in a single Lake Formation governance to manage access to data locations and crawlers spanning multiple accounts in the data mesh.

According to the Allstate Corporation:

“By leveraging the power of AWS Lake Formation in our modern data architecture, we will be able to further unlock the potential of our data and empower our analytics community to drive innovation and build data-driven applications. The granular data access and collaboration provided by this architecture will enable us to build a truly unified data and analytics experience, bringing us one step closer to realizing our vision of becoming a fully data-driven enterprise.”

– Prashant Mehrotra, Director – Machine Learning and R&D, Allstate

In this post, we walk through the creation of a simplified data mesh architecture that shows how to use an AWS Glue crawler with Lake Formation to automate bringing changes from data producer domains to data consumers while maintaining centralized governance.

Solution overview

In a data mesh architecture, you have several producer accounts that own S3 buckets, several consumer accounts who wants to access shared datasets, and a central governance account to manage data shares between producers and consumers. This central governance account doesn’t own any S3 bucket or actual tables.

The following figure shows a simplified data mesh architecture with a single producer account, a centralized governance account, and a single consumer account. The data mesh producer account hosts the encrypted S3 bucket, which is shared with the central governance account. The central governance account registers the S3 bucket with Lake Formation using an AWS Identity and Access Management (IAM) role, which has permissions to the S3 bucket and AWS Key Management Service (AWS KMS). The central account creates the database for storing the dataset schema and shares it with the producer account. The producer account, as the S3 bucket owner, runs a crawler to crawl the buckets registered with the central account using Lake Formation permissions and populates the database. Now the shared database with new datasets are available to share with consumers in the data mesh. The central governance account can now share the database with a consumer admin, who can delegate access to other personas (such as data analysts) in the consumer account for data access.

shows a simplified data mesh architecture with a single producer account, a centralized governance account, and a single consumer account

In the following sections, we provide AWS CloudFormation templates to set up the resources in each account. Then we provide the steps to configure the crawler, manage permissions and sharing, and validate the solution by running queries with Athena.

Prerequisites

Complete the following steps in each account (producer, central governance, and consumer) to update the Data Catalog settings to use Lake Formation permissions to control catalog resources instead of IAM-based access control:

  1. Sign in to the Lake Formation console as admin.
  2. If this is the first time accessing the Lake Formation console, add yourself as the data lake administrator.
    add yourself as the data lake administrator.
  3. In the navigation pane, under Data catalog, choose Settings.
  4. Uncheck Use only IAM access control for new databases.
  5. Uncheck Use only IAM access control for new tables in new databases.
  6. Keep Version 3 as the current cross-account version.
  7. Choose Save.

Set up resources in the central governance account

The CloudFormation template for the central account creates a CentralDataMeshOwner user assigned as Lake Formation admin. The CentralDataMeshOwner user in the central governance account performs the necessary steps to share the central catalogs with the producer and consumer accounts. The CentralDataMeshOwner user also sets up a custom Lake Formation service role to register the S3 data lake location. Complete the following steps:

  1. Log in to the central governance account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For DataMeshOwnerUserName, keep the default (CentralDataMeshOwner).
  4. For ProducerAWSAccount, enter the producer account ID.
  5. Create the stack.
  6. After the stack launches, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  7. Note down the value of RegisterLocationServiceRole.
  8. Choose the LFUsersPassword value to navigate to the AWS Secrets Manager console.
  9. In the Secret value section, choose Retrieve secret value.
  10. Note down the secret value for the password for IAM user CentralDataMeshOwner.

Set up resources in the producer account

The CloudFormation template for the producer account creates the following resources:

  • IAM user LOBProducerSteward
  • S3 bucket retail-datalake-<producer account id >-<producer region>
  • KMS key used for bucket encryption
  • Required S3 bucket policies to provide access to the central governance account
  • AWS Glue crawler and crawler IAM role with necessary permissions

Complete the following steps:

  1. Log in to the producer account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For CentralAccountID, enter the central account ID.
  4. For CentralAccountLFServiceRole, enter the value of RegisterLocationServiceRole from CloudFormation noted earlier.
  5. Create the stack.
  6. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  7. Note down the AWSGlueServiceRole value.
  8. Choose the ProducerStewardUserCredentials value to navigate to the Secrets Manager console.
  9. In the Secret value section, choose Retrieve secret value.
  10. Note down the secret value for the password for IAM user LOBProducerSteward.
  11. On the Amazon S3 console, check the bucket policies for retail-datalake-<producer account id >-<producer region> and make sure it is shared with the central governance account IAM role.

This is required for registering the bucket with Lake Formation in the central account so that the account can manage the data sharing.

  1. On the AWS KMS console, check that the bucket is encrypted with the customer managed key and the key is shared with the central governance account.

Set up resources in the consumer account

The CloudFormation template for the consumer account creates the following resources:

  • IAM user ConsumerAdminUser assigned to the data lake admin
  • IAM user LFBusinessAnalyst1
  • S3 bucket for Athena output
  • Athena workgroup

Complete the following steps:

  1. Log in to the consumer account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. Create the stack.
  4. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  5. Choose the AllConsumerUsersCredentials value to navigate to the Secrets Manager console.
  6. In the Secret value section, choose Retrieve secret value.
  7. Note down the secret value for the password for the IAM user ConsumerAdminUser.

Now that all the accounts have been set up, we set up cross-account sharing on AWS with a central governance account to manage sharing of permissions across producers and consumers.

Configure the central governance account to manage sharing with the producer account

Sign in to the central governance account as CentralDataMeshOwner using the password noted earlier through the central governance account CloudFormation stack. Then complete the following steps:

  1. On Lake Formation console, choose Data lake locations under Register and ingest in the navigation pane.
  2. For Amazon S3 path, provide the path retail-datalake-<producer account id >-<region>.
  3. For IAM role, choose the IAM role created using the CloudFormation stack.

This role has permissions for the accessing the encrypted S3 bucket and its key. Do not choose the role AWSServiceRoleForLakeFormationDataAccess.

  1. Choose Register location.
  2. In the navigation pane, choose Databases.
  3. Choose Create database.
  4. For Database name¸ enter datameshtestdatabase.
  5. Choose Create database.
  6. In the navigation pane, choose Data locations and choose Grant.
  7. Select External account and provide the producer account for AWS account ID, AWS organization ID, or IAM principal ARN.
  8. For Storage location, provide the data lake bucket path.
  9. Select Grantable, then choose Grant.
  10. Choose Data lake permissions, then choose Grant.
  11. Select External accounts and provide the producer account number.
  12. For Databases, choose datameshtestdatabase.
  13. For Database permissions and Grantable permissions, select Create table, Alter, and Describe.
  14. Choose Grant.

Configure the crawler in the producer account to populate the schema

Sign in to producer account as LOBProducerSteward with the password noted earlier through the producer account CloudFormation stack, then complete the following steps:

  1. On the AWS RAM console, accept the pending resource share from the central account.
  2. On the Lake Formation console, choose Databases under Data catalog in the navigation pane.
  3. Choose datameshtestdatabase, and on the Action menu, choose Create resource link.
  4. For Resource link name, enter datameshtestdatabaselink.
  5. Choose Create.
  6. On the AWS Glue console, choose Crawlers in the navigation pane.
  7. Choose the crawler CrossAccountCrawler-<accountid>.
  8. Choose Edit, then choose Configure security settings.
  9. Select Use Lake Formation credentials for crawling S3 data source.
  10. Select In a different account and provide the account ID of the central governance account.
  11. Choose Next.
  12. Choose datameshtestdatabaselink as the database and choose Update.
  13. In the navigation pane, choose Data locations and choose Grant.
  14. Select My account, and choose the crawler IAM role for IAM users and roles.
  15. For Storage locations, choose the bucket retail-datalake-<accountid>-<region>.
  16. For Registered account location, enter the central account ID.
  17. Choose Grant.
    Alternatively, you can also use the AWS CLI to grant data location permission on bucket registered in central account to the crawler role using below command:

    aws lakeformation grant-permissions 
    --principal DataLakePrincipalIdentifier="<Crawler Role ARN>" 
    --permissions "DATA_LOCATION_ACCESS” 
    --resource ‘{ "DataLocation": {"ResourceArn":"<S3 bucket arn>", "CatalogId": "<Central Account id>"}}'

    For using CLI, refer to Installing or updating the latest version of the AWS CLI.

  18. In the navigation pane, choose Data lake permissions.
  19. Choose the crawler IAM role for the principal account.
  20. Choose datameshtestdatabase for the database.
  21. For Database permissions, select Create, Describe, and Alter.
  22. Choose Grant.
  23. Choose the crawler IAM role for the principal account.
  24. Choose datameshtestdatabaselink for the database.
  25. For Resource link permissions, select Describe.
  26. Choose Grant.
  27. Run the crawler.

The following screenshot shows the details after a successful run.

When the crawler is complete, you can validate the table created under the database datameshtestdatabaselink.

This table is owned by the producer account and available in the central governance account under the shared database datameshtestdatabase. Now the data lake admin in the central governance account can share the database and populated table with the consumer account.

Configure the central governance account to manage sharing of read-only access with the consumer account

Sign in to the central governance account as CentralDataMeshOwner with the password noted earlier through the central governance account CloudFormation stack, then complete the following steps:

  1. Grant database permissions to the consumer account.
  2. For Principals, choose external account and provide <consumer accountID>
  3. For Databases, select datameshtestdatabase.
  4. For Database permissions, select Describe.
  5. For Grantable permissions¸ select Describe.
  6. Choose Grant.

  7. Grant table permissions to the consumer account.
  8. For Principals, choose external account and provide <consumer accountID>.
  9. For Databases, select datameshtestdatabase.
  10. For Tables, select retail_datalake_<accountID>_<region>.
  11. For Table permissions, select Select and Describe.
  12. For Grantable permissions¸ select Select and Describe.
  13. Choose Grant.

Configure the consumer account as the consumer account data lake admin

Sign to the consumer account as ConsumerAdminUser with the password noted earlier through the consumer account CloudFormation stack. (Note that in the consumer account Lake Formation configuration, both ConsumerAdminUser and LFBusinessAnalyst1 have the same password.)

  1. On the AWS RAM console, accept the resource share from the central account.
  2. On the Lake Formation console, validate that the shared database datameshtestdatabase is available and create the resource link datameshtestdatabaselink using the shared database.

The following screenshot shows the details after the resource link is created.

  1. On the Lake Formation console, choose Grant.
  2. Choose LFBusinessAnalyst1 for IAM users and roles.
  3. Choose datameshtestdatabase for the database under Named data catalog resources.
  4. Select Describe for Database permissions.
  5. On the Lake Formation console, choose Grant.
  6. Choose LFBusinessAnalyst1 for IAM users and roles.
  7. Choose datameshtestdatabaselink for the database under Named data catalog resources.
  8. Select Describe for Resource link permissions.
  9. On the Lake Formation console, choose Grant.
  10. Choose LFBusinessAnalyst1 for IAM users and roles.
  11. Choose retail_datalake_<accountid>_<region> for the table under Named data catalog resources.
  12. Select Select and Describe for Table permissions.

Run queries in the consumer account

Sign to the consumer account console as LFBusinessAnalyst1 with the password noted earlier through the consumer account CloudFormation stack, then complete the following steps:

  1. On the Athena console, and choose lfconsumer-workgroup as the Athena workgroup.
  2. Run the following query to validate access:
select * from datameshtestdatabaselink.retail_datalake_<accountid>_<region>

We have successfully registered the dataset and created a Data Catalog in the central governance account. We crawled the data lake that was registered with the central governance account using Lake Formation permissions from the producer account and populated the schema. We granted Lake Formation permission on the database and table from the central account to the consumer user and validated consumer user access to the data using Athena.

Clean up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign in to the CloudFormation console as the IAM admin used for creating the CloudFormation stack in all three accounts.
  2. Delete the stacks you created.

Conclusion

In this post, we showed how to set up cross-account crawling using a central governance account with the new AWS Glue crawler capability of Lake Formation integration. This capability allows data producers to set up crawling capabilities in their own domain so that changes are seamlessly available to data governance and data consumers. Implementing a data mesh with AWS Glue crawlers, Lake Formation, Athena, and other analytical services provide a well-understood, performant, scalable, and cost-effective solution to integrate, prepare, and serve data.

If you have questions or suggestions, submit them in the comments section.

For more resources, refer to the following:


About the authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Piyali Kamra is a seasoned enterprise architect and a hands-on technologist who believes that building large scale enterprise systems is not an exact science but more like an art, in which tools and technologies must be carefully selected based on the team’s culture , strengths , weaknesses and risks , in tandem with having a futuristic vision as to how you want to shape your product a few years down the road.

Visualize Confluent data in Amazon QuickSight using Amazon Athena

Post Syndicated from Ahmed Zamzam original https://aws.amazon.com/blogs/big-data/visualize-confluent-data-in-amazon-quicksight-using-amazon-athena/

This is a guest post written by Ahmed Saef Zamzam and Geetha Anne from Confluent.

Businesses are using real-time data streams to gain insights into their company’s performance and make informed, data-driven decisions faster. As real-time data has become essential for businesses, a growing number of companies are adapting their data strategy to focus on data in motion. Event streaming is the central nervous system of a data in motion strategy and, in many organizations, Apache Kafka is the tool that powers it.

Today, Kafka is well known and widely used for streaming data. However, managing and operating Kafka at scale can still be challenging. Confluent offers a solution through its fully managed, cloud-native service that simplifies running and operating data streams at scale. Confluent extends open-source Kafka through a suite of related services and features designed to enhance the data in motion experience for operators, developers, and architects in production.

In this post, we demonstrate how Amazon Athena, Amazon QuickSight, and Confluent work together to enable visualization of data streams in near-real time. We use the Kafka connector in Athena to do the following:

  • Join data inside Confluent with data stored in one of the many data sources supported by Athena, such as Amazon Simple Storage Service (Amazon S3)
  • Visualize Confluent data using QuickSight

Challenges

Purpose-built stream processing engines, like Confluent ksqlDB, often provide SQL-like semantics for real-time transformations, joins, aggregations, and filters on streaming data. With ksqlDB, you can create persistent queries, which continuously process streams of events according to specific logic, and materialize streaming data in views that can be queried at a point in time (pull queries) or subscribed to by clients (push queries).

ksqlDB is one solution that made stream processing accessible to a wider range of users. However, pull queries, like those supported by ksqlDB, may not be suitable for all stream processing use cases, and there may be complexities or unique requirements that pull queries are not designed for.

Data visualization for Confluent data

A frequent use case for enterprises is data visualization. To visualize data stored in Confluent, you can use one of over 120 pre-built connectors, provided by Confluent, to write streaming data to a destination data store of your choice. Next, you connect your business intelligence (BI) tool to the data store to begin visualizing the data.

The following diagram depicts a typical architecture utilized by many Confluent customers. In this workflow, data is written to Amazon S3 through the Confluent S3 sink connector and then analyzed with Athena, a serverless interactive analytics service that enables you to analyze and query data stored in Amazon S3 and various other data sources using standard SQL. You can then use Athena as an input data source to QuickSight, a highly scalable cloud native BI service, for further analysis.

typical architecture utilized by many Confluent customers.

Although this approach works well for many use cases, it requires data to be moved, and therefore duplicated, before it can be visualized. This duplication not only adds time and effort for data engineers who may need to develop and test new scripts, but also creates data redundancy, making it more challenging to manage and secure the data, and increases storage cost.

Enriching data with reference data in another data store

With ksqlDB queries, the source and destination are always Kafka topics. Therefore, if you have a data stream that you need to enrich with external reference data, you have two options. One option is to import the reference data into Confluent, model it as a table, and use ksqlDB’s stream-table join to enrich the stream. The other option is to ingest the data stream into a separate data store and perform join operations there. Both require data movement and result in duplicate data storage.

Solution overview

So far, we have discussed two challenges that are not addressed by conventional stream processing tools. Is there a solution that addresses both challenges simultaneously?

When you want to analyze data without separate pipelines and jobs, a popular choice is Athena. With Athena, you can run SQL queries on a wide range of data sources—in addition to Amazon S3—without learning a new language, developing scripts to extract (and duplicate) data, or managing infrastructure.

Recently, Athena announced a connector for Kafka. Like Athena’s other connectors, queries on Kafka are processed within Kafka and return results to Athena. The connector supports predicate pushdown, which means that adding filters to your queries can reduce the amount of data scanned, improve query performance, and reduce cost.

For example, when using this connector, the amount of data scanned by the query SELECT * FROM CONFLUENT_TABLE could be significantly higher than the amount of data scanned by the query SELECT * FROM CONFLUENT_TABLE WHERE COUNTRY = 'UK'. The reason is that the AWS Lambda function which provides the runtime environment for the Athena connector, filters data at the source before returning it to Athena.

Let’s assume we have a stream of online transactions flowing into Confluent and customer reference data stored in Amazon S3. We want to use Athena to join both data sources together and produce a new dataset for QuickSight. Instead of using the S3 sink connector to load data into Amazon S3, we use Athena to query Confluent and join it with S3 data—all without moving data. The following diagram illustrates this architecture.

Athena to join both data sources together and produce a new dataset for QuickSight

We perform the following steps:

  1. Register the schema of your Confluent data.
  2. Configure the Athena connector for Kafka.
  3. Optionally, interactively analyze Confluent data.
  4. Create a QuickSight dataset using Athena as the source.

Register the schema

To connect Athena to Confluent, the connector needs the schema of the topic to be registered in the AWS Glue Schema Registry, which Athena uses for query planning.

The following is a sample record in Confluent:

{
  "transaction_id": "23e5ed25-5818-4d4f-acb3-73ef04d51d21",
  "customer_id": "126-58-9758",
  "amount": 986,
  "timestamp": "2023-01-03T15:40:42",
  "product_category": "health_fitness"
}

The following is the schema of this record:

{
  "topicName": "transactions",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "transaction_id",
        "mapping": "transaction_id",
        "type": "VARCHAR"
      },
      {
        "name": "customer_id",
        "mapping": "customer_id",
        "type": "VARCHAR"
      },
      {
        "name": "amount",
        "mapping": "amount",
        "type": "INTEGER"
      },
      {
        "name": "timestamp",
        "mapping": "timestamp",
        "type": "timestamp",
        "formatHint": "yyyy-MM-dd\'T\'HH:mm:ss"
      },
      {
        "name": "product_category",
        "mapping": "product_category",
        "type": "VARCHAR"
      },
      {
        "name": "customer_id",
        "mapping": "customer_id",
        "type": "VARCHAR"
      }
    ]
  }
}

The data producer writing the data can register this schema with the AWS Glue Schema Registry. Alternatively, you can use the AWS Management Console or AWS Command Line Interface (AWS CLI) to create a schema manually.

We create the schema manually by running the following CLI command. Replace <registry_name> with your registry name and make sure that the text in the description field includes the required string {AthenaFederationKafka}:

aws glue create-registry –registry-name <registry_name> --description {AthenaFederationKafka}

Next, we run the following command to create a schema inside the newly created schema registry:

aws glue create-schema –registry-id RegistryName=<registry_name> --schema-name <schema_name> --compatibility <Compatibility_Mode> --data-format JSON –schema-definition <Schema>

Before running the command, be sure to provide the following details:

  • Replace <registry_name> with our AWS Glue Schema Registry name
  • Replace <schema_name> with the name of our Confluent Cloud topic, for example, transactions
  • Replace <Compatibility_Mode> with one of the supported compatibility modes, for example, ‘Backward’
  • Replace <Schema> with our schema

Configure and deploy the Athena Connector

With our schema created, we’re ready to deploy the Athena connector. Complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. Search for and select Apache Kafka.
    Add Apache Kafka as data source
  4. For Data source name, enter the name for the data source.
    Enter name for data source

This data source name will be referenced in your queries. For example:

SELECT * 
FROM <data_source_name>.<registry_name>.<schema_name>
WHERE COL1='SOMETHING'

Applying this to our use case and previously defined schema, our query would be as follows:

SELECT * 
FROM "Confluent"."transactions_db"."transactions"
WHERE product_category='Kids'
  1. In the Connection details section, choose Create Lambda function.
    create lambda function

You’re redirected to the Applications page on the Lambda console. Some of the application settings are already filled.

The following are the important settings required for integrating with Confluent Cloud. For more information on these settings, refer to Parameters.

  1. For LambdaFunctionName, enter the name for the Lambda function the connector will use. For example, athena_confluent_connector.

We use this parameter in the next step.

  1. For KafkaEndpoint, enter the Confluent Cloud bootstrap URL.

You can find this on the Cluster settings page in the Confluent Cloud UI.

enter the Confluent Cloud bootstrap URL

Confluent Cloud supports two authentication mechanisms: OAuth and SASL/PLAIN (API keys). The connector doesn’t support OAuth; this leaves us with SASL/PLAIN. SASL/PLAIN uses SSL as a security protocol and PLAIN as SASL mechanism.

  1. For AuthType, enter SASL_SSL_PLAIN.

The API key and secret used by the connector to access Confluent need to be stored in AWS Secrets Manager.

  1. Get your Confluent API key or create a new one.
  2. Run the following AWS CLI command to create the secret in Secrets Manager:
    aws secretsmanager create-secret \
        --name <SecretNamePrefix>\
        --secret-string "{\"username\":\"<Confluent_API_KEY>\",\"password\":\"<Confluent_Secret>\"}"

The secret string should have two key-value pairs, one named username and the other password.

  1. For SecretNamePrefix, enter the secret name prefix created in the previous step.
  2. If the Confluent cloud cluster is reachable over the internet, leave SecurityGroupIds and SubnetIds blank. Otherwise, your Lambda function needs to run in a VPC that has connectivity to your Confluent Cloud network. Therefore, enter a security group ID and three private subnet IDs in this VPC.
  3. For SpillBucket, enter the name of an S3 bucket where the connector can spill data.

Athena connectors temporarily store (spill) data to Amazon S3 for further processing by Athena.

  1. Select I acknowledge that this app creates custom IAM roles and resource policies.
  2. Choose Deploy.
  3. Return to the Connection details section on the Athena console and for Lambda, enter the name of the Lambda function you created.
  4. Choose Next.
    Return to the Connection details section on the Athena console and for Lambda, enter the name of the Lambda function you created. And Choose Next.
  5. Choose Create data source.

Perform interactive analysis on Confluent data

With the Athena connector set up, our streaming data is now queryable from the same service we use to analyze S3 data lakes. Next, we use Athena to conduct point-in-time analysis of transactions flowing through Confluent Cloud.

Aggregation

We can use standard SQL functions to aggregate the data. For example, we can get the revenue by product category:

SELECT product_category, SUM(amount) AS Revenue
FROM "Confluent"."athena_blog"."transactions"
GROUP BY product_category
ORDER BY Revenue desc

SQL function to aggregate data

Enrich transaction data with customer data

The aggregation example is also available with ksqlDB pull queries. However, Athena’s connector allows us to join the data with other data sources like Amazon S3.

In our use case, the transactions streamed to Confluent Cloud lack detailed information about customers, apart from a customer_id. However, we have a reference dataset in Amazon S3 that has more information about the customers. With Athena, we can join both datasets together to gain insights about our customers. See the following code:

SELECT * 
FROM "Confluent"."athena_blog"."transactions" a
INNER JOIN "AwsDataCatalog"."athenablog"."customer" b 
ON a.customer_id=b.customer_id

join data

You can see from the results that we were able to enrich the streaming data with customer details, stored in Amazon S3, including name and address.

Visualize data using QuickSight

Another powerful feature this connector brings is the ability to visualize data stored in Confluent using any BI tool that supports Athena as a data source. In this post, we use QuickSight. QuickSight is a machine learning (ML)-powered BI service built for the cloud. You can use it to deliver easy-to-understand insights to the people you work with, wherever they are.

For more information about signing up for QuickSight, see Signing up for an Amazon QuickSight subscription.

Complete the following steps to visualize your streaming data with QuickSight:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena as the data source.
  4. For Data source name, enter a name.
  5. Choose Create data source.
  6. In the Choose your table section, choose Use custom SQL.
    In the Choose your table section, choose Use custom SQL.
  7. Enter the join query like the one given previously, then choose Confirm query.
    Enter the join query like the one given previously, then choose Confirm query.
  8. Next, choose to import the data into SPICE (Super-fast, Parallel, In-memory Calculation Engine), a fully managed in-memory cache that boosts performance, or directly query the data.

Utilizing SPICE will enhance performance, but the data may need to be periodically updated. You can choose to incrementally refresh your dataset or schedule regular refreshes with SPICE. If you want near-real-time data reflected in your dashboards, select Directly query your data. Note that with the direct query option, user actions in QuickSight, such as applying a drill-down filter, may invoke a new Athena query.

  1. Choose Visualize.
    Choose Visualize

That’s it, we have successfully connected QuickSight to Confluent through Athena. With just a few clicks, you can create a few visuals displaying data from Confluent.

successfully connected QuickSight to Confluent through Athena.

Clean up

To avoid incurring ongoing charges, delete the resources you provisioned by completing the following steps:

  1. Delete the AWS Glue schema and registry.
  2. Delete the Athena Kafka connector.
  3. Delete the QuickSight dataset.

Conclusion

In this post, we discussed use cases for Athena and Confluent. We provided examples of how you can use both for near-real-time data visualization with QuickSight and interactive analysis involving joins between streaming data in Confluent and data stored in Amazon S3.

The Athena connector for Kafka simplifies the process of querying and analyzing streaming data from Confluent Cloud. It removes the need to first move streaming data to persistent storage before it can be used in downstream use cases like business intelligence. This complements the existing integration between Confluent and Athena, using the S3 sink connector, which enables loading streaming data into a data lake, and is an additional option for customers who want to enable interactive analysis on Confluent data.


About the authors

Ahmed Zamzam is a Senior Partner Solutions Architect at Confluent, with a focus on the AWS partnership. In his role, he works with customers in the EMEA region across various industries to assist them in building applications that leverage their data using Confluent and AWS. Prior to Confluent, Ahmed was a Specialist Solutions Architect for Analytics AWS specialized in data streaming and search. In his free time, Ahmed enjoys traveling, playing tennis, and cycling.

Geetha Anne is a Partner Solutions Engineer at Confluent with previous experience in implementing solutions for data-driven business problems on the cloud, involving data warehousing and real-time streaming analytics. She fell in love with distributed computing during her undergraduate days and has followed her interest ever since. Geetha provides technical guidance, design advice, and thought leadership to key Confluent customers and partners. She also enjoys teaching complex technical concepts to both tech-savvy and general audiences.

Manage your data warehouse cost allocations with Amazon Redshift Serverless tagging

Post Syndicated from Sandeep Bajwa original https://aws.amazon.com/blogs/big-data/manage-your-data-warehouse-cost-allocations-with-amazon-redshift-serverless-tagging/

Amazon Redshift Serverless makes it simple to run and scale analytics without having to manage your data warehouse infrastructure. Developers, data scientists, and analysts can work across databases, data warehouses, and data lakes to build reporting and dashboarding applications, perform real-time analytics, share and collaborate on data, and even build and train machine learning (ML) models with Redshift Serverless.

Tags allows you to assign metadata to your AWS resources. You can define your own key and value for your resource tag, so that you can easily manage and filter your resources. Tags can also improve transparency and map costs to specific teams, products, or applications. This way, you can raise cost awareness and also make teams and users accountable for their own cost and usage.

You can now use tagging in Redshift Serverless to categorize the following resources based on your grouping needs:

  • Namespace – A collection of database objects and users
  • Workgroup – A collection of compute resources
  • Snapshot – Point-in-time backups of a cluster
  • Recovery point – Recovery points in Redshift Serverless are created every 30 minutes and saved for 24 hours

When using Redshift Serverless, you may have to manage data across many business departments, environments, and billing groups. In doing so, you’re usually faced with one of the following tasks:

  • Cost allocation and financial management – You want to know what you’re spending on AWS for a given project, line of business, or environment
  • Operations support and incident management – You want to send issues to the right teams and users
  • Access control – You want to constrain user access to certain resources
  • Security risk management – You want to group resources based on their level of security or data sensitivity and make sure proper controls are in place

In this post, we focus on tagging Redshift Serverless resources for cost allocation and reporting purposes. Knowing where you have incurred costs at the resource, workload, team, and organization level enhances your ability to budget and manage cost.

Solution overview

Let’s say that your company has two departments: marketing and finance. Each department has multiple cost centers and environments, as illustrated in the following figure. In AWS Cost Explorer, you want to create cost reports for Redshift Serverless by department, environment, and cost center.

We start with creating and applying user-defined tags to Amazon Serverless workgroups for respective departments, environments, and cost centers. You can use both the AWS Command Line Interface (AWS CLI) and Redshift Serverless console to tag serverless resources.

The high-level steps are as follows:

  1. Create tags.
  2. View and edit tags.
  3. Set up cost allocation tags.
  4. Create cost reports.

Create tags

To create tags, complete the following steps:

  1. On the Amazon Redshift console, choose Manage tags in the navigation pane.
    Amazon Redshift console
  2. For Filter by resource type, you can filter by Workgroup, Namespace, Snapshot, and Recovery Point.
  3. Optionally, you can search for resources by an existing tag by entering values for Tag key or Tag value. For this post, we don’t include any tag filters, so we can view all the resources across our account.
    search for resources by an existing tag by entering values
  4. Select your resource from the search results and choose Manage tags to customize the tag key and value parameters.

Here, you can add new tags, remove tags, save changes, and cancel your changes if needed.

  1. Because we want to allocate cost across the various departments, we add a new key called department and a new value called marketing.
  1. Choose Save changes.
    Save Changes
  2. Confirm the changes by choosing Apply changes.
    Apply changes

For more details on tagging, refer to Tagging resources overview.

View and edit tags

If you already have resources such as workgroups (listed on the Workgroup configuration page) or snapshots (listed on the Data backup page), you can create new tags or edit existing tags on the given resource. In the following example, we manage tags on an existing workgroup.

  1. On the Amazon Redshift console, choose Workgroup configuration in the navigation pane.
  2. Select your workgroup and on the Actions menu, choose Manage tags.
    Select your workgroup and on the Actions menu, choose Manage tags.

Now we can remove existing tags or add new tags. For our use case, let’s assume that the marketing department is no longer using the default workgroup, so we want to remove the current tag.

  1. Choose Remove next to the marketing tag.
    Choose Remove next to the marketing tag.

We are given the option to choose Undo if needed.

  1. Choose Save changes and then Apply the changes to confirm.
    Choose Save changes and then Apply the changes to confirm.

After we apply the tags, we can view the full list of resources. The number of tags applied to each resource is found in the Tags column.
view the full list of resources

Set up cost allocation tags

After you create and apply the user-defined tags to your Redshift Serverless workgroups, it can take up to 24 hours for the tags to appear on your cost allocation tags page for activation. You can activate tags by using the AWS Billing console for cost allocation tracking with the following steps:

  1. On the AWS Billing console, choose Cost allocation tags in the navigation pane.
  2. Under User-defined cost allocation tags¸ select the tags you created and applied (for this example, cost-center).
  3. Choose Activate.
    Choose Activate

After we activate all the tags we created, we can view the full list by choosing Active on the drop-down menu.

Create cost reports

After you activate the cost allocation tags, they appear on your cost allocation reports in Cost Explorer.

Cost Explorer helps you manage your AWS costs by giving you detailed insights into the line items in your bill. In Cost Explorer, you can visualize daily, monthly, and forecasted spend by combining an array of available filters. Filters allow you to narrow down costs according to AWS service type, linked accounts, and tags.

The following screenshot shows the preconfigured reports in Cost Explorer.
preconfigured reports in Cost Explorer.

To create custom reports for your cost and usage data, complete the following steps:

  1. On the AWS Cost Management console, choose Reports in the navigation pane.
  2. Choose Create new report.
  3. Select the report type you want to create (for this example, we select Cost and usage).
  4. Choose Create Report.
    Create Report
  5. To view weekly Redshift Serverless cost by cost center, choose the applicable settings in the Report parameters pane. For this post, we group data by the cost-center tag and filter data by the department tag.
  6. Save the report for later use by choosing Save to report library.
    Save the report for later use by choosing Save to report library.
  7. Enter a name for your report, then choose Save report.
    Save Report

The following screenshot shows a sample report for daily Redshift Serverless cost by department.

sample report for daily Redshift Serverless cost by department.

The following screenshot shows an example report of weekly Redshift Serverless cost by environment.

example report of weekly Redshift Serverless cost by environment.

Conclusion

Tagging resources in Amazon Redshift helps you maintain a central place to organize and view resources across the service for billing management. This feature saves you hours of manual work you would spend in grouping your Amazon Redshift resources via a spreadsheet or other manual alternatives.

For more tagging best practices, refer to Tagging AWS resources.


About the Authors

Sandeep Bajwa is a Sr. Analytics Specialist based out of Northern Virginia, specialized in the design and implementation of analytics and data lake solutions.

Michael Yitayew is a Product Manager for Amazon Redshift based out of New York. He works with customers and engineering teams to build new features that enable data engineers and data analysts to more easily load data, manage data warehouse resources, and query their data. He has supported AWS customers for over 3 years in both product marketing and product management roles.

Manage users and group memberships on Amazon QuickSight using SCIM events generated in IAM Identity Center with Azure AD

Post Syndicated from Wakana Vilquin-Sakashita original https://aws.amazon.com/blogs/big-data/manage-users-and-group-memberships-on-amazon-quicksight-using-scim-events-generated-in-iam-identity-center-with-azure-ad/

Amazon QuickSight is cloud-native, scalable business intelligence (BI) service that supports identity federation. AWS Identity and Access Management (IAM) allows organizations to use the identities managed in their enterprise identity provider (IdP) and federate single sign-on (SSO) to QuickSight. As more organizations are building centralized user identity stores with all their applications, including on-premises apps, third-party apps, and applications on AWS, they need a solution to automate user provisioning into these applications and keep their attributes in sync with their centralized user identity store.

When architecting a user repository, some organizations decide to organize their users in groups or use attributes (such as department name), or a combination of both. If your organization uses Microsoft Azure Active Directory (Azure AD) for centralized authentication and utilizes its user attributes to organize the users, you can enable federation across all QuickSight accounts as well as manage users and their group membership in QuickSight using events generated in the AWS platform. This allows system administrators to centrally manage user permissions from Azure AD. Provisioning, updating, and de-provisioning users and groups in QuickSight no longer requires management in two places with this solution. This makes sure that users and groups in QuickSight stay consistent with information in Azure AD through automatic synchronization.

In this post, we walk you through the steps required to configure federated SSO between QuickSight and Azure AD via AWS IAM Identity Center (Successor to AWS Single Sign-On) where automatic provisioning is enabled for Azure AD. We also demonstrate automatic user and group membership update using a System for Cross-domain Identity Management (SCIM) event.

Solution overview

The following diagram illustrates the solution architecture and user flow.

solution architecture and user flow.

In this post, IAM Identity Center provides a central place to bring together administration of users and their access to AWS accounts and cloud applications. Azure AD is the user repository and configured as the external IdP in IAM Identity Center. In this solution, we demonstrate the use of two user attributes (department, jobTitle) specifically in Azure AD. IAM Identity Center supports automatic provisioning (synchronization) of user and group information from Azure AD into IAM Identity Center using the SCIM v2.0 protocol. With this protocol, the attributes from Azure AD are passed along to IAM Identity Center, which inherits the defined attribute for the user’s profile in IAM Identity Center. IAM Identity Center also supports identity federation with SAML (Security Assertion Markup Language) 2.0. This allows IAM Identity Center to authenticate identities using Azure AD. Users can then SSO into applications that support SAML, including QuickSight. The first half of this post focuses on how to configure this end to end (see Sign-In Flow in the diagram).

Next, user information starts to get synchronized between Azure AD and IAM Identity Center via SCIM protocol. You can automate creating a user in QuickSight using an AWS Lambda function triggered by the CreateUser SCIM event originated from IAM Identity Center, which was captured in Amazon EventBridge. In the same Lambda function, you can subsequently update the user’s membership by adding into the specified group (whose name is comprised of two user attributes: department-jobTitle, otherwise create the group if it doesn’t exist yet, prior to adding the membership.

In this post, this automation part is omitted because it would be redundant with the content discussed in the following sections.

This post explores and demonstrates an UpdateUser SCIM event triggered by the user profile update on Azure AD. The event is captured in EventBridge, which invokes a Lambda function to update the group membership in QuickSight (see Update Flow in the diagram). Because a given user is supposed to belong to only one group at a time in this example, the function will replace the user’s current group membership with the new one.

In Part I, you set up SSO to QuickSight from Azure AD via IAM Identity Center (the sign-in flow):

  1. Configure Azure AD as the external IdP in IAM Identity Center.
  2. Add and configure an IAM Identity Center application in Azure AD.
  3. Complete configuration of IAM Identity Center.
  4. Set up SCIM automatic provisioning on both Azure AD and IAM Identity Center, and confirm in IAM Identity Center.
  5. Add and configure a QuickSight application in IAM Identity Center.
  6. Configure a SAML IdP and SAML 2.0 federation IAM role.
  7. Configure attributes in the QuickSight application.
  8. Create a user, group, and group membership manually via the AWS Command Line Interface (AWS CLI) or API.
  9. Verify the configuration by logging in to QuickSight from the IAM Identity Center portal.

In Part II, you set up automation to change group membership upon an SCIM event (the update flow):

  1. Understand SCIM events and event patterns for EventBridge.
  2. Create attribute mapping for the group name.
  3. Create a Lambda function.
  4. Add an EventBridge rule to trigger the event.
  5. Verify the configuration by changing the user attribute value at Azure AD.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • IAM Identity Center. For instructions, refer to Steps 1–2 in the AWS IAM Identity Center Getting Started guide.
  • A QuickSight account subscription.
  • Basic understanding of IAM and privileges required to create an IAM IdP, roles, and policies.
  • An Azure AD subscription. You need at least one user with the following attributes to be registered in Azure AD:
    • userPrincipalName – Mandatory field for Azure AD user.
    • displayName – Mandatory field for Azure AD user.
    • Mail – Mandatory field for IAM Identity Center to work with QuickSight.
    • jobTitle – Used to allocate user to group
    • department – Used to allocate user to group.
    • givenName – Optional field.
    • surname – Optional field.

Part I: Set up SSO to QuickSight from Azure AD via IAM Identity Center

This section presents the steps to set up the sign-in flow.

Configure an external IdP as Azure AD in IAM Identity Center

To configure your external IdP, complete the following steps:

  1. On the IAM Identity Center console, choose Settings.
  2. Choose Actions on the Identity source tab, then choose Change identity source.
  3. Choose External identity provider, then choose Next.

The IdP metadata is displayed. Keep this browser tab open.

Add and configure an IAM Identity Center application in Azure AD

To set up your IAM Identity Center application, complete the following steps:

  1. Open a new browser tab.
  2. Log in to the Azure AD portal using your Azure administrator credentials.
  3. Under Azure services, choose Azure Active Directory.
  4. In the navigation pane, under Manage, choose Enterprise applications, then choose New application.
  5. In the Browse Azure AD Galley section, search for IAM Identity Center, then choose AWS IAM Identity Center (successor to AWS Single Sign-On).
  6. Enter a name for the application (in this post, we use IIC-QuickSight) and choose Create.
  7. In the Manage section, choose Single sign-on, then choose SAML.
  8. In the Assign users and groups section, choose Assign users and groups.
  9. Choose Add user/group and add at least one user.
  10. Select User as its role.
  11. In the Set up single sign on section, choose Get started.
  12. In the Basic SAML Configuration section, choose Edit, and fill out following parameters and values:
  13. Identifier – The value in the IAM Identity Center issuer URL field.
  14. Reply URL – The value in the IAM Identity Center Assertion Consumer Service (ACS) URL field.
  15. Sign on URL – Leave blank.
  16. Relay State – Leave blank.
  17. Logout URL – Leave blank.
  18. Choose Save.

The configuration should look like the following screenshot.

configuration

  1. In the SAML Certificates section, download the Federation Metadata XML file and the Certificate (Raw) file.
    Federation Metadata XML file and the Certificate (Raw) file

You’re all set with Azure AD SSO configuration at this moment. Later on, you’ll return to this page to configure automated provisioning, so keep this browser tab open.

Complete configuration of IAM Identity Center

Complete your IAM Identity Center configuration with the following steps:

  1. Go back to the browser tab for IAM Identity Center console which you have kept open in previous step.
  2. For IdP SAML metadata under the Identity provider metadata section, choose Choose file.
  3. Choose the previously downloaded metadata file (IIC-QuickSight.xml).
  4. For IdP certificate under the Identity provider metadata section, choose Choose file.
  5. Choose the previously downloaded certificate file (IIC-QuickSight.cer).
  6. Choose Next.
  7. Enter ACCEPT, then choose Change Identity provider source.

Set up SCIM automatic provisioning on both Azure AD and IAM Identity Center

Your provisioning method is still set as Manual (non-SCIM). In this step, we enable automatic provisioning so that IAM Identity Center becomes aware of the users, which allows identity federation to QuickSight.

  1. In the Automatic provisioning section, choose Enable.
    choose Enable
  2. Choose Access token to show your token.
    access token
  3. Go back to the browser tab (Azure AD), which you kept open in Step 1.
  4. In the Manage section, choose Enterprise applications.
  5. Choose IIC-QuickSight, then choose Provisioning.
  6. Choose Automatic in Provisioning Mode and enter the following values:
  7. Tenant URL – The value in the SCIM endpoint field.
  8. Secret Token – The value in the Access token field.
  9. Choose Test Connection.
  10. After the test connection is successfully complete, set Provisioning Status to On.
    set Provisioning Status to On
  11. Choose Save.
  12. Choose Start provisioning to start automatic provisioning using the SCIM protocol.

When provisioning is complete, it will result in propagating one or more users from Azure AD to IAM Identity Center. The following screenshot shows the users that were provisioned in IAM Identity Center.

the users that were provisioned in IAM Identity Center

Note that upon this SCIM provisioning, the users in QuickSight should be created using the Lambda function triggered by the event originated from IAM Identity Center. In this post, we create a user and group membership via the AWS CLI (Step 8).

Add and configure a QuickSight application in IAM Identity Center

In this step, we create a QuickSight application in IAM Identity Center. You also configure an IAM SAML provider, role, and policy for the application to work. Complete the following steps:

  1. On the IAM Identity Center console, on the Applications page, choose Add Application.
  2. For Pre-integrated application under Select an application, enter quicksight.
  3. Select Amazon QuickSight, then choose Next.
  4. Enter a name for Display name, such as Amazon QuickSight.
  5. Choose Download under IAM Identity Center SAML metadata file and save it in your computer.
  6. Leave all other fields as they are, and save the configuration.
  7. Open the application you’ve just created, then choose Assign Users.

The users provisioned via SCIM earlier will be listed.

  1. Choose all of the users to assign to the application.

Configure a SAML IdP and a SAML 2.0 federation IAM role

To set up your IAM SAML IdP for IAM Identity Center and IAM role, complete the following steps:

  1. On the IAM console, in the navigation pane, choose Identity providers, then choose Add provider.
  2. Choose SAML as Provider type, and enter Azure-IIC-QS as Provider name.
  3. Under Metadata document, choose Choose file and upload the metadata file you downloaded earlier.
  4. Choose Add provider to save the configuration.
  5. In the navigation pane, choose Roles, then choose Create role.
  6. For Trusted entity type, select SAML 2.0 federation.
  7. For Choose a SAML 2.0 provider, select the SAML provider that you created, then choose Allow programmatic and AWS Management Console access.
  8. Choose Next.
  9. On the Add Permission page, choose Next.

In this post, we create QuickSight users via an AWS CLI command, therefore we’re not creating any permission policy. However, if the self-provisioning feature in QuickSight is required, the permission policy for the CreateReader, CreateUser, and CreateAdmin actions (depending on the role of the QuickSight users) is required.

  1. On the Name, review, and create page, under Role details, enter qs-reader-azure for the role.
  2. Choose Create role.
  3. Note the ARN of the role.

You use the ARN to configure attributes in your IAM Identity Center application.

Configure attributes in the QuickSight application

To associate the IAM SAML IdP and IAM role to the QuickSight application in IAM Identity Center, complete the following steps:

  1. On the IAM Identity Center console, in the navigation pane, choose Applications.
  2. Select the Amazon QuickSight application, and on the Actions menu, choose Edit attribute mappings.
  3. Choose Add new attribute mapping.
  4. Configure the mappings in the following table.
User attribute in the application Maps to this string value or user attribute in IAM Identity Center
Subject ${user:email}
https://aws.amazon.com/SAML/Attributes/RoleSessionName ${user:email}
https://aws.amazon.com/SAML/Attributes/Role arn:aws:iam::<ACCOUNTID>:role/qs-reader-azure,arn:aws:iam::<ACCOUNTID>:saml-provider/Azure-IIC-QS
https://aws.amazon.com/SAML/Attributes/PrincipalTag:Email ${user:email}

Note the following values:

  • Replace <ACCOUNTID> with your AWS account ID.
  • PrincipalTag:Email is for the email syncing feature for self-provisioning users that need to be enabled on the QuickSight admin page. In this post, don’t enable this feature because we register the user with an AWS CLI command.
  1. Choose Save changes.

Create a user, group, and group membership with the AWS CLI

As described earlier, users and groups in QuickSight are being created manually in this solution. We create them via the following AWS CLI commands.

The first step is to create a user in QuickSight specifying the IAM role created earlier and email address registered in Azure AD. The second step is to create a group with the group name as combined attribute values from Azure AD for the user created in the first step. The third step is to add the user into the group created earlier; member-name indicates the user name created in QuickSight that is comprised of <IAM Role name>/<session name>. See the following code:

aws quicksight register-user \
--aws-account-id <ACCOUNTID> --namespace default \
--identity-type IAM --email <email registered in Azure AD> \
--user-role READER --iam-arn arn:aws:iam::<ACCOUNTID>:role/qs-reader-azure \
--session-name <email registered in Azure AD>

 aws quicksight create-group \
--aws-account-id <ACCOUNTID> --namespace default \
--group-name Marketing-Specialist

 aws quicksight create-group-membership \
--aws-account-id <ACCOUNTID> --namespace default \
--member-name qs-reader-azure/<email registered in Azure AD> \
–-group-name Marketing-Specialist

At this point, the end-to-end configuration of Azure AD, IAM Identity Center, IAM, and QuickSight is complete.

Verify the configuration by logging in to QuickSight from the IAM Identity Center portal

Now you’re ready to log in to QuickSight using the IdP-initiated SSO flow:

  1. Open a new private window in your browser.
  2. Log in to the IAM Identity Center portal (https://d-xxxxxxxxxx.awsapps.com/start).

You’re redirected to the Azure AD login prompt.

  1. Enter your Azure AD credentials.

You’re redirected back to the IAM Identity Center portal.

  1. In the IAM Identity Center portal, choose Amazon QuickSight.

IAM Identity Center portal, choose Amazon QuickSight

You’re automatically redirected to your QuickSight home.
automatically redirected to your QuickSight home

Part II: Automate group membership change upon SCIM events

In this section, we configure the update flow.

Understand the SCIM event and event pattern for EventBridge

When an Azure AD administrator makes any changes to the attributes on the particular user profile, the change will be synced with the user profile in IAM Identity Center via SCIM protocol, and the activity is recorded in an AWS CloudTrail event called UpdateUser by sso-directory.amazonaws.com (IAM Identity Center) as the event source. Similarly, the CreateUser event is recorded when a user is created on Azure AD, and the DisableUser event is for when a user is disabled.

The following screenshot on the  Event history page shows two CreateUser events: one is recorded by IAM Identity Center, and the other one is by QuickSight. In this post, we use the one from IAM Identity Center.

CloudTrail console

In order for EventBridge to be able to handle the flow properly, each event must specify the fields of an event that you want the event pattern to match. The following event pattern is an example of the UpdateUser event generated in IAM Identity Center upon SCIM synchronization:

{
  "source": ["aws.sso-directory"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["sso-directory.amazonaws.com"],
    "eventName": ["UpdateUser"]
  }
}

In this post, we demonstrate an automatic update of group membership in QuickSight that is triggered by the UpdateUser SCIM event.

Create attribute mapping for the group name

In order for the Lambda function to manage group membership in QuickSight, it must obtain the two user attributes (department and jobTitle). To make the process simpler, we’re combining two attributes in Azure AD (department, jobTitle) into one attribute in IAM Identity Center (title), using the attribute mappings feature in Azure AD. IAM Identity Center then uses the title attribute as a designated group name for this user.

  1. Log in to the Azure AD console, navigate to Enterprise Applications, IIC-QuickSight, and Provisioning.
  2. Choose Edit attribute mappings.
  3. Under Mappings, choose Provision Azure Active Directory Users.
    Azure AD console, Under mappings
  4. Choose jobTitle from the list of Azure Active Directory Attributes.
  5. Change the following settings:
    1. Mapping TypeExpression
    2. ExpressionJoin("-", [department], [jobTitle])
    3. Target attribute title
      update settings
  6. Choose Save.
  7. You can leave the provisioning page.

The attribute is automatically updated in IAM Identity Center. The updated user profile looks like the following screenshots (Azure AD on the left, IAM Identity Center on the right).

updated user profile
Job related information

Create a Lambda function

Now we create a Lambda function to update QuickSight group membership upon the SCIM event. The core part of the function is to obtain the user’s title attribute value in IAM Identity Center based on the triggered event information, and then to ensure that the user exists in QuickSight. If the group name doesn’t exist yet, it creates the group in QuickSight and then adds the user into the group. Complete the following steps:

  1. On the Lambda console, choose Create function.
  2. For Name, enter UpdateQuickSightUserUponSCIMEvent.
  3. For Runtime, choose Python 3.9.
  4. For Time Out, set to 15 seconds.
  5. For Permissions, create and attach an IAM role that includes the following permissions (the trusted entity (principal) should be lambda.amazonaws.com):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "MinimalPrivForScimQsBlog",
                "Effect": "Allow",
                "Action": [
                    "identitystore:DescribeUser",
                    "quicksight:RegisterUser",
                    "quicksight:DescribeUser",
                    "quicksight:CreateGroup",
                    "quicksight:DeleteGroup",
                    "quicksight:DescribeGroup",
                    "quicksight:ListUserGroups",
                    "quicksight:CreateGroupMembership",
                    "quicksight:DeleteGroupMembership",
                    "quicksight:DescribeGroupMembership",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": "*"
            }
        ]
    }

  6. Write Python code using the Boto3 SDK for IdentityStore and QuickSight. The following is the entire sample Python code:
import sys
import boto3
import json
import logging
from time import strftime
from datetime import datetime

# Set logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
  '''
  Modify QuickSight group membership upon SCIM event from IAM Identity Center originated from Azure AD.
  It works in this way:
    Azure AD -> SCIM -> Identity Center -> CloudTrail -> EventBridge -> Lambda -> QuickSight
  Note that this is a straightforward sample to show how to update QuickSight group membership upon certain SCIM event.
  For example, it assumes that 1:1 user-to-group assigmnent, only one (combined) SAML attribute, etc. 
  For production, take customer requirements into account and develop your own code.
  '''

  # Setting variables (hard-coded. get dynamically for production code)
  qs_namespace_name = 'default'
  qs_iam_role = 'qs-reader-azure'

  # Obtain account ID and region
  account_id = boto3.client('sts').get_caller_identity()['Account']
  region = boto3.session.Session().region_name

  # Setup clients
  qs = boto3.client('quicksight')
  iic = boto3.client('identitystore')

  # Check boto3 version
  logger.debug(f"## Your boto3 version: {boto3.__version__}")

  # Get user info from event data
  event_json = json.dumps(event)
  logger.debug(f"## Event: {event_json}")
  iic_store_id = event['detail']['requestParameters']['identityStoreId']
  iic_user_id = event['detail']['requestParameters']['userId']  # For UpdateUser event, userId is provided through requestParameters
  logger.info("## Getting user info from Identity Store.")
  try:
    res_iic_describe_user = iic.describe_user(
      IdentityStoreId = iic_store_id,
      UserId = iic_user_id
    )
  except Exception as e:
    logger.error("## Operation failed due to unknown error. Exiting.")
    logger.error(e)
    sys.exit()
  else:
    logger.info(f"## User info retrieval succeeded.")
    azure_user_attribute_title = res_iic_describe_user['Title']
    azure_user_attribute_userprincipalname = res_iic_describe_user['UserName']
    qs_user_name = qs_iam_role + "/" + azure_user_attribute_userprincipalname
    logger.info(f"#### Identity Center user name: {azure_user_attribute_userprincipalname}")
    logger.info(f"#### QuickSight group name desired: {azure_user_attribute_title}")
    logger.debug(f"#### res_iic_describe_user: {json.dumps(res_iic_describe_user)}, which is {type(res_iic_describe_user)}")

  # Exit if user is not present since this function is supposed to be called by UpdateUser event
  try:
    # Get QuickSight user name
    res_qs_describe_user = qs.describe_user(
      UserName = qs_user_name,
      AwsAccountId = account_id,
      Namespace = qs_namespace_name
    )
  except qs.exceptions.ResourceNotFoundException as e:
    logger.error(f"## User {qs_user_name} is not found in QuickSight.")
    logger.error(f"## Make sure the QuickSight user has been created in advance. Exiting.")
    logger.error(e)
    sys.exit()
  except Exception as e:
    logger.error("## Operation failed due to unknown error. Exiting.")
    logger.error(e)
    sys.exit()
  else:
    logger.info(f"## User {qs_user_name} is found in QuickSight.")

  # Remove current membership unless it's the desired one
  qs_new_group = azure_user_attribute_title  # Set "Title" SAML attribute as the desired QuickSight group name
  in_desired_group = False  # Set this flag True when the user is already a member of the desired group
  logger.info(f"## Starting group membership removal.")
  try:
    res_qs_list_user_groups = qs.list_user_groups(
      UserName = qs_user_name,
      AwsAccountId = account_id,
      Namespace = qs_namespace_name
    )
  except Exception as e:
    logger.error("## Operation failed due to unknown error. Exiting.")
    logger.error(e)
    sys.exit()
  else:
    # Skip if the array is empty (user is not member of any groups)
    if not res_qs_list_user_groups['GroupList']:
      logger.info(f"## User {qs_user_name} is not a member of any QuickSight group. Skipping removal.")
    else:
      for grp in res_qs_list_user_groups['GroupList']:
        qs_current_group = grp['GroupName']
        # Retain membership if the new and existing group names match
        if qs_current_group == qs_new_group:
          logger.info(f"## The user {qs_user_name} already belong to the desired group. Skipping removal.")
          in_desired_group = True
        else:
          # Remove all unnecessary memberships
          logger.info(f"## Removing user {qs_user_name} from existing group {qs_current_group}.")
          try:
            res_qs_delete_group_membership = qs.delete_group_membership(
              MemberName = qs_user_name,
              GroupName = qs_current_group,
              AwsAccountId = account_id,
              Namespace = qs_namespace_name
            )
          except Exception as e:
            logger.error(f"## Operation failed due to unknown error. Exiting.")
            logger.error(e)
            sys.exit()
          else:
            logger.info(f"## The user {qs_user_name} has removed from {qs_current_group}.")

  # Create group membership based on IIC attribute "Title"
  logger.info(f"## Starting group membership assignment.")
  if in_desired_group is True:
      logger.info(f"## The user already belongs to the desired one. Skipping assignment.")
  else:
    try:
      logger.info(f"## Checking if the desired group exists.")
      res_qs_describe_group = qs.describe_group(
        GroupName = qs_new_group,
        AwsAccountId = account_id,
        Namespace = qs_namespace_name
      )
    except qs.exceptions.ResourceNotFoundException as e:
      # Create a QuickSight group if not present
      logger.info(f"## Group {qs_new_group} is not present. Creating.")
      today = datetime.now()
      res_qs_create_group = qs.create_group(
        GroupName = qs_new_group,
        Description = 'Automatically created at ' + today.strftime('%Y.%m.%d %H:%M:%S'),
        AwsAccountId = account_id,
        Namespace = qs_namespace_name
      )
    except Exception as e:
      logger.error(f"## Operation failed due to unknown error. Exiting.")
      logger.error(e)
      sys.exit()
    else:
      logger.info(f"## Group {qs_new_group} is found in QuickSight.")

    # Add the user to the desired group
    logger.info("## Modifying group membership based on its latest attributes.")
    logger.info(f"#### QuickSight user name: {qs_user_name}")
    logger.info(f"#### QuickSight group name: {qs_new_group}")
    try: 
      res_qs_create_group_membership = qs.create_group_membership(
        MemberName = qs_user_name,
        GroupName = qs_new_group,
        AwsAccountId = account_id,
        Namespace = qs_namespace_name
    )
    except Exception as e:
      logger.error("## Operation failed due to unknown error. Exiting.")
      logger.error(e)
    else:
      logger.info("## Group membership modification succeeded.")
      qs_group_member_name = res_qs_create_group_membership['GroupMember']['MemberName']
      qs_group_member_arn = res_qs_create_group_membership['GroupMember']['Arn']
      logger.debug("## QuickSight group info:")
      logger.debug(f"#### qs_user_name: {qs_user_name}")
      logger.debug(f"#### qs_group_name: {qs_new_group}")
      logger.debug(f"#### qs_group_member_name: {qs_group_member_name}")
      logger.debug(f"#### qs_group_member_arn: {qs_group_member_arn}")
      logger.debug("## IIC info:")
      logger.debug(f"#### IIC user name: {azure_user_attribute_userprincipalname}")
      logger.debug(f"#### IIC user id: {iic_user_id}")
      logger.debug(f"#### Title: {azure_user_attribute_title}")
      logger.info(f"## User {qs_user_name} has been successfully added to the group {qs_new_group} in {qs_namespace_name} namespace.")
  
  # return response
  return {
    "namespaceName": qs_namespace_name,
    "userName": qs_user_name,
    "groupName": qs_new_group
  }

Note that this Lambda function requires Boto3 1.24.64 or later. If the Boto3 included in the Lambda runtime is older than this, use a Lambda layer to use the latest version of Boto3. For more details, refer to How do I resolve “unknown service”, “parameter validation failed”, and “object has no attribute” errors from a Python (Boto 3) Lambda function.

Add an EventBridge rule to trigger the event

To create an EventBridge rule to invoke the previously created Lambda function, complete the following steps:

  1. On the EventBridge console, create a new rule.
  2. For Name, enter updateQuickSightUponSCIMEvent.
  3. For Event pattern, enter the following code:
    {
      "source": ["aws.sso-directory"],
      "detail-type": ["AWS API Call via CloudTrail"],
      "detail": {
        "eventSource": ["sso-directory.amazonaws.com"],
        "eventName": ["UpdateUser"]
      }
    }

  4. For Targets, choose the Lambda function you created (UpdateQuickSightUserUponSCIMEvent).
  5. Enable the rule.

Verify the configuration by changing a user attribute value at Azure AD

Let’s modify a user’s attribute at Azure AD, and then check if the new group is created and that the user is added into the new one.

  1. Go back to the Azure AD console.
  2. From Manage, click Users.
  3. Choose one of the users you previously used to log in to QuickSight from the IAM Identity Center portal.
  4. Choose Edit properties, then edit the values for Job title and Department.
    Edit Properties
  5. Save the configuration.
  6. From Manage, choose Enterprise application, your application name, and Provisioning.
  7. Choose Stop provisioning and then Start provisioning in sequence.

In Azure AD, the SCIM provisioning interval is fixed to 40 minutes. To get immediate results, we manually stop and start the provisioning.

Provisioning status

  1. Navigate to the QuickSight console.
  2. On the drop-down user name menu, choose Manage QuickSight.
  3. Choose Manage groups.

Now you should find that the new group is created and the user is assigned to this group.

new group is created and the user is assigned to this group

Clean up

When you’re finished with the solution, clean up your environment to minimize cost impact. You may want to delete the following resources:

  • Lambda function
  • Lambda layer
  • IAM role for the Lambda function
  • CloudWatch log group for the Lambda function
  • EventBridge rule
  • QuickSight account
    • Note : There can only be one QuickSight account per AWS account. So your QuickSight account might already be used by other users in your organization. Delete the QuickSight account only if you explicitly set it up to follow this blog and are absolutely sure that it is not being used by any other users.
  • IAM Identity Center instance
  • IAM ID Provider configuration for Azure AD
  • Azure AD instance

Summary

This post provided step-by-step instructions to configure IAM Identity Center SCIM provisioning and SAML 2.0 federation from Azure AD for centralized management of QuickSight users. We also demonstrated automated group membership updates in QuickSight based on user attributes in Azure AD, by using SCIM events generated in IAM Identity Center and setting up automation with EventBridge and Lambda.

With this event-driven approach to provision users and groups in QuickSight, system administrators can have full flexibility in where the various different ways of user management could be expected depending on the organization. It also ensures the consistency of users and groups between QuickSight and Azure AD whenever a user accesses QuickSight.

We are looking forward to hearing any questions or feedback.


About the authors

Takeshi Nakatani is a Principal Bigdata Consultant on Professional Services team in Tokyo. He has 25 years of experience in IT industry, expertised in architecting data infrastructure. On his days off, he can be a rock drummer or a motorcyclyst.

Wakana Vilquin-Sakashita is Specialist Solution Architect for Amazon QuickSight. She works closely with customers to help making sense of the data through visualization. Previously Wakana worked for S&P Global  assisting customers to access data, insights and researches relevant for their business.

AWS Week in Review – March 20, 2023

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-week-in-review-march-20-2023/

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

A new week starts, and Spring is almost here! If you’re curious about AWS news from the previous seven days, I got you covered.

Last Week’s Launches
Here are the launches that got my attention last week:

Picture of an S3 bucket and AWS CEO Adam Selipsky.Amazon S3 – Last week there was AWS Pi Day 2023 celebrating 17 years of innovation since Amazon S3 was introduced on March 14, 2006. For the occasion, the team released many new capabilities:

Amazon Linux 2023 – Our new Linux-based operating system is now generally available. Sébastien’s post is full of tips and info.

Application Auto Scaling – Now can use arithmetic operations and mathematical functions to customize the metrics used with Target Tracking policies. You can use it to scale based on your own application-specific metrics. Read how it works with Amazon ECS services.

AWS Data Exchange for Amazon S3 is now generally available – You can now share and find data files directly from S3 buckets, without the need to create or manage copies of the data.

Amazon Neptune – Now offers a graph summary API to help understand important metadata about property graphs (PG) and resource description framework (RDF) graphs. Neptune added support for Slow Query Logs to help identify queries that need performance tuning.

Amazon OpenSearch Service – The team introduced security analytics that provides new threat monitoring, detection, and alerting features. The service now supports OpenSearch version 2.5 that adds several new features such as support for Point in Time Search and improvements to observability and geospatial functionality.

AWS Lake Formation and Apache Hive on Amazon EMR – Introduced fine-grained access controls that allow data administrators to define and enforce fine-grained table and column level security for customers accessing data via Apache Hive running on Amazon EMR.

Amazon EC2 M1 Mac Instances – You can now update guest environments to a specific or the latest macOS version without having to tear down and recreate the existing macOS environments.

AWS Chatbot – Now Integrates With Microsoft Teams to simplify the way you troubleshoot and operate your AWS resources.

Amazon GuardDuty RDS Protection for Amazon Aurora – Now generally available to help profile and monitor access activity to Aurora databases in your AWS account without impacting database performance

AWS Database Migration Service – Now supports validation to ensure that data is migrated accurately to S3 and can now generate an AWS Glue Data Catalog when migrating to S3.

AWS Backup – You can now back up and restore virtual machines running on VMware vSphere 8 and with multiple vNICs.

Amazon Kendra – There are new connectors to index documents and search for information across these new content: Confluence Server, Confluence Cloud, Microsoft SharePoint OnPrem, Microsoft SharePoint Cloud. This post shows how to use the Amazon Kendra connector for Microsoft Teams.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
A few more blog posts you might have missed:

Example of a geospatial query.Women founders Q&A – We’re talking to six women founders and leaders about how they’re making impacts in their communities, industries, and beyond.

What you missed at that 2023 IMAGINE: Nonprofit conference – Where hundreds of nonprofit leaders, technologists, and innovators gathered to learn and share how AWS can drive a positive impact for people and the planet.

Monitoring load balancers using Amazon CloudWatch anomaly detection alarms – The metrics emitted by load balancers provide crucial and unique insight into service health, service performance, and end-to-end network performance.

Extend geospatial queries in Amazon Athena with user-defined functions (UDFs) and AWS Lambda – Using a solution based on Uber’s Hexagonal Hierarchical Spatial Index (H3) to divide the globe into equally-sized hexagons.

How cities can use transport data to reduce pollution and increase safety – A guest post by Rikesh Shah, outgoing head of open innovation at Transport for London.

For AWS open-source news and updates, here’s the latest newsletter curated by Ricardo to bring you the most recent updates on open-source projects, posts, events, and more.

Upcoming AWS Events
Here are some opportunities to meet:

AWS Public Sector Day 2023 (March 21, London, UK) – An event dedicated to helping public sector organizations use technology to achieve more with less through the current challenging conditions.

Women in Tech at Skills Center Arlington (March 23, VA, USA) – Let’s celebrate the history and legacy of women in tech.

The AWS Summits season is warming up! You can sign up here to know when registration opens in your area.

That’s all from me for this week. Come back next Monday for another Week in Review!

Danilo

Accelerating revenue growth with real-time analytics: Poshmark’s journey

Post Syndicated from Mahesh Pasupuleti original https://aws.amazon.com/blogs/big-data/accelerating-revenue-growth-with-real-time-analytics-poshmarks-journey/

This post was co-written by Mahesh Pasupuleti and Gaurav Shah from Poshmark.

Poshmark is a leading social marketplace for new and secondhand styles for women, men, kids, pets, home, and more. By combining the human connection of physical shopping with the scale, ease, and selection benefits of Ecommerce, Poshmark makes buying and selling simple, social, and sustainable. Its community of more than 80 million registered users across the US, Canada, Australia, and India is driving a more sustainable future for the fashion industry.

An important goal to achieve for any organization is to grow the top line revenue. Top line revenue refers to the total value of sales of an organization’s services or products. The two main approaches organizations employ to increase revenue are to expand geographically to enter new markets and to increase market share within a market by improving customer experience (CX).

Improving CX is a well-known guideline to attract and retain customers and thereby increase the market share. In this post, we share how Poshmark improved CX and accelerated revenue growth by using a real-time analytics solution. We discuss how to create such a solution using Amazon Kinesis Data Streams, Amazon Managed Streaming for Kafka (Amazon MSK), Amazon Kinesis Data Analytics for Apache Flink; the design decisions that went into the architecture; and the observed business benefits by Poshmark.

High-level challenge: The need for real-time analytics

Previous efforts at Poshmark for improving CX through analytics were based on batch processing of analytics data and using it on a daily basis to improve CX. Although these batch analytics-based efforts were successful to some extent, they saw opportunities to improve the customer experience with real-time personalization and security guidance during the customer’s interaction with the Poshmark app. The customer insights gathered from the batch analytics couldn’t be paired with the current customer activities in real time due to the latencies involved in enriching the current activities with the knowledge gained through batch processes. Therefore, the opportunity to provide tailored offers or showcase products based on customers’ preference and behaviors in near-real time, which contributes to a much better customer experience, was missing. Similarly, the opportunity to catch fraud within a second, before checkout, was also missing.

To improve the customer experience, Poshmark decided to invest in building a real-time analytics platform to enable real-time capabilities, as explained further in this post. Poshmark engineers worked closely with AWS architects through the AWS Data Lab program. The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Design Lab is one half to two day engagement with customer team offering prescriptive guidance to arrive at the optimal solution architecture design before you embark on building the platform.

Designing the solution architecture through the AWS Data Lab process

The business and technical stakeholders from Poshmark and the AWS Data Lab architects discussed near-to-long-term business requirements along with the functional and non-functional capabilities required to decide on the architecture approach. They reviewed the current state architecture and constraints to understand data flow and technical integration points. The joint team discussed the pros and cons of various AWS services that already exist in Poshmark’s current architecture, as well as other AWS services that can meet the requirements.

Poshmark wanted to address the following business use cases via the real-time analytics platform:

  • Sessionization – Poshmark captures both server-side application events and client-side tracking events. They wanted to use these events to identify and analyze user sessions to track behavior.
  • Illegitimate sign-up and sign-in prevention – Poshmark wanted to detect and ban illegitimate sign-up or sign-in events from bots or non-human traffic in real time on the Poshmark application.
  • IP translation – The IP addresses present in events will be translated to city, state, and zip, and enriched with other information to implement near-real-time, location-aware services encompassing security-related functions as well as personalization functions.
  • Anonymization – Poshmark wanted to anonymize events and make the data available for internal users for querying in near-real time.
  • Personalized recommendations – User behavior based on clickstream events can be captured up to the last second before enriching it for personalization and sending it to the model to predict the recommendations.
  • Other use cases – Additional use cases relating to aggregations and machine learning (ML) inference use cases such as authorization to operate, listing spam detection, and avoiding account takeovers (ATOs), among others.

One common pattern identified for these use cases was the need for a central data enrichment pipeline to enrich incoming raw events before event data can be utilized for actual business processing. In the Design Lab, we focused on design for data enrichment pipelines aimed at enriching events with data from static files, dynamic data stores such as databases, APIs, or within the same event stream for the aforementioned streaming use cases. Later in this post, we cover the salient points discussed during the lab around design and architecture.

Batch analytics solution architecture

The following diagram shows the previous architecture at Poshmark. For brevity, only the flow pertaining to the real-time analytics platform is explained.

User interactions on Poshmark web and mobile applications generate server-side events. These events include add to cart, orders, transactions, and more on application servers, and the page view, clicks, and more on tracking servers. Fluentd with an Amazon Kinesis plugin is set up on both the application and tracking servers to send these events to Amazon Kinesis Data Streams. The Fluentd Kinesis plugin aggregates events before sending to Kinesis Data Streams. A single Kinesis data stream is currently set up to capture these events. A random partition key is configured in Fluentd for the events to allow even distribution of events across shards. The event data format is nested JSON. Poshmark maintains the same schema grammar at the first level of JSON for both server-side and client-side server events. The attributes at nested level can differ between server-side and client-side events.

Poshmark receives around 1 billion events per day (100 million per hour during peak hours, 10 million per hour during non-peak hours). The average size of the event record is 1.2 KB.

The data from the Kinesis data stream is consumed by two applications:

  • A Spark streaming application on Amazon EMR is used to write data from the Kinesis data stream to a data lake hosted on Amazon Simple Storage Service (Amazon S3) in a partitioned way. The data from the S3 data lake is used for batch processing and analytics through Amazon EMR and Amazon Redshift.
  • Druid hosted on Amazon Elastic Compute Cloud (Amazon EC2) integrates with the Kinesis data stream for streaming ingestion and allows users to run slice-and-dice OLAP queries. Operational dashboards are hosted on Grafana integrated with Druid.

Desired enhancements to the initial solution

The use cases discussed during the architecture sessions fall into one or more combinations of the following stream processing requirements:

  • Stateless event processing – For example, near-real-time anonymization.
  • External lookup – Looking up a value from external stores. For example, IP address, city, zip, state, or ID.
  • Stateful data processing – Accessing past events or aggregations or ML inferences.

To meet these requirements, the streaming platform is divided into two layers:

  • Central data enrichment – This layer runs enrichments commonly required by downstream streaming applications. This will help avoid replication of the same enrichment logic in each application and enable better operational maintenance. The enrichment should strive for per-record processing in most cases.
  • Specific streaming applications – This layer will house specific streaming applications with respect to use cases and utilize enriched data from the central data enrichment pipeline.

For central data enrichment, we made the following enhancements to the platform:

  • The total latency including ingestion and data enrichment was super critical and should be in the range of double-digit millisecond latency based on the overall latency budget of Poshmark to achieve real-time ML responses to events. The absolute lowest ingestion latency was achieved by Kafka, and the team decided to go with the managed version of Kafka, Amazon MSK.
  • Similarly, low-latency processing of data is also required, and appropriate framework should be considered accordingly.
  • Exactly-once delivery guarantees were required to avoid data duplication resulting in wrong calculations.
  • The enrichment source could be any source such as static files, databases, and APIs and latencies can vary between them. A number of server-side and client-side events are generated when a user interacts with a Poshmark application. As a result, the same information from the enrichment source is required to enrich each event. This frequently accessed information cached in a centralized cache will optimize fetch time.

Design decisions for the new solution

Poshmark made the following design decisions for central data enrichment:

  • Kafka can support double-digit millisecond latency from producer to consumer with appropriate performance tuning. Kafka can provide exactly-once semantics both at producers and consumer applications. AWS provides Kafka as part of its Amazon MSK offering, eliminating the operational overhead of maintaining and running Kafka cluster infrastructure on AWS, thereby allowing you to focus on developing and running Kafka-based applications. Poshmark decided to use Amazon MSK for their streaming ingestion and storage requirements.
  • We also decided to use Flink for streaming data enrichment applications for the following reasons:
    • Flink can provide low-latency processing even at higher throughput with exactly-once guarantees. Spark Structured Streaming on the other hand can provide low latency with low throughput due to microbatch-based processing. Spark Structured Streaming continuous processing is an experimental feature and provides at-least once guarantees.
    • The enrichment requests call to an external store if modeled in a map function (Spark’s map API or Flink’s MapFunction API) will make synchronous calls to the external store. The call will wait for a response from the external store before processing the next event, adding to delays and reducing overall throughput. The asynchronous interaction will allow sending requests and receiving responses concurrently from external stores. This will reduce wait time and improve overall throughput. Flink supports async I/O operators natively, allowing users to use asynchronous request clients with data streams. The API handles the integration with data streams, well as handling order, event time, fault tolerance, and more. Spark Structured Streaming doesn’t provide any such support natively and leaves it to users for custom implementation.
    • Poshmark selected Kinesis Data Analytics for Apache Flink to run the data enrichment application. Kinesis Data Analytics for Apache Flink provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots).
  • An enrichment microservice accompanying Amazon ElastiCache for Redis was set up to abstract access from data enrichment applications. The AsyncFunction in the Flink async I/O operator isn’t multi-threaded and won’t work in a truly asynchronous way if the call is blocked or waiting for a response. The enrichment microservice handles requests and responses asynchronously coming from Flink async I/O operators. The data is also cached in ElastiCache for Redis to improve the latency of the microservice.
  • The Poshmark ML applications are the consumers of this enriched data. The team has built and deployed different ML models over time. These models include a learning to rank algorithm, fraud detection, personalization and recommendations, and online spam filtering. Previously, for deploying each model into production, the Poshmark team had to go through a series of infrastructure setup steps that involved data extraction from real-time sources, building real-time aggregate features from streaming data, storing these features in a low-latency database (Redis) for sub-millisecond inferences, and finally performing inferences via Amazon SageMaker hosted endpoints.
  • We also designed an ML feature storage pipeline that consumes data from the enriched streaming sources (Kinesis or Kafka), generate single-level and aggregated-level features, and ingest these generated features into a feature store repository with a very low latency of less than 80 milliseconds.
  • The ML models are now able to extract the needed features with latency less than 10 milliseconds from the feature repository and perform real-time model inferencing.

Real-time analytics solution architecture

The following diagram illustrates the solution architecture for real-time analytics with Amazon MSK and Kinesis Data Analytics for Apache Flink.

The workflow is as follows:

  1. Users interact on Poshmark’s web or mobile application.
  2. Server-side events are captured on application servers and client-side events are captured on tracking servers. These events are written in the downstream MSK cluster.
  3. The raw events will be ingested into the MSK cluster using the Fluentd plugin to produce data for Kafka.
  4. The enrichment microservice consists of reactive (asynchronous) enrichment lookup APIs fetching data from persistent data stores. ElastiCache for Redis caches frequently accessed data, reducing fetch time for enrichment lookup APIs.
  5. The Flink application running on Kinesis Data Analytics for Apache Flink consumes raw events from Amazon MSK and runs data enrichment on a per-record basis. The Flink data enrichment application uses Flink’s async I/O to read external data from the enrichment lookup store for enriching stream events.
  6. Enriched events are written in the MSK cluster under different enriched events topics.
  7. The existing Spark streaming application consumes from the enriched events topic (or raw events topic) in Amazon MSK and writes the data into an S3 data lake.
  8. Druid streaming ingestion now reads from the enriched events topic or raw events topic in Amazon MSK depending on the requirements.

Enrichment of the captured event data

In this section, we discuss the different steps to enrich the captured event data.

Enrichment processing

Kinesis Data Analytics for Apache Flink provides the underlying infrastructure for the Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots). You can use the high-level Flink programming features (such as operators, functions, sources, and sinks) in the same way that you use them when hosting the Flink infrastructure yourself.

Flink on Amazon EMR gives the flexibility to choose your Flink version, installation, configuration, instances, and storage. However, you also have to take care of cluster management and operational requirements such as scaling, application backup, and provisioning.

Enrichment lookup store

The AsyncFunction in the Flink async I/O operator isn’t multi-threaded and won’t work in a truly asynchronous way if the call is blocked or waiting for a response. The enrichment lookup API should handle requests and responses asynchronously coming from Flink async I/O operators. The enrichment lookup API can be hosted on Amazon EC2 or containers such as Amazon Elastic Container Service (Amazon ECS) or Amazon Elastic Kubernetes Service (Amazon EKS).

A number of server-side and client-side events are generated when a user interacts with a Poshmark application. As a result, the same information is required to enrich each event. This frequently accessed information cached in a centralized cache can optimize fetch time. The latency to the centralized cache can be further reduced by hosting the client (enrichment lookup API) and cache server in the same Availability Zone.

Reconciliation in case of pipeline errors

The event enrichment can fail in data enrichment applications for various reasons, such as the external store timing out or missing information in the store. The enriched fields may or may not be critical for downstream streaming applications. You should build your downstream streaming applications considering that these failures can occur and implement a fallback mechanism, for example retrying on-demand enrichment from the application. The failure handling will also be governed by latency tolerance of the application.

The processing of data is based on event time. In some situations, data can arrive late in the platform. Both Flink and Spark allow lateness and watermarks for users to handle late-arriving data by defining thresholds. Late-arriving data beyond the threshold is discarded from processing. It’s possible to get this discarded too-late data in Flink using a side output. There is no such provision in Spark Structured Streaming.

A few streaming applications require their batch counterpart to reconcile data hourly or daily to handle data mismatch or data discrepancy due to late-arriving data or missing data.

Improved customer experience

The new real-time architecture offered the following benefits for an improved customer experience:

  • Anonymization – Poshmark is now able to provide and utilize real-time anonymized data for multiple functions both internally and externally because anonymization happens in real time.
  • Fraud mitigation – Poshmark was previously able to detect and prevent 45% of ATOs with the batch-based solution. With the real-time system, Poshmark is able to prevent 80% of ATOs.
  • Personalization – By providing personalized search results, Poshmark achieved an 8% improvement on clickthrough rates for search. This is a significant increase in the top of the funnel, increasing overall search conversions.

Improvement in these three factors helped end-customers gain confidence in the Poshmark app and website, which in turn enabled customers to increase their interaction with the app and helped accelerate customer engagement and growth.

Conclusion

In this post, we discussed the ingestion of real-time clickstream and log event data into Amazon MSK. We showed how enrichment of the captured data can be performed through Kinesis Data Analytics for Apache Flink. We broke up the enrichment processing into multiple components, such as Kinesis Data Analytics for Apache Flink, the enrichment microservices and the enrichment lookup store, and an enrichment cache. We discussed the downstream applications that used this enriched customer information to perform real-time security checks and offer personalized recommendations to end-users. We also discussed some of the areas that may need attention in case there are failures in the pipeline. Lastly, we showed how Poshmark improved their customer experience and gained market share by implementing this real-time analytics pipeline.


About the authors

Mahesh Pasupuleti is a VP of Data & Machine Learning Engineering at Poshmark. He has helped several startups succeed in different domains, including media streaming, healthcare, the financial sector, and marketplaces. He loves software engineering, building high performance teams, and strategy, and enjoys gardening and playing badminton in his free time.

Gaurav Shah is Director of Data Engineering and ML at Poshmark. He and his team help build data-driven solutions to drive growth at Poshmark.

Raghu Mannam is a Sr. Solutions Architect at AWS in San Francisco. He works closely with late-stage startups, many of which have had recent IPOs. His focus is end-to-end solutioning including security, DevOps automation, resilience, analytics, machine learning, and workload optimization in the cloud.

Deepesh Malviya is Solutions Architect Manager on the AWS Data Lab team. He and his team help customers architect and build data, analytics, and machine learning solutions to accelerate their key initiatives as part of the AWS Data Lab.

Account Security Analytics and Events: better visibility over all domains

Post Syndicated from Radwa Radwan original https://blog.cloudflare.com/account-security-analytics-and-events/

Account Security Analytics and Events: better visibility over all domains

Account Security Analytics and Events: better visibility over all domains

Cloudflare offers many security features like WAF, Bot management, DDoS, Zero Trust, and more! This suite of products are offered in the form of rules to give basic protection against common vulnerability attacks. These rules are usually configured and monitored per domain, which is very simple when we talk about one, two, maybe three domains (or what we call in Cloudflare’s terms, “zones”).

The zone-level overview sometimes is not time efficient

If you’re a Cloudflare customer with tens, hundreds, or even thousands of domains under your control, you’d spend hours going through these domains one by one, monitoring and configuring all security features. We know that’s a pain, especially for our Enterprise customers. That’s why last September we announced the Account WAF, where you can create one security rule and have it applied to the configuration of all your zones at once!

Account WAF makes it easy to deploy security configurations. Following the same philosophy, we want to empower our customers by providing visibility over these configurations, or even better, visibility on all HTTP traffic.

Today, Cloudflare is offering holistic views on the security suite by launching Account Security Analytics and Account Security Events. Now, across all your domains, you can monitor traffic, get insights quicker, and save hours of your time.

How do customers get visibility over security traffic today?

Before today, to view account analytics or events, customers either used to access each zone individually to check the events and analytics dashboards, or used zone GraphQL Analytics API or logs to collect data and send them to their preferred storage provider where they could collect, aggregate, and plot graphs to get insights for all zones under their account — in case ready-made dashboards were not provided.

Introducing Account Security Analytics and Events

Account Security Analytics and Events: better visibility over all domains

The new views are security focused, data-driven dashboards — similar to zone-level views, both have  similar data like: sampled logs and the top filters over many source dimensions (for example, IP addresses, Host, Country, ASN, etc.).

The main difference between them is that Account Security Events focuses on the current configurations on every zone you have, which makes reviewing mitigated requests (rule matches) easy. This step is essential in distinguishing between actual threats from false positives, along with maintaining optimal security configuration.

Part of the Security Events power is showing Events “by service” listing the security-related activity per security feature (for example, WAF, Firewall Rules, API Shield) and Events “by Action” (for example, allow, block, challenge).

On the other hand, Account Security Analytics view shows a wider angle with all HTTP traffic on all zones under the account, whether this traffic is mitigated, i.e., the security configurations took an action to prevent the request from reaching your zone, or not mitigated. This is essential in fine-tuning your security configuration, finding possible false negatives, or onboarding new zones.

The view also provides quick filters or insights of what we think are interesting cases worth exploring for ease of use. Many of the view components are similar to zone level Security Analytics that we introduced recently.

To get to know the components and how they interact, let’s have a look at an actual example.

Analytics walk-through when investigating a spike in traffic

Traffic spikes happen to many customers’ accounts; to investigate the reason behind them, and check what’s missing from the configurations, we recommend starting from Analytics as it shows mitigated and non-mitigated traffic, and to revise the mitigated requests to double check any false positives then Security Events is the go to place. That’s what we’ll do in this walk-through starting with the Analytics, finding a spike, and checking if we need further mitigation action.

Step 1: To navigate to the new views, sign into the Cloudflare dashboard and select the account you want to monitor. You will find Security Analytics and Security Events in the sidebar under Security Center.

Account Security Analytics and Events: better visibility over all domains

Step 2: In the Analytics dashboard, if you had a big spike in the traffic compared to the usual, there’s a big chance it’s a layer 7 DDoS attack. Once you spot one, zoom into the time interval in the graph.

Zooming into a traffic spike on the timeseries scale

By Expanding the top-Ns on top of the analytics page we can see here many observations:

Account Security Analytics and Events: better visibility over all domains

We can confirm it’s a DDoS attack as the peak of traffic does not come from one single IP address, It’s distributed over multiple source IPs. The “edge status code” indicates that there’s a rate limiting rule applied on this attack and it’s a GET method over HTTP/2.

Looking at the right hand side of the analytics we can see “Attack Analysis” indicating that these requests were clean from XSS, SQLi, and common RCE attacks. The Bot Analysis indicates it’s an automated traffic in the Bot Scores distribution; these two products add another layer of intelligence to the investigation process. We can easily deduce here that the attacker is sending clean requests through high volumetric attack from multiple IPs to take the web application down.

Account Security Analytics and Events: better visibility over all domains

Step 3: For this attack we can see we have rules in place to mitigate it, with the visibility we get the freedom to fine tune our configurations to have better security posture, if needed. we can filter on this attack fingerprint, for instance: add a filter on the referer `www.example.com` which is receiving big bulk of the attack requests, add filter on path equals `/`, HTTP method, query string, and a filter on the automated traffic with Bot score, we will see the following:

Account Security Analytics and Events: better visibility over all domains

Step 4: Jumping to Security Events to zoom in on our mitigation actions in this case, spike fingerprint is mitigated using two actions: Managed Challenge and Block.

Account Security Analytics and Events: better visibility over all domains

The mitigation happened on: Firewall rules and DDoS configurations, the exact rules are shown in the top events.

Account Security Analytics and Events: better visibility over all domains

Who gets the new views?

Starting this week all our customers on Enterprise plans will have access to Account Security Analytics and Security Events. We recommend having Account Bot Management, WAF Attack Score, and Account WAF to have access to the full visibility and actions.

What’s next?

The new Account Security Analytics and Events encompass metadata generated by the Cloudflare network for all domains in one place. In the upcoming period we will be providing a better experience to save our customers’ time in a simple way. We’re currently in beta, log into the dashboard, check out the views, and let us know your feedback.

Extend geospatial queries in Amazon Athena with UDFs and AWS Lambda

Post Syndicated from John Telford original https://aws.amazon.com/blogs/big-data/extend-geospatial-queries-in-amazon-athena-with-udfs-and-aws-lambda/

Amazon Athena is a serverless and interactive query service that allows you to easily analyze data in Amazon Simple Storage Service (Amazon S3) and 25-plus data sources, including on-premises data sources or other cloud systems using SQL or Python. Athena built-in capabilities include querying for geospatial data; for example, you can count the number of earthquakes in each Californian county. One disadvantage of analyzing at county-level is that it may give you a misleading impression of which parts of California have had the most earthquakes. This is because the counties aren’t equally sized; a county may have had more earthquakes simply because it’s a big county. What if we wanted a hierarchical system that allowed us to zoom in and out to aggregate data over different equally-sized geographic areas?

In this post, we present a solution that uses Uber’s Hexagonal Hierarchical Spatial Index (H3) to divide the globe into equally-sized hexagons. We then use an Athena user-defined function (UDF) to determine which hexagon each historical earthquake occurred in. Because the hexagons are equally-sized, this analysis gives a fair impression of where earthquakes tend to occur.

At the end, we’ll produce a visualization like the one below that shows the number of historical earthquakes in different areas of the western US.

H3 divides the globe into equal-sized regular hexagons. The number of hexagons depends on the chosen resolution, which may vary from 0 (122 hexagons, each with edge lengths of about 1,100 km) to 15 (569,707,381,193,162 hexagons, each with edge lengths of about 50 cm). H3 enables analysis at the area level, and each area has the same size and shape.

Solution overview

The solution extends Athena’s built-in geospatial capabilities by creating a UDF powered by AWS Lambda. Finally, we use an Amazon SageMaker notebook to run Athena queries that are rendered as a choropleth map. The following diagram illustrates this architecture.

The end-to-end architecture is as follows:

  1. A CSV file of historical earthquakes is uploaded into an S3 bucket.
  2. An AWS Glue external table is created based on the earthquake CSV.
  3. A Lambda function calculates H3 hexagons for parameters (latitude, longitude, resolution). The function is written in Java and can be called as a UDF using queries in Athena.
  4. A SageMaker notebook uses an AWS SDK for pandas package to run a SQL query in Athena, including the UDF.
  5. A Plotly Express package renders a choropleth map of the number of earthquakes in each hexagon.

Prerequisites

For this post, we use Athena to read data in Amazon S3 using the table defined in the AWS Glue Data Catalog associated with our earthquake dataset. In terms of permissions, there are two main requirements:

Configure Amazon S3

The first step is to create an S3 bucket to store the earthquake dataset, as follows:

  1. Download the CSV file of historical earthquakes from GitHub.
  2. On the Amazon S3 console, choose Buckets in the navigation pane.
  3. Choose Create bucket.
  4. For Bucket name, enter a globally unique name for your data bucket.
  5. Choose Create folder, and enter the folder name earthquakes.
  6. Upload the file to the S3 bucket. In this example, we upload the earthquakes.csv file to the earthquakes prefix.

Create a table in Athena

Navigate to Athena console to create a table. Complete the following steps:

  1. On the Athena console, choose Query editor.
  2. Select your preferred Workgroup using the drop-down menu.
  3. In the SQL editor, use the following code to create a table in the default database:
    CREATE external TABLE earthquakes
    (
      earthquake_date STRING,
      latitude DOUBLE,
      longitude DOUBLE,
      depth DOUBLE,
      magnitude DOUBLE,
      magtype STRING,
      mbstations STRING,
      gap STRING,
      distance STRING,
      rms STRING,
      source STRING,
      eventid STRING
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE LOCATION 's3://<MY-DATA-BUCKET>/earthquakes/';

Create a Lambda function for the Athena UDF

For a thorough explanation on how to build Athena UDFs, see Querying with user defined functions. We use Java 11 and Uber H3 Java binding to build the H3 UDF. We provide the implementation of the UDF on GitHub.

There are several options for deploying a UDF using Lambda. In this example, we use the AWS Management Console. For production deployments, you probably want to use infrastructure as code such as the AWS Cloud Development Kit (AWS CDK). For information about how to use the AWS CDK to deploy the Lambda function, refer to the project code repository. Another possible deployment option is using AWS Serverless Application Repository (SAR).

Deploy the UDF

Deploy the Uber H3 binding UDF using the console as follows:

  1. Go to binary directory in the GitHub repository, and download aws-h3-athena-udf-*.jar to your local desktop.
  2. Create a Lambda function called H3UDF with Runtime set to Java 11 (Corretto), and Architecture set to x86_64.
  3. Upload the aws-h3-athena-udf*.jar file.
  4. Change the handler name to com.aws.athena.udf.h3.H3AthenaHandler.
  5. In the General configuration section, choose Edit to set the memory of the Lambda function to 4096 MB, which is an amount of memory that works for our examples. You may need to set the memory size larger for your use cases.

Use the Lambda function as an Athena UDF

After you create the Lambda function, you’re ready to use it as a UDF. The following screenshot shows the function details.

You can now use the function as an Athena UDF. On the Athena console, run the following command:

USING EXTERNAL FUNCTION lat_lng_to_cell_address(lat DOUBLE, lng DOUBLE, res INTEGER)
RETURNS VARCHAR
LAMBDA '<MY-LAMBDA-ARN>'-- Replace with ARN of your Lambda function.
SELECT *,
       lat_lng_to_cell_address(latitude, longitude, 4) AS h3_cell
FROM earthquakes
WHERE latitude BETWEEN 18 AND 70;

The udf/examples folder in the GitHub repository includes more examples of the Athena queries.

Developing the UDFs

Now that we showed you how to deploy a UDF for Athena using Lambda, let’s dive deeper into how to develop these kinds of UDFs. As explained in Querying with user defined functions, in order to develop a UDF, we first need to implement a class that inherits UserDefinedFunctionHandler. Then we need to implement the functions inside the class that can be used as UDFs of Athena.

We begin the UDF implementation by defining a class H3AthenaHandler that inherits the UserDefinedFunctionHandler. Then we implement functions that act as wrappers of functions defined in the Uber H3 Java binding. We make sure that all the functions defined in the H3 Java binding API are mapped, so that they can be used in Athena as UDFs. For example, we map the lat_lng_to_cell_address function used in the preceding example to the latLngToCell of the H3 Java binding.

On top of the call to the Java binding, many of the functions in the H3AthenaHandler check whether the input parameter is null. The null check is useful because we don’t assume the input to be non-null. In practice, null values for an H3 index or address are not unusual.

The following code shows the implementation of the get_resolution function:

/** Returns the resolution of an index.
     *  @param h3 the H3 index.
     *  @return the resolution. Null when h3 is null.
     *  @throws IllegalArgumentException when index is out of range.
     */
    public Integer get_resolution(Long h3){
        final Integer result;
        if (h3 == null) {
            result = null;
        } else {
            result = h3Core.getResolution(h3);
        }
        return result;
    }

Some H3 API functions such as cellToLatLng return List<Double> of two elements, where the first element is the latitude and the second is longitude. The H3 UDF that we implement provides a function that returns well-known text (WKT) representation. For example, we provide cell_to_lat_lng_wkt, which returns a Point WKT string instead of List<Double>. We can then use the output of cell_to_lat_lng_wkt in combination with the built-in spatial Athena function ST_GeometryFromText as follows:

USING EXTERNAL FUNCTION cell_to_lat_lng_wkt(h3 BIGINT) 
RETURNS VARCHAR
LAMBDA '<MY-LAMBDA-ARN>'
SELECT ST_GeometryFromText(cell_to_lat_lng_wkt(622506764662964223))

Athena UDF only supports scalar data types and does not support nested types. However, some H3 APIs return nested types. For example, the polygonToCells function in H3 takes a List<List<List<GeoCoord>>>. Our implementation of polygon_to_cells UDF receives a Polygon WKT instead. The following shows an example Athena query using this UDF:

-- get all h3 hexagons that cover Toulouse, Nantes, Lille, Paris, Nice 
USING EXTERNAL FUNCTION polygon_to_cells(polygonWKT VARCHAR, res INT)
RETURNS ARRAY(BIGINT)
LAMBDA '<MY-LAMBDA-ARN>'
SELECT polygon_to_cells('POLYGON ((43.604652 1.444209, 47.218371 -1.553621, 50.62925 3.05726, 48.864716 2.349014, 43.6961 7.27178, 3.604652 1.444209))', 2)

Use SageMaker notebooks for visualization

A SageMaker notebook is a managed machine learning compute instance that runs a Jupyter notebook application. In this example, we will use a SageMaker notebook to write and run our code to visualize our results, but if your use case includes Apache Spark then using Amazon Athena for Apache Spark would be a great choice. For advice on security best practices for SageMaker, see Building secure machine learning environments with Amazon SageMaker. You can create your own SageMaker notebook by following these instructions:

  1. On the SageMaker console, choose Notebook in the navigation pane.
  2. Choose Notebook instances.
  3. Choose Create notebook instance.
  4. Enter a name for the notebook instance.
  5. Choose an existing IAM role or create a role that allows you to run SageMaker and grants access to Amazon S3 and Athena.
  6. Choose Create notebook instance.
  7. Wait for the notebook status to change from Creating to InService.
  8. Open the notebook instance by choosing Jupyter or JupyterLab.

Explore the data

We’re now ready to explore the data.

  1. On the Jupyter console, under New, choose Notebook.
  2. On the Select Kernel drop-down menu, choose conda_python3.
  3. Add new cells by choosing the plus sign.
  4. In your first cell, download the following Python modules that aren’t included in the standard SageMaker environment:
    !pip install geojson
    !pip install awswrangler
    !pip install geomet
    !pip install shapely

    GeoJSON is a popular format for storing spatial data in a JSON format. The geojson module allows you to easily read and write GeoJSON data with Python. The second module we install, awswrangler, is the AWS SDK for pandas. This is a very easy way to read data from various AWS data sources into Pandas data frames. We use it to read earthquake data from the Athena table.

  5. Next, we import all the packages that we use to import the data, reshape it, and visualize it:
    from geomet import wkt
    import plotly.express as px
    from shapely.geometry import Polygon, mapping
    import awswrangler as wr
    import pandas as pd
    from shapely.wkt import loads
    import geojson
    import ast

  6. We begin importing our data using the athena.read_sql._query function in AWS SDK for pandas. The Athena query has a subquery that uses the UDF to add a column h3_cell to each row in the earthquakes table, based on the latitude and longitude of the earthquake. The analytic function COUNT is then used to find out the number of earthquakes in each H3 cell. For this visualization, we’re only interested in earthquakes within the US, so we filter out rows in the data frame that are outside the area of interest:
    def run_query(lambda_arn, db, resolution):
        query = f"""USING EXTERNAL FUNCTION cell_to_boundary_wkt(cell VARCHAR)
                        RETURNS ARRAY(VARCHAR)
                        LAMBDA '{lambda_arn}'
                           SELECT h3_cell, cell_to_boundary_wkt(h3_cell) as boundary, quake_count FROM(
                            USING EXTERNAL FUNCTION lat_lng_to_cell_address(lat DOUBLE, lng DOUBLE, res INTEGER)
                             RETURNS VARCHAR
                            LAMBDA '{lambda_arn}'
                        SELECT h3_cell, COUNT(*) AS quake_count
                          FROM
                            (SELECT *,
                               lat_lng_to_cell_address(latitude, longitude, {resolution}) AS h3_cell
                             FROM earthquakes
                             WHERE latitude BETWEEN 18 AND 70        -- For this visualisation, we're only interested in earthquakes within the USA.
                               AND longitude BETWEEN -175 AND -50
                             )
                           GROUP BY h3_cell ORDER BY quake_count DESC) cell_quake_count"""
        return wr.athena.read_sql_query(query, database=db)
    
    lambda_arn = '<MY-LAMBDA-ARN>' # Replace with ARN of your lambda.
    db_name = '<MY-DATABASE-NAME>' # Replace with name of your Glue database.
    earthquakes_df = run_query(lambda_arn=lambda_arn,db=db_name, resolution=4)
    earthquakes_df.head()

    The following screenshot shows our results.

Follow along with the rest of the steps in our Jupyter notebook to see how we analyze and visualize our example with H3 UDF data.

Visualize the results

To visualize our results, we use the Plotly Express module to create a choropleth map of our data. A choropleth map is a type of visualization that is shaded based on quantitative values. This is a great visualization for our use case because we’re shading different regions based on the frequency of earthquakes.

In the resulting visual, we can see the ranges of frequency of earthquakes in different areas of North America. Note, the H3 resolution in this map is lower than in the earlier map, which makes each hexagon cover a larger area of the globe.

Clean up

To avoid incurring extra charges on your account, delete the resources you created:

  1. On the SageMaker console, select the notebook and on the Actions menu, choose Stop.
  2. Wait for the status of the notebook to change to Stopped, then select the notebook again and on the Actions menu, choose Delete.
  3. On the Amazon S3 console, select the bucket you created and choose Empty.
  4. Enter the bucket name and choose Empty.
  5. Select the bucket again and choose Delete.
  6. Enter the bucket name and choose Delete bucket.
  7. On the Lambda console, select the function name and on the Actions menu, choose Delete.

Conclusion

In this post, you saw how to extend functions in Athena for geospatial analysis by adding your own user-defined function. Although we used Uber’s H3 geospatial index in this demonstration, you can bring your own geospatial index for your own custom geospatial analysis.

In this post, we used Athena, Lambda, and SageMaker notebooks to visualize the results of our UDFs in the western US. Code examples are in the h3-udf-for-athena GitHub repo.

As a next step, you can modify the code in this post and customize it for your own needs to gain further insights from your own geographical data. For example, you could visualize other cases such as droughts, flooding, and deforestation.


About the Authors

John Telford is a Senior Consultant at Amazon Web Services. He is a specialist in big data and data warehouses. John has a Computer Science degree from Brunel University.

Anwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Pauline Ting is a Data Scientist in the AWS Professional Services team. She supports customers in achieving and accelerating their business outcome by developing sustainable AI/ML solutions. In her spare time, Pauline enjoys traveling, surfing, and trying new dessert places.