Tag Archives: Analytics

Orchestrate an end-to-end ETL pipeline using Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA

Post Syndicated from Radhika Jakkula original https://aws.amazon.com/blogs/big-data/orchestrate-an-end-to-end-etl-pipeline-using-amazon-s3-aws-glue-and-amazon-redshift-serverless-with-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.

By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.

This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Solution overview

For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.

Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.

Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.

The workflow consists of the following components:

  • The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
  • In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
  • Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
  • VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
  • Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
  • The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA.

Prerequisites

Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.

Deploy resources in Account A using AWS CloudFormation

In Account A, launch the provided AWS CloudFormation stack to create the following resources:

  • The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
  • A sample dataset called products.csv, which we use in this post.

Upload the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.

Deploy resources in Account B using AWS CloudFormation

In Account B, launch the provided CloudFormation stack template to create the following resources:

  • The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
    • dags – The folder for DAG files.
    • plugins – The file for any custom or community Airflow plugins.
    • requirements – The requirements.txt file for any Python packages.
    • scripts – Any SQL scripts used in the DAG.
    • data – Any datasets used in the DAG.
  • A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
  • An AWS Glue environment, which contains the following:
    • An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
    • A database called products_db in the AWS Glue Data Catalog.
    • An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift resources

Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.

In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.

Configure Airflow permissions

After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.

Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment.

The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.

Set up the environment

This section outlines the steps to configure the environment. The process involves the following high-level steps:

  1. Update any necessary providers.
  2. Set up cross-account access.
  3. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets Manager to integrate with Amazon MWAA.
  5. Define Airflow connections.

Update the providers

Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).

Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.

Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.

The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.

  1. Specify the requirements as follows:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Update the version in the constraints file to 8.4.0 or higher.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version.

  1. Navigate to the Amazon MWAA environment and choose Edit.
  2. Under DAG code in Amazon S3, for Requirements file, choose the latest version.
  3. Choose Save.

This will update the environment and new providers will be in effect.

  1. To verify the providers version, go to Providers under the Admin table.

The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.

Set up cross-account access

You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:

  1. In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this policy to the AWS Glue role and Amazon MWAA role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
  9. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.

Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA.

Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs

Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.

Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.

If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.

Configure the Amazon MWAA connection with Secrets Manager

When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.

Complete the following steps:

  1. Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

This allows Amazon MWAA to access credentials stored in Secrets Manager.

  1. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
  2. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.

This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
  2. Run the following code to generate the connection URI string:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

The connection string should be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>
  1. Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).

This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.

You can also add secrets using the Secrets Manager console as key-value pairs.

  1. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.

Create an Airflow connection through the metadata database

You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.

  1. For Connection Id, enter a name for the connection.
  2. For Connection Type, choose Amazon Redshift.
  3. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For User, enter your admin user name.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Extra, set the region and timeout parameters.
  9. Test the connection, then save your settings.

Create and run a DAG

In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.

Create a DAG

In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:

  • The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
    • For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
    • We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
  • After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
  • The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
  • When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
  • The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
  • You can connect to Amazon Redshift from Airflow using three different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
    • RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.

Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.

In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.

  • As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
  • TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
  • The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.

The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Verify the DAG run

After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.

In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.

Verify the results

On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Clean up

Clean up the resources created as part of this post to avoid incurring ongoing charges:

  1. Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
  2. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.

Conclusion

With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.


About the Authors


Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.

How Salesforce optimized their detection and response platform using AWS managed services

Post Syndicated from Atul Khare original https://aws.amazon.com/blogs/big-data/how-salesforce-optimized-their-detection-and-response-platform-using-aws-managed-services/

This is a guest blog post co-authored with Atul Khare and Bhupender Panwar from Salesforce.

Headquartered in San Francisco, Salesforce, Inc. is a cloud-based customer relationship management (CRM) software company building artificial intelligence (AI)-powered business applications that allow businesses to connect with their customers in new and personalized ways.

The Salesforce Trust Intelligence Platform (TIP) log platform team is responsible for data pipeline and data lake infrastructure, providing log ingestion, normalization, persistence, search, and detection capability to ensure Salesforce is safe from threat actors. It runs miscellaneous services to facilitate investigation, mitigation, and containment for security operations. The TIP team is critical to securing Salesforce’s infrastructure, detecting malicious threat activities, and providing timely responses to security events. This is achieved by collecting and inspecting petabytes of security logs across dozens of organizations, some with thousands of accounts.

In this post, we discuss how the Salesforce TIP team optimized their architecture using Amazon Web Services (AWS) managed services to achieve better scalability, cost, and operational efficiency.

TIP existing architecture bird’s eye view and scale of the platform

The main key performance indicator (KPI) for the TIP platform is its capability to ingest a high volume of security logs from a variety of Salesforce internal systems in real time and process them with high velocity. The platform ingests more than 1 PB of data per day, more than 10 million events per second, and more than 200 different log types. The platform ingests log files in JSON, text, and Common Event Format (CEF) formats.

The message bus in TIP’s existing architecture mainly uses Apache Kafka for ingesting different log types coming from the upstream systems. Kafka had a single topic for all the log types before they were consumed by different downstream applications including Splunk, Streaming Search, and Log Normalizer. The Normalized Parquet Logs are stored in an Amazon Simple Storage Service (Amazon S3) data lake and cataloged into Hive Metastore (HMS) on an Amazon Relational Database Service (Amazon RDS) instance based on S3 event notifications. The data lake consumers then use Apache Presto running on Amazon EMR cluster to perform one-time queries. Other teams including the Data Science and Machine Learning teams use the platform to detect, analyze, and control security threats.

Challenges with the existing TIP log platform architecture

Some of the main challenges that TIP’s existing architecture was facing include:

  • Heavy operational overhead and maintenance cost managing the Kafka cluster
  • High cost to serve (CTS) to meet growing business needs
  • Compute threads limited by partitions’ numbers
  • Difficult to scale out when traffic increases
  • Weekly patching creates lags
  • Challenges with HMS scalability

All these challenges motivated the TIP team to embark on a journey to create a more optimized platform that’s easier to scale with less operational overhead and lower CTS.

New TIP log platform architecture

The Salesforce TIP log platform engineering team, in collaboration with AWS, started building the new architecture to replace the Kafka-based message bus solution with the fully managed AWS messaging and notification solutions Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Notification Service (Amazon SNS). In the new design, the upstream systems send their logs to a central Amazon S3 storage location, which invokes a process to partition the logs and store them in an S3 data lake. Consumer applications such as Splunk get the messages delivered to their system using Amazon SQS. Similarly, the partitioned log data through Amazon SQS events initializes a log normalization process that delivers the normalized log data to open source Delta Lake tables on an S3 data lake. One of the major changes in the new architecture is the use of an AWS Glue Data Catalog to replace the previous Hive Metastore. The one-time analysis applications use Apache Trino on an Amazon EMR cluster to query the Delta Tables cataloged in AWS Glue. Other consumer applications also read the data from S3 data lake files stored in Delta Table format. More details on some of the important processes are as follows:

Log partitioner (Spark structured stream)

This service ingests logs from the Amazon S3 SNS SQS-based store and stores them in the partitioned (by log types) format in S3 for further downstream consumptions from the Amazon SNS SQS subscription. This is the bronze layer of the TIP data lake.

Log normalizer (Spark structured stream)

One of the downstream consumers of log partitioner (Splunk Ingestor is another one), the log normalizer ingests the data from Partitioned Output S3, using Amazon SNS SQS notifications, and enriches them using Salesforce custom parsers and tags. Finally, this enriched data is landed in the data lake on S3. This is the silver layer of the TIP data lake.

Machine learning and other data analytics consumers (Trino, Flink, and Spark Jobs)

These consumers consume from the silver layer of the TIP data lake and run analytics for security detection use cases. The earlier Kafka interface is now converted to delta streams ingestion, which concludes the total removal of the Kafka bus from the TIP data pipeline.

Advantages of the new TIP log platform architecture

The main advantages realized by the Salesforce TIP team based on this new architecture using Amazon S3, Amazon SNS, and Amazon SQS include:

  • Cost savings of approximately $400 thousand per month
  • Auto scaling to meet growing business needs
  • Zero DevOps maintenance overhead
  • No mapping of partitions to compute threads
  • Compute resources can be scaled up and down independently
  • Fully managed Data Catalog to reduce the operational overhead of managing HMS

Summary

In this blog post we discussed how the Salesforce Trust Intelligence Platform (TIP) optimized their data pipeline by replacing the Kafka-based message bus solution with fully managed AWS messaging and notification solutions using Amazon SQS and Amazon SNS. Salesforce and AWS teams worked together to make sure this new platform seamlessly scales to ingest more than 1 PB of data per day, more than 10 millions events per second, and more than 200 different log types. Reach out to your AWS account team if you have similar use cases and you need help architecting your platform to achieve operational efficiencies and scale.


About the authors

Atul Khare is a Director of Engineering at Salesforce Security, where he spearheads the Security Log Platform and Data Lakehouse initiatives. He supports diverse security customers by building robust big data ETL pipeline that is elastic, resilient, and easy to use, providing uniform & consistent security datasets for threat detection and response operations, AI, forensic analysis, analytics, and compliance needs across all Salesforce clouds. Beyond his professional endeavors, Atul enjoys performing music with his band to raise funds for local charities.

Bhupender Panwar is a Big Data Architect at Salesforce and seasoned advocate for big data and cloud computing. His background encompasses the development of data-intensive applications and pipelines, solving intricate architectural and scalability challenges, and extracting valuable insights from extensive datasets within the technology industry. Outside of his big data work, Bhupender loves to hike, bike, enjoy travel and is a great foodie.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Vikas Panghal is the Principal Product Manager leading the product management team for Amazon SNS and Amazon SQS. He has deep expertise in event-driven and messaging applications and brings a wealth of knowledge and experience to his role, shaping the future of messaging services. He is passionate about helping customers build highly scalable, fault-tolerant, and loosely coupled systems. Outside of work, he enjoys spending time with his family outdoors, playing chess, and running.

Amazon OpenSearch Service Under the Hood : OpenSearch Optimized Instances(OR1)

Post Syndicated from Bukhtawar Khan original https://aws.amazon.com/blogs/big-data/amazon-opensearch-service-under-the-hood-opensearch-optimized-instancesor1/

Amazon OpenSearch Service recently introduced the OpenSearch Optimized Instance family (OR1), which delivers up to 30% price-performance improvement over existing memory optimized instances in internal benchmarks, and uses Amazon Simple Storage Service (Amazon S3) to provide 11 9s of durability. With this new instance family, OpenSearch Service uses OpenSearch innovation and AWS technologies to reimagine how data is indexed and stored in the cloud.

Today, customers widely use OpenSearch Service for operational analytics because of its ability to ingest high volumes of data while also providing rich and interactive analytics. In order to provide these benefits, OpenSearch is designed as a high-scale distributed system with multiple independent instances indexing data and processing requests. As your operational analytics data velocity and volume of data grows, bottlenecks may emerge. To sustainably support high indexing volume and provide durability, we built the OR1 instance family.

In this post, we discuss how the reimagined data flow works with OR1 instances and how it can provide high indexing throughput and durability using a new physical replication protocol. We also dive deep into some of the challenges we solved to maintain correctness and data integrity.

Designing for high throughput with 11 9s of durability

OpenSearch Service manages tens of thousands of OpenSearch clusters. We’ve gained insights into typical cluster configurations that customers use to meet high throughput and durability goals. To achieve higher throughput, customers often choose to drop replica copies to save on the replication latency; however, this configuration results in sacrificing availability and durability. Other customers require high durability and as a result need to maintain multiple replica copies, resulting in higher operating costs for them.

The OpenSearch Optimized Instance family provides additional durability while also keeping costs lower by storing a copy of the data on Amazon S3. With OR1 instances, you can configure multiple replica copies for high read availability while maintaining indexing throughput.
The following diagram illustrates an indexing flow involving a metadata update in OR1

Indexing Request Flow in OR1

During indexing operations, individual documents are indexed into Lucene and also appended to a write-ahead log also known as a translog. Before sending back an acknowledgement to the client, all translog operations are persisted to the remote data store backed by Amazon S3. If any replica copies are configured, the primary copy performs checks to detect the possibility of multiple writers (control flow) on all replica copies for correctness reasons.
The following diagram illustrates the segment generation and replication flow in OR1 instances

Replication Flow in OR1

Periodically, as new segment files are created, the OR1 copy those segments to Amazon S3. When the transfer is complete, the primary publishes new checkpoints to all replica copies, notifying them of a new segment being available for download. The replica copies subsequently download newer segments and make them searchable. This model decouples the data flow that happens using Amazon S3 and the control flow (checkpoint publication and term validation) that happens over inter-node transport communication.

The following diagram illustrates the recovery flow in OR1 instances

Recovery Flow in OR1

OR1 instances persist not only the data, but the cluster metadata like index mappings, templates, and settings in Amazon S3. This makes sure that in the event of a cluster-manager quorum loss, which is a common failure mode in non-dedicated cluster-manager setups, OpenSearch can reliably recover the last acknowledged metadata.

In the event of an infrastructure failure, an OpenSearch domain can end up losing one or more nodes. In such an event, the new instance family guarantees recovery of both the cluster metadata and the index data up to the latest acknowledged operation. As new replacement nodes join the cluster, the internal cluster recovery mechanism bootstraps the new set of nodes and then recovers the latest cluster metadata from the remote cluster metadata store. After the cluster metadata is recovered, the recovery mechanism starts to hydrate the missing segment data and translog from Amazon S3. Then all uncommitted translog operations, up to the last acknowledged operation, are replayed to reinstate the lost copy.

The new design doesn’t modify the way searches work. Queries are processed normally by either the primary or replica shard for each shard in the index. You may see longer delays (in the 10-second range) before all copies are consistent to a particular point in time because the data replication is using Amazon S3.

A key advantage of this architecture is that it serves as a foundational building block for future innovations, like separation of readers and writers, and helps segregate compute and storage layers.

How redefining the replication strategy boosts the indexing throughput

OpenSearch supports two replication strategies: logical (document) and physical (segment) replication. In the case of logical replication, the data is indexed on all the copies independently, leading to redundant computation on the cluster. The OR1 instances use the new physical replication model, where data is indexed only on the primary copy and additional copies are created by copying data from the primary. With a high number of replica copies, the node hosting the primary copy requires significant network bandwidth, replicating the segment to all the copies. The new OR1 instances solve this problem by durably persisting the segment to Amazon S3, which is configured as a remote storage option. They also help with scaling replicas without bottlenecking on primary.

After the segments are uploaded to Amazon S3, the primary sends out a checkpoint request, notifying all replicas to download the new segments. The replica copies then need to download the incremental segments. Because this process frees up compute resources on replicas, which is otherwise required to redundantly index data and network overhead incurred on primaries to replicate data, the cluster is able to churn more throughput. In the event the replicas aren’t able to process the newly created segments, due to overload or slow network paths, the replicas beyond a point are marked as failed to prevent them from returning stale results.

Why high durability is a good idea, but hard to do well

Although all committed segments are durably persisted to Amazon S3 whenever they get created, one of key challenges in achieving high durability is synchronously writing all uncommitted operations to a write-ahead log on Amazon S3, before acknowledging back the request to the client, without sacrificing throughput. The new semantics introduce additional network latency for individual requests, but the way we’ve made sure there is no impact to throughput is by batching and draining requests on a single thread for up to a specified interval, while making sure other threads continue to index requests. As a result, you can drive higher throughput with more concurrent client connections by optimally batching your bulk payloads.

Other challenges in designing a highly durable system include enforcing data integrity and correctness at all times. Although some events like network partitions are rare, they can break the correctness of the system and therefore the system needs to be prepared to deal with these failure modes. Therefore, while switching to the new segment replication protocol, we also introduced a few other protocol changes, like detecting multiple writers on each replica. The protocol makes sure that an isolated writer can’t acknowledge a write request, while another newly promoted primary, based on the cluster-manager quorum, is concurrently accepting newer writes.

The new instance family automatically detects the loss of a primary shard while recovering data, and performs extensive checks on network reachability before the data can be re-hydrated from Amazon S3 and the cluster is brought back to a healthy state.

For data integrity, all files are extensively checksummed to make sure we are able to detect and prevent network or file system corruption that may result in data being unreadable. Furthermore, all files including metadata are designed to be immutable, providing additional safety against corruptions and versioned to prevent accidental mutating changes.

Reimagining how data flows

The OR1 instances hydrate copies directly from Amazon S3 in order to perform recovery of lost shards during an infrastructure failure. By using Amazon S3, we are able to free up the primary node’s network bandwidth, disk throughput, and compute, and therefore provide a more seamless in-place scaling and blue/green deployment experience by orchestrating the entire process with minimal primary node coordination.

OpenSearch Service provides automatic data backups called snapshots at hourly intervals, which means in case of accidental modifications to data, you have the option to go back to a previous point in time state. However, with the new OpenSearch instance family, we’ve discussed that the data is already durably persisted on Amazon S3. So how do snapshots work when we already have the data present on Amazon S3?

With the new instance family, snapshots serve as checkpoints, referencing the already present segment data as it exists at a point in time. This makes snapshots more lightweight and faster because they don’t need to re-upload any additional data. Instead, they upload metadata files that capture the view of the segments at that point in time, which we call shallow snapshots. The benefit of shallow snapshots extends to all operations, namely creation, deletion, and cloning of snapshots. You still have the option to snapshot an independent copy with manual snapshots for other administrative operations.

Summary

OpenSearch is an open source, community-driven software. Most of the foundational changes including the replication model, remote-backed storage, and remote cluster metadata have been contributed to open source; in fact, we follow an open source first development model.

Efforts to improve throughput and reliability is a never-ending cycle as we continue to learn and improve. The new OpenSearch optimized instances serve as a foundational building block, paving the way for future innovations. We are excited to continue our efforts in improving reliability and performance and to see what new and existing solutions builders can create using OpenSearch Service. We hope this leads to a deeper understanding of the new OpenSearch instance family, how this offering achieves high durability and better throughput, and how it can help you configure clusters based on the needs of your business.

If you’re excited to contribute to OpenSearch, open up a GitHub issue and let us know your thoughts. We would also love to hear about your success stories achieving high throughput and durability on OpenSearch Service. If you have other questions, please leave a comment.


About the Authors

Bukhtawar Khan is a Principal Engineer working on Amazon OpenSearch Service. He is interested in building distributed and autonomous systems. He is a maintainer and an active contributor to OpenSearch.

Gaurav Bafna is a Senior Software Engineer working on OpenSearch at Amazon Web Services. He is fascinated about solving problems in distributed systems. He is a maintainer and an active contributor to OpenSearch.

Sachin Kale is a senior software development engineer at AWS working on OpenSearch.

Rohin Bhargava is a Sr. Product Manager with the Amazon OpenSearch Service team. His passion at AWS is to help customers find the correct mix of AWS services to achieve success for their business goals.

Ranjith Ramachandra is a Senior Engineering Manager working on Amazon OpenSearch Service. He is passionate about highly scalable distributed systems, high performance and resilient systems.

Uplevel your data architecture with real- time streaming using Amazon Data Firehose and Snowflake

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/uplevel-your-data-architecture-with-real-time-streaming-using-amazon-data-firehose-and-snowflake/

Today’s fast-paced world demands timely insights and decisions, which is driving the importance of streaming data. Streaming data refers to data that is continuously generated from a variety of sources. The sources of this data, such as clickstream events, change data capture (CDC), application and service logs, and Internet of Things (IoT) data streams are proliferating. Snowflake offers two options to bring streaming data into its platform: Snowpipe and Snowflake Snowpipe Streaming. Snowpipe is suitable for file ingestion (batching) use cases, such as loading large files from Amazon Simple Storage Service (Amazon S3) to Snowflake. Snowpipe Streaming, a newer feature released in March 2023, is suitable for rowset ingestion (streaming) use cases, such as loading a continuous stream of data from Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Before Snowpipe Streaming, AWS customers used Snowpipe for both use cases: file ingestion and rowset ingestion. First, you ingested streaming data to Kinesis Data Streams or Amazon MSK, then used Amazon Data Firehose to aggregate and write streams to Amazon S3, followed by using Snowpipe to load the data into Snowflake. However, this multi-step process can result in delays of up to an hour before data is available for analysis in Snowflake. Moreover, it’s expensive, especially when you have small files that Snowpipe has to upload to the Snowflake customer cluster.

To solve this issue, Amazon Data Firehose now integrates with Snowpipe Streaming, enabling you to capture, transform, and deliver data streams from Kinesis Data Streams, Amazon MSK, and Firehose Direct PUT to Snowflake in seconds at a low cost. With a few clicks on the Amazon Data Firehose console, you can set up a Firehose stream to deliver data to Snowflake. There are no commitments or upfront investments to use Amazon Data Firehose, and you only pay for the amount of data streamed.

Some key features of Amazon Data Firehose include:

  • Fully managed serverless service – You don’t need to manage resources, and Amazon Data Firehose automatically scales to match the throughput of your data source without ongoing administration.
  • Straightforward to use with no code – You don’t need to write applications.
  • Real-time data delivery – You can get data to your destinations quickly and efficiently in seconds.
  • Integration with over 20 AWS services – Seamless integration is available for many AWS services, such as Kinesis Data Streams, Amazon MSK, Amazon VPC Flow Logs, AWS WAF logs, Amazon CloudWatch Logs, Amazon EventBridge, AWS IoT Core, and more.
  • Pay-as-you-go model – You only pay for the data volume that Amazon Data Firehose processes.
  • Connectivity – Amazon Data Firehose can connect to public or private subnets in your VPC.

This post explains how you can bring streaming data from AWS into Snowflake within seconds to perform advanced analytics. We explore common architectures and illustrate how to set up a low-code, serverless, cost-effective solution for low-latency data streaming.

Overview of solution

The following are the steps to implement the solution to stream data from AWS to Snowflake:

  1. Create a Snowflake database, schema, and table.
  2. Create a Kinesis data stream.
  3. Create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination using a secure private link.
  4. To test the setup, generate sample stream data from the Amazon Kinesis Data Generator (KDG) with the Firehose delivery stream as the destination.
  5. Query the Snowflake table to validate the data loaded into Snowflake.

The solution is depicted in the following architecture diagram.

Prerequisites

You should have the following prerequisites:

Create a Snowflake database, schema, and table

Complete the following steps to set up your data in Snowflake:

  • Log in to your Snowflake account and create the database:
    create database adf_snf;

  • Create a schema in the new database:
    create schema adf_snf.kds_blog;

  • Create a table in the new schema:
    create or replace table iot_sensors
    (sensorId number,
    sensorType varchar,
    internetIP varchar,
    connectionTime timestamp_ntz,
    currentTemperature number
    );

Create a Kinesis data stream

Complete the following steps to create your data stream:

  • On the Kinesis Data Streams console, choose Data streams in the navigation pane.
  • Choose Create data stream.
  • For Data stream name, enter a name (for example, KDS-Demo-Stream).
  • Leave the remaining settings as default.
  • Choose Create data stream.

Create a Firehose delivery stream

Complete the following steps to create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination:

  • On the Amazon Data Firehose console, choose Create Firehose stream.
  • For Source, choose Amazon Kinesis Data Streams.
  • For Destination, choose Snowflake.
  • For Kinesis data stream, browse to the data stream you created earlier.
  • For Firehose stream name, leave the default generated name or enter a name of your preference.
  • Under Connection settings, provide the following information to connect Amazon Data Firehose to Snowflake:
    • For Snowflake account URL, enter your Snowflake account URL.
    • For User, enter the user name generated in the prerequisites.
    • For Private key, enter the private key generated in the prerequisites. Make sure the private key is in PKCS8 format. Do not include the PEM header-BEGIN prefix and footer-END suffix as part of the private key. If the key is split across multiple lines, remove the line breaks.
    • For Role, select Use custom Snowflake role and enter the IAM role that has access to write to the database table.

You can connect to Snowflake using public or private connectivity. If you don’t provide a VPC endpoint, the default connectivity mode is public. To allow list Firehose IPs in your Snowflake network policy, refer to Choose Snowflake for Your Destination. If you’re using a private link URL, provide the VPCE ID using SYSTEM$GET_PRIVATELINK_CONFIG:

select SYSTEM$GET_PRIVATELINK_CONFIG();

This function returns a JSON representation of the Snowflake account information necessary to facilitate the self-service configuration of private connectivity to the Snowflake service, as shown in the following screenshot.

  • For this post, we’re using a private link, so for VPCE ID, enter the VPCE ID.
  • Under Database configuration settings, enter your Snowflake database, schema, and table names.
  • In the Backup settings section, for S3 backup bucket, enter the bucket you created as part of the prerequisites.
  • Choose Create Firehose stream.

Alternatively, you can use an AWS CloudFormation template to create the Firehose delivery stream with Snowflake as the destination rather than using the Amazon Data Firehose console.

To use the CloudFormation stack, choose

BDB-4100-CFN-Launch-Stack

Generate sample stream data
Generate sample stream data from the KDG with the Kinesis data stream you created:

{ 
"sensorId": {{random.number(999999999)}}, 
"sensorType": "{{random.arrayElement( ["Thermostat","SmartWaterHeater","HVACTemperatureSensor","WaterPurifier"] )}}", 
"internetIP": "{{internet.ip}}", 
"connectionTime": "{{date.now("YYYY-MM-DDTHH:m:ss")}}", 
"currentTemperature": {{random.number({"min":10,"max":150})}} 
}

Query the Snowflake table

Query the Snowflake table:

select * from adf_snf.kds_blog.iot_sensors;

You can confirm that the data generated by the KDG that was sent to Kinesis Data Streams is loaded into the Snowflake table through Amazon Data Firehose.

Troubleshooting

If data is not loaded into Kinesis Data Steams after the KDG sends data to the Firehose delivery stream, refresh and make sure you are logged in to the KDG.

If you made any changes to the Snowflake destination table definition, recreate the Firehose delivery stream.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you are not planning to use them further.

Conclusion

Amazon Data Firehose provides a straightforward way to deliver data to Snowpipe Streaming, enabling you to save costs and reduce latency to seconds. To try Amazon Kinesis Firehose with Snowflake, refer to the Amazon Data Firehose with Snowflake as destination lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Mostafa Mansour is a Principal Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Firehose. He specializes in developing intuitive product experiences that solve complex challenges for customers at scale. When he’s not hard at work on Amazon Kinesis Data Firehose, you’ll likely find Mostafa on the squash court, where he loves to take on challengers and perfect his dropshots.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

How Aura from Unity revolutionized their big data pipeline with Amazon Redshift Serverless

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-aura-from-unity-revolutionized-their-big-data-pipeline-with-amazon-redshift-serverless/

This post is co-written with  Amir Souchami and  Fabian Szenkier from Unity.

Aura from Unity (formerly known as ironSource) is the market standard for creating rich device experiences that engage and retain customers. With a powerful set of solutions, Aura enables complete digital transformation, letting operators promote key services outside the store, directly on-device.

Amazon Redshift is a recommended service for online analytical processing (OLAP) workloads such as cloud data warehouses, data marts, and other analytical data stores. You can use simple SQL to analyze structured and semi-structured data, operational databases, and data lakes to deliver the best price/performance at any scale. The Amazon Redshift data sharing feature provides instant, granular, and high-performance access without data copies and data movement across multiple Redshift data warehouses in the same or different AWS accounts and across AWS Regions. Data sharing provides live access to data so that you always see the most up-to-date and consistent information as it’s updated in the data warehouse.

Amazon Redshift Serverless makes it straightforward to run and scale analytics in seconds without the need to set up and manage data warehouse clusters. Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver fast performance for even the most demanding and unpredictable workloads, and you pay only for what you use. You can load your data and start querying right away in the Amazon Redshift Query Editor or in your favorite business intelligence (BI) tool and continue to enjoy the best price/performance and familiar SQL features in an easy-to-use, zero administration environment.

In this post, we describe Aura’s successful and swift adoption of Redshift Serverless, which allowed them to optimize their overall bidding advertisement campaigns’ time to market from 24 hours to 2 hours. We explore why Aura chose this solution and what technological challenges it helped solve.

Aura’s initial data pipeline

Aura is a pioneer in using Redshift RA3 clusters with data sharing for extract, transform, and load (ETL) and BI workloads. One of Aura’s operations is bidding advertisement campaigns. These campaigns are optimized by using an AI-based bid process that requires running hundreds of analytical queries per campaign. These queries are run on data that resides in an RA3 provisioned Redshift cluster.

The integrated pipeline is comprised of various AWS services:

The following diagram illustrates this architecture.

Aura architecture

Challenges of the initial architecture

The queries for each campaign run in the following manner:

First, a preparation query filters and aggregates raw data, preparing it for the subsequent operation. This is followed by the main query, which carries out the logic according to the preparation query result set.

As the number of campaigns grew, Aura’s Data team was required to run hundreds of concurrent queries for each of these steps. Aura’s existing provisioned cluster was already heavily utilized with data ingestion, ETL, and BI workloads, so they were looking for cost-effective ways to isolate this workload with dedicated compute resources.

The team evaluated a variety of options, including unloading data to Amazon S3 and a multi-cluster architecture using data sharing and Redshift serverless. The team gravitated towards the multi-cluster architecture with data sharing, as it requires no query rewrite, allows for dedicated compute for this specific workload, avoids the need to duplicate or move data from the main cluster, and provides high concurrency and automatic scaling. Lastly, it’s billed in a pay-for-what-you-use model, and provisioning is straightforward and quick.

Proof of concept

After evaluating the options, Aura’s Data team decided to conduct a proof of concept using Redshift Serverless as a consumer of their main Redshift provisioned cluster, sharing just the relevant tables for running the required queries. Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). A single RPU provides 16 GB of memory and a serverless endpoint can range from 8 RPU to 512 RPU.

Aura’s Data team started the proof of concept using a 256 RPU Redshift Serverless endpoint and gradually lowered the RPU to reduce costs while making sure the query runtime was below the required target.

Eventually, the team decided to use a 128 RPU (2 TB RAM) Redshift Serverless endpoint as the base RPU, while using the Redshift Serverless auto scaling feature, which allows hundreds of concurrent queries to run by automatically upscaling the RPU as needed.

Aura’s new solution with Redshift Serverless

After a successful proof of concept, the production setup included adding code to switch between the provisioned Redshift cluster and the Redshift Serverless endpoint. This was done using a configurable threshold based on the number of queries waiting to be processed in a specific MSK topic consumed at the beginning of the pipeline. Small-scale campaign queries would still run on the provisioned cluster, and large-scale queries would use the Redshift Serverless endpoint. The new solution uses an Amazon MWAA pipeline that fetches configuration information from a DynamoDB table, consumes jobs that represent ad campaigns, and then runs hundreds of EKS jobs triggered using EKSPodOperator. Each job runs the two serial queries (the preparation query followed by a main query, which outputs the results to Amazon S3). This happens several hundred times concurrently using Redshift Serverless compute resources.

Then the process initiates another set of EKSPodOperator operators to run the AI training code based on the data result that was saved on Amazon S3.

The following diagram illustrates the solution architecture.

Aura new architecture

Outcome

The overall runtime of the pipeline was reduced from 24 hours to just 2 hours, a 12-times improvement. This integration of Redshift Serverless, coupled with data sharing, led to a 90% reduction in pipeline duration, negating the necessity for data duplication or query rewriting. Moreover, the introduction of a dedicated consumer as an exclusive compute resource significantly eased the load of the producer cluster, enabling running small-scale queries even faster.

“Redshift Serverless and data sharing enabled us to provision and scale our data warehouse capacity to deliver fast performance, high concurrency and handle challenging ML workloads with very minimal effort.”

– Amir Souchami, Aura’s Principal Technical Systems Architect.

Learnings

Aura’s Data team is highly focused on working in a cost-effective manner and has therefore implemented several cost controls in their Redshift Serverless endpoint:

  • Limit the overall spend by setting a maximum RPU-hour usage limit (per day, week, month) for the workgroup. Aura configured that limit so when it is reached, Amazon Redshift will send an alert to the relevant Amazon Redshift administrator team. This feature also allows writing an entry to a system table and even turning off user queries.
  • Use a maximum RPU configuration, which defines the upper limit of compute resources that Redshift Serverless can use at any given time. When the maximum RPU limit is set for the workgroup, Redshift Serverless scales within that limit to continue to run the workload.
  • Implement query monitoring rules that prevent wasteful resource utilization and runaway costs caused by poorly written queries.

Conclusion

A data warehouse is a crucial part of any modern data-driven company, enabling you to answer complex business questions and provide insights. The evolution of Amazon Redshift allowed Aura to quickly adapt to business requirements by combining data sharing between provisioned and Redshift Serverless data warehouses. Aura’s journey with Redshift Serverless underscores the vast potential of strategic tech integration in driving efficiency and operational excellence.

If Aura’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how such a solution can address them.
  • Reach out to AWS experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. Such hands-on experience will provide valuable insights before moving to production.

Elevate your Redshift expertise. Already enjoying the power of Amazon Redshift? Enhance your data journey with the latest features and expert guidance. Reach out to your dedicated AWS account team for personalized support, discover cutting-edge capabilities, and unlock even greater value from your data with Amazon Redshift.


About the Authors

Amir Souchami, Chief Architect of Aura from Unity, focusing on creating resilient and performant cloud systems and mobile apps at major scale.

Fabian Szenkier is the ML and Big Data Architect at Aura by Unity, works on building modern AI/ML solutions and state of the art data engineering pipelines at scale.

Liat Tzur is a Senior Technical Account Manager at Amazon Web Services. She serves as the customer’s advocate and assists her customers in achieving cloud operational excellence in alignment with their business goals.

Adi Jabkowski is a Sr. Redshift Specialist in EMEA, part of the Worldwide Specialist Organization (WWSO) at AWS.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value.

Automate large-scale data validation using Amazon EMR and Apache Griffin

Post Syndicated from Dipal Mahajan original https://aws.amazon.com/blogs/big-data/automate-large-scale-data-validation-using-amazon-emr-and-apache-griffin/

Many enterprises are migrating their on-premises data stores to the AWS Cloud. During data migration, a key requirement is to validate all the data that has been moved from source to target. This data validation is a critical step, and if not done correctly, may result in the failure of the entire project. However, developing custom solutions to determine migration accuracy by comparing the data between the source and target can often be time-consuming.

In this post, we walk through a step-by-step process to validate large datasets after migration using a configuration-based tool using Amazon EMR and the Apache Griffin open source library. Griffin is an open source data quality solution for big data, which supports both batch and streaming mode.

In today’s data-driven landscape, where organizations deal with petabytes of data, the need for automated data validation frameworks has become increasingly critical. Manual validation processes are not only time-consuming but also prone to errors, especially when dealing with vast volumes of data. Automated data validation frameworks offer a streamlined solution by efficiently comparing large datasets, identifying discrepancies, and ensuring data accuracy at scale. With such frameworks, organizations can save valuable time and resources while maintaining confidence in the integrity of their data, thereby enabling informed decision-making and enhancing overall operational efficiency.

The following are standout features for this framework:

  • Utilizes a configuration-driven framework
  • Offers plug-and-play functionality for seamless integration
  • Conducts count comparison to identify any disparities
  • Implements robust data validation procedures
  • Ensures data quality through systematic checks
  • Provides access to a file containing mismatched records for in-depth analysis
  • Generates comprehensive reports for insights and tracking purposes

Solution overview

This solution uses the following services:

  • Amazon Simple Storage Service (Amazon S3) or Hadoop Distributed File System (HDFS) as the source and target.
  • Amazon EMR to run the PySpark script. We use a Python wrapper on top of Griffin to validate data between Hadoop tables created over HDFS or Amazon S3.
  • AWS Glue to catalog the technical table, which stores the results of the Griffin job.
  • Amazon Athena to query the output table to verify the results.

We use tables that store the count for each source and target table and also create files that show the difference of records between source and target.

The following diagram illustrates the solution architecture.

Architecture_Diagram

In the depicted architecture and our typical data lake use case, our data either resides n Amazon S3 or is migrated from on premises to Amazon S3 using replication tools such as AWS DataSync or AWS Database Migration Service (AWS DMS). Although this solution is designed to seamlessly interact with both Hive Metastore and the AWS Glue Data Catalog, we use the Data Catalog as our example in this post.

This framework operates within Amazon EMR, automatically running scheduled tasks on a daily basis, as per the defined frequency. It generates and publishes reports in Amazon S3, which are then accessible via Athena. A notable feature of this framework is its capability to detect count mismatches and data discrepancies, in addition to generating a file in Amazon S3 containing full records that didn’t match, facilitating further analysis.

In this example, we use three tables in an on-premises database to validate between source and target : balance_sheet, covid, and survery_financial_report.

Prerequisites

Before getting started, make sure you have the following prerequisites:

Deploy the solution

To make it straightforward for you to get started, we have created a CloudFormation template that automatically configures and deploys the solution for you. Complete the following steps:

  1. Create an S3 bucket in your AWS account called bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region} (provide your AWS account ID and AWS Region).
  2. Unzip the following file to your local system.
  3. After unzipping the file to your local system, change <bucket name> to the one you created in your account (bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}) in the following files:
    1. bootstrap-bdb-3070-datavalidation.sh
    2. Validation_Metrics_Athena_tables.hql
    3. datavalidation/totalcount/totalcount_input.txt
    4. datavalidation/accuracy/accuracy_input.txt
  4. Upload all the folders and files in your local folder to your S3 bucket:
    aws s3 cp . s3://<bucket_name>/ --recursive

  5. Run the following CloudFormation template in your account.

The CloudFormation template creates a database called griffin_datavalidation_blog and an AWS Glue crawler called griffin_data_validation_blog on top of the data folder in the .zip file.

  1. Choose Next.
    Cloudformation_template_1
  2. Choose Next again.
  3. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs
  1. Run the AWS Glue crawler and verify that six tables have been created in the Data Catalog.
  2. Run the following CloudFormation template in your account.

This template creates an EMR cluster with a bootstrap script to copy Griffin-related JARs and artifacts. It also runs three EMR steps:

  • Create two Athena tables and two Athena views to see the validation matrix produced by the Griffin framework
  • Run count validation for all three tables to compare the source and target table
  • Run record-level and column-level validations for all three tables to compare between the source and target table
  1. For SubnetID, enter your subnet ID.
  2. Choose Next.
    Cloudformation_template_2
  3. Choose Next again.
  4. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.

You can view the stack outputs on the console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs

It takes approximately 5 minutes for the deployment to complete. When the stack is complete, you should see the EMRCluster resource launched and available in your account.

When the EMR cluster is launched, it runs the following steps as part of the post-cluster launch:

  • Bootstrap action – It installs the Griffin JAR file and directories for this framework. It also downloads sample data files to use in the next step.
  • Athena_Table_Creation – It creates tables in Athena to read the result reports.
  • Count_Validation – It runs the job to compare the data count between source and target data from the Data Catalog table and stores the results in an S3 bucket, which will be read via an Athena table.
  • Accuracy – It runs the job to compare the data rows between the source and target data from the Data Catalog table and store the results in an S3 bucket, which will be read via the Athena table.

Athena_table

When the EMR steps are complete, your table comparison is done and ready to view in Athena automatically. No manual intervention is needed for validation.

Validate data with Python Griffin

When your EMR cluster is ready and all the jobs are complete, it means the count validation and data validation are complete. The results have been stored in Amazon S3 and the Athena table is already created on top of that. You can query the Athena tables to view the results, as shown in the following screenshot.

The following screenshot shows the count results for all tables.

Summary_table

The following screenshot shows the data accuracy results for all tables.

Detailed_view

The following screenshot shows the files created for each table with mismatched records. Individual folders are generated for each table directly from the job.

mismatched_records

Every table folder contains a directory for each day the job is run.

S3_path_mismatched

Within that specific date, a file named __missRecords contains records that do not match.

S3_path_mismatched_2

The following screenshot shows the contents of the __missRecords file.

__missRecords

Clean up

To avoid incurring additional charges, complete the following steps to clean up your resources when you’re done with the solution:

  1. Delete the AWS Glue database griffin_datavalidation_blog and drop the database griffin_datavalidation_blog cascade.
  2. Delete the prefixes and objects you created from the bucket bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}.
  3. Delete the CloudFormation stack, which removes your additional resources.

Conclusion

This post showed how you can use Python Griffin to accelerate the post-migration data validation process. Python Griffin helps you calculate count and row- and column-level validation, identifying mismatched records without writing any code.

For more information about data quality use cases, refer to Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog and AWS Glue Data Quality.


About the Authors

Dipal Mahajan serves as a Lead Consultant at Amazon Web Services, providing expert guidance to global clients in developing highly secure, scalable, reliable, and cost-efficient cloud applications. With a wealth of experience in software development, architecture, and analytics across diverse sectors such as finance, telecom, retail, and healthcare, he brings invaluable insights to his role. Beyond the professional sphere, Dipal enjoys exploring new destinations, having already visited 14 out of 30 countries on his wish list.

Akhil is a Lead Consultant at AWS Professional Services. He helps customers design & build scalable data analytics solutions and migrate data pipelines and data warehouses to AWS. In his spare time, he loves travelling, playing games and watching movies.

Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

Amazon DataZone now integrates with AWS Glue Data Quality and external data quality solutions

Post Syndicated from Andrea Filippo La Scola original https://aws.amazon.com/blogs/big-data/amazon-datazone-now-integrates-with-aws-glue-data-quality-and-external-data-quality-solutions/

Today, we are pleased to announce that Amazon DataZone is now able to present data quality information for data assets. This information empowers end-users to make informed decisions as to whether or not to use specific assets.

Many organizations already use AWS Glue Data Quality to define and enforce data quality rules on their data, validate data against predefined rules, track data quality metrics, and monitor data quality over time using artificial intelligence (AI). Other organizations monitor the quality of their data through third-party solutions.

Amazon DataZone now integrates directly with AWS Glue to display data quality scores for AWS Glue Data Catalog assets. Additionally, Amazon DataZone now offers APIs for importing data quality scores from external systems.

In this post, we discuss the latest features of Amazon DataZone for data quality, the integration between Amazon DataZone and AWS Glue Data Quality and how you can import data quality scores produced by external systems into Amazon DataZone via API.

Challenges

One of the most common questions we get from customers is related to displaying data quality scores in the Amazon DataZone business data catalog to let business users have visibility into the health and reliability of the datasets.

As data becomes increasingly crucial for driving business decisions, Amazon DataZone users are keenly interested in providing the highest standards of data quality. They recognize the importance of accurate, complete, and timely data in enabling informed decision-making and fostering trust in their analytics and reporting processes.

Amazon DataZone data assets can be updated at varying frequencies. As data is refreshed and updated, changes can happen through upstream processes that put it at risk of not maintaining the intended quality. Data quality scores help you understand if data has maintained the expected level of quality for data consumers to use (through analysis or downstream processes).

From a producer’s perspective, data stewards can now set up Amazon DataZone to automatically import the data quality scores from AWS Glue Data Quality (scheduled or on demand) and include this information in the Amazon DataZone catalog to share with business users. Additionally, you can now use new Amazon DataZone APIs to import data quality scores produced by external systems into the data assets.

With the latest enhancement, Amazon DataZone users can now accomplish the following:

  • Access insights about data quality standards directly from the Amazon DataZone web portal
  • View data quality scores on various KPIs, including data completeness, uniqueness, accuracy
  • Make sure users have a holistic view of the quality and trustworthiness of their data.

In the first part of this post, we walk through the integration between AWS Glue Data Quality and Amazon DataZone. We discuss how to visualize data quality scores in Amazon DataZone, enable AWS Glue Data Quality when creating a new Amazon DataZone data source, and enable data quality for an existing data asset.

In the second part of this post, we discuss how you can import data quality scores produced by external systems into Amazon DataZone via API. In this example, we use Amazon EMR Serverless in combination with the open source library Pydeequ to act as an external system for data quality.

Visualize AWS Glue Data Quality scores in Amazon DataZone

You can now visualize AWS Glue Data Quality scores in data assets that have been published in the Amazon DataZone business catalog and that are searchable through the Amazon DataZone web portal.

If the asset has AWS Glue Data Quality enabled, you can now quickly visualize the data quality score directly in the catalog search pane.

By selecting the corresponding asset, you can understand its content through the readme, glossary terms, and technical and business metadata. Additionally, the overall quality score indicator is displayed in the Asset Details section.

A data quality score serves as an overall indicator of a dataset’s quality, calculated based on the rules you define.

On the Data quality tab, you can access the details of data quality overview indicators and the results of the data quality runs.

The indicators shown on the Overview tab are calculated based on the results of the rulesets from the data quality runs.

Each rule is assigned an attribute that contributes to the calculation of the indicator. For example, rules that have the Completeness attribute will contribute to the calculation of the corresponding indicator on the Overview tab.

To filter data quality results, choose the Applicable column dropdown menu and choose your desired filter parameter.

You can also visualize column-level data quality starting on the Schema tab.

When data quality is enabled for the asset, the data quality results become available, providing insightful quality scores that reflect the integrity and reliability of each column within the dataset.

When you choose one of the data quality result links, you’re redirected to the data quality detail page, filtered by the selected column.

Data quality historical results in Amazon DataZone

Data quality can change over time for many reasons:

  • Data formats may change because of changes in the source systems
  • As data accumulates over time, it may become outdated or inconsistent
  • Data quality can be affected by human errors in data entry, data processing, or data manipulation

In Amazon DataZone, you can now track data quality over time to confirm reliability and accuracy. By analyzing the historical report snapshot, you can identify areas for improvement, implement changes, and measure the effectiveness of those changes.

Enable AWS Glue Data Quality when creating a new Amazon DataZone data source

In this section, we walk through the steps to enable AWS Glue Data Quality when creating a new Amazon DataZone data source.

Prerequisites

To follow along, you should have a domain for Amazon DataZone, an Amazon DataZone project, and a new Amazon DataZone environment (with a DataLakeProfile). For instructions, refer to Amazon DataZone quickstart with AWS Glue data.

You also need to define and run a ruleset against your data, which is a set of data quality rules in AWS Glue Data Quality. To set up the data quality rules and for more information on the topic, refer to the following posts:

After you create the data quality rules, make sure that Amazon DataZone has the permissions to access the AWS Glue database managed through AWS Lake Formation. For instructions, see Configure Lake Formation permissions for Amazon DataZone.

In our example, we have configured a ruleset against a table containing patient data within a healthcare synthetic dataset generated using Synthea. Synthea is a synthetic patient generator that creates realistic patient data and associated medical records that can be used for testing healthcare software applications.

The ruleset contains 27 individual rules (one of them failing), so the overall data quality score is 96%.

If you use Amazon DataZone managed policies, there is no action needed because these will get automatically updated with the needed actions. Otherwise, you need to allow Amazon DataZone to have the required permissions to list and get AWS Glue Data Quality results, as shown in the Amazon DataZone user guide.

Create a data source with data quality enabled

In this section, we create a data source and enable data quality. You can also update an existing data source to enable data quality. We use this data source to import metadata information related to our datasets. Amazon DataZone will also import data quality information related to the (one or more) assets contained in the data source.

  1. On the Amazon DataZone console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Name, enter a name for your data source.
  4. For Data source type, select AWS Glue.
  5. For Environment, choose your environment.
  6. For Database name, enter a name for the database.
  7. For Table selection criteria, choose your criteria.
  8. Choose Next.
  9. For Data quality, select Enable data quality for this data source.

If data quality is enabled, Amazon DataZone will automatically fetch data quality scores from AWS Glue at each data source run.

  1. Choose Next.

Now you can run the data source.

While running the data source, Amazon DataZone imports the last 100 AWS Glue Data Quality run results. This information is now visible on the asset page and will be visible to all Amazon DataZone users after publishing the asset.

Enable data quality for an existing data asset

In this section, we enable data quality for an existing asset. This might be useful for users that already have data sources in place and want to enable the feature afterwards.

Prerequisites

To follow along, you should have already run the data source and produced an AWS Glue table data asset. Additionally, you should have defined a ruleset in AWS Glue Data Quality over the target table in the Data Catalog.

For this example, we ran the data quality job multiple times against the table, producing the related AWS Glue Data Quality scores, as shown in the following screenshot.

Import data quality scores into the data asset

Complete the following steps to import the existing AWS Glue Data Quality scores into the data asset in Amazon DataZone:

  1. Within the Amazon DataZone project, navigate to the Inventory data pane and choose the data source.

If you choose the Data quality tab, you can see that there’s still no information on data quality because AWS Glue Data Quality integration is not enabled for this data asset yet.

  1. On the Data quality tab, choose Enable data quality.
  2. In the Data quality section, select Enable data quality for this data source.
  3. Choose Save.

Now, back on the Inventory data pane, you can see a new tab: Data quality.

On the Data quality tab, you can see data quality scores imported from AWS Glue Data Quality.

Ingest data quality scores from an external source using Amazon DataZone APIs

Many organizations already use systems that calculate data quality by performing tests and assertions on their datasets. Amazon DataZone now supports importing third-party originated data quality scores via API, allowing users that navigate the web portal to view this information.

In this section, we simulate a third-party system pushing data quality scores into Amazon DataZone via APIs through Boto3 (Python SDK for AWS).

For this example, we use the same synthetic dataset as earlier, generated with Synthea.

The following diagram illustrates the solution architecture.

The workflow consists of the following steps:

  1. Read a dataset of patients in Amazon Simple Storage Service (Amazon S3) directly from Amazon EMR using Spark.

The dataset is created as a generic S3 asset collection in Amazon DataZone.

  1. In Amazon EMR, perform data validation rules against the dataset.
  2. The metrics are saved in Amazon S3 to have a persistent output.
  3. Use Amazon DataZone APIs through Boto3 to push custom data quality metadata.
  4. End-users can see the data quality scores by navigating to the data portal.

Prerequisites

We use Amazon EMR Serverless and Pydeequ to run a fully managed Spark environment. To learn more about Pydeequ as a data testing framework, see Testing Data quality at scale with Pydeequ.

To allow Amazon EMR to send data to the Amazon DataZone domain, make sure that the IAM role used by Amazon EMR has the permissions to do the following:

  • Read from and write to the S3 buckets
  • Call the post_time_series_data_points action for Amazon DataZone:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Statement1",
                "Effect": "Allow",
                "Action": [
                    "datazone:PostTimeSeriesDataPoints"
                ],
                "Resource": [
                    "<datazone_domain_arn>"
                ]
            }
        ]
    }

Make sure that you added the EMR role as a project member in the Amazon DataZone project. On the Amazon DataZone console, navigate to the Project members page and choose Add members.

Add the EMR role as a contributor.

Ingest and analyze PySpark code

In this section, we analyze the PySpark code that we use to perform data quality checks and send the results to Amazon DataZone. You can download the complete PySpark script.

To run the script entirely, you can submit a job to EMR Serverless. The service will take care of scheduling the job and automatically allocating the resources needed, enabling you to track the job run statuses throughout the process.

You can submit a job to EMR within the Amazon EMR console using EMR Studio or programmatically, using the AWS CLI or using one of the AWS SDKs.

In Apache Spark, a SparkSession is the entry point for interacting with DataFrames and Spark’s built-in functions. The script will start initializing a SparkSession:

with SparkSession.builder.appName("PatientsDataValidation") \
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .getOrCreate() as spark:

We read a dataset from Amazon S3. For increased modularity, you can use the script input to refer to the S3 path:

s3inputFilepath = sys.argv[1]
s3outputLocation = sys.argv[2]

df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(s3inputFilepath) #s3://<bucket_name>/patients/patients.csv

Next, we set up a metrics repository. This can be helpful to persist the run results in Amazon S3.

metricsRepository = FileSystemMetricsRepository(spark, s3_write_path)

Pydeequ allows you to create data quality rules using the builder pattern, which is a well-known software engineering design pattern, concatenating instruction to instantiate a VerificationSuite object:

key_tags = {'tag': 'patient_df'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .useRepository(metricsRepository) \
    .addCheck(
        check.hasSize(lambda x: x >= 1000) \
        .isComplete("birthdate")  \
        .isUnique("id")  \
        .isComplete("ssn") \
        .isComplete("first") \
        .isComplete("last") \
        .hasMin("healthcare_coverage", lambda x: x == 1000.0)) \
    .saveOrAppendResult(resultKey) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following is the output for the data validation rules:

+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+
|check           |check_level|check_status|constraint                                          |constraint_status|constraint_message                                  |
+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+
|Integrity checks|Error      |Error       |SizeConstraint(Size(None))                          |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(birthdate,None))|Success          |                                                    |
|Integrity checks|Error      |Error       |UniquenessConstraint(Uniqueness(List(id),None))     |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(ssn,None))      |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(first,None))    |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(last,None))     |Success          |                                                    |
|Integrity checks|Error      |Error       |MinimumConstraint(Minimum(healthcare_coverage,None))|Failure          |Value: 0.0 does not meet the constraint requirement!|
+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+

At this point, we want to insert these data quality values in Amazon DataZone. To do so, we use the post_time_series_data_points function in the Boto3 Amazon DataZone client.

The PostTimeSeriesDataPoints DataZone API allows you to insert new time series data points for a given asset or listing, without creating a new revision.

At this point, you might also want to have more information on which fields are sent as input for the API. You can use the APIs to obtain the specification for Amazon DataZone form types; in our case, it’s amazon.datazone.DataQualityResultFormType.

You can also use the AWS CLI to invoke the API and display the form structure:

aws datazone get-form-type --domain-identifier <your_domain_id> --form-type-identifier amazon.datazone.DataQualityResultFormType --region <domain_region> --output text --query 'model.smithy'

This output helps identify the required API parameters, including fields and value limits:

$version: "2.0"
namespace amazon.datazone
structure DataQualityResultFormType {
    @amazon.datazone#timeSeriesSummary
    @range(min: 0, max: 100)
    passingPercentage: Double
    @amazon.datazone#timeSeriesSummary
    evaluationsCount: Integer
    evaluations: EvaluationResults
}
@length(min: 0, max: 2000)
list EvaluationResults {
    member: EvaluationResult
}

@length(min: 0, max: 20)
list ApplicableFields {
    member: String
}

@length(min: 0, max: 20)
list EvaluationTypes {
    member: String
}

enum EvaluationStatus {
    PASS,
    FAIL
}

string EvaluationDetailType

map EvaluationDetails {
    key: EvaluationDetailType
    value: String
}

structure EvaluationResult {
    description: String
    types: EvaluationTypes
    applicableFields: ApplicableFields
    status: EvaluationStatus
    details: EvaluationDetails
}

To send the appropriate form data, we need to convert the Pydeequ output to match the DataQualityResultsFormType contract. This can be achieved with a Python function that processes the results.

For each DataFrame row, we extract information from the constraint column. For example, take the following code:

CompletenessConstraint(Completeness(birthdate,None))

We convert it to the following:

{
  "constraint": "CompletenessConstraint",
  "statisticName": "Completeness_custom",
  "column": "birthdate"
}

Make sure to send an output that matches the KPIs that you want to track. In our case, we are appending _custom to the statistic name, resulting in the following format for KPIs:

  • Completeness_custom
  • Uniqueness_custom

In a real-world scenario, you might want to set a value that matches with your data quality framework in relation to the KPIs that you want to track in Amazon DataZone.

After applying a transformation function, we have a Python object for each rule evaluation:

..., {
   'applicableFields': ["healthcare_coverage"],
   'types': ["Minimum_custom"],
   'status': 'FAIL',
   'description': 'MinimumConstraint - Minimum - Value: 0.0 does not meet the constraint requirement!'
 },...

We also use the constraint_status column to compute the overall score:

(number of success / total number of evaluation) * 100

In our example, this results in a passing percentage of 85.71%.

We set this value in the passingPercentage input field along with the other information related to the evaluations in the input of the Boto3 method post_time_series_data_points:

import boto3

# Instantiate the client library to communicate with Amazon DataZone Service
#
datazone = boto3.client(
    service_name='datazone', 
    region_name=<Region(String) example: us-east-1>
)

# Perform the API operation to push the Data Quality information to Amazon DataZone
#
datazone.post_time_series_data_points(
    domainIdentifier=<DataZone domain ID>,
    entityIdentifier=<DataZone asset ID>,
    entityType='ASSET',
    forms=[
        {
            "content": json.dumps({
                    "evaluationsCount":<Number of evaluations (number)>,
                    "evaluations": [<List of objects {
                        'description': <Description (String)>,
                        'applicableFields': [<List of columns involved (String)>],
                        'types': [<List of KPIs (String)>],
                        'status': <FAIL/PASS (string)>
                        }>
                     ],
                    "passingPercentage":<Score (number)>
                }),
            "formName": <Form name(String) example: PydeequRuleSet1>,
            "typeIdentifier": "amazon.datazone.DataQualityResultFormType",
            "timestamp": <Date (timestamp)>
        }
    ]
)

Boto3 invokes the Amazon DataZone APIs. In these examples, we used Boto3 and Python, but you can choose one of the AWS SDKs developed in the language you prefer.

After setting the appropriate domain and asset ID and running the method, we can check on the Amazon DataZone console that the asset data quality is now visible on the asset page.

We can observe that the overall score matches with the API input value. We can also see that we were able to add customized KPIs on the overview tab through custom types parameter values.

With the new Amazon DataZone APIs, you can load data quality rules from third-party systems into a specific data asset. With this capability, Amazon DataZone allows you to extend the types of indicators present in AWS Glue Data Quality (such as completeness, minimum, and uniqueness) with custom indicators.

Clean up

We recommend deleting any potentially unused resources to avoid incurring unexpected costs. For example, you can delete the Amazon DataZone domain and the EMR application you created during this process.

Conclusion

In this post, we highlighted the latest features of Amazon DataZone for data quality, empowering end-users with enhanced context and visibility into their data assets. Furthermore, we delved into the seamless integration between Amazon DataZone and AWS Glue Data Quality. You can also use the Amazon DataZone APIs to integrate with external data quality providers, enabling you to maintain a comprehensive and robust data strategy within your AWS environment.

To learn more about Amazon DataZone, refer to the Amazon DataZone User Guide.


About the Authors


Andrea Filippo
is a Partner Solutions Architect at AWS supporting Public Sector partners and customers in Italy. He focuses on modern data architectures and helping customers accelerate their cloud journey with serverless technologies.

Emanuele is a Solutions Architect at AWS, based in Italy, after living and working for more than 5 years in Spain. He enjoys helping large companies with the adoption of cloud technologies, and his area of expertise is mainly focused on Data Analytics and Data Management. Outside of work, he enjoys traveling and collecting action figures.

Varsha Velagapudi is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about simplifying customers’ AI/ML and analytics journey to help them succeed in their day-to-day tasks. Outside of work, she enjoys nature and outdoor activities, reading, and traveling.

Iris – Turning observations into actionable insights for enhanced decision making

Post Syndicated from Grab Tech original https://engineering.grab.com/iris

Introduction

Iris (/ˈaɪrɪs/), a name inspired by the Olympian mythological figure who personified the rainbow and served as the messenger of the gods, is a comprehensive observability platform for Extract, Transform, Load (ETL) jobs. Just as the mythological Iris connected the gods to humanity, our Iris platform bridges the gap between raw data and meaningful insights, serving the needs of data-driven organisations. Specialising in meticulous monitoring and tracking of Spark and Presto jobs, Iris stands as a transformative tool for peak observability and effective decision-making.

  • Iris captures critical job metrics right at the Java Virtual Machine (JVM) level, including but not limited to runtime, CPU and memory utilisation rates, garbage collection statistics, stage and task execution details, and much more.
  • Iris not only regularly records these metrics but also supports real-time monitoring and offline analytics of metrics in the data lake. This gives you multi-faceted control and insights into the operational aspects of your workloads.
  • Iris gives you an overview of your jobs, predicts if your jobs are over or under-provisioned, and provides suggestions on how to optimise resource usage and save costs.

Understanding the needs

When examining ETL job monitoring across various platforms, a common deficiency became apparent. Existing tools could only provide CPU and memory usage data at the instance level, where an instance could refer to an EC2 unit or a Kubernetes pod with resources bound to the container level.

However, this CPU and memory usage data included usage from the operating system and other background tasks, making it difficult to isolate usage specific to Spark jobs (JVM level). A sizeable fraction of resource consumption, thus, could not be attributed directly to our ETL jobs. This lack of granularity posed significant challenges when trying to perform effective resource optimisation for individual jobs.

Gap between total instance and JVM provisioned resources

The situation was further complicated when compute instances were shared among various jobs. In such cases, determining the precise resource consumption for a specific job became nearly impossible. This made in-depth analysis and performance optimisation of specific jobs a complex and often ineffective process.

In the initial stages of my career in Spark, I took the reins of handling SEGP ETL jobs deployed in Chimera. Then, Chimera did not possess any tool for observing and understanding SEGP jobs. The lack of an efficient tool for close-to-real-time visualisation of Spark cluster/job metrics, profiling code class/function runtime durations, and investigating deep-level job metrics to assess CPU and memory usage, posed a significant challenge even back then.

In the quest for solutions within Grab, I found no tool that could fulfill all these needs. This prompted me to extend my search beyond the organisation, leading me to discover that Uber had an exceptional tool known as the JVM Profiler. This tool could collect JVM metrics and profile the job. Further research also led me to sparkMeasure, a standalone tool known for its ability to measure Spark metrics on-the-fly without any code changes.

This personal research and journey highlights the importance of a comprehensive, in-depth observability tool – emphasising the need that Iris aims to fulfill in the world of ETL job monitoring. Through this journey, Iris was ideated, named after the Greek deity, encapsulating the mission to bridge the gap between the realm of raw ETL job metrics and the world of actionable insights.

Observability with Iris

Platform architecture

Platform architecture of Iris

Iris’s robust architecture is designed to smartly deliver observability into Spark jobs with high reliability. It consists of three main modules: Metrics Collector, Kafka Queue, and Telegraf, InfluxDB, and Grafana (TIG) Stack.

Metrics Collector: This module listens to Spark jobs, collects metrics, and funnels them to the Kafka queue. What sets this apart is its unobstructive nature – there is no need for end-users to update their application code or notebook.

Kafka Queue: Serving as an asynchronous deliverer of metrics messages, Kafka is leveraged to prevent Iris from becoming another bottleneck slowing down user jobs. By functioning as a message queue, it enables the efficient processing of metric data.

TIG Stack: This component is utilised for real-time monitoring, making visualising performance metrics a cinch. The TIG stack proves to be an effective solution for real-time data visualisation.

For offline analytics, Iris pushes metrics data from Kafka into our data lake. This creates a wealth of historical data that can be utilised for future research, analysis, and predictions. The strategic combination of real-time monitoring and offline analysis forms the basis of Iris’s ability to provide valuable insights.

Next, we will delve into how Iris collects the metrics.

Data collection

Iris’s metrics is now primarily driven by two tools that operate under the Metrics Collector module: JVM Profiler and sparkMeasure.

JVM Profiler

As mentioned earlier, JVM Profiler is an exceptional tool that helps to collect and profile metrics at JVM level.

Java process for the JVM Profiler tool

Uber JVM Profiler supports the following features:

  • Debug memory usage for all your Spark application executors, including java heap memory, non-heap memory, native memory (VmRSS, VmHWM), memory pool, and buffer pool (directed/mapped buffer).
  • Debug CPU usage, garbage collection time for all Spark executors.
  • Debug arbitrary Java class methods (how many times they run, how long they take), also called Duration Profiling.
  • Debug arbitrary Java class method call and trace its argument value, also known as Argument Profiling.
  • Do Stacktrack Profiling and generate flamegraph to visualise CPU time spent for the Spark application.
  • Debug I/O metrics (disk read/write bytes for the application, CPU iowait for the machine).
  • Debug JVM Thread Metrics like Count of Total Threads, Peak Threads, Live/Active Threads, and newThreads.

Example metrics (Source code)

{
        "nonHeapMemoryTotalUsed": 11890584.0,
        "bufferPools": [
                {
                        "totalCapacity": 0,
                        "name": "direct",
                        "count": 0,
                        "memoryUsed": 0
                },
                {
                        "totalCapacity": 0,
                        "name": "mapped",
                        "count": 0,
                        "memoryUsed": 0
                }
        ],
        "heapMemoryTotalUsed": 24330736.0,
        "epochMillis": 1515627003374,
        "nonHeapMemoryCommitted": 13565952.0,
        "heapMemoryCommitted": 257425408.0,
        "memoryPools": [
                {
                        "peakUsageMax": 251658240,
                        "usageMax": 251658240,
                        "peakUsageUsed": 1194496,
                        "name": "Code Cache",
                        "peakUsageCommitted": 2555904,
                        "usageUsed": 1173504,
                        "type": "Non-heap memory",
                        "usageCommitted": 2555904
                },
                {
                        "peakUsageMax": -1,
                        "usageMax": -1,
                        "peakUsageUsed": 9622920,
                        "name": "Metaspace",
                        "peakUsageCommitted": 9830400,
                        "usageUsed": 9622920,
                        "type": "Non-heap memory",
                        "usageCommitted": 9830400
                },
                {
                        "peakUsageMax": 1073741824,
                        "usageMax": 1073741824,
                        "peakUsageUsed": 1094160,
                        "name": "Compressed Class Space",
                        "peakUsageCommitted": 1179648,
                        "usageUsed": 1094160,
                        "type": "Non-heap memory",
                        "usageCommitted": 1179648
                },
                {
                        "peakUsageMax": 1409286144,
                        "usageMax": 1409286144,
                        "peakUsageUsed": 24330736,
                        "name": "PS Eden Space",
                        "peakUsageCommitted": 67108864,
                        "usageUsed": 24330736,
                        "type": "Heap memory",
                        "usageCommitted": 67108864
                },
                {
                        "peakUsageMax": 11010048,
                        "usageMax": 11010048,
                        "peakUsageUsed": 0,
                        "name": "PS Survivor Space",
                        "peakUsageCommitted": 11010048,
                        "usageUsed": 0,
                        "type": "Heap memory",
                        "usageCommitted": 11010048
                },
                {
                        "peakUsageMax": 2863661056,
                        "usageMax": 2863661056,
                        "peakUsageUsed": 0,
                        "name": "PS Old Gen",
                        "peakUsageCommitted": 179306496,
                        "usageUsed": 0,
                        "type": "Heap memory",
                        "usageCommitted": 179306496
                }
        ],
        "processCpuLoad": 0.0008024004394748531,
        "systemCpuLoad": 0.23138430784607697,
        "processCpuTime": 496918000,
        "appId": null,
        "name": "24103@machine01",
        "host": "machine01",
        "processUuid": "3c2ec835-749d-45ea-a7ec-e4b9fe17c23a",
        "tag": "mytag",
        "gc": [
                {
                        "collectionTime": 0,
                        "name": "PS Scavenge",
                        "collectionCount": 0
                },
                {
                        "collectionTime": 0,
                        "name": "PS MarkSweep",
                        "collectionCount": 0
                }
        ]
}

A list of all metrics and information corresponding to them can be found here.

sparkMeasure

Complementing the JVM Profiler is sparkMeasure, a standalone tool that was built to robustly capture Spark job-specific metrics.

Architecture of Spark Task Metrics, Listener Bus, and sparkMeasure (Source)

It is registered as a custom listener and operates by collection built-in metrics that Spark exchanges between the driver node and executor nodes. Its standout feature is the ability to collect all metrics supported by Spark, as defined in Spark’s official documentation here.

Example stage metrics collected by sparkMeasure (Source code)

Scheduling mode = FIFO

Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:

numStages => 3
numTasks => 17
elapsedTime => 1291 (1 s)
stageDuration => 1058 (1 s)
executorRunTime => 2774 (3 s)
executorCpuTime => 2004 (2 s)
executorDeserializeTime => 2868 (3 s)
executorDeserializeCpuTime => 1051 (1 s)
resultSerializationTime => 5 (5 ms)
jvmGCTime => 88 (88 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 16 (16 ms)
resultSize => 16091 (15.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8

Stages and their duration:
Stage 0 duration => 593 (0.6 s)
Stage 1 duration => 416 (0.4 s)
Stage 3 duration => 49 (49 ms)

Data organisation

The architecture of Iris is designed to efficiently route metrics to two key destinations:

  • Real-time datasets: InfluxDB
  • Offline datasets: GrabTech Datalake in AWS

Real-time dataset

Freshness/latency: 5 to 10 seconds

All metrics flowing in through Kafka topics are instantly wired into InfluxDB. A crucial part of this process is accomplished by Telegraf, a plugin-driven server agent used for collecting and sending metrics. Acting as a Kafka consumer, Telegraf listens to each Kafka topic according to its corresponding metrics profiling. It parses the incoming JSON messages and extracts crucial data points (such as role, hostname, jobname, etc.). Once the data is processed, Telegraf writes it into the InfluxDB.

InfluxDB organises the stored data in what we call ‘measurements’, which could analogously be considered as tables in traditional relational databases.

In Iris’s context, we have structured our real-time data into the following crucial measurements:

  1. CpuAndMemory: This measures CPU and memory-related metrics, giving us insights into resource utilisation by Spark jobs.
  2. I/O: This records input/output metrics, providing data on the reading and writing operations happening during the execution of jobs.
  3. ThreadInfo: This measurement holds data related to job threading, allowing us to monitor concurrency and synchronisation aspects.
  4. application_started and application_ended: These measurements allow us to track Spark application lifecycles, from initiation to completion.
  5. executors_started and executors_removed: These measurements give us a look at the executor dynamics during Spark application execution.

  1. jobs_started and jobs_ended: These provide vital data points relating to the lifecycle of individual Spark jobs within applications.
  2. queries_started and queries_ended: These measurements are designed to track the lifecycle of individual Spark SQL queries.
  3. stage_metrics, stages_started, and stages_ended: These measurements help monitor individual stages within Spark jobs, a valuable resource for tracking the job progress and identifying potential bottlenecks.

The real-time data collected in these measurements form the backbone of the monitoring capabilities of Iris, providing an accurate and current picture of Spark job performances.

Offline dataset

Freshness/latency: 1 hour

In addition to real-time data management with InfluxDB, Iris is also responsible for routing metrics to our offline data storage in the Grab Tech Datalake for long-term trend studies, pattern analysis, and anomaly detection.

The metrics from Kafka are periodically synchronised to the Amazon S3 tables under the iris schema in the Grab Tech AWS catalogue. This valuable historical data from Kafka is meticulously organised with a one-to-one mapping between the platform or Kafka topic to the table in the iris schema. For example: iris.chimera_jvmprofiler_cpuandmemory map with prd-iris-chimera-jvmprofiler-cpuandmemory Kafka topic.


This streamlined organisation means you can write queries to retrieve information from the AWS dataset very similarly to how you would do it from InfluxDB. Whether it’s CPU and memory usage, I/O, thread info, or spark metrics, you can conveniently fetch historical data for your analysis.

Data visualisation

A well-designed visual representation makes it easier to see patterns, trends, and outliers in groups of data. Iris employs different visualisation tools based on whether the data is real-time or historical.

Real-Time data visualisation – Grafana

Iris uses Grafana for showcasing real-time data. For each platform, two primary dashboards have been set up: JVM metrics and Spark metrics.

JVM metrics dashboard: This dashboard is designed to display information related to the JVM.
Spark metrics dashboard: This dashboard primarily focuses on visualising Spark-specific elements.

Offline data visualisation

While real-time visualisation is crucial for immediate awareness and decision-making, visualising historical data provides invaluable insights about long-term trends, patterns, and anomalies. Developers can query the raw or aggregated data from the Iris tables for their specific analyses.

Moreover, to assist platform owners and end-users in obtaining a quick summary of their job data, we provide built-in dashboards with pre-aggregated visuals. These dashboards contain a wealth of information expressed in an easy-to-understand format. Key metrics include:

  • Total instances
  • Total CPU cores
  • Total memory
  • CPU and memory utilisation
  • Total machine runtimes

  • Besides visualisations for individual jobs, we have designed an overview dashboard providing a comprehensive summary of all resources consumed by all ETL jobs. This is particularly useful for platform owners and tech leads, allowing them to have an all-encompassing visibility of the performance and resource usage across the ETL jobs.

    Dashboard for monitoring ETL jobs

    These dashboards’ visuals effectively turn the historical metrics data into clear, comprehensible, and insightful information, guiding users towards objective-driven decision-making.

    Transforming observations into insights

    While our journey with Iris is just in the early stages, we’ve already begun harnessing its ability to transform raw data into concrete insights. The strength of Iris lies not just in its data collection capabilities but also in its potential to analyse and infer patterns from the collated data.

    Currently, we’re experimenting with a job classification model that aims to predict resource allocation efficiency (i.e. identifying jobs as over or under-provisioned). This information, once accurately predicted, can help optimise the usage of resources by fine-tuning the provisions for each job. While this model is still in its early stages of testing and lacks sufficient validation data, it exemplifies the direction we’re heading – integrating advanced analytics with operational observability.

    As we continue to refine Iris and develop more models, our aim is to empower users with deep insights into their Spark applications. These insights can potentially identify bottlenecks, optimise resource allocation and ultimately, enhance overall performance. In the long run, we see Iris evolving from being a data collection tool to a platform that can provide actionable recommendations and enable data-driven decision-making.

    Job classification feature set

    At the core of our job classification model, there are two carefully selected metrics:

    1. CPU cores per hour: This represents the number of tasks a job can handle concurrently in a given hour. A higher number would mean more tasks being processed simultaneously.

    2. Total Terabytes of data input per core: This considers only the input from the underlying HDFS/S3 input, excluding shuffle data. It represents the volume of data one CPU core needs to process. A larger input would mean more CPUs are required to complete the job in a reasonable timeframe.

    The choice of these two metrics for building feature sets is based on a nuanced understanding of Spark job dynamics:

  • Allocating the right CPU cores is crucial as a higher number of cores means more tasks being processed concurrently. This is especially important for jobs with larger input data and more partitioned files, as they often require more concurrent processing capacity, hence, more CPU cores.
  • The total data input helps to estimate the data processing load of a job. A job tasked with processing a high volume of input data but assigned low CPU cores might be under-provisioned and result in an extended runtime.

  • As for CPU and memory utilisation, while it could offer useful insights, we’ve found it may not always contribute to predicting if a job is over or under-provisioned because utilisation can vary run-to-run. Thus, to keep our feature set robust and consistent, we primarily focus on CPU cores per hour and total terabytes of input data.

    With these metrics as our foundation, we are developing models that can classify jobs into over-provisioned or under-provisioned, helping us optimise resource allocation and improve job performance in the long run.

    As always, treat any information related to our job classification feature set and the insights derived from it with utmost care for data confidentiality and integrity.

    We’d like to reiterate that these models are still in the early stages of testing and we are constantly working to enhance their predictive accuracy. The true value of this model will be unlocked as it is refined and as we gather more validation data.

    Model training and optimisation

    Choosing the right model is crucial for deriving meaningful insights from datasets. We decided to start with a simple, yet powerful algorithm – K-means clustering, for job classification. K-means is a type of unsupervised machine learning algorithm used to classify items into groups (or clusters) based on their features.

    Here is our process:

    1. Model exploration: We began by exploring the K-means algorithm using a small dataset for validation.
    2. Platform-specific cluster numbers: To account for the uniqueness of every platform, we ran a Score Test (an evaluation method to determine the optimal number of clusters) for each platform. The derived optimal number of clusters is then used in the monthly job for that respective platform’s data.
    3. Set up a scheduled job: After ensuring the code was functioning correctly, we set up a job to run the model on a monthly schedule. Monthly re-training was chosen to encapsulate possible changes in the data patterns over time.
    4. Model saving and utilisation: The trained model is saved to our S3 bucket and used to classify jobs as over-provisioned or under-provisioned based on the daily job runs.

    This iterative learning approach, through which our model learns from an ever-increasing pool of historical data, helps maintain its relevance and improve its accuracy over time.

    Here is an example output from Databricks train run:

  • Blue green group: Input per core is too large but the CPU per hour is small, so the job may take a lot of time to complete.
  • Purple group: Input per core is too small but the CPU per hour is too high. There may be a lot of wasted CPU here.
  • Yellow group: I think this is the ideal group where input per core and CPU per hour is not high.

  • Keep in mind that classification insights provided by our K-means model are still in the experimental stage. As we continue to refine the approach, the reliability of these insights is expected to grow, providing increasingly valuable direction for resource allocation optimisation.

    Seeing Iris in action

    This section provides practical examples and real-case scenarios that demonstrate Iris’s capacity for delivering insights from ETL job observations.

    Case study 1: Spark benchmarking

    From August to September 2023, we carried out a Spark benchmarking exercise to measure and compare the cost and performance of Grab’s Spark platforms: Open Source Spark on Kubernetes (Chimera), Databricks and AWS EMR. Since each platform has its own way to measure a job’s performance and cost, Iris was used to collect the necessary Spark metrics in order to calculate the cost for each job. Furthermore, many other metrics were collected by Iris in order to compare the platforms’ performances like CPU and memory utilisation, runtime, etc.

    Case study 2: Improving Databricks Infra Cost Unit (DBIU) Accuracy with Iris

    Being able to accurately calculate and fairly distribute Databricks infrastructure costs has always been a challenge, primarily due to difficulties in distinguishing between on-demand and Spot instance usage. This was further complicated by two conditions:

    • Fallback to on-demand instances: Databricks has a feature that automatically falls back to on-demand instances when Spot instances are not readily available. While beneficial for job execution, this feature has traditionally made it difficult to accurately track per-job Spot vs. on-demand usage.
    • User configurable hybrid policy: Users can specify a mix of on-demand and Spot instances for their jobs. This flexible, hybrid approach often results in complex, non-uniform usage patterns, further complicating cost categorisation.

    Iris has made a key difference in resolving these dilemmas. By providing granular, instance-level metrics including whether each instance is on-demand or Spot, Iris has greatly improved our visibility into per-job instance usage.

    This precise data enables us to isolate the on-demand instance usage, which was previously bundled in the total cost calculation. Similarly, it allows us to accurately gauge and consider the usage ratio of on-demand instances in hybrid policy scenarios.

    The enhanced transparency provided by Iris metrics allows us to standardise DBIU cost calculations, making them fairer for users who majorly or only use Spot instances. In other words, users need to pay more if they intentionally choose or fall back to on-demand instances for their jobs.

    The practical application of Iris in enhancing DBIU accuracy illustrates its potential in driving data-informed decisions and fostering fairness in resource usage and cost distribution.

    Case study 3: Optimising job configuration for better performance and cost efficiency

    One of the key utilities of iris is its potential to assist with job optimisation. For instance, we have been able to pinpoint jobs that were consistently over-provisioned and work with end-users to tune their job configurations.

    Through this exercise and continuous monitoring, we’ve seen substantial results from the job optimisations:

  • Cost reductions ranging from 20% to 50% for most jobs.
  • Positive feedback from users about improvements in job performance and cost efficiency.

  • By the way, interestingly, our analysis led us to identify certain the following patterns. These patterns could be leveraged to widen the impact of our optimisation efforts across multiple use-cases in our platforms:

    Pattern Recommendation
  • Job duration < 20 minutes
  • Input per core < 1GB
  • Total used instance is 2x/3x of max worker nodes
  • Use fixed number of workers nodes potentially speeding up performance and certainly reducing costs.
  • CPU utilisation < 25%
  • Cut max worker in half. E.g: 10 to 5 workers
  • Downgrade instance size a half. E.g: 4xlarge -> 2xlarge
  • Job has much shuffle
  • Bump the instance size and reduce the number of workers. E.g. bump 2xlarge -> 4xlarge and reduce number of workers from 100 -> 50
  • However, we acknowledge that these findings may not apply uniformly to every instance. The optimisation recommendations derived from these patterns might not yield the desired outcomes in all cases.

    The future of Iris

    Building upon its firm foundation as a robust Spark observability tool, we envision a future for Iris wherein it not only monitors metrics but provides actionable insights, discerns usage patterns, and drives predictions.

    Our plans to make Iris more accessible include developing APIs endpoint for platform teams to query performance by job names. Another addition we’re aiming for is the ability for Iris to provide resource tuning recommendations. By making platform-specific and job-specific recommendations easily accessible, we hope to assist platform teams in making informed, data-driven decisions on resource allocation and cost efficiency.

    We’re also looking to expand Iris’s capabilities with the development of a listener for Presto jobs, similar to the sparkMeasure tool currently used for Spark jobs. The listener would provide valuable metrics and insights into the performance of Presto jobs, opening up new avenues for optimisation and cost management.

    Another major focus will be building a feedback loop for Iris to further enhance accuracy, continually refine its models, and improve insights provided. This effort would greatly benefit from the close collaboration and inputs from platform teams and other tech leads, as their expertise aids in interpreting Iris’s metrics and predictions and validating its meaningfulness.

    In conclusion, as Iris continues to develop and mature, we foresee it evolving into a crucial tool for data-driven decision-making and proactive management of Spark applications, playing a significant role in the efficient usage of cloud computing resources.

    Conclusion

    The role of Iris as an observability tool for Spark jobs in the world of Big Data is rapidly evolving. Iris has proven to be more than a simple data collection tool; it is a platform that integrates advanced analytics with operational observability.

    Even though Iris is in its early stages, it’s already been instrumental in creating detailed visualisations of both real-time and historical data from varied platforms. Besides that, Iris has started making strides in its journey towards using machine learning models like K-means clustering to classify jobs, demonstrating its potential in helping operators fine-tune resource allocation.

    Using instance-level metrics, Iris is helping improve cost distribution fairness and accuracy, making it a potent tool for resource optimisation. Furthermore, the successful case study of reducing job costs and enhancing performance through resource reallocation provides a promising outlook into Iris’s future applicability.

    With ongoing development plans, such as the Presto listener and the creation of endpoints for broader accessibility, Iris is poised to become an integral tool for data-informed decision-making. As we strive to enhance Iris, we will continue to collaborate with platform teams and tech leads whose feedback is invaluable in fulfilling Iris’s potential.

    Our journey with Iris is a testament to Grab’s commitment to creating a data-informed and efficient cloud computing environment. Iris, with its observed and planned capabilities, is on its way to revolutionising the way resource allocation is managed and optimised.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    AI recommendations for descriptions in Amazon DataZone for enhanced business data cataloging and discovery is now generally available

    Post Syndicated from Varsha Velagapudi original https://aws.amazon.com/blogs/big-data/ai-recommendations-for-descriptions-in-amazon-datazone-for-enhanced-business-data-cataloging-and-discovery-is-now-generally-available/

    In March 2024, we announced the general availability of the generative artificial intelligence (AI) generated data descriptions in Amazon DataZone. In this post, we share what we heard from our customers that led us to add the AI-generated data descriptions and discuss specific customer use cases addressed by this capability. We also detail how the feature works and what criteria was applied for the model and prompt selection while building on Amazon Bedrock.

    Amazon DataZone enables you to discover, access, share, and govern data at scale across organizational boundaries, reducing the undifferentiated heavy lifting of making data and analytics tools accessible to everyone in the organization. With Amazon DataZone, data users like data engineers, data scientists, and data analysts can share and access data across AWS accounts using a unified data portal, allowing them to discover, use, and collaborate on this data across their teams and organizations. Additionally, data owners and data stewards can make data discovery simpler by adding business context to data while balancing access governance to the data in the user interface.

    What we hear from customers

    Organizations are adopting enterprise-wide data discovery and governance solutions like Amazon DataZone to unlock the value from petabytes, and even exabytes, of data spread across multiple departments, services, on-premises databases, and third-party sources (such as partner solutions and public datasets). Data consumers need detailed descriptions of the business context of a data asset and documentation about its recommended use cases to quickly identify the relevant data for their intended use case. Without the right metadata and documentation, data consumers overlook valuable datasets relevant to their use case or spend more time going back and forth with data producers to understand the data and its relevance for their use case—or worse, misuse the data for a purpose it was not intended for. For instance, a dataset designated for testing might mistakenly be used for financial forecasting, resulting in poor predictions. Data producers find it tedious and time consuming to maintain extensive and up-to-date documentation on their data and respond to continued questions from data consumers. As data proliferates across the data mesh, these challenges only intensify, often resulting in under-utilization of their data.

    Introducing generative AI-powered data descriptions

    With AI-generated descriptions in Amazon DataZone, data consumers have these recommended descriptions to identify data tables and columns for analysis, which enhances data discoverability and cuts down on back-and-forth communications with data producers. Data consumers have more contextualized data at their fingertips to inform their analysis. The automatically generated descriptions enable a richer search experience for data consumers because search results are now also based on detailed descriptions, possible use cases, and key columns. This feature also elevates data discovery and interpretation by providing recommendations on analytical applications for a dataset giving customers additional confidence in their analysis. Because data producers can generate contextual descriptions of data, its schema, and data insights with a single click, they are incentivized to make more data available to data consumers. With the addition of automatically generated descriptions, Amazon DataZone helps organizations interpret their extensive and distributed data repositories.

    The following is an example of the asset summary and use cases detailed description.

    Use cases served by generative AI-powered data descriptions

    The automatically generated descriptions capability in Amazon DataZone streamlines relevant descriptions, provides usage recommendations and ultimately enhances the overall efficiency of data-driven decision-making. It saves organizations time for catalog curation and speeds discovery for relevant use cases of the data. It offers the following benefits:

    • Aid search and discovery of valuable datasets – With the clarity provided by automatically generated descriptions, data consumers are less likely to overlook critical datasets through enhanced search and faster understanding, so every valuable insight from the data is recognized and utilized.
    • Guide data application – Misapplying data can lead to incorrect analyses, missed opportunities, or skewed results. Automatically generated descriptions offer AI-driven recommendations on how best to use datasets, helping customers apply them in contexts where they are appropriate and effective.
    • Increase efficiency in data documentation and discovery – Automatically generated descriptions streamline the traditionally tedious and manual process of data cataloging. This reduces the need for time-consuming manual documentation, making data more easily discoverable and comprehensible.

    Solution overview

    The AI recommendations feature in Amazon DataZone was built on Amazon Bedrock, a fully managed service that offers a choice of high-performing foundation models. To generate high-quality descriptions and impactful use cases, we use the available metadata on the asset such as the table name, column names, and optional metadata provided by the data producers. The recommendations don’t use any data that resides in the tables unless explicitly provided by the user as content in the metadata.

    To get the customized generations, we first infer the domain corresponding to the table (such as automotive industry, finance, or healthcare), which then guides the rest of the workflow towards generating customized descriptions and use cases. The generated table description contains information about how the columns are related to each other, as well as the overall meaning of the table, in the context of the identified industry segment. The table description also contains a narrative style description of the most important constituent columns. The use cases provided are also tailored to the domain identified, which are suitable not just for expert practitioners from the specific domain, but also for generalists.

    The generated descriptions are composed from LLM-produced outputs for table description, column description, and use cases, generated in a sequential order. For instance, the column descriptions are generated first by jointly passing the table name, schema (list of column names and their data types), and other available optional metadata. The obtained column descriptions are then used in conjunction with the table schema and metadata to obtain table descriptions and so on. This follows a consistent order like what a human would follow when trying to understand a table.

    The following diagram illustrates this workflow.

    Evaluating and selecting the foundation model and prompts

    Amazon DataZone manages the model(s) selection for the recommendation generation. The model(s) used can be updated or changed from time-to-time. Selecting the appropriate models and prompting strategies is a critical step in confirming the quality of the generated content, while also achieving low costs and low latencies. To realize this, we evaluated our workflow using multiple criteria on datasets that spanned more than 20 different industry domains before finalizing a model. Our evaluation mechanisms can be summarized as follows:

    • Tracking automated metrics for quality assessment – We tracked a combination of more than 10 supervised and unsupervised metrics to evaluate essential quality factors such as informativeness, conciseness, reliability, semantic coverage, coherence, and cohesiveness. This allowed us to capture and quantify the nuanced attributes of generated content, confirming that it meets our high standards for clarity and relevance.
    • Detecting inconsistencies and hallucinations – Next, we addressed the challenge of content reliability generated by LLMs through our self-consistency-based hallucination detection. This identifies any potential non-factuality in the generated content, and also serves as a proxy for confidence scores, as an additional layer of quality assurance.
    • Using large language models as judges – Lastly, our evaluation process incorporates a method of judgment: using multiple state-of-the-art large language models (LLMs) as evaluators. By using bias-mitigation techniques and aggregating the scores from these advanced models, we can obtain a well-rounded assessment of the content’s quality.

    The approach of using LLMs as a judge, hallucination detection, and automated metrics brings diverse perspectives into our evaluation, as a proxy for expert human evaluations.

    Getting started with generative AI-powered data descriptions

    To get started, log in to the Amazon DataZone data portal. Go to your asset in your data project and choose Generate summary to obtain the detailed description of the asset and its columns. Amazon DataZone uses the available metadata on the asset to generate the descriptions. You can optionally provide additional context as metadata in the readme section or metadata form content on the asset for more customized descriptions. For detailed instructions, refer to New generative AI capabilities for Amazon DataZone further simplify data cataloging and discovery (preview). For API instructions, see Using machine learning and generative AI.

    Amazon DataZone AI recommendations for descriptions is generally available in Amazon DataZone domains provisioned in the following AWS Regions: US East (N. Virginia), US West (Oregon), Asia Pacific (Tokyo), and Europe (Frankfurt).

    For pricing, you will be charged for input and output tokens for generating column descriptions, asset descriptions, and analytical use cases in AI recommendations for descriptions. For more details, see Amazon DataZone Pricing.

    Conclusion

    In this post, we discussed the challenges and key use cases for the new AI recommendations for descriptions feature in Amazon DataZone. We detailed how the feature works and how the model and prompt selection were done to provide the most useful recommendations.

    If you have any feedback or questions, leave them in the comments section.


    About the Authors

    Varsha Velagapudi is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about simplifying customers’ AI/ML and analytics journey to help them succeed in their day-to-day tasks. Outside of work, she enjoys playing with her 3-year old, reading, and traveling.

    Zhengyuan Shen is an Applied Scientist at Amazon AWS, specializing in advancements in AI, particularly in large language models and their application in data comprehension. He is passionate about leveraging innovative ML scientific solutions to enhance products or services, thereby simplifying the lives of customers through a seamless blend of science and engineering. Outside of work, he enjoys cooking, weightlifting, and playing poker.

    Balasubramaniam Srinivasan is an Applied Scientist at Amazon AWS, working on foundational models for structured data and natural sciences. He enjoys enriching ML models with domain-specific knowledge and inductive biases to delight customers. Outside of work, he enjoys playing and watching tennis and soccer.

    Deliver decompressed Amazon CloudWatch Logs to Amazon S3 and Splunk using Amazon Data Firehose

    Post Syndicated from Ranjit Kalidasan original https://aws.amazon.com/blogs/big-data/deliver-decompressed-amazon-cloudwatch-logs-to-amazon-s3-and-splunk-using-amazon-data-firehose/

    You can use Amazon Data Firehose to aggregate and deliver log events from your applications and services captured in Amazon CloudWatch Logs to your Amazon Simple Storage Service (Amazon S3) bucket and Splunk destinations, for use cases such as data analytics, security analysis, application troubleshooting etc. By default, CloudWatch Logs are delivered as gzip-compressed objects. You might want the data to be decompressed, or want logs to be delivered to Splunk, which requires decompressed data input, for application monitoring and auditing.

    AWS released a feature to support decompression of CloudWatch Logs in Firehose. With this new feature, you can specify an option in Firehose to decompress CloudWatch Logs. You no longer have to perform additional processing using AWS Lambda or post-processing to get decompressed logs, and can deliver decompressed data to Splunk. Additionally, you can use optional Firehose features such as record format conversion to convert CloudWatch Logs to Parquet or ORC, and dynamic partitioning to automatically group streaming records based on keys in the data (for example, by month) and deliver the grouped records to corresponding Amazon S3 prefixes.

    In this post, we look at how to enable the decompression feature for Splunk and Amazon S3 destinations. We start with Splunk and then Amazon S3 for new streams, then we address migration steps to take advantage of this feature and simplify your existing pipeline.

    Decompress CloudWatch Logs for Splunk

    You can use subscription filter in CloudWatch log groups to ingest data directly to Firehose or through Amazon Kinesis Data Streams.

    Note: For the CloudWatch Logs decompression feature, you need a HTTP Event Collector (HEC) data input created in Splunk, with indexer acknowledgement enabled and the source type. This is required to map to the right source type for the decompressed logs. When creating the HEC input, include the source type mapping (for example, aws:cloudtrail).

    To create a Firehose delivery stream for the decompression feature, complete the following steps:

    1. Provide your destination settings and select Raw endpoint as endpoint type.

    You can use a raw endpoint for the decompression feature to ingest both raw and JSON-formatted event data to Splunk. For example, VPC Flow Logs data is raw data, and AWS CloudTrail data is in JSON format.

    1. Enter the HEC token for Authentication token.
    2. To enable decompression feature, deselect Transform source records with AWS Lambda under Transform records.
    3. Select Turn on decompression and Turn on message extraction for Decompress source records from Amazon CloudWatch Logs.
    4. Select Turn on message extraction for the Splunk destination.

    Message extraction feature

    After decompression, CloudWatch Logs are in JSON format, as shown in the following figure. You can see the decompressed data has metadata information such as logGroup, logStream, and subscriptionFilters, and the actual data is included within the message field under logEvents (the following example shows an example of CloudTrail events in the CloudWatch Logs).

    When you enable message extraction, Firehose will extract just the contents of the message fields and concatenate the contents with a new line between them, as shown in following figure. With the CloudWatch Logs metadata filtered out with this feature, Splunk will successfully parse the actual log data and map to the source type configured in HEC token.

    Additionally, If you want to deliver these CloudWatch events to your Splunk destination in real time, you can use zero buffering, a new feature that was launched recently in Firehose. You can use this feature to set up 0 seconds as the buffer interval or any time interval between 0–60 seconds to deliver data to the Splunk destination in real time within seconds.

    With these settings, you can now seamlessly ingest decompressed CloudWatch log data into Splunk using Firehose.

    Decompress CloudWatch Logs for Amazon S3

    The CloudWatch Logs decompression feature for an Amazon S3 destination works similar to Splunk, where you can turn off data transformation using Lambda and turn on the decompression and message extraction options. You can use the decompression feature to write the log data as a text file to the Amazon S3 destination or use with other Amazon S3 destination features like record format conversion using Parquet or ORC, or dynamic partitioning to partition the data.

    Dynamic partitioning with decompression

    For Amazon S3 destination, Firehose supports dynamic partitioning, which enables you to continuously partition streaming data by using keys within data, and then deliver the data grouped by these keys into corresponding Amazon S3 prefixes. This enables you to run high-performance, cost-efficient analytics on streaming data in Amazon S3 using services such as Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and Amazon QuickSight. Partitioning your data minimizes the amount of data scanned, optimizes performance, and reduces costs of your analytics queries on Amazon S3.

    With the new decompression feature, you can perform dynamic partitioning without any Lambda function for mapping the partitioning keys on CloudWatch Logs. You can enable the Inline parsing for JSON option, scan the decompressed log data, and select the partitioning keys. The following screenshot shows an example where inline parsing is enabled for CloudTrail log data with a partitioning schema selected for account ID and AWS Region in the CloudTrail record.

    Record format conversion with decompression

    For CloudWatch Logs data, you can use the record format conversion feature on decompressed data for Amazon S3 destination. Firehose can convert the input data format from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries compared to row-oriented formats like JSON. You can use the features for record format conversion under the Transform and convert records settings to convert the CloudWatch log data to Parquet or ORC format. The following screenshot shows an example of record format conversion settings for Parquet format using an AWS Glue schema and table for CloudTrail log data. When the dynamic partitioning settings are configured, record format conversion works along with dynamic partitioning to create the files in the output format with a partition folder structure in the target S3 bucket.

    Migrate existing delivery streams for decompression

    If you want to migrate an existing Firehose stream that uses Lambda for decompression to this new decompression feature of Firehose, refer to the steps outlined in Enabling and disabling decompression.

    Pricing

    The Firehose decompression feature decompress the data and charges per GB of decompressed data. To understand decompression pricing, refer to Amazon Data Firehose pricing.

    Clean up

    To avoid incurring future charges, delete the resources you created in the following order:

    1. Delete the CloudWatch Logs subscription filter.
    2. Delete the Firehose delivery stream.
    3. Delete the S3 buckets.

    Conclusion

    The decompression and message extraction feature of Firehose simplifies delivery of CloudWatch Logs to Amazon S3 and Splunk destinations without requiring any code development or additional processing. For an Amazon S3 destination, you can use Parquet or ORC conversion and dynamic partitioning capabilities on decompressed data.

    For more information, refer to the following resources:


    About the Authors

    Ranjit Kalidasan is a Senior Solutions Architect with Amazon Web Services based in Boston, Massachusetts. He is a Partner Solutions Architect helping security ISV partners co-build and co-market solutions with AWS. He brings over 25 years of experience in information technology helping global customers implement complex solutions for security and analytics. You can connect with Ranjit on LinkedIn.

    Phaneendra Vuliyaragoli is a Product Management Lead for Amazon Data Firehose at AWS. In this role, Phaneendra leads the product and go-to-market strategy for Amazon Data Firehose.

    Nexthink scales to trillions of events per day with Amazon MSK

    Post Syndicated from Moe Haidar original https://aws.amazon.com/blogs/big-data/nexthink-scales-to-trillions-of-events-per-day-with-amazon-msk/

    Real-time data streaming and event processing present scalability and management challenges. AWS offers a broad selection of managed real-time data streaming services to effortlessly run these workloads at any scale.

    In this post, Nexthink shares how Amazon Managed Streaming for Apache Kafka (Amazon MSK) empowered them to achieve massive scale in event processing. Experiencing business hyper-growth, Nexthink migrated to AWS to overcome the scaling limitations of on-premises solutions. With Amazon MSK, Nexthink now seamlessly processes trillions of events per day, reaching over 5 GB per second of aggregated throughput.

    In the following sections, Nexthink introduces their product and the need for scalability. They then highlight the challenges of their legacy on-premises application and present their transition to a cloud-centered software as a service (SaaS) architecture powered by Amazon MSK. Finally, Nexthink details the benefits achieved by adopting Amazon MSK.

    Nexthink’s need to scale

    Nexthink is the leader in digital employee experience (DeX). The company is shaping the future of work by providing IT leaders and C-levels with insights into employees’ daily technology experiences at the device and application level. This allows IT to evolve from reactive problem-solving to proactive optimization.

    The Nexthink Infinity platform combines analytics, monitoring, automation, and more to manage the employee digital experience. By collecting device and application events, processing them in real time, and storing them, our platform analyzes data to solve problems and boost experiences for over 15 million employees across five continents.

    In just 3 years, Nexthink’s business grew tenfold, and with the introduction of more real-time data our application had to scale from processing 200 MB per second to 5 GB per second and trillions of events daily. To enable this growth, we modernized our application from an on-premises single-tenant monolith to a cloud-based scalable SaaS solution powered by Amazon MSK.

    The next sections detail our modernization journey, including the challenges we faced and the benefits we realized with our new cloud-centered, AWS-based architecture.

    The on-premises solution and its challenges

    Let’s first explore our previous on-premises solution, Nexthink V6, before examining how Amazon MSK addressed its challenges. The following diagram illustrates its architecture.

    Nexthink v6

    V6 was made up of two monolithic, single-tenant Java and C++ applications that were tightly coupled. The portal was a backend-for-frontend Java application, and the core engine was an in-house C++ in-memory database application that was also handling device connections, data ingestion, aggregation, and querying. By bundling all these functions together, the engine became difficult to manage and improve.

    V6 also lacked scalability. Initially supporting 10,000 devices, some new tenants had over 300,000 devices. We reacted by deploying multiple V6 engines per tenant, increasing complexity and cost, hampering user experience, and delaying time to market. This also led to longer proof of concept and onboarding cycles, which hurt the business.

    Furthermore, the absence of a streaming platform like Kafka created dependencies between teams through tight HTTP/gRPC coupling. Additionally, teams couldn’t access real-time events before ingestion into the database, limiting feature development. We also lacked a data buffer, risking potential data loss during outages. Such constraints impeded innovation and increased risks.

    In summary, although the V6 system served its initial purpose, reinventing it with cloud-centered technologies became imperative to enhance scalability, reliability, and foster innovation by our engineering and product teams.

    Transitioning to a cloud-centered architecture with Amazon MSK

    To achieve our modernization goals, after thorough research and iterations, we implemented an event-driven microservices design on Amazon Elastic Kubernetes Service (Amazon EKS), using Kafka on Amazon MSK for distributed event storage and streaming.

    Our transition from the v6 on-prem solution to the cloud-centered platform was phased over four iterations:

    • Phase 1 – We lifted and shifted from on premises to virtual machines in the cloud, reducing operational complexities and accelerating proof of concept cycles while transparently migrating customers.
    • Phase 2 – We extended the cloud architecture by implementing new product features with microservices and self-managed Kafka on Kubernetes. However, operating Kafka clusters ourselves proved overly difficult, leading us to Phase 3.
    • Phase 3 – We switched from self-managed Kafka to Amazon MSK, improving stability and reducing operational costs. We realized that managing Kafka wasn’t our core competency or differentiator, and the overhead was high. Amazon MSK enabled us to focus on our core application, freeing us from the burden of undifferentiated Kafka management.
    • Phase 4 – Finally, we eliminated all legacy components, completing the transition to a fully cloud-centered SaaS platform. This multi-year journey of learning and transformation took 3 years.

    Today, after our successful transition, we use Amazon MSK for two key functions:

    • Real-time data ingestion and processing of trillions of daily events from over 15 million devices worldwide, as illustrated in the following figure.

    Nexthink Architecture Ingestion

    • Enabling an event-driven system that decouples data producers and consumers, as depicted in the following figure.

    Nexthink Architecture Event Driven

    To further enhance our scalability and resilience, we adopted a cell-based architecture using the wide availability of Amazon MSK across AWS Regions. We currently operate over 10 cells, each representing an independent regional deployment of our SaaS solution. This cell-based approach minimizes the area of impact in case of issues, addresses data residency requirements, and enables horizontal scaling across AWS Regions, as illustrated in the following figure.

    Nexthink Architecture Cells

    Benefits of Amazon MSK

    Amazon MSK has been critical in enabling our event-driven design. In this section, we outline the main benefits we gained from its adoption.

    Improved data resilience

    In our new architecture, data from devices is pushed directly to Kafka topics in Amazon MSK, which provides high availability and resilience. This makes sure that events can be safely received and stored at any time. Our services consuming this data inherit the same resilience from Amazon MSK. If our backend ingestion services face disruptions, no event is lost, because Kafka retains all published messages. When our services resume, they seamlessly continue processing from where they left off, thanks to Kafka’s producer semantics, which allow processing messages exactly-once, at-least-once, or at-most-once based on application needs.

    Amazon MSK enables us to tailor the data retention duration to our specific requirements, ranging from seconds to unlimited duration. This flexibility grants uninterrupted data availability to our application, which wasn’t possible with our previous architecture. Furthermore, to safeguard data integrity in the event of processing errors or corruption, Kafka enabled us to implement a data replay mechanism, ensuring data consistency and reliability.

    Organizational scaling

    By adopting an event-driven architecture with Amazon MSK, we decomposed our monolithic application into loosely coupled, stateless microservices communicating asynchronously via Kafka topics. This approach enabled our engineering organization to scale rapidly from just 4–5 teams in 2019 to over 40 teams and approximately 350 engineers today.

    The loose coupling between event publishers and subscribers empowered teams to focus on distinct domains, such as data ingestion, identification services, and data lakes. Teams could develop solutions independently within their domains, communicating through Kafka topics without tight coupling. This architecture accelerated feature development by minimizing the risk of new features impacting existing ones. Teams could efficiently consume events published by others, offering new capabilities more rapidly while reducing cross-team dependencies.

    The following figure illustrates the seamless workflow of adding new domains to our system.

    Adding domains

    Furthermore, the event-driven design allowed teams to build stateless services that could seamlessly auto scale based on MSK metrics like messages per second. This event-driven scalability eliminated the need for extensive capacity planning and manual scaling efforts, freeing up development time.

    By using an event-driven microservices architecture on Amazon MSK, we achieved organizational agility, enhanced scalability, and accelerated innovation while minimizing operational overhead.

    Seamless infrastructure scaling

    Nexthink’s business grew tenfold in 3 years, and many new capabilities were added to the product, leading to a substantial increase in traffic from 200 MB per second to 5 GB per second. This exponential data growth was enabled by the robust scalability of Amazon MSK. Achieving such scale with an on-premises solution would have been challenging and expensive, if not infeasible.

    Attempting to self-manage Kafka imposed unnecessary operational overhead without providing business value. Running it with just 5% of today’s traffic was already complex and required two engineers. At today’s volumes, we estimated needing 6–10 dedicated staff, increasing costs and diverting resources away from core priorities.

    Real-time capabilities

    By channeling all our data through Amazon MSK, we enabled real-time processing of events. This unlocked capabilities like real-time alerts, event-driven triggers, and webhooks that were previously unattainable. As such, Amazon MSK was instrumental in facilitating our event-driven architecture and powering impactful innovations.

    Secure data access

    Transitioning to our new architecture, we met our security and data integrity goals. With Kafka ACLs, we enforced strict access controls, allowing consumers and producers to only interact with authorized topics. We based these granular data access controls on criteria like data type, domain, and team.

    To securely scale decentralized management of topics, we introduced proprietary Kubernetes Custom Resource Definitions (CRDs). These CRDs enabled teams to independently manage their own topics, settings, and ACLs without compromising security.

    Amazon MSK encryption made sure that the data remained encrypted at rest and in transit. We also introduced a Bring Your Own Key (BYOK) option, allowing application-level encryption with customer keys for all single-tenant and multi-tenant topics.

    Enhanced observability

    Amazon MSK gave us great visibility into our data flows. The out-of-the-box Amazon CloudWatch metrics let us see the amount and types of data flowing through each topic and cluster. This helped us quantify the usage of our product features by tracking data volumes at the topic level. The Amazon MSK operational metrics enabled effortless monitoring and right-sizing of clusters and brokers. Overall, the rich observability of Amazon MSK facilitated data-driven decisions about architecture and product features.

    Conclusion

    Nexthink’s journey from an on-premises monolith to a cloud SaaS was streamlined by using Amazon MSK, a fully managed Kafka service. Amazon MSK allowed us to scale seamlessly while benefiting from enterprise-grade reliability and security. By offloading Kafka management to AWS, we could stay focused on our core business and innovate faster.

    Going forward, we plan to further improve performance, costs, and scalability by adopting Amazon MSK capabilities such as tiered storage and AWS Graviton-based EC2 instance types.

    We are also working closely with the Amazon MSK team to prepare for upcoming service features. Rapidly adopting new capabilities will help us remain at the forefront of innovation while continuing to grow our business.

    To learn more about how Nexthink uses AWS to serve its global customer base, explore the Nexthink on AWS case study. Additionally, discover other customer success stories with Amazon MSK by visiting the Amazon MSK blog category.


    About the Authors

    Moe HaidarMoe Haidar is a principal engineer and special projects lead @ CTO office of Nexthink. He has been involved with AWS since 2018 and is a key contributor to the cloud transformation of the Nexthink platform to AWS. His focus is on product and technology incubation and architecture, but he also loves doing hands-on activities to keep his knowledge of technologies sharp and up to date. He still contributes heavily to the code base and loves to tackle complex problems.
    Simone PomataSimone Pomata is Senior Solutions Architect at AWS. He has worked enthusiastically in the tech industry for more than 10 years. At AWS, he helps customers succeed in building new technologies every day.
    Magdalena GargasMagdalena Gargas is a Solutions Architect passionate about technology and solving customer challenges. At AWS, she works mostly with software companies, helping them innovate in the cloud. She participates in industry events, sharing insights and contributing to the advancement of the containerization field.

    Krones real-time production line monitoring with Amazon Managed Service for Apache Flink

    Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/krones-real-time-production-line-monitoring-with-amazon-managed-service-for-apache-flink/

    Krones provides breweries, beverage bottlers, and food producers all over the world with individual machines and complete production lines. Every day, millions of glass bottles, cans, and PET containers run through a Krones line. Production lines are complex systems with lots of possible errors that could stall the line and decrease the production yield. Krones wants to detect the failure as early as possible (sometimes even before it happens) and notify production line operators to increase reliability and output. So how to detect a failure? Krones equips their lines with sensors for data collection, which can then be evaluated against rules. Krones, as the line manufacturer, as well as the line operator have the possibility to create monitoring rules for machines. Therefore, beverage bottlers and other operators can define their own margin of error for the line. In the past, Krones used a system based on a time series database. The main challenges were that this system was hard to debug and also queries represented the current state of machines but not the state transitions.

    This post shows how Krones built a streaming solution to monitor their lines, based on Amazon Kinesis and Amazon Managed Service for Apache Flink. These fully managed services reduce the complexity of building streaming applications with Apache Flink. Managed Service for Apache Flink manages the underlying Apache Flink components that provide durable application state, metrics, logs, and more, and Kinesis enables you to cost-effectively process streaming data at any scale. If you want to get started with your own Apache Flink application, check out the GitHub repository for samples using the Java, Python, or SQL APIs of Flink.

    Overview of solution

    Krones’s line monitoring is part of the Krones Shopfloor Guidance system. It provides support in the organization, prioritization, management, and documentation of all activities in the company. It allows them to notify an operator if the machine is stopped or materials are required, regardless where the operator is in the line. Proven condition monitoring rules are already built-in but can also be user defined via the user interface. For example, if a certain data point that is monitored violates a threshold, there can be a text message or trigger for a maintenance order on the line.

    The condition monitoring and rule evaluation system is built on AWS, using AWS analytics services. The following diagram illustrates the architecture.

    Architecture Diagram for Krones Production Line Monitoring

    Almost every data streaming application consists of five layers: data source, stream ingestion, stream storage, stream processing, and one or more destinations. In the following sections, we dive deeper into each layer and how the line monitoring solution, built by Krones, works in detail.

    Data source

    The data is gathered by a service running on an edge device reading several protocols like Siemens S7 or OPC/UA. Raw data is preprocessed to create a unified JSON structure, which makes it easier to process later on in the rule engine. A sample payload converted to JSON might look like the following:

    {
      "version": 1,
      "timestamp": 1234,
      "equipmentId": "84068f2f-3f39-4b9c-a995-d2a84d878689",
      "tag": "water_temperature",
      "value": 13.45,
      "quality": "Ok",
      "meta": {      
        "sequenceNumber": 123,
        "flags": ["Fst", "Lst", "Wmk", "Syn", "Ats"],
        "createdAt": 12345690,
        "sourceId": "filling_machine"
      }
    }

    Stream ingestion

    AWS IoT Greengrass is an open source Internet of Things (IoT) edge runtime and cloud service. This allows you to act on data locally and aggregate and filter device data. AWS IoT Greengrass provides prebuilt components that can be deployed to the edge. The production line solution uses the stream manager component, which can process data and transfer it to AWS destinations such as AWS IoT Analytics, Amazon Simple Storage Service (Amazon S3), and Kinesis. The stream manager buffers and aggregates records, then sends it to a Kinesis data stream.

    Stream storage

    The job of the stream storage is to buffer messages in a fault tolerant way and make it available for consumption to one or more consumer applications. To achieve this on AWS, the most common technologies are Kinesis and Amazon Managed Streaming for Apache Kafka (Amazon MSK). For storing our sensor data from production lines, Krones choose Kinesis. Kinesis is a serverless streaming data service that works at any scale with low latency. Shards within a Kinesis data stream are a uniquely identified sequence of data records, where a stream is composed of one or more shards. Each shard has 2 MB/s of read capacity and 1 MB/s write capacity (with max 1,000 records/s). To avoid hitting those limits, data should be distributed among shards as evenly as possible. Every record that is sent to Kinesis has a partition key, which is used to group data into a shard. Therefore, you want to have a large number of partition keys to distribute the load evenly. The stream manager running on AWS IoT Greengrass supports random partition key assignments, which means that all records end up in a random shard and the load is distributed evenly. A disadvantage of random partition key assignments is that records aren’t stored in order in Kinesis. We explain how to solve this in the next section, where we talk about watermarks.

    Watermarks

    A watermark is a mechanism used to track and measure the progress of event time in a data stream. The event time is the timestamp from when the event was created at the source. The watermark indicates the timely progress of the stream processing application, so all events with an earlier or equal timestamp are considered as processed. This information is essential for Flink to advance event time and trigger relevant computations, such as window evaluations. The allowed lag between event time and watermark can be configured to determine how long to wait for late data before considering a window complete and advancing the watermark.

    Krones has systems all around the globe, and needed to handle late arrivals due to connection losses or other network constraints. They started out by monitoring late arrivals and setting the default Flink late handling to the maximum value they saw in this metric. They experienced issues with time synchronization from the edge devices, which lead them to a more sophisticated way of watermarking. They built a global watermark for all the senders and used the lowest value as the watermark. The timestamps are stored in a HashMap for all incoming events. When the watermarks are emitted periodically, the smallest value of this HashMap is used. To avoid stalling of watermarks by missing data, they configured an idleTimeOut parameter, which ignores timestamps that are older than a certain threshold. This increases latency but gives strong data consistency.

    public class BucketWatermarkGenerator implements WatermarkGenerator<DataPointEvent> {
    private HashMap <String, WatermarkAndTimestamp> lastTimestamps;
    private Long idleTimeOut;
    private long maxOutOfOrderness;
    }
    

    Stream processing

    After the data is collected from sensors and ingested into Kinesis, it needs to be evaluated by a rule engine. A rule in this system represents the state of a single metric (such as temperature) or a collection of metrics. To interpret a metric, more than one data point is used, which is a stateful calculation. In this section, we dive deeper into the keyed state and broadcast state in Apache Flink and how they’re used to build the Krones rule engine.

    Control stream and broadcast state pattern

    In Apache Flink, state refers to the ability of the system to store and manage information persistently across time and operations, enabling the processing of streaming data with support for stateful computations.

    The broadcast state pattern allows the distribution of a state to all parallel instances of an operator. Therefore, all operators have the same state and data can be processed using this same state. This read-only data can be ingested by using a control stream. A control stream is a regular data stream, but usually with a much lower data rate. This pattern allows you to dynamically update the state on all operators, enabling the user to change the state and behavior of the application without the need for a redeploy. More precisely, the distribution of the state is done by the use of a control stream. By adding a new record into the control stream, all operators receive this update and are using the new state for the processing of new messages.

    This allows users of Krones application to ingest new rules into the Flink application without restarting it. This avoids downtime and gives a great user experience as changes happen in real time. A rule covers a scenario in order to detect a process deviation. Sometimes, the machine data is not as easy to interpret as it might look at first glance. If a temperature sensor is sending high values, this might indicate an error, but also be the effect of an ongoing maintenance procedure. It’s important to put metrics in context and filter some values. This is achieved by a concept called grouping.

    Grouping of metrics

    The grouping of data and metrics allows you to define the relevance of incoming data and produce accurate results. Let’s walk through the example in the following figure.

    Grouping of metrics

    In Step 1, we define two condition groups. Group 1 collects the machine state and which product is going through the line. Group 2 uses the value of the temperature and pressure sensors. A condition group can have different states depending on the values it receives. In this example, group 1 receives data that the machine is running, and the one-liter bottle is selected as the product; this gives this group the state ACTIVE. Group 2 has metrics for temperature and pressure; both metrics are above their thresholds for more than 5 minutes. This results in group 2 being in a WARNING state. This means group 1 reports that everything is fine and group 2 does not. In Step 2, weights are added to the groups. This is needed in some situations, because groups might report conflicting information. In this scenario, group 1 reports ACTIVE and group 2 reports WARNING, so it’s not clear to the system what the state of the line is. After adding the weights, the states can be ranked, as shown in step 3. Lastly, the highest ranked state is chosen as the winning one, as shown in Step 4.

    After the rules are evaluated and the final machine state is defined, the results will be further processed. The action taken depends on the rule configuration; this can be a notification to the line operator to restock materials, do some maintenance, or just a visual update on the dashboard. This part of the system, which evaluates metrics and rules and takes actions based on the results, is referred to as a rule engine.

    Scaling the rule engine

    By letting users build their own rules, the rule engine can have a high number of rules that it needs to evaluate, and some rules might use the same sensor data as other rules. Flink is a distributed system that scales very well horizontally. To distribute a data stream to several tasks, you can use the keyBy() method. This allows you to partition a data stream in a logical way and send parts of the data to different task managers. This is often done by choosing an arbitrary key so you get an evenly distributed load. In this case, Krones added a ruleId to the data point and used it as a key. Otherwise, data points that are needed are processed by another task. The keyed data stream can be used across all rules just like a regular variable.

    Destinations

    When a rule changes its state, the information is sent to a Kinesis stream and then via Amazon EventBridge to consumers. One of the consumers creates a notification from the event that is transmitted to the production line and alerts the personnel to act. To be able to analyze the rule state changes, another service writes the data to an Amazon DynamoDB table for fast access and a TTL is in place to offload long-term history to Amazon S3 for further reporting.

    Conclusion

    In this post, we showed you how Krones built a real-time production line monitoring system on AWS. Managed Service for Apache Flink allowed the Krones team to get started quickly by focusing on application development rather than infrastructure. The real-time capabilities of Flink enabled Krones to reduce machine downtime by 10% and increase efficiency up to 5%.

    If you want to build your own streaming applications, check out the available samples on the GitHub repository. If you want to extend your Flink application with custom connectors, see Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink. The Async Sink is available in Apache Flink version 1.15.1 and later.


    About the Authors

    Florian Mair is a Senior Solutions Architect and data streaming expert at AWS. He is a technologist that helps customers in Europe succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

    Emil Dietl is a Senior Tech Lead at Krones specializing in data engineering, with a key field in Apache Flink and microservices. His work often involves the development and maintenance of mission-critical software. Outside of his professional life, he deeply values spending quality time with his family.

    Simon Peyer is a Solutions Architect at AWS based in Switzerland. He is a practical doer and is passionate about connecting technology and people using AWS Cloud services. A special focus for him is data streaming and automations. Besides work, Simon enjoys his family, the outdoors, and hiking in the mountains.

    Improve healthcare services through patient 360: A zero-ETL approach to enable near real-time data analytics

    Post Syndicated from Saeed Barghi original https://aws.amazon.com/blogs/big-data/improve-healthcare-services-through-patient-360-a-zero-etl-approach-to-enable-near-real-time-data-analytics/

    Healthcare providers have an opportunity to improve the patient experience by collecting and analyzing broader and more diverse datasets. This includes patient medical history, allergies, immunizations, family disease history, and individuals’ lifestyle data such as workout habits. Having access to those datasets and forming a 360-degree view of patients allows healthcare providers such as claim analysts to see a broader context about each patient and personalize the care they provide for every individual. This is underpinned by building a complete patient profile that enables claim analysts to identify patterns, trends, potential gaps in care, and adherence to care plans. They can then use the result of their analysis to understand a patient’s health status, treatment history, and past or upcoming doctor consultations to make more informed decisions, streamline the claim management process, and improve operational outcomes. Achieving this will also improve general public health through better and more timely interventions, identify health risks through predictive analytics, and accelerate the research and development process.

    AWS has invested in a zero-ETL (extract, transform, and load) future so that builders can focus more on creating value from data, instead of having to spend time preparing data for analysis. The solution proposed in this post follows a zero-ETL approach to data integration to facilitate near real-time analytics and deliver a more personalized patient experience. The solution uses AWS services such as AWS HealthLake, Amazon Redshift, Amazon Kinesis Data Streams, and AWS Lake Formation to build a 360 view of patients. These services enable you to collect and analyze data in near real time and put a comprehensive data governance framework in place that uses granular access control to secure sensitive data from unauthorized users.

    Zero-ETL refers to a set of features on the AWS Cloud that enable integrating different data sources with Amazon Redshift:

    Solution overview

    Organizations in the healthcare industry are currently spending a significant amount of time and money on building complex ETL pipelines for data movement and integration. This means data will be replicated across multiple data stores via bespoke and in some cases hand-written ETL jobs, resulting in data inconsistency, latency, and potential security and privacy breaches.

    With support for querying cross-account Apache Iceberg tables via Amazon Redshift, you can now build a more comprehensive patient-360 analysis by querying all patient data from one place. This means you can seamlessly combine information such as clinical data stored in HealthLake with data stored in operational databases such as a patient relationship management system, together with data produced from wearable devices in near real-time. Having access to all this data enables healthcare organizations to form a holistic view of patients, improve care coordination across multiple organizations, and provide highly personalized care for each individual.

    The following diagram depicts the high-level solution we build to achieve these outcomes.

    Deploy the solution

    You can use the following AWS CloudFormation template to deploy the solution components:

    This stack creates the following resources and necessary permissions to integrate the services:

    AWS Solution setup

    AWS HealthLake

    AWS HealthLake enables organizations in the health industry to securely store, transform, transact, and analyze health data. It stores data in HL7 FHIR format, which is an interoperability standard designed for quick and efficient exchange of health data. When you create a HealthLake data store, a Fast Healthcare Interoperability Resources (FHIR) data repository is made available via a RESTful API endpoint. Simultaneously and as part of AWS HealthLake managed service, the nested JSON FHIR data undergoes an ETL process and is stored in Apache Iceberg open table format in Amazon S3.

    To create an AWS HealthLake data store, refer to Getting started with AWS HealthLake. Make sure to select the option Preload sample data when creating your data store.

    In real-world scenarios and when you use AWS HealthLake in production environments, you don’t need to load sample data into your AWS HealthLake data store. Instead, you can use FHIR REST API operations to manage and search resources in your AWS HealthLake data store.

    We use two tables from the sample data stored in HealthLake: patient and allergyintolerance.

    Query AWS HealthLake tables with Redshift Serverless

    Amazon Redshift is the data warehousing service available on the AWS Cloud that provides up to six times better price-performance than any other cloud data warehouses in the market, with a fully managed, AI-powered, massively parallel processing (MPP) data warehouse built for performance, scale, and availability. With continuous innovations added to Amazon Redshift, it is now more than just a data warehouse. It enables organizations of different sizes and in different industries to access all the data they have in their AWS environments and analyze it from one single location with a set of features under the zero-ETL umbrella. Amazon Redshift integrates with AWS HealthLake and data lakes through Redshift Spectrum and Amazon S3 auto-copy features, enabling you to query data directly from files on Amazon S3.

    Query AWS HealthLake data with Amazon Redshift

    Amazon Redshift makes it straightforward to query the data stored in S3-based data lakes with automatic mounting of an AWS Glue Data Catalog in the Redshift query editor v2. This means you no longer have to create an external schema in Amazon Redshift to use the data lake tables cataloged in the Data Catalog. To get started with this feature, see Querying the AWS Glue Data Catalog. After it is set up and you’re connected to the Redshift query editor v2, complete the following steps:

    1. Validate that your tables are visible in the query editor V2. The Data Catalog objects are listed under the awsdatacatalog database.

    FHIR data stored in AWS HealthLake is highly nested. To learn about how to un-nest semi-structured data with Amazon Redshift, see Tutorial: Querying nested data with Amazon Redshift Spectrum.

    1. Use the following query to un-nest the allergyintolerance and patient tables, join them together, and get patient details and their allergies:
      WITH patient_allergy AS 
      (
          SELECT
              resourcetype, 
              c AS allery_category,
              a."patient"."reference",
              SUBSTRING(a."patient"."reference", 9, LEN(a."patient"."reference")) AS patient_id,
              a.recordeddate AS allergy_record_date,
              NVL(cd."code", 'NA') AS allergy_code,
              NVL(cd.display, 'NA') AS allergy_description
      
          FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."allergyintolerance" a
                  LEFT JOIN a.category c ON TRUE
                  LEFT JOIN a.reaction r ON TRUE
                  LEFT JOIN r.manifestation m ON TRUE
                  LEFT JOIN m.coding cd ON TRUE
      ), patinet_info AS
      (
          SELECT id,
                  gender,
                  g as given_name,
                  n.family as family_name,
                  pr as prefix
      
          FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."patient" p
                  LEFT JOIN p.name n ON TRUE
                  LEFT JOIN n.given g ON TRUE
                  LEFT JOIN n.prefix pr ON TRUE
      )
      SELECT DISTINCT p.id, 
              p.gender, 
              p.prefix,
              p.given_name,
              p.family_name,
              pa.allery_category,
              pa.allergy_code,
              pa.allergy_description
      from patient_allergy pa
          JOIN patinet_info p
              ON pa.patient_id = p.id
      ORDER BY p.id, pa.allergy_code
      ;
      

    To eliminate the need for Amazon Redshift to un-nest data every time a query is run, you can create a materialized view to hold un-nested and flattened data. Materialized views are an effective mechanism to deal with complex and repeating queries. They contain a precomputed result set, based on a SQL query over one or more base tables. You can issue SELECT statements to query a materialized view, in the same way that you can query other tables or views in the database.

    1. Use the following SQL to create a materialized view. You use it later to build a complete view of patients:
      CREATE MATERIALIZED VIEW patient_allergy_info AUTO REFRESH YES AS
      WITH patient_allergy AS 
      (
          SELECT
              resourcetype, 
              c AS allery_category,
              a."patient"."reference",
              SUBSTRING(a."patient"."reference", 9, LEN(a."patient"."reference")) AS patient_id,
              a.recordeddate AS allergy_record_date,
              NVL(cd."code", 'NA') AS allergy_code,
              NVL(cd.display, 'NA') AS allergy_description
      
          FROM
              "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."allergyintolerance" a
                  LEFT JOIN a.category c ON TRUE
                  LEFT JOIN a.reaction r ON TRUE
                  LEFT JOIN r.manifestation m ON TRUE
                  LEFT JOIN m.coding cd ON TRUE
      ), patinet_info AS
      (
          SELECT id,
                  gender,
                  g as given_name,
                  n.family as family_name,
                  pr as prefix
      
          FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."patient" p
                  LEFT JOIN p.name n ON TRUE
                  LEFT JOIN n.given g ON TRUE
                  LEFT JOIN n.prefix pr ON TRUE
      )
      SELECT DISTINCT p.id, 
              p.gender, 
              p.prefix,
              p.given_name,
              p.family_name,
              pa.allery_category,
              pa.allergy_code,
              pa.allergy_description
      from patient_allergy pa
          JOIN patinet_info p
              ON pa.patient_id = p.id
      ORDER BY p.id, pa.allergy_code
      ;
      

    You have confirmed you can query data in AWS HealthLake via Amazon Redshift. Next, you set up zero-ETL integration between Amazon Redshift and Amazon Aurora MySQL.

    Set up zero-ETL integration between Amazon Aurora MySQL and Redshift Serverless

    Applications such as front-desk software, which are used to schedule appointments and register new patients, store data in OLTP databases such as Aurora. To get data out of OLTP databases and have them ready for analytics use cases, data teams might have to spend a considerable amount of time to build, test, and deploy ETL jobs that are complex to maintain and scale.

    With the Amazon Redshift zero-ETL integration with Amazon Aurora MySQL, you can run analytics on the data stored in OLTP databases and combine them with the rest of the data in Amazon Redshift and AWS HealthLake in near real time. In the next steps in this section, we connect to a MySQL database and set up zero-ETL integration with Amazon Redshift.

    Connect to an Aurora MySQL database and set up data

    Connect to your Aurora MySQL database using your editor of choice using AdminUsername and AdminPassword that you entered when running the CloudFormation stack. (For simplicity, it is the same for Amazon Redshift and Aurora.)

    When you’re connected to your database, complete the following steps:

    1. Create a new database by running the following command:
      CREATE DATABASE front_desk_app_db;

    2. Create a new table. This table simulates storing patient information as they visit clinics and other healthcare centers. For simplicity and to demonstrate specific capabilities, we assume that patient IDs are the same in AWS HealthLake and the front-of-office application. In real-world scenarios, this can be a hashed version of a national health care number:
      CREATE TABLE patient_appointment ( 
            patient_id varchar(250), 
            gender varchar(1), 
            date_of_birth date, 
            appointment_datetime datetime, 
            phone_number varchar(15), 
            PRIMARY KEY (patient_id, appointment_datetime) 
      );

    Having a primary key in the table is mandatory for zero-ETL integration to work.

    1. Insert new records into the source table in the Aurora MySQL database. To demonstrate the required functionalities, make sure the patient_id of the sample records inserted into the MySQL database match the ones in AWS HealthLake. Replace [patient_id_1] and [patient_id_2] in the following query with the ones from the Redshift query you ran previously (the query that joined allergyintolerance and patient):
      INSERT INTO front_desk_app_db.patient_appointment (patient_id, gender, date_of_birth, appointment_datetime, phone_number)
      
      VALUES([PATIENT_ID_1], 'F', '1988-7-04', '2023-12-19 10:15:00', '0401401401'),
      ([PATIENT_ID_1], 'F', '1988-7-04', '2023-09-19 11:00:00', '0401401401'),
      ([PATIENT_ID_1], 'F', '1988-7-04', '2023-06-06 14:30:00', '0401401401'),
      ([PATIENT_ID_2], 'F', '1972-11-14', '2023-12-19 08:15:00', '0401401402'),
      ([PATIENT_ID_2], 'F', '1972-11-14', '2023-01-09 12:15:00', '0401401402');

    Now that your source table is populated with sample records, you can set up zero-ETL and have data ingested into Amazon Redshift.

    Set up zero-ETL integration between Amazon Aurora MySQL and Amazon Redshift

    Complete the following steps to create your zero-ETL integration:

    1. On the Amazon RDS console, choose Databases in the navigation pane.
    2. Choose the DB identifier of your cluster (not the instance).
    3. On the Zero-ETL Integration tab, choose Create zero-ETL integration.
    4. Follow the steps to create your integration.

    Create a Redshift database from the integration

    Next, you create a target database from the integration. You can do this by running a couple of simple SQL commands on Amazon Redshift. Log in to the query editor V2 and run the following commands:

    1. Get the integration ID of the zero-ETL you set up between your source database and Amazon Redshift:
      SELECT * FROM svv_integration;

    2. Create a database using the integration ID:
      CREATE DATABASE ztl_demo FROM INTEGRATION '[INTEGRATION_ID ';

    3. Query the database and validate that a new table is created and populated with data from your source MySQL database:
      SELECT * FROM ztl_demo.front_desk_app_db.patient_appointment;

    It might take a few seconds for the first set of records to appear in Amazon Redshift.

    This shows that the integration is working as expected. To validate it further, you can insert a new record in your Aurora MySQL database, and it will be available in Amazon Redshift for querying in near real time within a few seconds.

    Set up streaming ingestion for Amazon Redshift

    Another aspect of zero-ETL on AWS, for real-time and streaming data, is realized through Amazon Redshift Streaming Ingestion. It provides low-latency, high-speed ingestion of streaming data from Kinesis Data Streams and Amazon MSK. It lowers the effort required to have data ready for analytics workloads, lowers the cost of running such workloads on the cloud, and decreases the operational burden of maintaining the solution.

    In the context of healthcare, understanding an individual’s exercise and movement patterns can help with overall health assessment and better treatment planning. In this section, you send simulated data from wearable devices to Kinesis Data Streams and integrate it with the rest of the data you already have access to from your Redshift Serverless data warehouse.

    For step-by-step instructions, refer to Real-time analytics with Amazon Redshift streaming ingestion. Note the following steps when you set up streaming ingestion for Amazon Redshift:

    1. Select wearables_stream and use the following template when sending data to Amazon Kinesis Data Streams via Kinesis Data Generator, to simulate data generated by wearable devices. Replace [PATIENT_ID_1] and [PATIENT_ID_2] with the patient IDs you earlier when inserting new records into your Aurora MySQL table:
      {
         "patient_id": "{{random.arrayElement(["[PATIENT_ID_1]"," [PATIENT_ID_2]"])}}",
         "steps_increment": "{{random.arrayElement(
            [0,1]
         )}}",
         "heart_rate": {{random.number( 
            {
               "min":45,
               "max":120}
         )}}
      }

    2. Create an external schema called from_kds by running the following query and replacing [IAM_ROLE_ARN] with the ARN of the role created by the CloudFormation stack (Patient360BlogRole):
      CREATE EXTERNAL SCHEMA from_kds
      FROM KINESIS
      IAM_ROLE '[IAM_ROLE_ARN]';

    3. Use the following SQL when creating a materialized view to consume data from the stream:
      CREATE MATERIALIZED VIEW patient_wearable_data AUTO REFRESH YES AS 
      SELECT approximate_arrival_timestamp, 
            JSON_PARSE(kinesis_data) as Data FROM from_kds."wearables_stream" 
      WHERE CAN_JSON_PARSE(kinesis_data);

    4. To validate that streaming ingestion works as expected, refresh the materialized view to get the data you already sent to the data stream and query the table to make sure data has landed in Amazon Redshift:
      REFRESH MATERIALIZED VIEW patient_wearable_data;
      
      SELECT *
      FROM patient_wearable_data
      ORDER BY approximate_arrival_timestamp DESC;

    Query and analyze patient wearable data

    The results in the data column of the preceding query are in JSON format. Amazon Redshift makes it straightforward to work with semi-structured data in JSON format. It uses PartiQL language to offer SQL-compatible access to relational, semi-structured, and nested data. Use the following query to flatten data:

    SELECT data."patient_id"::varchar AS patient_id,       
          data."steps_increment"::integer as steps_increment,       
          data."heart_rate"::integer as heart_rate, 
          approximate_arrival_timestamp 
    FROM patient_wearable_data 
    ORDER BY approximate_arrival_timestamp DESC;

    The result looks like the following screenshot.

    Now that you know how to flatten JSON data, you can analyze it further. Use the following query to get the number of minutes a patient has been physically active per day, based on their heart rate (greater than 80):

    WITH patient_wearble_flattened AS
    (
       SELECT data."patient_id"::varchar AS patient_id,
          data."steps_increment"::integer as steps_increment,
          data."heart_rate"::integer as heart_rate,
          approximate_arrival_timestamp,
          DATE(approximate_arrival_timestamp) AS date_received,
          extract(hour from approximate_arrival_timestamp) AS    hour_received,
          extract(minute from approximate_arrival_timestamp) AS minute_received
       FROM patient_wearable_data
    ), patient_active_minutes AS
    (
       SELECT patient_id,
          date_received,
          hour_received,
          minute_received,
          avg(heart_rate) AS heart_rate
       FROM patient_wearble_flattened
       GROUP BY patient_id,
          date_received,
          hour_received,
          minute_received
       HAVING avg(heart_rate) > 80
    )
    SELECT patient_id,
          date_received,
          COUNT(heart_rate) AS active_minutes_count
    FROM patient_active_minutes
    GROUP BY patient_id,
          date_received
    ORDER BY patient_id,
          date_received;

    Create a complete patient 360

    Now that you are able to query all patient data with Redshift Serverless, you can combine the three datasets you used in this post and form a comprehensive patient 360 view with the following query:

    WITH patient_appointment_info AS
    (
          SELECT "patient_id",
             "gender",
             "date_of_birth",
             "appointment_datetime",
             "phone_number"
          FROM ztl_demo.front_desk_app_db.patient_appointment
    ),
    patient_wearble_flattened AS
    (
          SELECT data."patient_id"::varchar AS patient_id,
             data."steps_increment"::integer as steps_increment,
             data."heart_rate"::integer as heart_rate,
             approximate_arrival_timestamp,
             DATE(approximate_arrival_timestamp) AS date_received,
             extract(hour from approximate_arrival_timestamp) AS hour_received,
             extract(minute from approximate_arrival_timestamp) AS minute_received
          FROM patient_wearable_data
    ), patient_active_minutes AS
    (
          SELECT patient_id,
             date_received,
             hour_received,
             minute_received,
             avg(heart_rate) AS heart_rate
          FROM patient_wearble_flattened
          GROUP BY patient_id,
             date_received,
             hour_received,
             minute_received
             HAVING avg(heart_rate) > 80
    ), patient_active_minutes_count AS
    (
          SELECT patient_id,
             date_received,
             COUNT(heart_rate) AS active_minutes_count
          FROM patient_active_minutes
          GROUP BY patient_id,
             date_received
    )
    SELECT pai.patient_id,
          pai.gender,
          pai.prefix,
          pai.given_name,
          pai.family_name,
          pai.allery_category,
          pai.allergy_code,
          pai.allergy_description,
          ppi.date_of_birth,
          ppi.appointment_datetime,
          ppi.phone_number,
          pamc.date_received,
          pamc.active_minutes_count
    FROM patient_allergy_info pai
          LEFT JOIN patient_active_minutes_count pamc
                ON pai.patient_id = pamc.patient_id
          LEFT JOIN patient_appointment_info ppi
                ON pai.patient_id = ppi.patient_id
    GROUP BY pai.patient_id,
          pai.gender,
          pai.prefix,
          pai.given_name,
          pai.family_name,
          pai.allery_category,
          pai.allergy_code,
          pai.allergy_description,
          ppi.date_of_birth,
          ppi.appointment_datetime,
          ppi.phone_number,
          pamc.date_received,
          pamc.active_minutes_count
    ORDER BY pai.patient_id,
          pai.gender,
          pai.prefix,
          pai.given_name,
          pai.family_name,
          pai.allery_category,
          pai.allergy_code,
          pai.allergy_description,
          ppi.date_of_birth DESC,
          ppi.appointment_datetime DESC,
          ppi.phone_number DESC,
          pamc.date_received,
          pamc.active_minutes_count

    You can use the solution and queries used here to expand the datasets used in your analysis. For example, you can include other tables from AWS HealthLake as needed.

    Clean up

    To clean up resources you created, complete the following steps:

    1. Delete the zero-ETL integration between Amazon RDS and Amazon Redshift.
    2. Delete the CloudFormation stack.
    3. Delete AWS HealthLake data store

    Conclusion

    Forming a comprehensive 360 view of patients by integrating data from various different sources offers numerous benefits for organizations operating in the healthcare industry. It enables healthcare providers to gain a holistic understanding of a patient’s medical journey, enhances clinical decision-making, and allows for more accurate diagnosis and tailored treatment plans. With zero-ETL features for data integration on AWS, it is effortless to build a view of patients securely, cost-effectively, and with minimal effort.

    You can then use visualization tools such as Amazon QuickSight to build dashboards or use Amazon Redshift ML to enable data analysts and database developers to train machine learning (ML) models with the data integrated through Amazon Redshift zero-ETL. The result is a set of ML models that are trained with a broader view into patients, their medical history, and their lifestyle, and therefore enable you make more accurate predictions about their upcoming health needs.


    About the Authors

    Saeed Barghi is a Sr. Analytics Specialist Solutions Architect specializing in architecting enterprise data platforms. He has extensive experience in the fields of data warehousing, data engineering, data lakes, and AI/ML. Based in Melbourne, Australia, Saeed works with public sector customers in Australia and New Zealand.

    Satesh Sonti is a Sr. Analytics Specialist Solutions Architect based out of Atlanta, specialized in building enterprise data platforms, data warehousing, and analytics solutions. He has over 17 years of experience in building data assets and leading complex data platform programs for banking and insurance clients across the globe.

    Successfully conduct a proof of concept in Amazon Redshift

    Post Syndicated from Ziad Wali original https://aws.amazon.com/blogs/big-data/successfully-conduct-a-proof-of-concept-in-amazon-redshift/

    Amazon Redshift is a fast, scalable, and fully managed cloud data warehouse that allows you to process and run your complex SQL analytics workloads on structured and semi-structured data. It also helps you securely access your data in operational databases, data lakes, or third-party datasets with minimal movement or copying of data. Tens of thousands of customers use Amazon Redshift to process large amounts of data, modernize their data analytics workloads, and provide insights for their business users.

    In this post, we discuss how to successfully conduct a proof of concept in Amazon Redshift by going through the main stages of the process, available tools that accelerate implementation, and common use cases.

    Proof of concept overview

    A proof of concept (POC) is a process that uses representative data to validate whether a technology or service fulfills a customer’s technical and business requirements. By testing the solution against key metrics, a POC provides insights that allow you to make an informed decision on the suitability of the technology for the intended use case.

    There are three major POC validation areas:

    • Workload – Take a representative portion of an existing workload and test it on Amazon Redshift, such as an extract, transform, and load (ETL) process, reporting, or management
    • Capability – Demonstrate how a specific Amazon Redshift feature, such as zero-ETL integration with Amazon Redshift, data sharing, or Amazon Redshift Spectrum, can simplify or enhance your overall architecture
    • Architecture – Understand how Amazon Redshift fits into a new or existing architecture along with other AWS services and tools

    A POC is not:

    • Planning and implementing a large-scale migration
    • User-facing deployments, such as deploying a configuration for user testing and validation over extended periods (this is more of a pilot)
    • End-to-end implementation of a use case (this is more of a prototype)

    Proof of concept process

    For a POC to be successful, it is recommended to follow and apply a well-defined and structured process. For a POC on Amazon Redshift, we recommend a three-phase process of discovery, implementation, and evaluation.

    Discovery phase

    The discovery phase is considered the most essential among the three phases and the longest. It defines through multiple sessions the scope of the POC and the list of tasks that need to be completed and later evaluated. The scope should contain inputs and data points on the current architecture as well as the target architecture. The following items need to be defined and documented to have a defined scope for the POC:

    • Current state architecture and its challenges
    • Business goals and the success criteria of the POC (such as cost, performance, and security) along with their associated priorities
    • Evaluation criteria that will be used to evaluate and interpret the success criteria, such as service-level agreements (SLAs)
    • Target architecture (the communication between the services and tools that will be used during the implementation of the POC)
    • Dataset and the list of tables and schemas

    After the scope has been clearly defined, you should proceed with defining and planning the list of tasks that need to be run during the next phase in order to implement the scope. Also, depending on the technical familiarity with the latest developments in Amazon Redshift, a technical enablement session on Amazon Redshift is also highly recommended before starting the implementation phase.

    Optionally, a responsibility assignment matrix (RAM) is recommended, especially in large POCs.

    Implementation phase

    The implementation phase takes the output of the previous phase as input. It consists of the following steps:

    1. Set up the environment by respecting the defined POC architecture.
    2. Complete the implementation tasks such as data ingestion and performance testing.
    3. Collect data metrics and statistics on the completed tasks.
    4. Analyze the data and then optimize as necessary.

    Evaluation phase

    The evaluation phase is the POC assessment and the final step of the process. It aggregates the implementation results of the preceding phase, interprets them, and evaluates the success criteria described in the discovery phase.

    It is recommended to use percentiles instead of averages whenever possible for a better interpretation.

    Challenges

    In this section, we discuss the major challenges that you may encounter while planning your POC.

    Scope

    You may face challenges during the discovery phase while defining the scope of the POC, especially in complex environments. You should focus on the crucial requirements and prioritized success criteria that need to be evaluated so you avoid ending up with a small migration project instead of a POC. In terms of technical content (such as data structures, transformation jobs, and reporting queries), make sure to identify and consider as little as possible of the content that will still provide you with all the necessary information at the end of the implementation phase in order to assess the defined success criteria. Additionally, document any assumptions you are making.

    Time

    A time period should be defined for any POC project to ensure it stays focused and achieves clear results. Without an established time frame, scope creep can occur as requirements shift and unnecessary features get added. This may lead to misleading evaluations about the technology or concept being tested. The duration set for the POC depends on factors like workload complexity and resource availability. If a period such as 3 weeks has been committed to already without accounting for these considerations, the scope and planned content should be scaled to feasibly fit that fixed time period.

    Cost

    Cloud services operate on a pay-as-you-go model, and estimating costs accurately can be challenging during a POC. Overspending or underestimating resource requirements can impact budget allocations. It’s important to carefully estimate the initial sizing of the Redshift cluster, monitor resource usage closely, and consider setting service limits along with AWS Budget alerts to avoid unexpected expenditures.

    Technical

    The team running the POC has to be ready for initial technical challenges, especially during environment setup, data ingestion, and performance testing. Each data warehouse technology has its own design and architecture, which sometimes requires some initial tuning at the data structure or query level. This is an expected challenge that needs to be considered in the implementation phase timeline. Having a technical enablement session beforehand can alleviate such hurdles.

    Amazon Redshift POC tools and features

    In this section, we discuss tools that you can adapt based on the specific requirements and nature of the POC being conducted. It’s essential to choose tools that align with the scope and technologies involved.

    AWS Analytics Automation Toolkit

    The AWS Analytics Automation Toolkit enables automatic provisioning and integration of not only Amazon Redshift, but database migration services like AWS Database Migration Service (AWS DMS), AWS Schema Conversion Tool (AWS SCT), and Apache JMeter. This toolkit is essential in most POCs because it automates the provisioning of infrastructure and setup of the necessary environment.

    AWS SCT

    The AWS SCT makes heterogeneous database migrations predictable, secure, and fast by automatically converting the majority of the database code and storage objects to a format that is compatible with the target database. Any objects that can’t be automatically converted are clearly marked so that they can be manually converted to complete the migration.

    In the context of a POC, the AWS SCT becomes crucial by streamlining and enhancing the efficiency of the schema conversion process from one database system to another. Given the time-sensitive nature of POCs, the AWS SCT automates the conversion process, facilitating planning, and estimation of time and efforts. Additionally, the AWS SCT plays a role in identifying potential compatibility issues, data mapping challenges, or other hurdles at an early stage of the process.

    Furthermore, the database migration assessment report summarizes all the action items for schemas that can’t be converted automatically to your target database. Getting started with AWS SCT is a straightforward process. Also, consider following the best practices for AWS SCT.

    Amazon Redshift auto-copy

    The Amazon Redshift auto-copy (preview) feature can automate data ingestion from Amazon Simple Storage Service (Amazon S3) to Amazon Redshift with a simple SQL command. COPY statements are invoked and start loading data when Amazon Redshift auto-copy detects new files in the specified S3 prefixes. This also makes sure that end-users have the latest data available in Amazon Redshift shortly after the source files are available.

    You can use this feature for the purpose of data ingestion throughout the POC. To learn more about ingesting from files located in Amazon S3 using a SQL command, refer to Simplify data ingestion from Amazon S3 to Amazon Redshift using auto-copy (preview). The post also shows you how to enable auto-copy using COPY jobs, how to monitor jobs, and considerations and best practices.

    Redshift Auto Loader

    The custom Redshift Auto Loader framework automatically creates schemas and tables in the target database and continuously loads data from Amazon S3 to Amazon Redshift. You can use this during the data ingestion phase of the POC. Deploying and setting up the Redshift Auto Loader framework to transfer files from Amazon S3 to Amazon Redshift is a straightforward process.

    For more information, refer to Migrate from Google BigQuery to Amazon Redshift using AWS Glue and Custom Auto Loader Framework.

    Apache JMeter

    Apache JMeter is an open-source load testing application written in Java that you can use to load test web applications, backend server applications, databases, and more. In a database context, it’s an extremely valuable tool for repeating benchmark tests in a consistent manner, simulating concurrency workloads, and scalability testing on different database configurations.

    When implementing your POC, benchmarking Amazon Redshift is often one of the main components of evaluation and a key source of insight into the price-performance of different Amazon Redshift configurations. With Apache JMeter, you can construct high-quality benchmark tests for Amazon Redshift.

    Workload Replicator

    If you are currently using Amazon Redshift and looking to replicate your existing production workload or isolate specific workloads in a POC, you can use the Workload Replicator to run them across different configurations of Redshift clusters (ra3.xlplus, ra3.4xl,ra3.16xl, serverless) for performance evaluation and comparison.

    This utility has the ability to mimic COPY and UNLOAD workloads and can run the transactions and queries in the same time interval as they’re run in the production cluster. However, it’s crucial to assess the limitations of the utility and AWS Identity and Access Management (IAM) security and compliance requirements.

    Node Configuration Comparison utility

    If you’re using Amazon Redshift and have stringent SLAs for query performance in your Amazon Redshift cluster, or you want to explore different Amazon Redshift configurations based on the price-performance of your workload, you can use the Amazon Redshift Node Configuration Comparison utility.

    This utility helps evaluate performance of your queries using different Redshift cluster configurations in parallel and compares the end results to find the best cluster configuration that meets your need. Similarly, If you’re already using Amazon Redshift and want to migrate from your existing DC2 or DS2 instances to RA3, you can refer to our recommendations on node count and type when upgrading. Before doing that, you can use this utility in your POC to evaluate the new cluster’s performance by replaying your past workloads, which integrates with the Workload Replicator utility to evaluate performance metrics for different Amazon Redshift configurations to meet your needs.

    This utility functions in a fully automated manner and has similar limitations as the workload replicator. However, it requires full permissions across various services for the user running the AWS CloudFormation stack.

    Use cases

    You have the opportunity to explore various functionalities and aspects of Amazon Redshift by defining and selecting a business use case you want to validate during the POC. In this section, we discuss some specific use cases you can explore using a POC.

    Functionality evaluation

    Amazon Redshift consists of a set of functionalities and options that simplify data pipelines and effortlessly integrate with other services. You can use a POC to test and evaluate one or more of those capabilities before refactoring your data pipeline and implementing them in your ecosystem. Functionalities could be existing features or new ones such as zero-ETL integration, streaming ingestion, federated queries, or machine learning.

    Workload isolation

    You can use the data sharing feature of Amazon Redshift to achieve workload isolation across diverse analytics use cases and achieve business-critical SLAs without duplicating or moving the data.

    Amazon Redshift data sharing enables a producer cluster to share data objects with one or more consumer clusters, thereby eliminating data duplication. This facilitates collaboration across isolated clusters, allowing data to be shared for innovation and analytic services. Sharing can occur at various levels such as databases, schemas, tables, views, columns, and user-defined functions, offering fine-grained access control. It is recommended to use Workload Replicator for performance evaluation and comparison in a workload isolation POC.

    The following sample architectures explain workload isolation using data sharing. The first diagram illustrates the architecture before using data sharing.

    The following diagram illustrates the architecture with data sharing.

    Migrating to Amazon Redshift

    If you’re interested in migrating from your existing data warehouse platform to Amazon Redshift, you can try out Amazon Redshift by developing a POC on a selected business use case. In this type of POC, it is recommended to use the AWS Analytics Automation Toolkit for setting up the environment, auto-copy or Redshift Auto Loader for data ingestion, and AWS SCT for schema conversion. When the development is complete, you can perform performance testing using Apache JMeter, which provides data points to measure price-performance and compare results with your existing platform. The following diagram illustrates this process.

    Moving to Amazon Redshift Serverless

    You can migrate your unpredictable and variable workloads to Amazon Redshift Serverless, which enables you to scale as and when needed and pay as per usage, making your infrastructure scalable and cost-efficient. If you’re migrating your full workload from provisioned (DC2, RA3) to serverless, you can use the Node Configuration Comparison utility for performance evaluation. The following diagram illustrates this workflow.

    Conclusion

    In a competitive environment, conducting a successful proof of concept is a strategic imperative for businesses aiming to validate the feasibility and effectiveness of new solutions. Amazon Redshift provides you with better price-performance compared to other cloud-centered data warehouses, and a large list of features that help you modernize and optimize your data pipelines. For more details, see Amazon Redshift continues its price-performance leadership.

    With the process discussed in this post and by choosing the tools needed for your specific use case, you can accelerate the process of conducting a POC. This allows you to collect the data metrics that can help you understand the potential challenges, benefits, and implications of implementing the proposed solution on a larger scale. A POC provides essential data points that evaluate price-performance as well as feasibility, which plays a vital role in decision-making.


    About the Authors

    Ziad WALI is an Acceleration Lab Solutions Architect at Amazon Web Services. He has over 10 years of experience in databases and data warehousing, where he enjoys building reliable, scalable, and efficient solutions. Outside of work, he enjoys sports and spending time in nature.

    Omama Khurshid is an Acceleration Lab Solutions Architect at Amazon Web Services. She focuses on helping customers across various industries build reliable, scalable, and efficient solutions. Outside of work, she enjoys spending time with her family, watching movies, listening to music, and learning new technologies.

    Srikant Das is an Acceleration Lab Solutions Architect at Amazon Web Services. His expertise lies in constructing robust, scalable, and efficient solutions. Beyond the professional sphere, he finds joy in travel and shares his experiences through insightful blogging on social media platforms.

    Create an end-to-end data strategy for Customer 360 on AWS

    Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/create-an-end-to-end-data-strategy-for-customer-360-on-aws/

    Customer 360 (C360) provides a complete and unified view of a customer’s interactions and behavior across all touchpoints and channels. This view is used to identify patterns and trends in customer behavior, which can inform data-driven decisions to improve business outcomes. For example, you can use C360 to segment and create marketing campaigns that are more likely to resonate with specific groups of customers.

    In 2022, AWS commissioned a study conducted by the American Productivity and Quality Center (APQC) to quantify the Business Value of Customer 360. The following figure shows some of the metrics derived from the study. Organizations using C360 achieved 43.9% reduction in sales cycle duration, 22.8% increase in customer lifetime value, 25.3% faster time to market, and 19.1% improvement in net promoter score (NPS) rating.

    Without C360, businesses face missed opportunities, inaccurate reports, and disjointed customer experiences, leading to customer churn. However, building a C360 solution can be complicated. A Gartner Marketing survey found only 14% of organizations have successfully implemented a C360 solution, due to lack of consensus on what a 360-degree view means, challenges with data quality, and lack of cross-functional governance structure for customer data.

    In this post, we discuss how you can use purpose-built AWS services to create an end-to-end data strategy for C360 to unify and govern customer data that address these challenges. We structure it in five pillars that power C360: data collection, unification, analytics, activation, and data governance, along with a solution architecture that you can use for your implementation.

    The five pillars of a mature C360

    When you embark on creating a C360, you work with multiple use cases, types of customer data, and users and applications that require different tools. Building a C360 on the right datasets, adding new datasets over time while maintaining the quality of data, and keeping it secure needs an end-to-end data strategy for your customer data. You also need to provide tools that make it straightforward for your teams to build products that mature your C360.

    We recommend building your data strategy around five pillars of C360, as shown in the following figure. This starts with basic data collection, unifying and linking data from various channels related to unique customers, and progresses towards basic to advanced analytics for decision-making, and personalized engagement through various channels. As you mature in each of these pillars, you progress towards responding to real-time customer signals.

    The following diagram illustrates the functional architecture that combines the building blocks of a Customer Data Platform on AWS with additional components used to design an end-to-end C360 solution. This is aligned to the five pillars we discuss in this post.

    Pillar 1: Data collection

    As you start building your customer data platform, you have to collect data from various systems and touchpoints, such as your sales systems, customer support, web and social media, and data marketplaces. Think of the data collection pillar as a combination of ingestion, storage, and processing capabilities.

    Data ingestion

    You have to build ingestion pipelines based on factors like types of data sources (on-premises data stores, files, SaaS applications, third-party data), and flow of data (unbounded streams or batch data). AWS provides different services for building data ingestion pipelines:

    • AWS Glue is a serverless data integration service that ingests data in batches from on-premises databases and data stores in the cloud. It connects to more than 70 data sources and helps you build extract, transform, and load (ETL) pipelines without having to manage pipeline infrastructure. AWS Glue Data Quality checks for and alerts on poor data, making it straightforward to spot and fix issues before they harm your business.
    • Amazon AppFlow ingests data from software as a service (SaaS) applications like Google Analytics, Salesforce, SAP, and Marketo, giving you the flexibility to ingest data from more than 50 SaaS applications.
    • AWS Data Exchange makes it straightforward to find, subscribe to, and use third-party data for analytics. You can subscribe to data products that help enrich customer profiles, for example demographics data, advertising data, and financial markets data.
    • Amazon Kinesis ingests streaming events in real time from point-of-sales systems, clickstream data from mobile apps and websites, and social media data. You could also consider using Amazon Managed Streaming for Apache Kafka (Amazon MSK) for streaming events in real time.

    The following diagram illustrates the different pipelines to ingest data from various source systems using AWS services.

    Data storage

    Structured, semi-structured, or unstructured batch data is stored in an object storage because these are cost-efficient and durable. Amazon Simple Storage Service (Amazon S3) is a managed storage service with archiving features that can store petabytes of data with eleven 9’s of durability. Streaming data with low latency needs is stored in Amazon Kinesis Data Streams for real-time consumption. This allows immediate analytics and actions for various downstream consumers—as seen with Riot Games’ central Riot Event Bus.

    Data processing

    Raw data is often cluttered with duplicates and irregular formats. You need to process this to make it ready for analysis. If you are consuming batch data and streaming data, consider using a framework that can handle both. A pattern such as the Kappa architecture views everything as a stream, simplifying the processing pipelines. Consider using Amazon Managed Service for Apache Flink to handle the processing work. With Managed Service for Apache Flink, you can clean and transform the streaming data and direct it to the appropriate destination based on latency requirements. You can also implement batch data processing using Amazon EMR on open source frameworks such as Apache Spark at 3.5 times better performance than the self-managed version. The architecture decision of using a batch or streaming processing system will depend on various factors; however, if you want to enable real-time analytics on your customer data, we recommend using a Kappa architecture pattern.

    Pillar 2: Unification

    To link the diverse data arriving from various touchpoints to a unique customer, you need to build an identity processing solution that identifies anonymous logins, stores useful customer information, links them to external data for better insights, and groups customers in domains of interest. Although the identity processing solution helps build the unified customer profile, we recommend considering this as part of your data processing capabilities. The following diagram illustrates the components of such a solution.

    The key components are as follows:

    • Identity resolution – Identity resolution is a deduplication solution, where records are matched to identify a unique customer and prospects by linking multiple identifiers such as cookies, device identifiers, IP addresses, email IDs, and internal enterprise IDs to a known person or anonymous profile using privacy-compliant methods. This can be achieved using AWS Entity Resolution, which enables using rules and machine learning (ML) techniques to match records and resolve identities. Alternatively, you can build identity graphs using Amazon Neptune for a single unified view of your customers.
    • Profile aggregation – When you’ve uniquely identified a customer, you can build applications in Managed Service for Apache Flink to consolidate all their metadata, from name to interaction history. Then, you transform this data into a concise format. Instead of showing every transaction detail, you can offer an aggregated spend value and a link to their Customer Relationship Management (CRM) record. For customer service interactions, provide an average CSAT score and a link to the call center system for a deeper dive into their communication history.
    • Profile enrichment – After you resolve a customer to a single identity, enhance their profile using various data sources. Enrichment typically involves adding demographic, behavioral, and geolocation data. You can use third-party data products from AWS Marketplace delivered through AWS Data Exchange to gain insights on income, consumption patterns, credit risk scores, and many more dimensions to further refine the customer experience.
    • Customer segmentation – After uniquely identifying and enriching a customer’s profile, you can segment them based on demographics like age, spend, income, and location using applications in Managed Service for Apache Flink. As you advance, you can incorporate AI services for more precise targeting techniques.

    After you have done the identity processing and segmentation, you need a storage capability to store the unique customer profile and provide search and query capabilities on top of it for downstream consumers to use the enriched customer data.

    The following diagram illustrates the unification pillar for a unified customer profile and single view of the customer for downstream applications.

    Unified customer profile

    Graph databases excel in modeling customer interactions and relationships, offering a comprehensive view of the customer journey. If you are dealing with billions of profiles and interactions, you can consider using Neptune, a managed graph database service on AWS. Organizations such as Zeta and Activision have successfully used Neptune to store and query billions of unique identifiers per month and millions of queries per second at millisecond response time.

    Single customer view

    Although graph databases provide in-depth insights, yet they can be complex for regular applications. It is prudent to consolidate this data into a single customer view, serving as a primary reference for downstream applications, ranging from ecommerce platforms to CRM systems. This consolidated view acts as a liaison between the data platform and customer-centric applications. For such purposes, we recommend using Amazon DynamoDB for its adaptability, scalability, and performance, resulting in an up-to-date and efficient customer database. This database will accept a lot of write queries back from the activation systems that learn new information about the customers and feed them back.

    Pillar 3: Analytics

    The analytics pillar defines capabilities that help you generate insights on top of your customer data. Your analytics strategy applies to the wider organizational needs, not just C360. You can use the same capabilities to serve financial reporting, measure operational performance, or even monetize data assets. Strategize based on how your teams explore data, run analyses, wrangle data for downstream requirements, and visualize data at different levels. Plan on how you can enable your teams to use ML to move from descriptive to prescriptive analytics.

    The AWS modern data architecture shows a way to build a purpose-built, secure, and scalable data platform in the cloud. Learn from this to build querying capabilities across your data lake and the data warehouse.

    The following diagram breaks down the analytics capability into data exploration, visualization, data warehousing, and data collaboration. Let’s find out what role each of these components play in the context of C360.

    Data exploration

    Data exploration helps unearth inconsistencies, outliers, or errors. By spotting these early on, your teams can have cleaner data integration for C360, which in turn leads to more accurate analytics and predictions. Consider the personas exploring the data, their technical skills, and the time to insight. For instance, data analysts who know to write SQL can directly query the data residing in Amazon S3 using Amazon Athena. Users interested in visual exploration can do so using AWS Glue DataBrew. Data scientists or engineers can use Amazon EMR Studio or Amazon SageMaker Studio to explore data from the notebook, and for a low-code experience, you can use Amazon SageMaker Data Wrangler. Because these services directly query S3 buckets, you can explore the data as it lands in the data lake, reducing time to insight.

    Visualization

    Turning complex datasets into intuitive visuals unravels hidden patterns in the data, and is crucial for C360 use cases. With this capability, you can design reports for different levels catering to varying needs: executive reports offering strategic overviews, management reports highlighting operational metrics, and detailed reports diving into the specifics. Such visual clarity helps your organization make informed decisions across all tiers, centralizing the customer’s perspective.

    The following diagram shows a sample C360 dashboard built on Amazon QuickSight. QuickSight offers scalable, serverless visualization capabilities. You can benefit from its ML integrations for automated insights like forecasting and anomaly detection or natural language querying with Amazon Q in QuickSight, direct data connectivity from various sources, and pay-per-session pricing. With QuickSight, you can embed dashboards to external websites and applications, and the SPICE engine enables rapid, interactive data visualization at scale. The following screenshot shows an example C360 dashboard built on QuickSight.

    Data warehouse

    Data warehouses are efficient in consolidating structured data from multifarious sources and serving analytics queries from a large number of concurrent users. Data warehouses can provide a unified, consistent view of a vast amount of customer data for C360 use cases. Amazon Redshift addresses this need by adeptly handling large volumes of data and diverse workloads. It provides strong consistency across datasets, allowing organizations to derive reliable, comprehensive insights about their customers, which is essential for informed decision-making. Amazon Redshift offers real-time insights and predictive analytics capabilities for analyzing data from terabytes to petabytes. With Amazon Redshift ML, you can embed ML on top of the data stored in the data warehouse with minimum development overhead. Amazon Redshift Serverless simplifies application building and makes it straightforward for companies to embed rich data analytics capabilities.

    Data collaboration

    You can securely collaborate and analyze collective datasets from your partners without sharing or copying one another’s underlying data using AWS Clean Rooms. You can bring together disparate data from across engagement channels and partner datasets to form a 360-degree view of your customers. AWS Clean Rooms can enhance C360 by enabling use cases like cross-channel marketing optimization, advanced customer segmentation, and privacy-compliant personalization. By safely merging datasets, it offers richer insights and robust data privacy, meeting business needs and regulatory standards.

    Pillar 4: Activation

    The value of data diminishes the older it gets, leading to higher opportunity costs over time. In a survey conducted by Intersystems, 75% of the organizations surveyed believe untimely data inhibited business opportunities. In another survey, 58% of organizations (out of 560 respondents of HBR Advisory council and readers) stated they saw an increase in customer retention and loyalty using real-time customer analytics.

    You can achieve a maturity in C360 when you build the ability to act on all the insights acquired from the previous pillars we discussed in real time. For example, at this maturity level, you can act on customer sentiment based on the context you automatically derived with an enriched customer profile and integrated channels. For this you need to implement prescriptive decision-making on how to address the customer’s sentiment. To do this at scale, you have to use AI/ML services for decision-making. The following diagram illustrates the architecture to activate insights using ML for prescriptive analytics and AI services for targeting and segmentation.

    Use ML for the decision-making engine

    With ML, you can improve the overall customer experience—you can create predictive customer behavior models, design hyper-personalized offers, and target the right customer with the right incentive. You can build them using Amazon SageMaker, which features a suite of managed services mapped to the data science lifecycle, including data wrangling, model training, model hosting, model inference, model drift detection, and feature storage. SageMaker enables you to build and operationalize your ML models, infusing them back into your applications to produce the right insight to the right person at the right time.

    Amazon Personalize supports contextual recommendations, through which you can improve the relevance of recommendations by generating them within a context—for instance, device type, location, or time of day. Your team can get started without any prior ML experience using APIs to build sophisticated personalization capabilities in a few clicks. For more information, see Customize your recommendations by promoting specific items using business rules with Amazon Personalize.

    Activate channels across marketing, advertising, direct-to-consumer, and loyalty

    Now that you know who your customers are and who to reach out to, you can build solutions to run targeting campaigns at scale. With Amazon Pinpoint, you can personalize and segment communications to engage customers across multiple channels. For example, you can use Amazon Pinpoint to build engaging customer experiences through various communication channels like email, SMS, push notifications, and in-app notifications.

    Pillar 5: Data governance

    Establishing the right governance that balances control and access gives users trust and confidence in data. Imagine offering promotions on products that a customer doesn’t need, or bombarding the wrong customers with notifications. Poor data quality can lead to such situations, and ultimately results in customer churn. You have to build processes that validate data quality and take corrective actions. AWS Glue Data Quality can help you build solutions that validate the quality of data at rest and in transit, based on predefined rules.

    To set up a cross-functional governance structure for customer data, you need a capability for governing and sharing data across your organization. With Amazon DataZone, admins and data stewards can manage and govern access to data, and consumers such as data engineers, data scientists, product managers, analysts, and other business users can discover, use, and collaborate with that data to drive insights. It streamlines data access, letting you find and use customer data, promotes team collaboration with shared data assets, and provides personalized analytics either via a web app or API on a portal. AWS Lake Formation makes sure data is accessed securely, guaranteeing the right people see the right data for the right reasons, which is crucial for effective cross-functional governance in any organization. Business metadata is stored and managed by Amazon DataZone, which is underpinned by technical metadata and schema information, which is registered in the AWS Glue Data Catalog. This technical metadata is also used both by other governance services such as Lake Formation and Amazon DataZone, and analytics services such as Amazon Redshift, Athena, and AWS Glue.

    Bringing it all together

    Using the following diagram as a reference, you can create projects and teams for building and operating different capabilities. For example, you can have a data integration team focus on the data collection pillar—you can then align functional roles, like data architects and data engineers. You can build your analytics and data science practices to focus on the analytics and activation pillars, respectively. Then you can create a specialized team for customer identity processing and for building the unified view of the customer. You can establish a data governance team with data stewards from different functions, security admins, and data governance policymakers to design and automate policies.

    Conclusion

    Building a robust C360 capability is fundamental for your organization to gain insights into your customer base. AWS Databases, Analytics, and AI/ML services can help streamline this process, providing scalability and efficiency. Following the five pillars to guide your thinking, you can build an end-to-end data strategy that defines the C360 view across the organization, makes sure data is accurate, and establishes cross-functional governance for customer data. You can categorize and prioritize the products and features you have to build within each pillar, select the right tool for the job, and build the skills you need in your teams.

    Visit AWS for Data Customer Stories to learn how AWS is transforming customer journeys, from the world’s largest enterprises to growing startups.


    About the Authors

    Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with organizations in retail, ecommerce, FinTech, HealthTech, and travel to achieve their business objectives with well architected data platforms.

    Sandipan Bhaumik (Sandi) is a Senior Analytics Specialist Solutions Architect at AWS. He helps customers modernize their data platforms in the cloud to perform analytics securely at scale, reduce operational overhead, and optimize usage for cost-effectiveness and sustainability.

    Exploring real-time streaming for generative AI Applications

    Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

    Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

    Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

    To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

    Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

    Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

    In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

    In-context learning

    LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

    LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

    For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

    A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

    A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

    In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

    Near-real-time customer profile updates

    Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

    The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

    In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

    CDC streaming ingestion

    A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

    There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

    Streaming storage

    Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

    Stream processing

    Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

    Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

    Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

    With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

    AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

    Unified customer profile

    Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

    An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

    AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

    Near-real-time organizational knowledge base updates

    Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

    AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

    The following diagram illustrates an example stream-processing workflow for vector embeddings.

    Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

    Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

    If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

    You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

    Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

    Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

    Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

    Feedback analytics and fine-tuning

    It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

    This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

    Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

    With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

    With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

    Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

    You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

    With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

    Summary

    For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

    Learn more about AWS data streaming services and get started building your own data streaming solution.


    About the Authors

    Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

    Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

    Introducing enhanced functionality for worker configuration management in Amazon MSK Connect

    Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/introducing-enhanced-functionality-for-worker-configuration-management-in-amazon-msk-connect/

    Amazon MSK Connect is a fully managed service for Apache Kafka Connect. With a few clicks, MSK Connect allows you to deploy connectors that move data between Apache Kafka and external systems.

    MSK Connect now supports the ability to delete MSK Connect worker configurations, tag resources, and manage worker configurations and custom plugins using AWS CloudFormation. Together, these new capabilities make it straightforward to manage your MSK Connect resources and automate deployments through CI/CD pipelines.

    MSK Connect makes it effortless to stream data to and from Apache Kafka over a private connection without requiring infrastructure management expertise. With a few clicks, you can deploy connectors like an Amazon S3 sink connector for loading streaming data to Amazon Simple Storage Service (Amazon S3), deploy connectors developed by third parties like Debezium for streaming change logs from databases into Apache Kafka, or deploy your own connector customized for your use case.

    MSK Connect integrates external systems or AWS services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your Apache Kafka cluster into a data sink. The connector can also perform lightweight tasks such as transformation, format conversion, or filtering data before delivering the data to a destination. You can use a plugin to create the connecter; these custom plugins are resources that contain the code that defines connector logic.

    The primary components of MSK Connect are workers. Each worker is a Java virtual machine (JVM) process that runs the connector logic based on the worker configuration provided. Worker configurations are resources that contain your connector configuration properties that can be reused across multiple connectors. Each worker is comprised of a set of tasks that copy the data in parallel.

    Today, we are announcing three new capabilities in MSK Connect:

    • The ability to delete worker configurations
    • Support for resource tags for enabling resource grouping, cost allocation and reporting, and access control with tag-based policies
    • Support in AWS CloudFormation to manage worker configurations and custom plugins

    In the following sections, we look at the new functionalities in more detail.

    Delete worker configurations

    Connectors for integrating Amazon Managed Streaming for Apache Kafka (Amazon MSK) with other AWS and partner services are usually created using a worker configuration (default or custom). These configurations can grow with the creation and deletion of connectors, potentially creating configuration management issues.

    You can now use the new delete worker configuration API to delete unused configurations. The service checks that the worker configuration is not in use by any connectors before deleting the configuration. Additionally, you can now use a prefix filter to list worker configurations and custom plugins using the ListWorkerConfigurations and ListCustomPlugins API calls. The prefix filter allows you to list the selective resources with names starting with the prefix so you can perform quick selective deletes.

    To test the new delete API, complete the following steps:

    1. On the Amazon MSK console, create a new worker configuration.
    2. Provide a name and optional description.
    3. In the Worker configuration section, enter your configuration code.

    MSK Connect Worker Configuration

    After you create the configuration, a Delete option is available on the configuration detail page (see the following screenshot) if the configuration is not being used in any connector.

    To support this new API, an additional workerConfigurationState has been added, so you can more easily track the state of the worker configuration. This new state will be returned in the API call responses for CreateWorkerConfiguration, DescribeWorkerConfiguration, and ListWorkerConfigurations.

    MSK Connect Worker Configuration

    1. Choose Delete to delete the worker configuration.
    2. In the confirmation pop-up, enter the name of the worker configuration, then choose Delete.

    Delete MSKC Worker Configuration

    If the worker configuration is being used with any connector, the Delete option is disabled, as shown in the following screenshot.

    Resource tags

    MSK Connect now also has support for resource tags. Tags are key-value metadata that can be associated with AWS service resources. You can add tags to connectors, custom plugins, and worker configurations to organize and find resources used across AWS services. In the following screenshots, our example MSK Connect connector, plugin, and worker configuration have been tagged with the resource tag key project and value demo-tags.

    You can now tag your Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3 resources with the same project name, for example. Then you can use the tag to search for all resources linked to this particular project for cost allocation, reporting, resource grouping, or access control. MSK Connect supports adding tags when creating resources, applying tags to an existing resource, removing tags from a resource, and querying tags associated with a resource.

    AWS CloudFormation support

    Previously, you were only able to provision an MSK Connect connector with AWS CloudFormation by using an existing worker configuration. With this new feature, you can now perform CREATE, READ, UPDATE, DELETE, and LIST operations on connectors, and create and add new worker configurations using AWS CloudFormation.

    The following code is an example of creating a worker configuration:

    {
    "Type": "AWS::KafkaConnect::WorkerConfiguration"
    "Properties":{
    "Name": "WorkerConfigurationName",
    "Description": "WorkerConfigurationDescription",
    "PropertiesFileContent": String,
    "Tags": [Tag,…],
    }
    }

    The return values are as follows:

    • ARN of the newly created worker configuration
    • State of the new worker configuration
    • Creation time of new worker configuration
    • Latest revision of the new worker configuration

    Conclusion

    MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we discussed the new features that were added to MSK Connect, which streamline connector and worker management with the introduction of APIs for deleting worker configurations, tagging MSK Connect resources, and support in AWS CloudFormation to create non-default worker configurations.

    These capabilities are available in all AWS Regions where Amazon MSK Connect is available. For a list of Region availability, refer to AWS Services by Region. To learn more about MSK Connect, visit the Amazon MSK Connect Developer Guide.


    About the Authors

    Chinmayi Narasimhadevara is a is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 20 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions.

    Harita Pappu is Technical Account Manager based out California. She has over 18 years of experience working in software industry building and scaling applications. She is passionate about new technologies and focused on helping customers achieve cost optimization and operational excellence.

    Build an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK using Python

    Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-serverless-streaming-pipeline-with-apache-kafka-on-amazon-msk-using-python/

    The volume of data generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and travel. Organizations are looking for more ways to quickly use the constant inflow of data to innovate for their businesses and customers. They have to reliably capture, process, analyze, and load the data into a myriad of data stores, all in real time.

    Apache Kafka is a popular choice for these real-time streaming needs. However, it can be challenging to set up a Kafka cluster along with other data processing components that scale automatically depending on your application’s needs. You risk under-provisioning for peak traffic, which can lead to downtime, or over-provisioning for base load, leading to wastage. AWS offers multiple serverless services like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB, and AWS Lambda that scale automatically depending on your needs.

    In this post, we explain how you can use some of these services, including MSK Serverless, to build a serverless data platform to meet your real-time needs.

    Solution overview

    Let’s imagine a scenario. You’re responsible for managing thousands of modems for an internet service provider deployed across multiple geographies. You want to monitor the modem connectivity quality that has a significant impact on customer productivity and satisfaction. Your deployment includes different modems that need to be monitored and maintained to ensure minimal downtime. Each device transmits thousands of 1 KB records every second, such as CPU usage, memory usage, alarm, and connection status. You want real-time access to this data so you can monitor performance in real time, and detect and mitigate issues quickly. You also need longer-term access to this data for machine learning (ML) models to run predictive maintenance assessments, find optimization opportunities, and forecast demand.

    Your clients that gather the data onsite are written in Python, and they can send all the data as Apache Kafka topics to Amazon MSK. For your application’s low-latency and real-time data access, you can use Lambda and DynamoDB. For longer-term data storage, you can use managed serverless connector service Amazon Data Firehose to send data to your data lake.

    The following diagram shows how you can build this end-to-end serverless application.

    end-to-end serverless application

    Let’s follow the steps in the following sections to implement this architecture.

    Create a serverless Kafka cluster on Amazon MSK

    We use Amazon MSK to ingest real-time telemetry data from modems. Creating a serverless Kafka cluster is straightforward on Amazon MSK. It only takes a few minutes using the AWS Management Console or AWS SDK. To use the console, refer to Getting started using MSK Serverless clusters. You create a serverless cluster, AWS Identity and Access Management (IAM) role, and client machine.

    Create a Kafka topic using Python

    When your cluster and client machine are ready, SSH to your client machine and install Kafka Python and the MSK IAM library for Python.

    • Run the following commands to install Kafka Python and the MSK IAM library:
    pip install kafka-python
    
    pip install aws-msk-iam-sasl-signer-python
    • Create a new file called createTopic.py.
    • Copy the following code into this file, replacing the bootstrap_servers and region information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
    from kafka.admin import KafkaAdminClient, NewTopic
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
    
    # AWS region where MSK cluster is located
    region= '<UPDATE_AWS_REGION_NAME_HERE>'
    
    # Class to provide MSK authentication token
    class MSKTokenProvider():
        def token(self):
            token, _ = MSKAuthTokenProvider.generate_auth_token(region)
            return token
    
    # Create an instance of MSKTokenProvider class
    tp = MSKTokenProvider()
    
    # Initialize KafkaAdminClient with required configurations
    admin_client = KafkaAdminClient(
        bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider=tp,
        client_id='client1',
    )
    
    # create topic
    topic_name="mytopic"
    topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
    existing_topics = admin_client.list_topics()
    if(topic_name not in existing_topics):
        admin_client.create_topics(topic_list)
        print("Topic has been created")
    else:
        print("topic already exists!. List of topics are:" + str(existing_topics))
    
    • Run the createTopic.py script to create a new Kafka topic called mytopic on your serverless cluster:
    python createTopic.py

    Produce records using Python

    Let’s generate some sample modem telemetry data.

    • Create a new file called kafkaDataGen.py.
    • Copy the following code into this file, updating the BROKERS and region information with the details for your cluster:
    from kafka import KafkaProducer
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
    import json
    import random
    from datetime import datetime
    topicname='mytopic'
    
    BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
    region= '<UPDATE_AWS_REGION_NAME_HERE>'
    class MSKTokenProvider():
        def token(self):
            token, _ = MSKAuthTokenProvider.generate_auth_token(region)
            return token
    
    tp = MSKTokenProvider()
    
    producer = KafkaProducer(
        bootstrap_servers=BROKERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        retry_backoff_ms=500,
        request_timeout_ms=20000,
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider=tp,)
    
    # Method to get a random model name
    def getModel():
        products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
        randomnum = random.randint(0, 4)
        return (products[randomnum])
    
    # Method to get a random interface status
    def getInterfaceStatus():
        status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
        randomnum = random.randint(0, 13)
        return (status[randomnum])
    
    # Method to get a random CPU usage
    def getCPU():
        i = random.randint(50, 100)
        return (str(i))
    
    # Method to get a random memory usage
    def getMemory():
        i = random.randint(1000, 1500)
        return (str(i))
        
    # Method to generate sample data
    def generateData():
        
        model=getModel()
        deviceid='dvc' + str(random.randint(1000, 10000))
        interface='eth4.1'
        interfacestatus=getInterfaceStatus()
        cpuusage=getCPU()
        memoryusage=getMemory()
        now = datetime.now()
        event_time = now.strftime("%Y-%m-%d %H:%M:%S")
        
        modem_data={}
        modem_data["model"]=model
        modem_data["deviceid"]=deviceid
        modem_data["interface"]=interface
        modem_data["interfacestatus"]=interfacestatus
        modem_data["cpuusage"]=cpuusage
        modem_data["memoryusage"]=memoryusage
        modem_data["event_time"]=event_time
        return modem_data
    
    # Continuously generate and send data
    while True:
        data =generateData()
        print(data)
        try:
            future = producer.send(topicname, value=data)
            producer.flush()
            record_metadata = future.get(timeout=10)
            
        except Exception as e:
            print(e.with_traceback())
    
    • Run the kafkaDataGen.py to continuously generate random data and publish it to the specified Kafka topic:
    python kafkaDataGen.py

    Store events in Amazon S3

    Now you store all the raw event data in an Amazon Simple Storage Service (Amazon S3) data lake for analytics. You can use the same data to train ML models. The integration with Amazon Data Firehose allows Amazon MSK to seamlessly load data from your Apache Kafka clusters into an S3 data lake. Complete the following steps to continuously stream data from Kafka to Amazon S3, eliminating the need to build or manage your own connector applications:

    • On the Amazon S3 console, create a new bucket. You can also use an existing bucket.
    • Create a new folder in your S3 bucket called streamingDataLake.
    • On the Amazon MSK console, choose your MSK Serverless cluster.
    • On the Actions menu, choose Edit cluster policy.

    cluster policy

    • Select Include Firehose service principal and choose Save changes.

    firehose service principal

    • On the S3 delivery tab, choose Create delivery stream.

    delivery stream

    • For Source, choose Amazon MSK.
    • For Destination, choose Amazon S3.

    source and destination

    • For Amazon MSK cluster connectivity, select Private bootstrap brokers.
    • For Topic, enter a topic name (for this post, mytopic).

    source settings

    • For S3 bucket, choose Browse and choose your S3 bucket.
    • Enter streamingDataLake as your S3 bucket prefix.
    • Enter streamingDataLakeErr as your S3 bucket error output prefix.

    destination settings

    • Choose Create delivery stream.

    create delivery stream

    You can verify that the data was written to your S3 bucket. You should see that the streamingDataLake directory was created and the files are stored in partitions.

    amazon s3

    Store events in DynamoDB

    For the last step, you store the most recent modem data in DynamoDB. This allows the client application to access the modem status and interact with the modem remotely from anywhere, with low latency and high availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

    Lets first create a table in DynamoDB. Refer to DynamoDB API permissions: Actions, resources, and conditions reference to verify that your client machine has the necessary permissions.

    • Create a new file called createTable.py.
    • Copy the following code into the file, updating the region information:
    import boto3
    region='<UPDATE_AWS_REGION_NAME_HERE>'
    dynamodb = boto3.client('dynamodb', region_name=region)
    table_name = 'device_status'
    key_schema = [
        {
            'AttributeName': 'deviceid',
            'KeyType': 'HASH'
        }
    ]
    attribute_definitions = [
        {
            'AttributeName': 'deviceid',
            'AttributeType': 'S'
        }
    ]
    # Create the table with on-demand capacity mode
    dynamodb.create_table(
        TableName=table_name,
        KeySchema=key_schema,
        AttributeDefinitions=attribute_definitions,
        BillingMode='PAY_PER_REQUEST'
    )
    print(f"Table '{table_name}' created with on-demand capacity mode.")
    • Run the createTable.py script to create a table called device_status in DynamoDB:
    python createTable.py

    Now let’s configure the Lambda function.

    • On the Lambda console, choose Functions in the navigation pane.
    • Choose Create function.
    • Select Author from scratch.
    • For Function name¸ enter a name (for example, my-notification-kafka).
    • For Runtime, choose Python 3.11.
    • For Permissions, select Use an existing role and choose a role with permissions to read from your cluster.
    • Create the function.

    On the Lambda function configuration page, you can now configure sources, destinations, and your application code.

    • Choose Add trigger.
    • For Trigger configuration, enter MSK to configure Amazon MSK as a trigger for the Lambda source function.
    • For MSK cluster, enter myCluster.
    • Deselect Activate trigger, because you haven’t configured your Lambda function yet.
    • For Batch size, enter 100.
    • For Starting position, choose Latest.
    • For Topic name¸ enter a name (for example, mytopic).
    • Choose Add.
    • On the Lambda function details page, on the Code tab, enter the following code:
    import base64
    import boto3
    import json
    import os
    import random
    
    def convertjson(payload):
        try:
            aa=json.loads(payload)
            return aa
        except:
            return 'err'
    
    def lambda_handler(event, context):
        base64records = event['records']['mytopic-0']
        
        raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
        
        for record in raw_records:
            item = json.loads(record)
            deviceid=item['deviceid']
            interface=item['interface']
            interfacestatus=item['interfacestatus']
            cpuusage=item['cpuusage']
            memoryusage=item['memoryusage']
            event_time=item['event_time']
            
            dynamodb = boto3.client('dynamodb')
            table_name = 'device_status'
            item = {
                'deviceid': {'S': deviceid},  
                'interface': {'S': interface},               
                'interface': {'S': interface},
                'interfacestatus': {'S': interfacestatus},
                'cpuusage': {'S': cpuusage},          
                'memoryusage': {'S': memoryusage},
                'event_time': {'S': event_time},
            }
            
            # Write the item to the DynamoDB table
            response = dynamodb.put_item(
                TableName=table_name,
                Item=item
            )
            
            print(f"Item written to DynamoDB")
    • Deploy the Lambda function.
    • On the Configuration tab, choose Edit to edit the trigger.

    edit trigger

    • Select the trigger, then choose Save.
    • On the DynamoDB console, choose Explore items in the navigation pane.
    • Select the table device_status.

    You will see Lambda is writing events generated in the Kafka topic to DynamoDB.

    ddb table

    Summary

    Streaming data pipelines are critical for building real-time applications. However, setting up and managing the infrastructure can be daunting. In this post, we walked through how to build a serverless streaming pipeline on AWS using Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose, and other services. The key benefits are no servers to manage, automatic scalability of the infrastructure, and a pay-as-you-go model using fully managed services.

    Ready to build your own real-time pipeline? Get started today with a free AWS account. With the power of serverless, you can focus on your application logic while AWS handles the undifferentiated heavy lifting. Let’s build something awesome on AWS!


    About the Authors

    Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

    Michael Oguike is a Product Manager for Amazon MSK. He is passionate about using data to uncover insights that drive action. He enjoys helping customers from a wide range of industries improve their businesses using data streaming. Michael also loves learning about behavioral science and psychology from books and podcasts.

    Announcing data filtering for Amazon Aurora MySQL zero-ETL integration with Amazon Redshift

    Post Syndicated from Jyoti Aggarwal original https://aws.amazon.com/blogs/big-data/announcing-data-filtering-for-amazon-aurora-mysql-zero-etl-integration-with-amazon-redshift/

    As your organization becomes more data driven and uses data as a source of competitive advantage, you’ll want to run analytics on your data to better understand your core business drivers to grow sales, reduce costs, and optimize your business. To run analytics on your operational data, you might build a solution that is a combination of a database, a data warehouse, and an extract, transform, and load (ETL) pipeline. ETL is the process data engineers use to combine data from different sources.

    To reduce the effort involved in building and maintaining ETL pipelines between transactional databases and data warehouses, AWS announced Amazon Aurora zero-ETL integration with Amazon Redshift at AWS re:Invent 2022 and is now generally available (GA) for Amazon Aurora MySQL-Compatible Edition 3.05.0.

    AWS is now announcing data filtering on zero-ETL integrations, enabling you to bring in selective data from the database instance on zero-ETL integrations between Amazon Aurora MySQL and Amazon Redshift. This feature allows you to select individual databases and tables to be replicated to your Redshift data warehouse for analytics use cases.

    In this post, we provide an overview of use cases where you can use this feature, and provide step-by-step guidance on how to get started with near real time operational analytics using this feature.

    Data filtering use cases

    Data filtering allows you to choose the databases and tables to be replicated from Amazon Aurora MySQL to Amazon Redshift. You can apply multiple filters to the zero-ETL integration, allowing you to tailor the replication to your specific needs. Data filtering applies either an exclude or include filter rule, and can use regular expressions to match multiple databases and tables.

    In this section, we discuss some common use cases for data filtering.

    Improve data security by excluding tables containing PII data from replication

    Operational databases often contain personally identifiable information (PII). This is information that is sensitive in nature, and can include information such as mailing addresses, customer verification documentation, or credit card information.

    Due to strict security compliance regulations, you may not want to use PII for your analytics use cases. Data filtering allows you to filter out databases or tables containing PII data, excluding them from replication to Amazon Redshift. This improves data security and compliance with analytics workloads.

    Save on storage costs and manage analytics workloads by replicating tables required for specific use cases

    Operational databases often contain many different datasets that aren’t useful for analytics. This includes supplementary data, specific application data, and multiple copies of the same dataset for different applications.

    Moreover, it’s common to build different use cases on different Redshift warehouses. This architecture requires different datasets to be available in individual endpoints.

    Data filtering allows you to only replicate the datasets that are required for your use cases. This can save costs by eliminating the need to store data that is not being used.

    You can also modify existing zero-ETL integrations to apply more restrictive data replication where desired. If you add a data filter to an existing integration, Aurora will fully reevaluate the data being replicated with the new filter. This will remove the newly filtered data from the target Redshift endpoint.

    For more information about quotas for Aurora zero-ETL integrations with Amazon Redshift, refer to Quotas.

    Start with small data replication and incrementally add tables as required

    As more analytics use cases are developed on Amazon Redshift, you may want to add more tables to an individual zero-ETL replication. Rather than replicating all tables to Amazon Redshift to satisfy the chance that they may be used in the future, data filtering allows you to start small with a subset of tables from your Aurora database and incrementally add more tables to the filter as they’re required.

    After a data filter on a zero-ETL integration is updated, Aurora will fully reevaluate the entire filter as if the previous filter didn’t exist, so workloads using previously replicated tables aren’t impacted in the addition of new tables.

    Improve individual workload performance by load balancing replication processes

    For large transactional databases, you may need to load balance the replication and any downstream processing to multiple Redshift clusters to allow for reduction of compute requirements for an individual Redshift endpoint and the ability to split workloads onto multiple endpoints. By load balancing workloads across multiple Redshift endpoints, you can effectively create a data mesh architecture, where endpoints are appropriately sized for individual workloads. This can improve performance and lower overall cost.

    Data filtering allows you to replicate different databases and tables to separate Redshift endpoints.

    The following figure shows how you could use data filters on zero-ETL integrations to split different databases in Aurora to separate Redshift endpoints.

    Example use case

    Consider the TICKIT database. The TICKIT sample database contains data from a fictional company where users can buy and sell tickets for various events. The company’s business analysts want to use the data that is stored in their Aurora MySQL database to generate various metrics, and would like to perform this analysis in near real time. For this reason, the company has identified zero-ETL as a potential solution.

    Throughout their investigation of the datasets required, the company’s analysts noted that the users table contains personal information about their customer user information that is not useful for their analytics requirements. Therefore, they want to replicate all data except the users table and will use zero-ETL’s data filtering to do so.

    Setup

    Start by following the steps in Getting started guide for near-real time operational analytics using Amazon Aurora zero-ETL integration with Amazon Redshift to create a new Aurora MySQL database, Amazon Redshift Serverless endpoint, and zero-ETL integration. Then open the Redshift query editor v2 and run the following query to show that data from the users table has been replicated successfully:

    select * from aurora_zeroetl.demodb.users;

    Data filters

    Data filters are applied directly to the zero-ETL integration on Amazon Relational Database Service (Amazon RDS). You can define multiple filters for a single integration, and each filter is defined as either an Include or Exclude filter type. Data filters apply a pattern to existing and future database tables to determine which filter should be applied.

    Apply a data filter

    To apply a filter to remove the users table from the zero-ETL integration, complete the following steps:

    1. On the Amazon RDS console, choose Zero-ETL integrations in the navigation pane.
    2. Choose the zero-ETL integration to add a filter to.

    The default filter is to include all databases and tables represented by an include:*.* filter.

    1. Choose Modify.
    2. Choose Add filter in the Source section.
    3. For Choose filter type, choose Exclude.
    4. For Filter expression, enter the expression demodb.users.

    Filter expression order matters. Filters are evaluated left to right, top to bottom, and subsequent filters will override previous filters. In this example, Aurora will evaluate that every table should be included (filter 1) and then evaluate that the demodb.users table should be excluded (filter 2). The exclusion filter therefore overrides the inclusion because it’s after the inclusion filter.

    1. Choose Continue.
    2. Review the changes, making sure that the order of the filters is correct, and choose Save changes.

    The integration will be added and will be in a Modifying state until the changes have been applied. This can take up to 30 minutes. To check if the changes have finished applying, choose the zero-ETL integration and check its status. When it shows as Active, the changes have been applied.

    Verify the change

    To verify the zero-ETL integration has been updated, complete the following steps:

    1. In the Redshift query editor v2, connect to your Redshift cluster.
    2. Choose (right-click) the aurora-zeroetl database you created and choose Refresh.
    3. Expand demodb and Tables.

    The users table is no longer available because it has been removed from the replication. All other tables are still available.

    1. If you run the same SELECT statement from earlier, you will receive an error stating the object does not exist in the database:
      select * from aurora_zeroetl.demodb.users;

    Apply a data filter using the AWS CLI

    The company’s business analysts now understand that more databases are being added to the Aurora MySQL database and they want to ensure only the demodb database is replicated to their Redshift cluster. To this end, they want to update the filters on the zero-ETL integration with the AWS Command Line Interface (AWS CLI).

    To add data filters to a zero-ETL integration using the AWS CLI, you can call the modify-integration command. In addition to the integration identifier, specify the --data-filter parameter with a comma-separated list of include and exclude filters.

    Complete the following steps to alter the filter on the zero-ETL integration:

    1. Open a terminal with the AWS CLI installed.
    2. Enter the following command to list all available integrations:
      aws rds describe-integrations

    3. Find the integration you want to update and copy the integration identifier.

    The integration identifier is an alphanumeric string at the end of the integration ARN.

    1. Run the following command, updating <integration identifier> with the identifier copied from the previous step:
      aws rds modify-integration --integration-identifier "<integration identifier>" --data-filter 'exclude: *.*, include: demodb.*, exclude: demodb.users'

    When Aurora is assessing this filter, it will exclude everything by default, then only include the demodb database, but exclude the demodb.users table.

    Data filters can implement regular expressions for the databases and table. For example, if you want to filter out any tables starting with user, you can run the following:

    aws rds modify-integration --integration-identifier "<integration identifier>" --data-filter 'exclude: *.*, include: demodb.*, exclude *./^user/'

    As with the previous filter change, the integration will be added and will be in a Modifying state until the changes have been applied. This can take up to 30 minutes. When it shows as Active, the changes have been applied.

    Clean up

    To remove the filter added to the zero-ETL integration, complete the following steps:

    1. On the Amazon RDS console, choose Zero-ETL integrations in the navigation pane.
    2. Choose your zero-ETL integration.
    3. Choose Modify.
    4. Choose Remove next to the filters you want to remove.
    5. You can also change the Exclude filter type to Include.

    Alternatively, you can use the AWS CLI to run the following:

    aws rds modify-integration --integration-identifier "<integration identifier>" --data-filter 'include: *.*'
    1. Choose Continue.
    2. Choose Save changes.

    The data filter will take up to 30 minutes to apply the changes. After you remove data filters, Aurora reevaluates the remaining filters as if the removed filter had never existed. Any data that previously didn’t match the filtering criteria but now does is replicated into the target Redshift data warehouse.

    Conclusion

    In this post, we showed you how to set up data filtering on your Aurora zero-ETL integration from Amazon Aurora MySQL to Amazon Redshift. This allows you to enable near real time analytics on transactional and operational data while replicating only the data required.

    With data filtering, you can split workloads into separate Redshift endpoints, limit the replication of private or confidential datasets, and increase performance of workloads by only replicating required datasets.

    To learn more about Aurora zero-ETL integration with Amazon Redshift, see Working with Aurora zero-ETL integrations with Amazon Redshift and Working with zero-ETL integrations.


    About the authors

    Jyoti Aggarwal is a Product Management Lead for AWS zero-ETL. She leads the product and business strategy, including driving initiatives around performance, customer experience, and security. She brings along an expertise in cloud compute, data pipelines, analytics, artificial intelligence (AI), and data services including databases, data warehouses and data lakes.


    Sean Beath
    is an Analytics Solutions Architect at Amazon Web Services. He has experience in the full delivery lifecycle of data platform modernisation using AWS services, and works with customers to help drive analytics value on AWS.

    Gokul Soundararajan is a principal engineer at AWS and received a PhD from University of Toronto and has been working in the areas of storage, databases, and analytics.

    Invoke AWS Lambda functions from cross-account Amazon Kinesis Data Streams

    Post Syndicated from Amar Surjit original https://aws.amazon.com/blogs/big-data/invoke-aws-lambda-functions-from-cross-account-amazon-kinesis-data-streams/

    A multi-account architecture on AWS is essential for enhancing security, compliance, and resource management by isolating workloads, enabling granular cost allocation, and facilitating collaboration across distinct environments. It also mitigates risks, improves scalability, and allows for advanced networking configurations.

    In a streaming architecture, you may have event producers, stream storage, and event consumers in a single account or spread across different accounts depending on your business and IT requirements. For example, your company may want to centralize its clickstream data or log data from multiple different producers across different accounts. Data consumers from marketing, product engineering, or analytics require access to the same streaming data across accounts, which requires the ability to deliver a multi-account streaming architecture.

    To build a multi-account streaming architecture, you can use Amazon Kinesis Data Streams as the stream storage and AWS Lambda as the event consumer. Amazon Kinesis Data Streams enables real-time processing of streaming data at scale. When integrated with Lambda, it allows for serverless data processing, enabling you to analyze and react to data streams in real time without managing infrastructure. This integration supports various use cases, including real-time analytics, log processing, Internet of Things (IoT) data ingestion, and more, making it valuable for businesses requiring timely insights from their streaming data. In this post, we demonstrate how you can process data ingested into a stream in one account with a Lambda function in another account.

    The recent launch of Kinesis Data Streams support for resource-based policies enables invoking a Lambda from another account. With a resource-based policy, you can specify AWS accounts, AWS Identity and Access Management (IAM) users, or IAM roles and the exact Kinesis Data Streams actions for which you want to grant access. After access is granted, you can configure a Lambda function in another account to start processing the data stream belonging to your account. This reduces cost and simplifies the data processing pipeline, because you no longer have to copy streaming data using Lambda functions in both accounts. Sharing access to your data streams or registered consumers does not incur additional charges to your account. Cross-account usage of Kinesis Data Streams resources will continue to be billed to the resource owners.

    In this post, we use Kinesis Data Streams with enhanced fan-out feature, empowering consumers with dedicated read throughput tailored to their applications. By default, Kinesis Data Streams offers shared read throughput of 2 MB/sec per shard across consumers, but with enhanced fan-out, each consumer can enjoy dedicated throughput of 2 MB/sec per shard. This flexibility allows you to seamlessly adapt Kinesis Data Streams to your specific requirements, choosing between enhanced fan-out for dedicated throughput or shared throughput according to your needs.

    Solution overview

    For our solution, we deploy Kinesis Data Streams in Account 1 and Lambda as the consumer in Account 2 to receive data from the data stream. The following diagram illustrates the high-level architecture.

    Amazon KDS-Lambda cross acct solution architecture

    The setup requires the following key elements:

    • Kinesis data stream in Account 1 and Lambda function in Account 2
    • Kinesis Data Streams resource policies in Account 1, allowing a cross-account Lambda execution role to perform operations on the Kinesis data stream
    • A Lambda execution role in Account 2 and an enhanced fan-out consumer resource policy in Account 1, allowing the cross-account Lambda execution role to perform operations on the Kinesis data stream

    For the setup, you use three AWS CloudFormation templates to create the key resources:

    • CloudFormation template 1 creates the following key resources in Account 1:
      • Kinesis data stream
      • Kinesis data stream enhanced fan-out consumer
    • CloudFormation template 2 creates the following key resources in Account 2:
      • Consumer Lambda function
      • Consumer Lambda function execution role
    • CloudFormation template 3 creates the following resource in Account 2:
      • Consumer Lambda function event source mapping

    The solution supports single-Region deployment, and the CloudFormation templates must be deployed in the same Region across different AWS accounts. In this solution, we use Kinesis Data Streams enhanced fan-out, which is a best practice for deploying architectures requiring large throughput across multiple consumers. Complete the steps in the following sections to deploy this solution.

    Prerequisites

    You should have two AWS accounts and the required permissions to run a CloudFormation template to create the services mentioned in the solution architecture. You also need the AWS Command Line Interface (AWS CLI) installed, version 2.15 and above.

    Launch CloudFormation template 1

    Complete the following steps to launch the first CloudFormation template:

    1. Sign in to the AWS Management Console as Account 1 and select the appropriate AWS Region.
    2. Download and launch CloudFormation template 1 where you want to deploy your Kinesis data stream.
    3. For LambdaConsumerAccountId, enter your Lambda consumer account ID and click submit. The CloudFormation template deployment will take a few minutes to complete.
    4. When the stack is complete, on the AWS CloudFormation console, navigate to the stack Outputs tab and copy the values of following parameters:
      • KinesisStreamArn
      • KinesisStreamEFOConsumerArn
      • KMSKeyArn

    You will need these values in later steps.

    Launch CloudFormation template 2

    Complete the following steps to launch the second CloudFormation template:

    1. Sign in to the console as Account 2 and select the appropriate Region.
    2. Download and launch CloudFormation template 2 where you want to host the Lambda consumer.
    3. Provide the following input parameters captured from the previous step:
      • KinesisStreamArn
      • KinesisStreamEFOConsumerArn
      • KMSKeyArn

    The CloudFormation template creates the following key resources:

    • Lambda consumer
    • Lambda execution role

    The Lambda function’s execution role is an IAM role that grants the function permission to access AWS services and resources. Here, you create a Lambda execution role that has the required Kinesis Data Streams and Lambda invocation permissions.

    The CloudFormation template deployment will take a few minutes to complete.

    1. When the stack is complete, on the AWS CloudFormation console, navigate to the stack Outputs tab and copy the values of following parameters:
      • KinesisStreamCreateResourcePolicyCommand
      • KinesisStreamEFOConsumerCreateResourcePolicyCommand
    2. Run the following AWS CLI commands in Account 1 using AWS CloudShell. We recommend using CloudShell because it will have the latest version of the AWS CLI and avoid any kind of failures.
      • KinesisStreamCreateResourcePolicyCommand – This creates the resource policy in Account 1 for Kinesis Data Stream. The following is a sample resource policy:
        {
        "Version": "2012-10-17",
        "Statement": [
        {
        "Sid": "StreamEFOReadStatementID",
        "Effect": "Allow",
        "Principal": {
        "AWS": [
        "arn:aws:iam::<AWS Lambda - Consumer account id>:role/kds-cross-account-stream-consumer-lambda-execution-role"
        ]
        },
        "Action": [
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:DescribeStream",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator"
        ],
        "Resource": "arn:aws:kinesis:<region id>:<Account 1 - Amazon KDS account id>:stream/kds-cross-account-stream"
        }
        ]
        }

      • KinesisStreamEFOConsumerCreateResourcePolicyCommand – This creates the resource policy for the enhanced fan-out consumer for the Kinesis data stream in Account 1. The following is a sample resource policy:
        {
        "Version": "2012-10-17",
        "Statement": [
        {
        "Sid": "ConsumerEFOReadStatementID",
        "Effect": "Allow",
        "Principal": {
        "AWS": [
        " arn:aws:iam::<AWS Lambda - Consumer account id>:role/kds-cross-account-stream-consumer-lambda-execution-role"
        ]
        },
        "Action": [
        "kinesis:DescribeStreamConsumer",
        "kinesis:SubscribeToShard"
        ],
        "Resource": "arn:aws:kinesis:<region id>:<Account 1 - Amazon KDS account id>:stream/kds-cross-account-stream/consumer/kds-cross-account-stream-efo-consumer:1706616477"
        }
        ]
        }

    You can also access this policy on the Kinesis Data Streams console, under Enhanced fan-out, Consumer name, and Consumer sharing resource-based policy.

    Launch CloudFormation template 3

    Now that you have created resource policies in Account 1 for the Kinesis data stream and its enhanced fan-out consumer, you can create Lambda event source mapping for the consumer Lambda function in Account 2. Complete the following steps:

    1. Sign in to the console as Account 2 and select the appropriate Region.
    2. Download and launch CloudFormation template 3 to update the stack you created using CloudFormation template 2.

    The CloudFormation template creates the Lambda event source mapping.

    Validate the solution

    At this point, the deployment is complete. A Kinesis data stream is available to consume the messages and a Lambda function receives these messages in the destination account. To send sample messages to the data stream in Account 1, run the following AWS CLI command using CloudShell:

    aws kinesis put-record --stream-name kds-cross-account-stream --data sampledatarecord --partition-key samplepartitionkey3 --region <region id>

    The Lambda function in Account 2 is able to receive the messages, and you should be able to verify the same using Amazon CloudWatch logs:

    1. On the CloudWatch console, choose Log groups in the navigation pane.
    2. Locate the log group /aws/lambda/kds-cross-account-stream-efo-consumer.
    3. Choose Search log group to view the relevant log messages. The following is an example message:
      "Records": [
      {
      "kinesis": {
      "kinesisSchemaVersion": "1.0",
      "partitionKey": "samplepartitionkey3",
      "sequenceNumber": "49648798411111169765201534322676841348246990356337393698",
      "data": "sampledatarecord",
      "approximateArrivalTimestamp": 1706623274.658
      },

    Clean up

    It’s always a good practice to clean up all the resources you created as part of this post to avoid any additional cost.

    To clean up your resources, delete the respective CloudFormation stacks from Accounts 1 and 2, and stop the producer from pushing events to the Kinesis data stream. This makes sure that you are not charged unnecessarily.

    Summary

    In this post, we demonstrated how to configure a cross-account Lambda integration with Kinesis Data Streams using AWS resource-based policies. This enables processing of data ingested into a stream within one AWS account through a Lambda function located in another account. To support customers who use a Kinesis data stream in their central account and have multiple consumers reading data from it, we have used the Kinesis Data Streams enhanced fan-out feature.

    To get started, open the Kinesis Data Streams console or use the new API PutResourcePolicy to attach a resource policy to your data stream or consumer.


    About the authors

    Pratik Patel is Sr. Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively keep customers’ AWS environments operationally healthy.

    Amar is a Senior Solutions Architect at Amazon AWS in the UK. He works across power, utilities, manufacturing and automotive customers on strategic implementations, specializing in using AWS Streaming and advanced data analytics solutions, to drive optimal business outcomes.