Tag Archives: Customer Solutions

Orchestrate an end-to-end ETL pipeline using Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA

Post Syndicated from Radhika Jakkula original https://aws.amazon.com/blogs/big-data/orchestrate-an-end-to-end-etl-pipeline-using-amazon-s3-aws-glue-and-amazon-redshift-serverless-with-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.

By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.

This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Solution overview

For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.

Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.

Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.

The workflow consists of the following components:

  • The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
  • In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
  • Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
  • VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
  • Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
  • The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA.

Prerequisites

Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.

Deploy resources in Account A using AWS CloudFormation

In Account A, launch the provided AWS CloudFormation stack to create the following resources:

  • The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
  • A sample dataset called products.csv, which we use in this post.

Upload the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.

Deploy resources in Account B using AWS CloudFormation

In Account B, launch the provided CloudFormation stack template to create the following resources:

  • The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
    • dags – The folder for DAG files.
    • plugins – The file for any custom or community Airflow plugins.
    • requirements – The requirements.txt file for any Python packages.
    • scripts – Any SQL scripts used in the DAG.
    • data – Any datasets used in the DAG.
  • A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
  • An AWS Glue environment, which contains the following:
    • An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
    • A database called products_db in the AWS Glue Data Catalog.
    • An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift resources

Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.

In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.

Configure Airflow permissions

After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.

Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment.

The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.

Set up the environment

This section outlines the steps to configure the environment. The process involves the following high-level steps:

  1. Update any necessary providers.
  2. Set up cross-account access.
  3. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets Manager to integrate with Amazon MWAA.
  5. Define Airflow connections.

Update the providers

Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).

Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.

Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.

The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.

  1. Specify the requirements as follows:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Update the version in the constraints file to 8.4.0 or higher.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version.

  1. Navigate to the Amazon MWAA environment and choose Edit.
  2. Under DAG code in Amazon S3, for Requirements file, choose the latest version.
  3. Choose Save.

This will update the environment and new providers will be in effect.

  1. To verify the providers version, go to Providers under the Admin table.

The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.

Set up cross-account access

You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:

  1. In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this policy to the AWS Glue role and Amazon MWAA role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
  9. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.

Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA.

Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs

Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.

Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.

If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.

Configure the Amazon MWAA connection with Secrets Manager

When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.

Complete the following steps:

  1. Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

This allows Amazon MWAA to access credentials stored in Secrets Manager.

  1. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
  2. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.

This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
  2. Run the following code to generate the connection URI string:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

The connection string should be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>
  1. Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).

This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.

You can also add secrets using the Secrets Manager console as key-value pairs.

  1. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.

Create an Airflow connection through the metadata database

You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.

  1. For Connection Id, enter a name for the connection.
  2. For Connection Type, choose Amazon Redshift.
  3. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For User, enter your admin user name.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Extra, set the region and timeout parameters.
  9. Test the connection, then save your settings.

Create and run a DAG

In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.

Create a DAG

In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:

  • The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
    • For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
    • We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
  • After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
  • The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
  • When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
  • The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
  • You can connect to Amazon Redshift from Airflow using three different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
    • RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.

Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.

In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.

  • As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
  • TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
  • The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.

The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Verify the DAG run

After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.

In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.

Verify the results

On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Clean up

Clean up the resources created as part of this post to avoid incurring ongoing charges:

  1. Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
  2. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.

Conclusion

With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.


About the Authors


Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.

Architecting for Disaster Recovery on AWS Outposts Racks with AWS Elastic Disaster Recovery

Post Syndicated from Macey Neff original https://aws.amazon.com/blogs/compute/architecting-for-disaster-recovery-on-aws-outposts-racks-with-aws-elastic-disaster-recovery/

This blog post is written by Brianna Rosentrater, Hybrid Edge Specialist SA.

AWS Elastic Disaster Recovery Service (AWS DRS) now supports disaster recovery (DR) architectures that include on-premises Windows and Linux workloads running on AWS Outposts. AWS DRS minimizes downtime and data loss with fast, reliable recovery of on-premises and cloud-based applications using affordable storage, minimal compute, and point-in-time recovery. Both services are billed and managed from your AWS Management Console.

Like workloads running in AWS Regions, it’s critical to plan for failures. Outposts are designed with resiliency in mind, providing redundant power, networking, and are available to order with N+M active compute instance capacity. In other words, for every physical N compute servers, you have the option of including M redundant hosts capable of handling the workload during a failure. When leveraging AWS DRS with Outpost, you can plan for larger-scale failure modes, such as data center outages, by replicating mission-critical workloads to other remote data center locations or the AWS Region.

In this post, you’ll learn how AWS DRS can be used with Outpost rack to architect for high availability in the event of a site failure. The post will examine several different architectures enabled by AWS DRS that provide DR for Outpost, and the benefits of each method described.

Prerequisites

Each of these architectures described below need the following:

Public internet access isn’t needed, AWS PrivateLink and AWS Direct Connect are supported for replication and failback which is a significant security benefit.

Planning for failure

Disasters come in many forms and are often unplanned and unexpected events. Regardless of whether your workload resides on premises, in a colocation facility, or in an AWS Region, it’s critical to define the Recovery Time Objective (RTO) and Recovery Point Objective (RPO) which are often workload-specific. These two metrics profile how long a service can be down during recovery and quantify the acceptable amount of data loss. RTO and RPO guide you in choosing the appropriate strategy such as backup and recovery, pilot light, warm standby, or a multi-site (active-active) approach.

With AWS DRS, while failing back to a test machine (not the original source server), replication of the source server continues. This allows failback drills without impacting RPO, and non-disruptive failback drills are an important part of disaster planning to validate your recovery plan meets your expected RPO/RTO as per your business requirements.

How AWS DRS integrates with Outpost

AWS DRS uses an AWS Replication Agent at the source to capture the workload and transfer it to a lightweight staging area, which resides on an Outpost equipped with Amazon S3 on Outposts. This method also provides the ability to perform low-effort, non-disruptive DR drills before making the final cutover. The AWS Replication Agent doesn’t need a reboot nor does it impact your applications during installation.

When an Outpost’s subnet is selected as the target for replication or launch, all associated AWS DRS components remain within the Outpost, including the AWS DRS server conversion technology. These conversion servers convert source disks of servers being migrated so that they can boot and run in the target infrastructure, Amazon EBS volumes, snapshots, and replication servers. The replication servers replicate the disks to the target infrastructure. With AWS DRS you can control the data replication path using private connectivity options such as a virtual private network (VPN), AWS Direct Connect, VPC peering, or another private connection. Learn more about using a private IP for data replication.

AWS DRS provides nearly continuous replication for mission-critical workloads and supports deployment patterns including on-premises to Outpost, Outpost to Region, Region to Outpost, and between two logical Outposts through local networks. To leverage Outpost with AWS DRS, simply select the Outpost subnet as your target or source for replication when configuring AWS DRS for your workload. If you are currently using CloudEndure DR for disaster recovery with Outpost, see these detailed instructions for migrating to AWS DRS from CloudEndure DR.

DR from on-premises to Outpost

Outpost can be used as a DR target for on-premises workloads. By deploying an Outpost in a remote data center or colocation a significant distance from the source within the same geo-political boundary, you can replicate workloads across great distances and increase resiliency of the data while ensuring adherence to data residency policies or legislation.

DR from on-premises to Outposts

Figure 1 – DR from on-premises to Outposts

In Figure 1, on premises sources replicate traffic from a LAN to a staging area residing in an Outpost subnet via the local gateway. This allows workloads to failover from their on-premises environment to an Outpost in a different physical location during a disaster.

The staging areas and replication servers run on Amazon Elastic Compute Cloud (Amazon EC2) with Amazon EBS volumes and require Amazon S3 on Outposts where the Amazon EBS snapshots reside.

The replication agent is responsible for providing nearly continuous, block-level replication from your LAN using TCP/1500 with traffic routing to Amazon EC2 instances using the Outposts local gateway.

DR from Outpost to Region

Since its initial release, Outpost has supported Amazon EBS snapshots written to Amazon S3 located in the AWS Region. Backup to an AWS Region is one of the most cost-effective and easiest-to-configure DR approaches, enabling data redundancy outside of your Outpost and data center.

This method also offers flexibility for restoration within an AWS Region if the original deployment is irrecoverable. However, depending on the frequency of the snapshots and the timing of the failure, backup, and recovery to the Region has the potential to have an RPO/RTO spanning hours depending on the throughput of the service link.

For critical workloads, AWS DRS can reduce RTO to minutes and RPO in the sub-second range. After creating an initial replication of workloads that reside on the Outpost, AWS DRS provides nearly continuous, block-level replication in the Region. Just like replication from non-AWS virtual machines or bare metal servers, AWS DRS resources, including Replication Servers, Conversion Servers, Amazon EBS Volumes, and Snapshots reside in the Region.

DR from Outpost to Region

Figure 2 – DR from Outpost to Region

In Figure 2, data replication is performed over the service link from Amazon EC2 instances running locally on an Outpost to an AWS Region. The service link traverses either public Region connectivity or AWS Direct Connect.

AWS Direct Connect is the recommended option because it provides low latency and consistent bandwidth for the service link back to a Region, which also improves the reliability of transmission for AWS DRS replication traffic.

The service link is comprised of redundant, encrypted VPN tunnels. Replication traffic can also be sent privately without traversing the public internet by leveraging Private Virtual Interfaces with Direct Connect for the service link.

With this architecture in place, you can mitigate disasters and reduce downtime by failing over to the AWS Region using AWS DRS.

DR from Region to Outpost

AWS provides multiple Availability Zones (AZs) within a Region and isolated AWS Regions globally for the greatest possible fault tolerance and stability. The reliability pillar of AWS’s Well-Architected Framework encourages distributing workloads across AZs and replicating data between Regions when the need for distances exceeds those of AZs.

AWS DRS supports nearly continuous replication of workloads from a Region to an Outpost within your data center or colocation facility for DR. This deployment model provides increased durability from a source AWS Region to an Outpost anchored to a different Region.

In this model, AWS DRS components remain on-premises within the Outpost, but data charges are applicable as data egresses from the Region back to the data center and Amazon S3 on Outposts is required on the destination Outpost.

DR from Region to Outpost

Figure 3 – DR from Region to Outpost

Implementing the preceding architecture diagram enables failover of critical workloads from the Region to on-premises Outposts seamlessly. Keep in mind that AWS Regions provide the management and control plane for Outpost, making it critical to consider probability and frequency of service link interruptions as a part of your DR planning. Scenarios such as warm standby with pre-allocated Amazon EC2 and Amazon EBS resources may prove more resilient during service link disruptions.

DR between two Outposts

Each logical Outpost is comprised of one or more physical racks. Logical Outposts are in independent colocations of one another, and support deployments in disparate data centers or colocation facilities. You can elect to have multiple logical Outposts anchored to different Availability Zones or Regions. AWS DRS unlocks options for replication between two logical Outposts, leading to increased resiliency and reducing the impact of your data center as a single point of failure. In the following architecture, nearly continuous replication captured from a single Outpost source is applied at a second logical Outpost.

DR between two Outposts

Figure 4 – DR between two Outposts

Supporting both directional and bidirectional replication between Outposts can minimize disruption caused by events that take down a data center, Availability Zone, or even the entire Region result in minimal disruption. In the following architecture diagram, bidirectional data replication occurs between the Outposts by routing traffic via the local gateways, minimizing outbound data charges from the Region and allowing for more direct routing between deployment sites that could potentially span significant distances. AWS DRS cannot communicate with resources directly utilizing a customer-owned IP address pool (CoIP pool).

Figure 5 – DR between two Outposts – bidirectional 

Architecture Considerations

When planning an Outpost deployment leveraging AWS DRS, it’s critical to consider the impact on storage. As a general best practice, AWS recommends planning for a 2:1 ratio consisting of EBS volumes used for nearly continuous replication and Amazon EBS snapshots on Amazon S3 for point-in-time recovery. While it’s unlikely that all servers would need recovery simultaneously, it’s also important to allocate a reserve of EBS volume capacity, which will launch at the time of recovery. Amazon S3 on Outpost is needed for each Outpost used as a replication destination, and the recommendation is to plan for a 1:1 ratio consisting of S3 on Outposts storage, plus the rate of data change. For example, if your data change rate is 10%, you’d want to plan for 110% S3 on Outpost use with AWS DRS.

Amazon CloudWatch has integrated metrics for EC2, Amazon EBS, and Amazon S3 capacity on Outposts, making it easy to create custom tailored dashboards and integrate with Simple Notification Service (Amazon SNS) for alerts at defined thresholds. Monitoring these metrics is critical in making sure that proper free space is available for data replication to occur unimpeded. CloudWatch has metrics available for AWS DRS as well. You can also use the AWS DRS service page in the AWS Console to monitor the status of your recovery instances.

Consider taking advantage of Recovery Plans within AWS DRS to make sure that related services are recovered in a particular order. For example, during a disaster, it might be critical to first bring up a database before recovering application tiers. Recovery plans provide the ability to group related services and apply wait times to individual targets.

Conclusion

AWS Outpost enables low latency, data residency, or data gravity-constrained workloads by supplying managed cloud compute and storage services within your data center or colocation. When coupled with AWS DRS, you can decrease RPO and RTO through a variety of flexible deployment models with sources and destinations ranging from on-premises, the Region, or another AWS Outpost.

How Salesforce optimized their detection and response platform using AWS managed services

Post Syndicated from Atul Khare original https://aws.amazon.com/blogs/big-data/how-salesforce-optimized-their-detection-and-response-platform-using-aws-managed-services/

This is a guest blog post co-authored with Atul Khare and Bhupender Panwar from Salesforce.

Headquartered in San Francisco, Salesforce, Inc. is a cloud-based customer relationship management (CRM) software company building artificial intelligence (AI)-powered business applications that allow businesses to connect with their customers in new and personalized ways.

The Salesforce Trust Intelligence Platform (TIP) log platform team is responsible for data pipeline and data lake infrastructure, providing log ingestion, normalization, persistence, search, and detection capability to ensure Salesforce is safe from threat actors. It runs miscellaneous services to facilitate investigation, mitigation, and containment for security operations. The TIP team is critical to securing Salesforce’s infrastructure, detecting malicious threat activities, and providing timely responses to security events. This is achieved by collecting and inspecting petabytes of security logs across dozens of organizations, some with thousands of accounts.

In this post, we discuss how the Salesforce TIP team optimized their architecture using Amazon Web Services (AWS) managed services to achieve better scalability, cost, and operational efficiency.

TIP existing architecture bird’s eye view and scale of the platform

The main key performance indicator (KPI) for the TIP platform is its capability to ingest a high volume of security logs from a variety of Salesforce internal systems in real time and process them with high velocity. The platform ingests more than 1 PB of data per day, more than 10 million events per second, and more than 200 different log types. The platform ingests log files in JSON, text, and Common Event Format (CEF) formats.

The message bus in TIP’s existing architecture mainly uses Apache Kafka for ingesting different log types coming from the upstream systems. Kafka had a single topic for all the log types before they were consumed by different downstream applications including Splunk, Streaming Search, and Log Normalizer. The Normalized Parquet Logs are stored in an Amazon Simple Storage Service (Amazon S3) data lake and cataloged into Hive Metastore (HMS) on an Amazon Relational Database Service (Amazon RDS) instance based on S3 event notifications. The data lake consumers then use Apache Presto running on Amazon EMR cluster to perform one-time queries. Other teams including the Data Science and Machine Learning teams use the platform to detect, analyze, and control security threats.

Challenges with the existing TIP log platform architecture

Some of the main challenges that TIP’s existing architecture was facing include:

  • Heavy operational overhead and maintenance cost managing the Kafka cluster
  • High cost to serve (CTS) to meet growing business needs
  • Compute threads limited by partitions’ numbers
  • Difficult to scale out when traffic increases
  • Weekly patching creates lags
  • Challenges with HMS scalability

All these challenges motivated the TIP team to embark on a journey to create a more optimized platform that’s easier to scale with less operational overhead and lower CTS.

New TIP log platform architecture

The Salesforce TIP log platform engineering team, in collaboration with AWS, started building the new architecture to replace the Kafka-based message bus solution with the fully managed AWS messaging and notification solutions Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Notification Service (Amazon SNS). In the new design, the upstream systems send their logs to a central Amazon S3 storage location, which invokes a process to partition the logs and store them in an S3 data lake. Consumer applications such as Splunk get the messages delivered to their system using Amazon SQS. Similarly, the partitioned log data through Amazon SQS events initializes a log normalization process that delivers the normalized log data to open source Delta Lake tables on an S3 data lake. One of the major changes in the new architecture is the use of an AWS Glue Data Catalog to replace the previous Hive Metastore. The one-time analysis applications use Apache Trino on an Amazon EMR cluster to query the Delta Tables cataloged in AWS Glue. Other consumer applications also read the data from S3 data lake files stored in Delta Table format. More details on some of the important processes are as follows:

Log partitioner (Spark structured stream)

This service ingests logs from the Amazon S3 SNS SQS-based store and stores them in the partitioned (by log types) format in S3 for further downstream consumptions from the Amazon SNS SQS subscription. This is the bronze layer of the TIP data lake.

Log normalizer (Spark structured stream)

One of the downstream consumers of log partitioner (Splunk Ingestor is another one), the log normalizer ingests the data from Partitioned Output S3, using Amazon SNS SQS notifications, and enriches them using Salesforce custom parsers and tags. Finally, this enriched data is landed in the data lake on S3. This is the silver layer of the TIP data lake.

Machine learning and other data analytics consumers (Trino, Flink, and Spark Jobs)

These consumers consume from the silver layer of the TIP data lake and run analytics for security detection use cases. The earlier Kafka interface is now converted to delta streams ingestion, which concludes the total removal of the Kafka bus from the TIP data pipeline.

Advantages of the new TIP log platform architecture

The main advantages realized by the Salesforce TIP team based on this new architecture using Amazon S3, Amazon SNS, and Amazon SQS include:

  • Cost savings of approximately $400 thousand per month
  • Auto scaling to meet growing business needs
  • Zero DevOps maintenance overhead
  • No mapping of partitions to compute threads
  • Compute resources can be scaled up and down independently
  • Fully managed Data Catalog to reduce the operational overhead of managing HMS

Summary

In this blog post we discussed how the Salesforce Trust Intelligence Platform (TIP) optimized their data pipeline by replacing the Kafka-based message bus solution with fully managed AWS messaging and notification solutions using Amazon SQS and Amazon SNS. Salesforce and AWS teams worked together to make sure this new platform seamlessly scales to ingest more than 1 PB of data per day, more than 10 millions events per second, and more than 200 different log types. Reach out to your AWS account team if you have similar use cases and you need help architecting your platform to achieve operational efficiencies and scale.


About the authors

Atul Khare is a Director of Engineering at Salesforce Security, where he spearheads the Security Log Platform and Data Lakehouse initiatives. He supports diverse security customers by building robust big data ETL pipeline that is elastic, resilient, and easy to use, providing uniform & consistent security datasets for threat detection and response operations, AI, forensic analysis, analytics, and compliance needs across all Salesforce clouds. Beyond his professional endeavors, Atul enjoys performing music with his band to raise funds for local charities.

Bhupender Panwar is a Big Data Architect at Salesforce and seasoned advocate for big data and cloud computing. His background encompasses the development of data-intensive applications and pipelines, solving intricate architectural and scalability challenges, and extracting valuable insights from extensive datasets within the technology industry. Outside of his big data work, Bhupender loves to hike, bike, enjoy travel and is a great foodie.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Vikas Panghal is the Principal Product Manager leading the product management team for Amazon SNS and Amazon SQS. He has deep expertise in event-driven and messaging applications and brings a wealth of knowledge and experience to his role, shaping the future of messaging services. He is passionate about helping customers build highly scalable, fault-tolerant, and loosely coupled systems. Outside of work, he enjoys spending time with his family outdoors, playing chess, and running.

How Aura from Unity revolutionized their big data pipeline with Amazon Redshift Serverless

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-aura-from-unity-revolutionized-their-big-data-pipeline-with-amazon-redshift-serverless/

This post is co-written with  Amir Souchami and  Fabian Szenkier from Unity.

Aura from Unity (formerly known as ironSource) is the market standard for creating rich device experiences that engage and retain customers. With a powerful set of solutions, Aura enables complete digital transformation, letting operators promote key services outside the store, directly on-device.

Amazon Redshift is a recommended service for online analytical processing (OLAP) workloads such as cloud data warehouses, data marts, and other analytical data stores. You can use simple SQL to analyze structured and semi-structured data, operational databases, and data lakes to deliver the best price/performance at any scale. The Amazon Redshift data sharing feature provides instant, granular, and high-performance access without data copies and data movement across multiple Redshift data warehouses in the same or different AWS accounts and across AWS Regions. Data sharing provides live access to data so that you always see the most up-to-date and consistent information as it’s updated in the data warehouse.

Amazon Redshift Serverless makes it straightforward to run and scale analytics in seconds without the need to set up and manage data warehouse clusters. Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver fast performance for even the most demanding and unpredictable workloads, and you pay only for what you use. You can load your data and start querying right away in the Amazon Redshift Query Editor or in your favorite business intelligence (BI) tool and continue to enjoy the best price/performance and familiar SQL features in an easy-to-use, zero administration environment.

In this post, we describe Aura’s successful and swift adoption of Redshift Serverless, which allowed them to optimize their overall bidding advertisement campaigns’ time to market from 24 hours to 2 hours. We explore why Aura chose this solution and what technological challenges it helped solve.

Aura’s initial data pipeline

Aura is a pioneer in using Redshift RA3 clusters with data sharing for extract, transform, and load (ETL) and BI workloads. One of Aura’s operations is bidding advertisement campaigns. These campaigns are optimized by using an AI-based bid process that requires running hundreds of analytical queries per campaign. These queries are run on data that resides in an RA3 provisioned Redshift cluster.

The integrated pipeline is comprised of various AWS services:

The following diagram illustrates this architecture.

Aura architecture

Challenges of the initial architecture

The queries for each campaign run in the following manner:

First, a preparation query filters and aggregates raw data, preparing it for the subsequent operation. This is followed by the main query, which carries out the logic according to the preparation query result set.

As the number of campaigns grew, Aura’s Data team was required to run hundreds of concurrent queries for each of these steps. Aura’s existing provisioned cluster was already heavily utilized with data ingestion, ETL, and BI workloads, so they were looking for cost-effective ways to isolate this workload with dedicated compute resources.

The team evaluated a variety of options, including unloading data to Amazon S3 and a multi-cluster architecture using data sharing and Redshift serverless. The team gravitated towards the multi-cluster architecture with data sharing, as it requires no query rewrite, allows for dedicated compute for this specific workload, avoids the need to duplicate or move data from the main cluster, and provides high concurrency and automatic scaling. Lastly, it’s billed in a pay-for-what-you-use model, and provisioning is straightforward and quick.

Proof of concept

After evaluating the options, Aura’s Data team decided to conduct a proof of concept using Redshift Serverless as a consumer of their main Redshift provisioned cluster, sharing just the relevant tables for running the required queries. Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). A single RPU provides 16 GB of memory and a serverless endpoint can range from 8 RPU to 512 RPU.

Aura’s Data team started the proof of concept using a 256 RPU Redshift Serverless endpoint and gradually lowered the RPU to reduce costs while making sure the query runtime was below the required target.

Eventually, the team decided to use a 128 RPU (2 TB RAM) Redshift Serverless endpoint as the base RPU, while using the Redshift Serverless auto scaling feature, which allows hundreds of concurrent queries to run by automatically upscaling the RPU as needed.

Aura’s new solution with Redshift Serverless

After a successful proof of concept, the production setup included adding code to switch between the provisioned Redshift cluster and the Redshift Serverless endpoint. This was done using a configurable threshold based on the number of queries waiting to be processed in a specific MSK topic consumed at the beginning of the pipeline. Small-scale campaign queries would still run on the provisioned cluster, and large-scale queries would use the Redshift Serverless endpoint. The new solution uses an Amazon MWAA pipeline that fetches configuration information from a DynamoDB table, consumes jobs that represent ad campaigns, and then runs hundreds of EKS jobs triggered using EKSPodOperator. Each job runs the two serial queries (the preparation query followed by a main query, which outputs the results to Amazon S3). This happens several hundred times concurrently using Redshift Serverless compute resources.

Then the process initiates another set of EKSPodOperator operators to run the AI training code based on the data result that was saved on Amazon S3.

The following diagram illustrates the solution architecture.

Aura new architecture

Outcome

The overall runtime of the pipeline was reduced from 24 hours to just 2 hours, a 12-times improvement. This integration of Redshift Serverless, coupled with data sharing, led to a 90% reduction in pipeline duration, negating the necessity for data duplication or query rewriting. Moreover, the introduction of a dedicated consumer as an exclusive compute resource significantly eased the load of the producer cluster, enabling running small-scale queries even faster.

“Redshift Serverless and data sharing enabled us to provision and scale our data warehouse capacity to deliver fast performance, high concurrency and handle challenging ML workloads with very minimal effort.”

– Amir Souchami, Aura’s Principal Technical Systems Architect.

Learnings

Aura’s Data team is highly focused on working in a cost-effective manner and has therefore implemented several cost controls in their Redshift Serverless endpoint:

  • Limit the overall spend by setting a maximum RPU-hour usage limit (per day, week, month) for the workgroup. Aura configured that limit so when it is reached, Amazon Redshift will send an alert to the relevant Amazon Redshift administrator team. This feature also allows writing an entry to a system table and even turning off user queries.
  • Use a maximum RPU configuration, which defines the upper limit of compute resources that Redshift Serverless can use at any given time. When the maximum RPU limit is set for the workgroup, Redshift Serverless scales within that limit to continue to run the workload.
  • Implement query monitoring rules that prevent wasteful resource utilization and runaway costs caused by poorly written queries.

Conclusion

A data warehouse is a crucial part of any modern data-driven company, enabling you to answer complex business questions and provide insights. The evolution of Amazon Redshift allowed Aura to quickly adapt to business requirements by combining data sharing between provisioned and Redshift Serverless data warehouses. Aura’s journey with Redshift Serverless underscores the vast potential of strategic tech integration in driving efficiency and operational excellence.

If Aura’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how such a solution can address them.
  • Reach out to AWS experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. Such hands-on experience will provide valuable insights before moving to production.

Elevate your Redshift expertise. Already enjoying the power of Amazon Redshift? Enhance your data journey with the latest features and expert guidance. Reach out to your dedicated AWS account team for personalized support, discover cutting-edge capabilities, and unlock even greater value from your data with Amazon Redshift.


About the Authors

Amir Souchami, Chief Architect of Aura from Unity, focusing on creating resilient and performant cloud systems and mobile apps at major scale.

Fabian Szenkier is the ML and Big Data Architect at Aura by Unity, works on building modern AI/ML solutions and state of the art data engineering pipelines at scale.

Liat Tzur is a Senior Technical Account Manager at Amazon Web Services. She serves as the customer’s advocate and assists her customers in achieving cloud operational excellence in alignment with their business goals.

Adi Jabkowski is a Sr. Redshift Specialist in EMEA, part of the Worldwide Specialist Organization (WWSO) at AWS.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value.

Terraform CI/CD and testing on AWS with the new Terraform Test Framework

Post Syndicated from Kevon Mayers original https://aws.amazon.com/blogs/devops/terraform-ci-cd-and-testing-on-aws-with-the-new-terraform-test-framework/

Image of HashiCorp Terraform logo and Amazon Web Services (AWS) Logo. Underneath the AWS Logo are the service logos for AWS CodeCommit, AWS CodeBuild, AWS CodePipeline, and Amazon S3. Graphic created by Kevon Mayers

Graphic created by Kevon Mayers

 Introduction

Organizations often use Terraform Modules to orchestrate complex resource provisioning and provide a simple interface for developers to enter the required parameters to deploy the desired infrastructure. Modules enable code reuse and provide a method for organizations to standardize deployment of common workloads such as a three-tier web application, a cloud networking environment, or a data analytics pipeline. When building Terraform modules, it is common for the module author to start with manual testing. Manual testing is performed using commands such as terraform validate for syntax validation, terraform plan to preview the execution plan, and terraform apply followed by manual inspection of resource configuration in the AWS Management Console. Manual testing is prone to human error, not scalable, and can result in unintended issues. Because modules are used by multiple teams in the organization, it is important to ensure that any changes to the modules are extensively tested before the release. In this blog post, we will show you how to validate Terraform modules and how to automate the process using a Continuous Integration/Continuous Deployment (CI/CD) pipeline.

Terraform Test

Terraform test is a new testing framework for module authors to perform unit and integration tests for Terraform modules. Terraform test can create infrastructure as declared in the module, run validation against the infrastructure, and destroy the test resources regardless if the test passes or fails. Terraform test will also provide warnings if there are any resources that cannot be destroyed. Terraform test uses the same HashiCorp Configuration Language (HCL) syntax used to write Terraform modules. This reduces the burden for modules authors to learn other tools or programming languages. Module authors run the tests using the command terraform test which is available on Terraform CLI version 1.6 or higher.

Module authors create test files with the extension *.tftest.hcl. These test files are placed in the root of the Terraform module or in a dedicated tests directory. The following elements are typically present in a Terraform tests file:

  • Provider block: optional, used to override the provider configuration, such as selecting AWS region where the tests run.
  • Variables block: the input variables passed into the module during the test, used to supply non-default values or to override default values for variables.
  • Run block: used to run a specific test scenario. There can be multiple run blocks per test file, Terraform executes run blocks in order. In each run block you specify the command Terraform (plan or apply), and the test assertions. Module authors can specify the conditions such as: length(var.items) != 0. A full list of condition expressions can be found in the HashiCorp documentation.

Terraform tests are performed in sequential order and at the end of the Terraform test execution, any failed assertions are displayed.

Basic test to validate resource creation

Now that we understand the basic anatomy of a Terraform tests file, let’s create basic tests to validate the functionality of the following Terraform configuration. This Terraform configuration will create an AWS CodeCommit repository with prefix name repo-.

# main.tf

variable "repository_name" {
  type = string
}
resource "aws_codecommit_repository" "test" {
  repository_name = format("repo-%s", var.repository_name)
  description     = "Test repository."
}

Now we create a Terraform test file in the tests directory. See the following directory structure as an example:

├── main.tf 
└── tests 
└── basic.tftest.hcl

For this first test, we will not perform any assertion except for validating that Terraform execution plan runs successfully. In the tests file, we create a variable block to set the value for the variable repository_name. We also added the run block with command = plan to instruct Terraform test to run Terraform plan. The completed test should look like the following:

# basic.tftest.hcl

variables {
  repository_name = "MyRepo"
}

run "test_resource_creation" {
  command = plan
}

Now we will run this test locally. First ensure that you are authenticated into an AWS account, and run the terraform init command in the root directory of the Terraform module. After the provider is initialized, start the test using the terraform test command.

❯ terraform test
tests/basic.tftest.hcl... in progress
run "test_resource_creation"... pass
tests/basic.tftest.hcl... tearing down
tests/basic.tftest.hcl... pass

Our first test is complete, we have validated that the Terraform configuration is valid and the resource can be provisioned successfully. Next, let’s learn how to perform inspection of the resource state.

Create resource and validate resource name

Re-using the previous test file, we add the assertion block to checks if the CodeCommit repository name starts with a string repo- and provide error message if the condition fails. For the assertion, we use the startswith function. See the following example:

# basic.tftest.hcl

variables {
  repository_name = "MyRepo"
}

run "test_resource_creation" {
  command = plan

  assert {
    condition = startswith(aws_codecommit_repository.test.repository_name, "repo-")
    error_message = "CodeCommit repository name ${var.repository_name} did not start with the expected value of ‘repo-****’."
  }
}

Now, let’s assume that another module author made changes to the module by modifying the prefix from repo- to my-repo-. Here is the modified Terraform module.

# main.tf

variable "repository_name" {
  type = string
}
resource "aws_codecommit_repository" "test" {
  repository_name = format("my-repo-%s", var.repository_name)
  description = "Test repository."
}

We can catch this mistake by running the the terraform test command again.

❯ terraform test
tests/basic.tftest.hcl... in progress
run "test_resource_creation"... fail
╷
│ Error: Test assertion failed
│
│ on tests/basic.tftest.hcl line 9, in run "test_resource_creation":
│ 9: condition = startswith(aws_codecommit_repository.test.repository_name, "repo-")
│ ├────────────────
│ │ aws_codecommit_repository.test.repository_name is "my-repo-MyRepo"
│
│ CodeCommit repository name MyRepo did not start with the expected value 'repo-***'.
╵
tests/basic.tftest.hcl... tearing down
tests/basic.tftest.hcl... fail

Failure! 0 passed, 1 failed.

We have successfully created a unit test using assertions that validates the resource name matches the expected value. For more examples of using assertions see the Terraform Tests Docs. Before we proceed to the next section, don’t forget to fix the repository name in the module (revert the name back to repo- instead of my-repo-) and re-run your Terraform test.

Testing variable input validation

When developing Terraform modules, it is common to use variable validation as a contract test to validate any dependencies / restrictions. For example, AWS CodeCommit limits the repository name to 100 characters. A module author can use the length function to check the length of the input variable value. We are going to use Terraform test to ensure that the variable validation works effectively. First, we modify the module to use variable validation.

# main.tf

variable "repository_name" {
  type = string
  validation {
    condition = length(var.repository_name) <= 100
    error_message = "The repository name must be less than or equal to 100 characters."
  }
}

resource "aws_codecommit_repository" "test" {
  repository_name = format("repo-%s", var.repository_name)
  description = "Test repository."
}

By default, when variable validation fails during the execution of Terraform test, the Terraform test also fails. To simulate this, create a new test file and insert the repository_name variable with a value longer than 100 characters.

# var_validation.tftest.hcl

variables {
  repository_name = “this_is_a_repository_name_longer_than_100_characters_7rfD86rGwuqhF3TH9d3Y99r7vq6JZBZJkhw5h4eGEawBntZmvy”
}

run “test_invalid_var” {
  command = plan
}

Notice on this new test file, we also set the command to Terraform plan, why is that? Because variable validation runs prior to Terraform apply, thus we can save time and cost by skipping the entire resource provisioning. If we run this Terraform test, it will fail as expected.

❯ terraform test
tests/basic.tftest.hcl… in progress
run “test_resource_creation”… pass
tests/basic.tftest.hcl… tearing down
tests/basic.tftest.hcl… pass
tests/var_validation.tftest.hcl… in progress
run “test_invalid_var”… fail
╷
│ Error: Invalid value for variable
│
│ on main.tf line 1:
│ 1: variable “repository_name” {
│ ├────────────────
│ │ var.repository_name is “this_is_a_repository_name_longer_than_100_characters_7rfD86rGwuqhF3TH9d3Y99r7vq6JZBZJkhw5h4eGEawBntZmvy”
│
│ The repository name must be less than or equal to 100 characters.
│
│ This was checked by the validation rule at main.tf:3,3-13.
╵
tests/var_validation.tftest.hcl… tearing down
tests/var_validation.tftest.hcl… fail

Failure! 1 passed, 1 failed.

For other module authors who might iterate on the module, we need to ensure that the validation condition is correct and will catch any problems with input values. In other words, we expect the validation condition to fail with the wrong input. This is especially important when we want to incorporate the contract test in a CI/CD pipeline. To prevent our test from failing due introducing an intentional error in the test, we can use the expect_failures attribute. Here is the modified test file:

# var_validation.tftest.hcl

variables {
  repository_name = “this_is_a_repository_name_longer_than_100_characters_7rfD86rGwuqhF3TH9d3Y99r7vq6JZBZJkhw5h4eGEawBntZmvy”
}

run “test_invalid_var” {
  command = plan

  expect_failures = [
    var.repository_name
  ]
}

Now if we run the Terraform test, we will get a successful result.

❯ terraform test
tests/basic.tftest.hcl… in progress
run “test_resource_creation”… pass
tests/basic.tftest.hcl… tearing down
tests/basic.tftest.hcl… pass
tests/var_validation.tftest.hcl… in progress
run “test_invalid_var”… pass
tests/var_validation.tftest.hcl… tearing down
tests/var_validation.tftest.hcl… pass

Success! 2 passed, 0 failed.

As you can see, the expect_failures attribute is used to test negative paths (the inputs that would cause failures when passed into a module). Assertions tend to focus on positive paths (the ideal inputs). For an additional example of a test that validates functionality of a completed module with multiple interconnected resources, see this example in the Terraform CI/CD and Testing on AWS Workshop.

Orchestrating supporting resources

In practice, end-users utilize Terraform modules in conjunction with other supporting resources. For example, a CodeCommit repository is usually encrypted using an AWS Key Management Service (KMS) key. The KMS key is provided by end-users to the module using a variable called kms_key_id. To simulate this test, we need to orchestrate the creation of the KMS key outside of the module. In this section we will learn how to do that. First, update the Terraform module to add the optional variable for the KMS key.

# main.tf

variable "repository_name" {
  type = string
  validation {
    condition = length(var.repository_name) <= 100
    error_message = "The repository name must be less than or equal to 100 characters."
  }
}

variable "kms_key_id" {
  type = string
  default = ""
}

resource "aws_codecommit_repository" "test" {
  repository_name = format("repo-%s", var.repository_name)
  description = "Test repository."
  kms_key_id = var.kms_key_id != "" ? var.kms_key_id : null
}

In a Terraform test, you can instruct the run block to execute another helper module. The helper module is used by the test to create the supporting resources. We will create a sub-directory called setup under the tests directory with a single kms.tf file. We also create a new test file for KMS scenario. See the updated directory structure:

├── main.tf
└── tests
├── setup
│ └── kms.tf
├── basic.tftest.hcl
├── var_validation.tftest.hcl
└── with_kms.tftest.hcl

The kms.tf file is a helper module to create a KMS key and provide its ARN as the output value.

# kms.tf

resource "aws_kms_key" "test" {
  description = "test KMS key for CodeCommit repo"
  deletion_window_in_days = 7
}

output "kms_key_id" {
  value = aws_kms_key.test.arn
}

The new test will use two separate run blocks. The first run block (setup) executes the helper module to generate a KMS key. This is done by assigning the command apply which will run terraform apply to generate the KMS key. The second run block (codecommit_with_kms) will then use the KMS key ARN output of the first run as the input variable passed to the main module.

# with_kms.tftest.hcl

run "setup" {
  command = apply
  module {
    source = "./tests/setup"
  }
}

run "codecommit_with_kms" {
  command = apply

  variables {
    repository_name = "MyRepo"
    kms_key_id = run.setup.kms_key_id
  }

  assert {
    condition = aws_codecommit_repository.test.kms_key_id != null
    error_message = "KMS key ID attribute value is null"
  }
}

Go ahead and run the Terraform init, followed by Terraform test. You should get the successful result like below.

❯ terraform test
tests/basic.tftest.hcl... in progress
run "test_resource_creation"... pass
tests/basic.tftest.hcl... tearing down
tests/basic.tftest.hcl... pass
tests/var_validation.tftest.hcl... in progress
run "test_invalid_var"... pass
tests/var_validation.tftest.hcl... tearing down
tests/var_validation.tftest.hcl... pass
tests/with_kms.tftest.hcl... in progress
run "create_kms_key"... pass
run "codecommit_with_kms"... pass
tests/with_kms.tftest.hcl... tearing down
tests/with_kms.tftest.hcl... pass

Success! 4 passed, 0 failed.

We have learned how to run Terraform test and develop various test scenarios. In the next section we will see how to incorporate all the tests into a CI/CD pipeline.

Terraform Tests in CI/CD Pipelines

Now that we have seen how Terraform Test works locally, let’s see how the Terraform test can be leveraged to create a Terraform module validation pipeline on AWS. The following AWS services are used:

  • AWS CodeCommit – a secure, highly scalable, fully managed source control service that hosts private Git repositories.
  • AWS CodeBuild – a fully managed continuous integration service that compiles source code, runs tests, and produces ready-to-deploy software packages.
  • AWS CodePipeline – a fully managed continuous delivery service that helps you automate your release pipelines for fast and reliable application and infrastructure updates.
  • Amazon Simple Storage Service (Amazon S3) – an object storage service offering industry-leading scalability, data availability, security, and performance.
Terraform module validation pipeline Architecture. Multiple interconnected AWS services such as AWS CodeCommit, CodeBuild, CodePipeline, and Amazon S3 used to build a Terraform module validation pipeline.

Terraform module validation pipeline

In the above architecture for a Terraform module validation pipeline, the following takes place:

  • A developer pushes Terraform module configuration files to a git repository (AWS CodeCommit).
  • AWS CodePipeline begins running the pipeline. The pipeline clones the git repo and stores the artifacts to an Amazon S3 bucket.
  • An AWS CodeBuild project configures a compute/build environment with Checkov installed from an image fetched from Docker Hub. CodePipeline passes the artifacts (Terraform module) and CodeBuild executes Checkov to run static analysis of the Terraform configuration files.
  • Another CodeBuild project configured with Terraform from an image fetched from Docker Hub. CodePipeline passes the artifacts (repo contents) and CodeBuild runs Terraform command to execute the tests.

CodeBuild uses a buildspec file to declare the build commands and relevant settings. Here is an example of the buildspec files for both CodeBuild Projects:

# Checkov
version: 0.1
phases:
  pre_build:
    commands:
      - echo pre_build starting

  build:
    commands:
      - echo build starting
      - echo starting checkov
      - ls
      - checkov -d .
      - echo saving checkov output
      - checkov -s -d ./ > checkov.result.txt

In the above buildspec, Checkov is run against the root directory of the cloned CodeCommit repository. This directory contains the configuration files for the Terraform module. Checkov also saves the output to a file named checkov.result.txt for further review or handling if needed. If Checkov fails, the pipeline will fail.

# Terraform Test
version: 0.1
phases:
  pre_build:
    commands:
      - terraform init
      - terraform validate

  build:
    commands:
      - terraform test

In the above buildspec, the terraform init and terraform validate commands are used to initialize Terraform, then check if the configuration is valid. Finally, the terraform test command is used to run the configured tests. If any of the Terraform tests fails, the pipeline will fail.

For a full example of the CI/CD pipeline configuration, please refer to the Terraform CI/CD and Testing on AWS workshop. The module validation pipeline mentioned above is meant as a starting point. In a production environment, you might want to customize it further by adding Checkov allow-list rules, linting, checks for Terraform docs, or pre-requisites such as building the code used in AWS Lambda.

Choosing various testing strategies

At this point you may be wondering when you should use Terraform tests or other tools such as Preconditions and Postconditions, Check blocks or policy as code. The answer depends on your test type and use-cases. Terraform test is suitable for unit tests, such as validating resources are created according to the naming specification. Variable validations and Pre/Post conditions are useful for contract tests of Terraform modules, for example by providing error warning when input variables value do not meet the specification. As shown in the previous section, you can also use Terraform test to ensure your contract tests are running properly. Terraform test is also suitable for integration tests where you need to create supporting resources to properly test the module functionality. Lastly, Check blocks are suitable for end to end tests where you want to validate the infrastructure state after all resources are generated, for example to test if a website is running after an S3 bucket configured for static web hosting is created.

When developing Terraform modules, you can run Terraform test in command = plan mode for unit and contract tests. This allows the unit and contract tests to run quicker and cheaper since there are no resources created. You should also consider the time and cost to execute Terraform test for complex / large Terraform configurations, especially if you have multiple test scenarios. Terraform test maintains one or many state files within the memory for each test file. Consider how to re-use the module’s state when appropriate. Terraform test also provides test mocking, which allows you to test your module without creating the real infrastructure.

Conclusion

In this post, you learned how to use Terraform test and develop various test scenarios. You also learned how to incorporate Terraform test in a CI/CD pipeline. Lastly, we also discussed various testing strategies for Terraform configurations and modules. For more information about Terraform test, we recommend the Terraform test documentation and tutorial. To get hands on practice building a Terraform module validation pipeline and Terraform deployment pipeline, check out the Terraform CI/CD and Testing on AWS Workshop.

Authors

Kevon Mayers

Kevon Mayers is a Solutions Architect at AWS. Kevon is a Terraform Contributor and has led multiple Terraform initiatives within AWS. Prior to joining AWS he was working as a DevOps Engineer and Developer, and before that was working with the GRAMMYs/The Recording Academy as a Studio Manager, Music Producer, and Audio Engineer. He also owns a professional production company, MM Productions.

Welly Siauw

Welly Siauw is a Principal Partner Solution Architect at Amazon Web Services (AWS). He spends his day working with customers and partners, solving architectural challenges. He is passionate about service integration and orchestration, serverless and artificial intelligence (AI) and machine learning (ML). He has authored several AWS blog posts and actively leads AWS Immersion Days and Activation Days. Welly spends his free time tinkering with espresso machines and outdoor hiking.

London Stock Exchange Group uses chaos engineering on AWS to improve resilience

Post Syndicated from Elias Bedmar original https://aws.amazon.com/blogs/architecture/london-stock-exchange-group-uses-chaos-engineering-on-aws-to-improve-resilience/

This post was co-written with Luke Sudgen, Lead DevOps Engineer Post Trade, and Padraig Murphy, Solutions Architect Post Trade, from London Stock Exchange Group.

In this post, we’ll discuss some failure scenarios that were tested by London Stock Exchange Group (LSEG) Post Trade Technology teams during a chaos engineering event supported by AWS. Chaos engineering allows LSEG to simulate real-world failures in their cloud systems as part of controlled experiments. This methodology improves resilience and observability, which reduces risk and helps achieve compliance with regulators before deploying to production.

Introduction, tooling, and methodology

As a heavily regulated provider of global financial markets infrastructure, LSEG is always looking for opportunities to enhance workload resilience. LSEG and AWS teamed up to organize and run a 3-day AWS Experience-Based Acceleration (EBA) event to perform chaos engineering experiments against key workloads. The event was sponsored and led by the architecture function and included cross-functional Post Trade technical teams across various workstreams. The experiments were run using AWS Fault Injection Service (FIS) following the experiment methodology described in the Verify the resilience of your workloads using Chaos Engineering blog post.

Resilience of modern distributed cloud systems can be continuously improved through reviewing workload architectures and recovery, assessing standard operating procedures (SOPs), and building SOP alerts and recovery automations. AWS Resilience Hub provides a comprehensive tooling suite to get started on these activities.

Another key activity to validate and enhance your resilience posture is chaos engineering, a methodology that induces controlled chaos into customer systems through real-world controlled experiments. Chaos engineering helps customers create real-world failure conditions that can uncover hidden bugs, monitor blind spots, and manage bottlenecks that are difficult to find in distributed systems. This makes it a very useful tool in regulated industries such as financial services.

Architectural overview

The architectural diagram in Figure 1 comprises a three-tier application deployed in virtual private clouds (VPCs) with a multi-AZ setup.

Chaos engineering pattern for hybrid architecture (3-tier application)

Figure 1. Chaos engineering pattern for hybrid architecture (3-tier application)

Operating within a public subnet, the web application creates a hybrid architecture by using an Amazon Elastic Compute Cloud (Amazon EC2) Auto Scaling group and connecting to an Amazon Relational Database Service (Amazon RDS) database that’s located in a private subnet and connected with on-premises services. Additionally, a number of internal services are hosted in a separate VPC, housed within containers. FIS provides a controlled environment to validate the robustness of the architecture against various failure scenarios, such as:

  • Amazon EC2 instance failure that causes the application or container pod on the machine to also fail
  • Amazon RDS database instance reboot or failover
  • Severe network latency degradation
  • Network connectivity disruption
  • Amazon Elastic Block Store (Amazon EBS) volume failure (IOPS pause, disk full)

Amazon EC2 instance and container failure

The objective of this use case is to evaluate the resilience of the application or container pod running on Amazon EC2 instances and identify how the system can adapt itself and continue functioning during unexpected disruptions or instability of an instance. You can use aws:ec2:stop-instances or aws:ec2:terminate-instances FIS actions to mimic different EC2 instance failure modes. The response of running containers to the different instance failures was also assessed. If you’re running containers within a managed AWS service such as Amazon Elastic Container Service (Amazon ECS) or Amazon Elastic Kubernetes Service (Amazon EKS), you can use FIS failure scenarios for ECS tasks and EKS pods.

Amazon RDS failure

RDS failure is another common scenario you can use to identify and troubleshoot database managed service failures from failovers and node reboots at a large scale. FIS can be used to inject reboot/failover failure conditions into the managed RDS instances to understand the bottlenecks and issues from disaster failovers, sync failures, and other database-related problems.

Severe network latency degradation

Network latency degradation injects latency in the network interface that connects two systems. This helps you understand how these systems handle a data transfer delay and your operational response readiness (alerts, metrics, and correction). This FIS action (aws:ssm:send-command/AWSFIS-Run-Network-Latency) uses the Linux traffic control (tc) utility.

Network connectivity disruption

Connectivity issues like traffic disruption or other network issues can be simulated with FIS network actions. FIS supports the aws:network:disrupt-connectivity action to test your application’s resilience in the event of total or partial connectivity loss within its subnet, as well as disruption (including cross-Region) with other AWS networking components such as route tables or AWS Transit Gateway.

Amazon EBS volume failure (IOPS pause)

Disk failure is a problematic issue in real-time operations-based systems. It can lead to transactions failing due to I/O failures or storage failure during peak activity in heavy workloads. The EBS volume failure actions test system performance under different disk failure scenarios. FIS supports the aws:ebs:pause-volume-io action to pause I/O operations on target EBS volumes, as well as other failure modes. The target volumes must be in the same Availability Zone and must be attached to instances built on the AWS Nitro System.

Outcomes and conclusion

Following the experiment, the teams from LSEG successfully identified a series of architectural improvements to reduce application recovery time and enhance metric granularity and alerting. As a second tangible output, the teams now have a reusable chaos engineering methodology and toolset. Running regular in-person cross-functional events is a great way to implement a chaos engineering practice in your organization.

You can start your resilience journey on AWS today with AWS Resilience Hub.

Nexthink scales to trillions of events per day with Amazon MSK

Post Syndicated from Moe Haidar original https://aws.amazon.com/blogs/big-data/nexthink-scales-to-trillions-of-events-per-day-with-amazon-msk/

Real-time data streaming and event processing present scalability and management challenges. AWS offers a broad selection of managed real-time data streaming services to effortlessly run these workloads at any scale.

In this post, Nexthink shares how Amazon Managed Streaming for Apache Kafka (Amazon MSK) empowered them to achieve massive scale in event processing. Experiencing business hyper-growth, Nexthink migrated to AWS to overcome the scaling limitations of on-premises solutions. With Amazon MSK, Nexthink now seamlessly processes trillions of events per day, reaching over 5 GB per second of aggregated throughput.

In the following sections, Nexthink introduces their product and the need for scalability. They then highlight the challenges of their legacy on-premises application and present their transition to a cloud-centered software as a service (SaaS) architecture powered by Amazon MSK. Finally, Nexthink details the benefits achieved by adopting Amazon MSK.

Nexthink’s need to scale

Nexthink is the leader in digital employee experience (DeX). The company is shaping the future of work by providing IT leaders and C-levels with insights into employees’ daily technology experiences at the device and application level. This allows IT to evolve from reactive problem-solving to proactive optimization.

The Nexthink Infinity platform combines analytics, monitoring, automation, and more to manage the employee digital experience. By collecting device and application events, processing them in real time, and storing them, our platform analyzes data to solve problems and boost experiences for over 15 million employees across five continents.

In just 3 years, Nexthink’s business grew tenfold, and with the introduction of more real-time data our application had to scale from processing 200 MB per second to 5 GB per second and trillions of events daily. To enable this growth, we modernized our application from an on-premises single-tenant monolith to a cloud-based scalable SaaS solution powered by Amazon MSK.

The next sections detail our modernization journey, including the challenges we faced and the benefits we realized with our new cloud-centered, AWS-based architecture.

The on-premises solution and its challenges

Let’s first explore our previous on-premises solution, Nexthink V6, before examining how Amazon MSK addressed its challenges. The following diagram illustrates its architecture.

Nexthink v6

V6 was made up of two monolithic, single-tenant Java and C++ applications that were tightly coupled. The portal was a backend-for-frontend Java application, and the core engine was an in-house C++ in-memory database application that was also handling device connections, data ingestion, aggregation, and querying. By bundling all these functions together, the engine became difficult to manage and improve.

V6 also lacked scalability. Initially supporting 10,000 devices, some new tenants had over 300,000 devices. We reacted by deploying multiple V6 engines per tenant, increasing complexity and cost, hampering user experience, and delaying time to market. This also led to longer proof of concept and onboarding cycles, which hurt the business.

Furthermore, the absence of a streaming platform like Kafka created dependencies between teams through tight HTTP/gRPC coupling. Additionally, teams couldn’t access real-time events before ingestion into the database, limiting feature development. We also lacked a data buffer, risking potential data loss during outages. Such constraints impeded innovation and increased risks.

In summary, although the V6 system served its initial purpose, reinventing it with cloud-centered technologies became imperative to enhance scalability, reliability, and foster innovation by our engineering and product teams.

Transitioning to a cloud-centered architecture with Amazon MSK

To achieve our modernization goals, after thorough research and iterations, we implemented an event-driven microservices design on Amazon Elastic Kubernetes Service (Amazon EKS), using Kafka on Amazon MSK for distributed event storage and streaming.

Our transition from the v6 on-prem solution to the cloud-centered platform was phased over four iterations:

  • Phase 1 – We lifted and shifted from on premises to virtual machines in the cloud, reducing operational complexities and accelerating proof of concept cycles while transparently migrating customers.
  • Phase 2 – We extended the cloud architecture by implementing new product features with microservices and self-managed Kafka on Kubernetes. However, operating Kafka clusters ourselves proved overly difficult, leading us to Phase 3.
  • Phase 3 – We switched from self-managed Kafka to Amazon MSK, improving stability and reducing operational costs. We realized that managing Kafka wasn’t our core competency or differentiator, and the overhead was high. Amazon MSK enabled us to focus on our core application, freeing us from the burden of undifferentiated Kafka management.
  • Phase 4 – Finally, we eliminated all legacy components, completing the transition to a fully cloud-centered SaaS platform. This multi-year journey of learning and transformation took 3 years.

Today, after our successful transition, we use Amazon MSK for two key functions:

  • Real-time data ingestion and processing of trillions of daily events from over 15 million devices worldwide, as illustrated in the following figure.

Nexthink Architecture Ingestion

  • Enabling an event-driven system that decouples data producers and consumers, as depicted in the following figure.

Nexthink Architecture Event Driven

To further enhance our scalability and resilience, we adopted a cell-based architecture using the wide availability of Amazon MSK across AWS Regions. We currently operate over 10 cells, each representing an independent regional deployment of our SaaS solution. This cell-based approach minimizes the area of impact in case of issues, addresses data residency requirements, and enables horizontal scaling across AWS Regions, as illustrated in the following figure.

Nexthink Architecture Cells

Benefits of Amazon MSK

Amazon MSK has been critical in enabling our event-driven design. In this section, we outline the main benefits we gained from its adoption.

Improved data resilience

In our new architecture, data from devices is pushed directly to Kafka topics in Amazon MSK, which provides high availability and resilience. This makes sure that events can be safely received and stored at any time. Our services consuming this data inherit the same resilience from Amazon MSK. If our backend ingestion services face disruptions, no event is lost, because Kafka retains all published messages. When our services resume, they seamlessly continue processing from where they left off, thanks to Kafka’s producer semantics, which allow processing messages exactly-once, at-least-once, or at-most-once based on application needs.

Amazon MSK enables us to tailor the data retention duration to our specific requirements, ranging from seconds to unlimited duration. This flexibility grants uninterrupted data availability to our application, which wasn’t possible with our previous architecture. Furthermore, to safeguard data integrity in the event of processing errors or corruption, Kafka enabled us to implement a data replay mechanism, ensuring data consistency and reliability.

Organizational scaling

By adopting an event-driven architecture with Amazon MSK, we decomposed our monolithic application into loosely coupled, stateless microservices communicating asynchronously via Kafka topics. This approach enabled our engineering organization to scale rapidly from just 4–5 teams in 2019 to over 40 teams and approximately 350 engineers today.

The loose coupling between event publishers and subscribers empowered teams to focus on distinct domains, such as data ingestion, identification services, and data lakes. Teams could develop solutions independently within their domains, communicating through Kafka topics without tight coupling. This architecture accelerated feature development by minimizing the risk of new features impacting existing ones. Teams could efficiently consume events published by others, offering new capabilities more rapidly while reducing cross-team dependencies.

The following figure illustrates the seamless workflow of adding new domains to our system.

Adding domains

Furthermore, the event-driven design allowed teams to build stateless services that could seamlessly auto scale based on MSK metrics like messages per second. This event-driven scalability eliminated the need for extensive capacity planning and manual scaling efforts, freeing up development time.

By using an event-driven microservices architecture on Amazon MSK, we achieved organizational agility, enhanced scalability, and accelerated innovation while minimizing operational overhead.

Seamless infrastructure scaling

Nexthink’s business grew tenfold in 3 years, and many new capabilities were added to the product, leading to a substantial increase in traffic from 200 MB per second to 5 GB per second. This exponential data growth was enabled by the robust scalability of Amazon MSK. Achieving such scale with an on-premises solution would have been challenging and expensive, if not infeasible.

Attempting to self-manage Kafka imposed unnecessary operational overhead without providing business value. Running it with just 5% of today’s traffic was already complex and required two engineers. At today’s volumes, we estimated needing 6–10 dedicated staff, increasing costs and diverting resources away from core priorities.

Real-time capabilities

By channeling all our data through Amazon MSK, we enabled real-time processing of events. This unlocked capabilities like real-time alerts, event-driven triggers, and webhooks that were previously unattainable. As such, Amazon MSK was instrumental in facilitating our event-driven architecture and powering impactful innovations.

Secure data access

Transitioning to our new architecture, we met our security and data integrity goals. With Kafka ACLs, we enforced strict access controls, allowing consumers and producers to only interact with authorized topics. We based these granular data access controls on criteria like data type, domain, and team.

To securely scale decentralized management of topics, we introduced proprietary Kubernetes Custom Resource Definitions (CRDs). These CRDs enabled teams to independently manage their own topics, settings, and ACLs without compromising security.

Amazon MSK encryption made sure that the data remained encrypted at rest and in transit. We also introduced a Bring Your Own Key (BYOK) option, allowing application-level encryption with customer keys for all single-tenant and multi-tenant topics.

Enhanced observability

Amazon MSK gave us great visibility into our data flows. The out-of-the-box Amazon CloudWatch metrics let us see the amount and types of data flowing through each topic and cluster. This helped us quantify the usage of our product features by tracking data volumes at the topic level. The Amazon MSK operational metrics enabled effortless monitoring and right-sizing of clusters and brokers. Overall, the rich observability of Amazon MSK facilitated data-driven decisions about architecture and product features.

Conclusion

Nexthink’s journey from an on-premises monolith to a cloud SaaS was streamlined by using Amazon MSK, a fully managed Kafka service. Amazon MSK allowed us to scale seamlessly while benefiting from enterprise-grade reliability and security. By offloading Kafka management to AWS, we could stay focused on our core business and innovate faster.

Going forward, we plan to further improve performance, costs, and scalability by adopting Amazon MSK capabilities such as tiered storage and AWS Graviton-based EC2 instance types.

We are also working closely with the Amazon MSK team to prepare for upcoming service features. Rapidly adopting new capabilities will help us remain at the forefront of innovation while continuing to grow our business.

To learn more about how Nexthink uses AWS to serve its global customer base, explore the Nexthink on AWS case study. Additionally, discover other customer success stories with Amazon MSK by visiting the Amazon MSK blog category.


About the Authors

Moe HaidarMoe Haidar is a principal engineer and special projects lead @ CTO office of Nexthink. He has been involved with AWS since 2018 and is a key contributor to the cloud transformation of the Nexthink platform to AWS. His focus is on product and technology incubation and architecture, but he also loves doing hands-on activities to keep his knowledge of technologies sharp and up to date. He still contributes heavily to the code base and loves to tackle complex problems.
Simone PomataSimone Pomata is Senior Solutions Architect at AWS. He has worked enthusiastically in the tech industry for more than 10 years. At AWS, he helps customers succeed in building new technologies every day.
Magdalena GargasMagdalena Gargas is a Solutions Architect passionate about technology and solving customer challenges. At AWS, she works mostly with software companies, helping them innovate in the cloud. She participates in industry events, sharing insights and contributing to the advancement of the containerization field.

Krones real-time production line monitoring with Amazon Managed Service for Apache Flink

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/krones-real-time-production-line-monitoring-with-amazon-managed-service-for-apache-flink/

Krones provides breweries, beverage bottlers, and food producers all over the world with individual machines and complete production lines. Every day, millions of glass bottles, cans, and PET containers run through a Krones line. Production lines are complex systems with lots of possible errors that could stall the line and decrease the production yield. Krones wants to detect the failure as early as possible (sometimes even before it happens) and notify production line operators to increase reliability and output. So how to detect a failure? Krones equips their lines with sensors for data collection, which can then be evaluated against rules. Krones, as the line manufacturer, as well as the line operator have the possibility to create monitoring rules for machines. Therefore, beverage bottlers and other operators can define their own margin of error for the line. In the past, Krones used a system based on a time series database. The main challenges were that this system was hard to debug and also queries represented the current state of machines but not the state transitions.

This post shows how Krones built a streaming solution to monitor their lines, based on Amazon Kinesis and Amazon Managed Service for Apache Flink. These fully managed services reduce the complexity of building streaming applications with Apache Flink. Managed Service for Apache Flink manages the underlying Apache Flink components that provide durable application state, metrics, logs, and more, and Kinesis enables you to cost-effectively process streaming data at any scale. If you want to get started with your own Apache Flink application, check out the GitHub repository for samples using the Java, Python, or SQL APIs of Flink.

Overview of solution

Krones’s line monitoring is part of the Krones Shopfloor Guidance system. It provides support in the organization, prioritization, management, and documentation of all activities in the company. It allows them to notify an operator if the machine is stopped or materials are required, regardless where the operator is in the line. Proven condition monitoring rules are already built-in but can also be user defined via the user interface. For example, if a certain data point that is monitored violates a threshold, there can be a text message or trigger for a maintenance order on the line.

The condition monitoring and rule evaluation system is built on AWS, using AWS analytics services. The following diagram illustrates the architecture.

Architecture Diagram for Krones Production Line Monitoring

Almost every data streaming application consists of five layers: data source, stream ingestion, stream storage, stream processing, and one or more destinations. In the following sections, we dive deeper into each layer and how the line monitoring solution, built by Krones, works in detail.

Data source

The data is gathered by a service running on an edge device reading several protocols like Siemens S7 or OPC/UA. Raw data is preprocessed to create a unified JSON structure, which makes it easier to process later on in the rule engine. A sample payload converted to JSON might look like the following:

{
  "version": 1,
  "timestamp": 1234,
  "equipmentId": "84068f2f-3f39-4b9c-a995-d2a84d878689",
  "tag": "water_temperature",
  "value": 13.45,
  "quality": "Ok",
  "meta": {      
    "sequenceNumber": 123,
    "flags": ["Fst", "Lst", "Wmk", "Syn", "Ats"],
    "createdAt": 12345690,
    "sourceId": "filling_machine"
  }
}

Stream ingestion

AWS IoT Greengrass is an open source Internet of Things (IoT) edge runtime and cloud service. This allows you to act on data locally and aggregate and filter device data. AWS IoT Greengrass provides prebuilt components that can be deployed to the edge. The production line solution uses the stream manager component, which can process data and transfer it to AWS destinations such as AWS IoT Analytics, Amazon Simple Storage Service (Amazon S3), and Kinesis. The stream manager buffers and aggregates records, then sends it to a Kinesis data stream.

Stream storage

The job of the stream storage is to buffer messages in a fault tolerant way and make it available for consumption to one or more consumer applications. To achieve this on AWS, the most common technologies are Kinesis and Amazon Managed Streaming for Apache Kafka (Amazon MSK). For storing our sensor data from production lines, Krones choose Kinesis. Kinesis is a serverless streaming data service that works at any scale with low latency. Shards within a Kinesis data stream are a uniquely identified sequence of data records, where a stream is composed of one or more shards. Each shard has 2 MB/s of read capacity and 1 MB/s write capacity (with max 1,000 records/s). To avoid hitting those limits, data should be distributed among shards as evenly as possible. Every record that is sent to Kinesis has a partition key, which is used to group data into a shard. Therefore, you want to have a large number of partition keys to distribute the load evenly. The stream manager running on AWS IoT Greengrass supports random partition key assignments, which means that all records end up in a random shard and the load is distributed evenly. A disadvantage of random partition key assignments is that records aren’t stored in order in Kinesis. We explain how to solve this in the next section, where we talk about watermarks.

Watermarks

A watermark is a mechanism used to track and measure the progress of event time in a data stream. The event time is the timestamp from when the event was created at the source. The watermark indicates the timely progress of the stream processing application, so all events with an earlier or equal timestamp are considered as processed. This information is essential for Flink to advance event time and trigger relevant computations, such as window evaluations. The allowed lag between event time and watermark can be configured to determine how long to wait for late data before considering a window complete and advancing the watermark.

Krones has systems all around the globe, and needed to handle late arrivals due to connection losses or other network constraints. They started out by monitoring late arrivals and setting the default Flink late handling to the maximum value they saw in this metric. They experienced issues with time synchronization from the edge devices, which lead them to a more sophisticated way of watermarking. They built a global watermark for all the senders and used the lowest value as the watermark. The timestamps are stored in a HashMap for all incoming events. When the watermarks are emitted periodically, the smallest value of this HashMap is used. To avoid stalling of watermarks by missing data, they configured an idleTimeOut parameter, which ignores timestamps that are older than a certain threshold. This increases latency but gives strong data consistency.

public class BucketWatermarkGenerator implements WatermarkGenerator<DataPointEvent> {
private HashMap <String, WatermarkAndTimestamp> lastTimestamps;
private Long idleTimeOut;
private long maxOutOfOrderness;
}

Stream processing

After the data is collected from sensors and ingested into Kinesis, it needs to be evaluated by a rule engine. A rule in this system represents the state of a single metric (such as temperature) or a collection of metrics. To interpret a metric, more than one data point is used, which is a stateful calculation. In this section, we dive deeper into the keyed state and broadcast state in Apache Flink and how they’re used to build the Krones rule engine.

Control stream and broadcast state pattern

In Apache Flink, state refers to the ability of the system to store and manage information persistently across time and operations, enabling the processing of streaming data with support for stateful computations.

The broadcast state pattern allows the distribution of a state to all parallel instances of an operator. Therefore, all operators have the same state and data can be processed using this same state. This read-only data can be ingested by using a control stream. A control stream is a regular data stream, but usually with a much lower data rate. This pattern allows you to dynamically update the state on all operators, enabling the user to change the state and behavior of the application without the need for a redeploy. More precisely, the distribution of the state is done by the use of a control stream. By adding a new record into the control stream, all operators receive this update and are using the new state for the processing of new messages.

This allows users of Krones application to ingest new rules into the Flink application without restarting it. This avoids downtime and gives a great user experience as changes happen in real time. A rule covers a scenario in order to detect a process deviation. Sometimes, the machine data is not as easy to interpret as it might look at first glance. If a temperature sensor is sending high values, this might indicate an error, but also be the effect of an ongoing maintenance procedure. It’s important to put metrics in context and filter some values. This is achieved by a concept called grouping.

Grouping of metrics

The grouping of data and metrics allows you to define the relevance of incoming data and produce accurate results. Let’s walk through the example in the following figure.

Grouping of metrics

In Step 1, we define two condition groups. Group 1 collects the machine state and which product is going through the line. Group 2 uses the value of the temperature and pressure sensors. A condition group can have different states depending on the values it receives. In this example, group 1 receives data that the machine is running, and the one-liter bottle is selected as the product; this gives this group the state ACTIVE. Group 2 has metrics for temperature and pressure; both metrics are above their thresholds for more than 5 minutes. This results in group 2 being in a WARNING state. This means group 1 reports that everything is fine and group 2 does not. In Step 2, weights are added to the groups. This is needed in some situations, because groups might report conflicting information. In this scenario, group 1 reports ACTIVE and group 2 reports WARNING, so it’s not clear to the system what the state of the line is. After adding the weights, the states can be ranked, as shown in step 3. Lastly, the highest ranked state is chosen as the winning one, as shown in Step 4.

After the rules are evaluated and the final machine state is defined, the results will be further processed. The action taken depends on the rule configuration; this can be a notification to the line operator to restock materials, do some maintenance, or just a visual update on the dashboard. This part of the system, which evaluates metrics and rules and takes actions based on the results, is referred to as a rule engine.

Scaling the rule engine

By letting users build their own rules, the rule engine can have a high number of rules that it needs to evaluate, and some rules might use the same sensor data as other rules. Flink is a distributed system that scales very well horizontally. To distribute a data stream to several tasks, you can use the keyBy() method. This allows you to partition a data stream in a logical way and send parts of the data to different task managers. This is often done by choosing an arbitrary key so you get an evenly distributed load. In this case, Krones added a ruleId to the data point and used it as a key. Otherwise, data points that are needed are processed by another task. The keyed data stream can be used across all rules just like a regular variable.

Destinations

When a rule changes its state, the information is sent to a Kinesis stream and then via Amazon EventBridge to consumers. One of the consumers creates a notification from the event that is transmitted to the production line and alerts the personnel to act. To be able to analyze the rule state changes, another service writes the data to an Amazon DynamoDB table for fast access and a TTL is in place to offload long-term history to Amazon S3 for further reporting.

Conclusion

In this post, we showed you how Krones built a real-time production line monitoring system on AWS. Managed Service for Apache Flink allowed the Krones team to get started quickly by focusing on application development rather than infrastructure. The real-time capabilities of Flink enabled Krones to reduce machine downtime by 10% and increase efficiency up to 5%.

If you want to build your own streaming applications, check out the available samples on the GitHub repository. If you want to extend your Flink application with custom connectors, see Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink. The Async Sink is available in Apache Flink version 1.15.1 and later.


About the Authors

Florian Mair is a Senior Solutions Architect and data streaming expert at AWS. He is a technologist that helps customers in Europe succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

Emil Dietl is a Senior Tech Lead at Krones specializing in data engineering, with a key field in Apache Flink and microservices. His work often involves the development and maintenance of mission-critical software. Outside of his professional life, he deeply values spending quality time with his family.

Simon Peyer is a Solutions Architect at AWS based in Switzerland. He is a practical doer and is passionate about connecting technology and people using AWS Cloud services. A special focus for him is data streaming and automations. Besides work, Simon enjoys his family, the outdoors, and hiking in the mountains.

How Amazon optimized its high-volume financial reconciliation process with Amazon EMR for higher scalability and performance

Post Syndicated from Jeeshan Khetrapal original https://aws.amazon.com/blogs/big-data/how-amazon-optimized-its-high-volume-financial-reconciliation-process-with-amazon-emr-for-higher-scalability-and-performance/

Account reconciliation is an important step to ensure the completeness and accuracy of financial statements. Specifically, companies must reconcile balance sheet accounts that could contain significant or material misstatements. Accountants go through each account in the general ledger of accounts and verify that the balance listed is complete and accurate. When discrepancies are found, accountants investigate and take appropriate corrective action.

As part of Amazon’s FinTech organization, we offer a software platform that empowers the internal accounting teams at Amazon to conduct account reconciliations. To optimize the reconciliation process, these users require high performance transformation with the ability to scale on demand, as well as the ability to process variable file sizes ranging from as low as a few MBs to more than 100 GB. It’s not always possible to fit data onto a single machine or process it with one single program in a reasonable time frame. This computation has to be done fast enough to provide practical services where programming logic and underlying details (data distribution, fault tolerance, and scheduling) can be separated.

We can achieve these simultaneous computations on multiple machines or threads of the same function across groups of elements of a dataset by using distributed data processing solutions. This encouraged us to reinvent our reconciliation service powered by AWS services, including Amazon EMR and the Apache Spark distributed processing framework, which uses PySpark. This service enables users to process files over 100 GB containing up to 100 million transactions in less than 30 minutes. The reconciliation service has become a powerhouse for data processing, and now users can seamlessly perform a variety of operations, such as Pivot, JOIN (like an Excel VLOOKUP operation), arithmetic operations, and more, providing a versatile and efficient solution for reconciling vast datasets. This enhancement is a testament to the scalability and speed achieved through the adoption of distributed data processing solutions.

In this post, we explain how we integrated Amazon EMR to build a highly available and scalable system that enabled us to run a high-volume financial reconciliation process.

Architecture before migration

The following diagram illustrates our previous architecture.

Our legacy service was built with Amazon Elastic Container Service (Amazon ECS) on AWS Fargate. We processed the data sequentially using Python. However, due to its lack of parallel processing capability, we frequently had to increase the cluster size vertically to support larger datasets. For context, 5 GB of data with 50 operations took around 3 hours to process. This service was configured to scale horizontally to five ECS instances that polled messages from Amazon Simple Queue Service (Amazon SQS), which fed the transformation requests. Each instance was configured with 4 vCPUs and 30 GB of memory to allow horizontal scaling. However, we couldn’t expand its capacity on performance because the process happened sequentially, picking chunks of data from Amazon Simple Storage Service (Amazon S3) for processing. For example, a VLOOKUP operation where two files are to be joined required both files to be read in memory chunk by chunk to obtain the output. This became an obstacle for users because they had to wait for long periods of time to process their datasets.

As part of our re-architecture and modernization, we wanted to achieve the following:

  • High availability – The data processing clusters should be highly available, providing three 9s of availability (99.9%)
  • Throughput – The service should handle 1,500 runs per day
  • Latency – It should be able to process 100 GB of data within 30 minutes
  • Heterogeneity – The cluster should be able to support a wide variety of workloads, with files ranging from a few MBs to hundreds of GBs
  • Query concurrency – The implementation demands the ability to support a minimum of 10 degrees of concurrency
  • Reliability of jobs and data consistency – Jobs need to run reliably and consistently to avoid breaking Service Level Agreements (SLAs)
  • Cost-effective and scalable – It must be scalable based on the workload, making it cost-effective
  • Security and compliance – Given the sensitivity of data, it must support fine-grained access control and appropriate security implementations
  • Monitoring – The solution must offer end-to-end monitoring of the clusters and jobs

Why Amazon EMR

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto. With these frameworks and related open-source projects, you can process data for analytics purposes and BI workloads. Amazon EMR lets you transform and move large amounts of data in and out of other AWS data stores and databases, such as Amazon S3 and Amazon DynamoDB.

A notable advantage of Amazon EMR lies in its effective use of parallel processing with PySpark, marking a significant improvement over traditional sequential Python code. This innovative approach streamlines the deployment and scaling of Apache Spark clusters, allowing for efficient parallelization on large datasets. The distributed computing infrastructure not only enhances performance, but also enables the processing of vast amounts of data at unprecedented speeds. Equipped with libraries, PySpark facilitates Excel-like operations on DataFrames, and the higher-level abstraction of DataFrames simplifies intricate data manipulations, reducing code complexity. Combined with automatic cluster provisioning, dynamic resource allocation, and integration with other AWS services, Amazon EMR proves to be a versatile solution suitable for diverse workloads, ranging from batch processing to ML. The inherent fault tolerance in PySpark and Amazon EMR promotes robustness, even in the event of node failures, making it a scalable, cost-effective, and high-performance choice for parallel data processing on AWS.

Amazon EMR extends its capabilities beyond the basics, offering a variety of deployment options to cater to diverse needs. Whether it’s Amazon EMR on EC2, Amazon EMR on EKS, Amazon EMR Serverless, or Amazon EMR on AWS Outposts, you can tailor your approach to specific requirements. For those seeking a serverless environment for Spark jobs, integrating AWS Glue is also a viable option. In addition to supporting various open-source frameworks, including Spark, Amazon EMR provides flexibility in choosing deployment modes, Amazon Elastic Compute Cloud (Amazon EC2) instance types, scaling mechanisms, and numerous cost-saving optimization techniques.

Amazon EMR stands as a dynamic force in the cloud, delivering unmatched capabilities for organizations seeking robust big data solutions. Its seamless integration, powerful features, and adaptability make it an indispensable tool for navigating the complexities of data analytics and ML on AWS.

Redesigned architecture

The following diagram illustrates our redesigned architecture.

The solution operates under an API contract, where clients can submit transformation configurations, defining the set of operations alongside the S3 dataset location for processing. The request is queued through Amazon SQS, then directed to Amazon EMR via a Lambda function. This process initiates the creation of an Amazon EMR step for Spark framework implementation on a dedicated EMR cluster. Although Amazon EMR accommodates an unlimited number of steps over a long-running cluster’s lifetime, only 256 steps can be running or pending simultaneously. For optimal parallelization, the step concurrency is set at 10, allowing 10 steps to run concurrently. In case of request failures, the Amazon SQS dead-letter queue (DLQ) retains the event. Spark processes the request, translating Excel-like operations into PySpark code for an efficient query plan. Resilient DataFrames store input, output, and intermediate data in-memory, optimizing processing speed, reducing disk I/O cost, enhancing workload performance, and delivering the final output to the specified Amazon S3 location.

We define our SLA in two dimensions: latency and throughput. Latency is defined as the amount of time taken to perform one job against a deterministic dataset size and the number of operations performed on the dataset. Throughput is defined as the maximum number of simultaneous jobs the service can perform without breaching the latency SLA of one job. The overall scalability SLA of the service depends on the balance of horizontal scaling of elastic compute resources and vertical scaling of individual servers.

Because we had to run 1,500 processes per day with minimal latency and high performance, we choose to integrate Amazon EMR on EC2 deployment mode with managed scaling enabled to support processing variable file sizes.

The EMR cluster configuration provides many different selections:

  • EMR node types – Primary, core, or task nodes
  • Instance purchasing options – On-Demand Instances, Reserved Instances, or Spot Instances
  • Configuration options – EMR instance fleet or uniform instance group
  • Scaling optionsAuto Scaling or Amazon EMR managed scaling

Based on our variable workload, we configured an EMR instance fleet (for best practices, see Reliability). We also decided to use Amazon EMR managed scaling to scale the core and task nodes (for scaling scenarios, refer to Node allocation scenarios). Lastly, we chose memory-optimized AWS Graviton instances, which provide up to 30% lower cost and up to 15% improved performance for Spark workloads.

The following code provides a snapshot of our cluster configuration:

Concurrent steps:10

EMR Managed Scaling:
minimumCapacityUnits: 64
maximumCapacityUnits: 512
maximumOnDemandCapacityUnits: 512
maximumCoreCapacityUnits: 512

Master Instance Fleet:
r6g.xlarge
- 4 vCore, 30.5 GiB memory, EBS only storage
- EBS Storage:250 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 1 units
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:250 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 1 units

Core Instance Fleet:
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 8 units
r6g.4xlarge
- 16 vCore, 122 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 16 units

Task Instances:
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 8 units
r6g.4xlarge
- 16 vCore, 122 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 16 units

Performance

With our migration to Amazon EMR, we were able to achieve a system performance capable of handling a variety of datasets, ranging from as low as 273 B to as high as 88.5 GB with a p99 of 491 seconds (approximately 8 minutes).

The following figure illustrates the variety of file sizes processed.

The following figure shows our latency.

To compare against sequential processing, we took two datasets containing 53 million records and ran a VLOOKUP operation against each other, along with 49 other Excel-like operations. This took 26 minutes to process in the new service, compared to 5 days to process in the legacy service. This improvement is almost 300 times greater over the previous architecture in terms of performance.

Considerations

Keep in mind the following when considering this solution:

  • Right-sizing clusters – Although Amazon EMR is resizable, it’s important to right-size the clusters. Right-sizing mitigates a slow cluster, if undersized, or higher costs, if the cluster is oversized. To anticipate these issues, you can calculate the number and type of nodes that will be needed for the workloads.
  • Parallel steps – Running steps in parallel allows you to run more advanced workloads, increase cluster resource utilization, and reduce the amount of time taken to complete your workload. The number of steps allowed to run at one time is configurable and can be set when a cluster is launched and any time after the cluster has started. You need to consider and optimize the CPU/memory usage per job when multiple jobs are running in a single shared cluster.
  • Job-based transient EMR clusters – If applicable, it is recommended to use a job-based transient EMR cluster, which delivers superior isolation, verifying that each task operates within its dedicated environment. This approach optimizes resource utilization, helps prevent interference between jobs, and enhances overall performance and reliability. The transient nature enables efficient scaling, providing a robust and isolated solution for diverse data processing needs.
  • EMR Serverless – EMR Serverless is the ideal choice if you prefer not to handle the management and operation of clusters. It allows you to effortlessly run applications using open-source frameworks available within EMR Serverless, offering a straightforward and hassle-free experience.
  • Amazon EMR on EKS – Amazon EMR on EKS offers distinct advantages, such as faster startup times and improved scalability resolving compute capacity challenges—which is particularly beneficial for Graviton and Spot Instance users. The inclusion of a broader range of compute types enhances cost-efficiency, allowing tailored resource allocation. Furthermore, Multi-AZ support provides increased availability. These compelling features provide a robust solution for managing big data workloads with improved performance, cost optimization, and reliability across various computing scenarios.

Conclusion

In this post, we explained how Amazon optimized its high-volume financial reconciliation process with Amazon EMR for higher scalability and performance. If you have a monolithic application that’s dependent on vertical scaling to process additional requests or datasets, then migrating it to a distributed processing framework such as Apache Spark and choosing a managed service such as Amazon EMR for compute may help reduce the runtime to lower your delivery SLA, and also may help reduce the Total Cost of Ownership (TCO).

As we embrace Amazon EMR for this particular use case, we encourage you to explore further possibilities in your data innovation journey. Consider evaluating AWS Glue, along with other dynamic Amazon EMR deployment options such as EMR Serverless or Amazon EMR on EKS, to discover the best AWS service tailored to your unique use case. The future of the data innovation journey holds exciting possibilities and advancements to be explored further.


About the Authors

Jeeshan Khetrapal is a Sr. Software Development Engineer at Amazon, where he develops fintech products based on cloud computing serverless architectures that are responsible for companies’ IT general controls, financial reporting, and controllership for governance, risk, and compliance.

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Create an end-to-end data strategy for Customer 360 on AWS

Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/create-an-end-to-end-data-strategy-for-customer-360-on-aws/

Customer 360 (C360) provides a complete and unified view of a customer’s interactions and behavior across all touchpoints and channels. This view is used to identify patterns and trends in customer behavior, which can inform data-driven decisions to improve business outcomes. For example, you can use C360 to segment and create marketing campaigns that are more likely to resonate with specific groups of customers.

In 2022, AWS commissioned a study conducted by the American Productivity and Quality Center (APQC) to quantify the Business Value of Customer 360. The following figure shows some of the metrics derived from the study. Organizations using C360 achieved 43.9% reduction in sales cycle duration, 22.8% increase in customer lifetime value, 25.3% faster time to market, and 19.1% improvement in net promoter score (NPS) rating.

Without C360, businesses face missed opportunities, inaccurate reports, and disjointed customer experiences, leading to customer churn. However, building a C360 solution can be complicated. A Gartner Marketing survey found only 14% of organizations have successfully implemented a C360 solution, due to lack of consensus on what a 360-degree view means, challenges with data quality, and lack of cross-functional governance structure for customer data.

In this post, we discuss how you can use purpose-built AWS services to create an end-to-end data strategy for C360 to unify and govern customer data that address these challenges. We structure it in five pillars that power C360: data collection, unification, analytics, activation, and data governance, along with a solution architecture that you can use for your implementation.

The five pillars of a mature C360

When you embark on creating a C360, you work with multiple use cases, types of customer data, and users and applications that require different tools. Building a C360 on the right datasets, adding new datasets over time while maintaining the quality of data, and keeping it secure needs an end-to-end data strategy for your customer data. You also need to provide tools that make it straightforward for your teams to build products that mature your C360.

We recommend building your data strategy around five pillars of C360, as shown in the following figure. This starts with basic data collection, unifying and linking data from various channels related to unique customers, and progresses towards basic to advanced analytics for decision-making, and personalized engagement through various channels. As you mature in each of these pillars, you progress towards responding to real-time customer signals.

The following diagram illustrates the functional architecture that combines the building blocks of a Customer Data Platform on AWS with additional components used to design an end-to-end C360 solution. This is aligned to the five pillars we discuss in this post.

Pillar 1: Data collection

As you start building your customer data platform, you have to collect data from various systems and touchpoints, such as your sales systems, customer support, web and social media, and data marketplaces. Think of the data collection pillar as a combination of ingestion, storage, and processing capabilities.

Data ingestion

You have to build ingestion pipelines based on factors like types of data sources (on-premises data stores, files, SaaS applications, third-party data), and flow of data (unbounded streams or batch data). AWS provides different services for building data ingestion pipelines:

  • AWS Glue is a serverless data integration service that ingests data in batches from on-premises databases and data stores in the cloud. It connects to more than 70 data sources and helps you build extract, transform, and load (ETL) pipelines without having to manage pipeline infrastructure. AWS Glue Data Quality checks for and alerts on poor data, making it straightforward to spot and fix issues before they harm your business.
  • Amazon AppFlow ingests data from software as a service (SaaS) applications like Google Analytics, Salesforce, SAP, and Marketo, giving you the flexibility to ingest data from more than 50 SaaS applications.
  • AWS Data Exchange makes it straightforward to find, subscribe to, and use third-party data for analytics. You can subscribe to data products that help enrich customer profiles, for example demographics data, advertising data, and financial markets data.
  • Amazon Kinesis ingests streaming events in real time from point-of-sales systems, clickstream data from mobile apps and websites, and social media data. You could also consider using Amazon Managed Streaming for Apache Kafka (Amazon MSK) for streaming events in real time.

The following diagram illustrates the different pipelines to ingest data from various source systems using AWS services.

Data storage

Structured, semi-structured, or unstructured batch data is stored in an object storage because these are cost-efficient and durable. Amazon Simple Storage Service (Amazon S3) is a managed storage service with archiving features that can store petabytes of data with eleven 9’s of durability. Streaming data with low latency needs is stored in Amazon Kinesis Data Streams for real-time consumption. This allows immediate analytics and actions for various downstream consumers—as seen with Riot Games’ central Riot Event Bus.

Data processing

Raw data is often cluttered with duplicates and irregular formats. You need to process this to make it ready for analysis. If you are consuming batch data and streaming data, consider using a framework that can handle both. A pattern such as the Kappa architecture views everything as a stream, simplifying the processing pipelines. Consider using Amazon Managed Service for Apache Flink to handle the processing work. With Managed Service for Apache Flink, you can clean and transform the streaming data and direct it to the appropriate destination based on latency requirements. You can also implement batch data processing using Amazon EMR on open source frameworks such as Apache Spark at 3.5 times better performance than the self-managed version. The architecture decision of using a batch or streaming processing system will depend on various factors; however, if you want to enable real-time analytics on your customer data, we recommend using a Kappa architecture pattern.

Pillar 2: Unification

To link the diverse data arriving from various touchpoints to a unique customer, you need to build an identity processing solution that identifies anonymous logins, stores useful customer information, links them to external data for better insights, and groups customers in domains of interest. Although the identity processing solution helps build the unified customer profile, we recommend considering this as part of your data processing capabilities. The following diagram illustrates the components of such a solution.

The key components are as follows:

  • Identity resolution – Identity resolution is a deduplication solution, where records are matched to identify a unique customer and prospects by linking multiple identifiers such as cookies, device identifiers, IP addresses, email IDs, and internal enterprise IDs to a known person or anonymous profile using privacy-compliant methods. This can be achieved using AWS Entity Resolution, which enables using rules and machine learning (ML) techniques to match records and resolve identities. Alternatively, you can build identity graphs using Amazon Neptune for a single unified view of your customers.
  • Profile aggregation – When you’ve uniquely identified a customer, you can build applications in Managed Service for Apache Flink to consolidate all their metadata, from name to interaction history. Then, you transform this data into a concise format. Instead of showing every transaction detail, you can offer an aggregated spend value and a link to their Customer Relationship Management (CRM) record. For customer service interactions, provide an average CSAT score and a link to the call center system for a deeper dive into their communication history.
  • Profile enrichment – After you resolve a customer to a single identity, enhance their profile using various data sources. Enrichment typically involves adding demographic, behavioral, and geolocation data. You can use third-party data products from AWS Marketplace delivered through AWS Data Exchange to gain insights on income, consumption patterns, credit risk scores, and many more dimensions to further refine the customer experience.
  • Customer segmentation – After uniquely identifying and enriching a customer’s profile, you can segment them based on demographics like age, spend, income, and location using applications in Managed Service for Apache Flink. As you advance, you can incorporate AI services for more precise targeting techniques.

After you have done the identity processing and segmentation, you need a storage capability to store the unique customer profile and provide search and query capabilities on top of it for downstream consumers to use the enriched customer data.

The following diagram illustrates the unification pillar for a unified customer profile and single view of the customer for downstream applications.

Unified customer profile

Graph databases excel in modeling customer interactions and relationships, offering a comprehensive view of the customer journey. If you are dealing with billions of profiles and interactions, you can consider using Neptune, a managed graph database service on AWS. Organizations such as Zeta and Activision have successfully used Neptune to store and query billions of unique identifiers per month and millions of queries per second at millisecond response time.

Single customer view

Although graph databases provide in-depth insights, yet they can be complex for regular applications. It is prudent to consolidate this data into a single customer view, serving as a primary reference for downstream applications, ranging from ecommerce platforms to CRM systems. This consolidated view acts as a liaison between the data platform and customer-centric applications. For such purposes, we recommend using Amazon DynamoDB for its adaptability, scalability, and performance, resulting in an up-to-date and efficient customer database. This database will accept a lot of write queries back from the activation systems that learn new information about the customers and feed them back.

Pillar 3: Analytics

The analytics pillar defines capabilities that help you generate insights on top of your customer data. Your analytics strategy applies to the wider organizational needs, not just C360. You can use the same capabilities to serve financial reporting, measure operational performance, or even monetize data assets. Strategize based on how your teams explore data, run analyses, wrangle data for downstream requirements, and visualize data at different levels. Plan on how you can enable your teams to use ML to move from descriptive to prescriptive analytics.

The AWS modern data architecture shows a way to build a purpose-built, secure, and scalable data platform in the cloud. Learn from this to build querying capabilities across your data lake and the data warehouse.

The following diagram breaks down the analytics capability into data exploration, visualization, data warehousing, and data collaboration. Let’s find out what role each of these components play in the context of C360.

Data exploration

Data exploration helps unearth inconsistencies, outliers, or errors. By spotting these early on, your teams can have cleaner data integration for C360, which in turn leads to more accurate analytics and predictions. Consider the personas exploring the data, their technical skills, and the time to insight. For instance, data analysts who know to write SQL can directly query the data residing in Amazon S3 using Amazon Athena. Users interested in visual exploration can do so using AWS Glue DataBrew. Data scientists or engineers can use Amazon EMR Studio or Amazon SageMaker Studio to explore data from the notebook, and for a low-code experience, you can use Amazon SageMaker Data Wrangler. Because these services directly query S3 buckets, you can explore the data as it lands in the data lake, reducing time to insight.

Visualization

Turning complex datasets into intuitive visuals unravels hidden patterns in the data, and is crucial for C360 use cases. With this capability, you can design reports for different levels catering to varying needs: executive reports offering strategic overviews, management reports highlighting operational metrics, and detailed reports diving into the specifics. Such visual clarity helps your organization make informed decisions across all tiers, centralizing the customer’s perspective.

The following diagram shows a sample C360 dashboard built on Amazon QuickSight. QuickSight offers scalable, serverless visualization capabilities. You can benefit from its ML integrations for automated insights like forecasting and anomaly detection or natural language querying with Amazon Q in QuickSight, direct data connectivity from various sources, and pay-per-session pricing. With QuickSight, you can embed dashboards to external websites and applications, and the SPICE engine enables rapid, interactive data visualization at scale. The following screenshot shows an example C360 dashboard built on QuickSight.

Data warehouse

Data warehouses are efficient in consolidating structured data from multifarious sources and serving analytics queries from a large number of concurrent users. Data warehouses can provide a unified, consistent view of a vast amount of customer data for C360 use cases. Amazon Redshift addresses this need by adeptly handling large volumes of data and diverse workloads. It provides strong consistency across datasets, allowing organizations to derive reliable, comprehensive insights about their customers, which is essential for informed decision-making. Amazon Redshift offers real-time insights and predictive analytics capabilities for analyzing data from terabytes to petabytes. With Amazon Redshift ML, you can embed ML on top of the data stored in the data warehouse with minimum development overhead. Amazon Redshift Serverless simplifies application building and makes it straightforward for companies to embed rich data analytics capabilities.

Data collaboration

You can securely collaborate and analyze collective datasets from your partners without sharing or copying one another’s underlying data using AWS Clean Rooms. You can bring together disparate data from across engagement channels and partner datasets to form a 360-degree view of your customers. AWS Clean Rooms can enhance C360 by enabling use cases like cross-channel marketing optimization, advanced customer segmentation, and privacy-compliant personalization. By safely merging datasets, it offers richer insights and robust data privacy, meeting business needs and regulatory standards.

Pillar 4: Activation

The value of data diminishes the older it gets, leading to higher opportunity costs over time. In a survey conducted by Intersystems, 75% of the organizations surveyed believe untimely data inhibited business opportunities. In another survey, 58% of organizations (out of 560 respondents of HBR Advisory council and readers) stated they saw an increase in customer retention and loyalty using real-time customer analytics.

You can achieve a maturity in C360 when you build the ability to act on all the insights acquired from the previous pillars we discussed in real time. For example, at this maturity level, you can act on customer sentiment based on the context you automatically derived with an enriched customer profile and integrated channels. For this you need to implement prescriptive decision-making on how to address the customer’s sentiment. To do this at scale, you have to use AI/ML services for decision-making. The following diagram illustrates the architecture to activate insights using ML for prescriptive analytics and AI services for targeting and segmentation.

Use ML for the decision-making engine

With ML, you can improve the overall customer experience—you can create predictive customer behavior models, design hyper-personalized offers, and target the right customer with the right incentive. You can build them using Amazon SageMaker, which features a suite of managed services mapped to the data science lifecycle, including data wrangling, model training, model hosting, model inference, model drift detection, and feature storage. SageMaker enables you to build and operationalize your ML models, infusing them back into your applications to produce the right insight to the right person at the right time.

Amazon Personalize supports contextual recommendations, through which you can improve the relevance of recommendations by generating them within a context—for instance, device type, location, or time of day. Your team can get started without any prior ML experience using APIs to build sophisticated personalization capabilities in a few clicks. For more information, see Customize your recommendations by promoting specific items using business rules with Amazon Personalize.

Activate channels across marketing, advertising, direct-to-consumer, and loyalty

Now that you know who your customers are and who to reach out to, you can build solutions to run targeting campaigns at scale. With Amazon Pinpoint, you can personalize and segment communications to engage customers across multiple channels. For example, you can use Amazon Pinpoint to build engaging customer experiences through various communication channels like email, SMS, push notifications, and in-app notifications.

Pillar 5: Data governance

Establishing the right governance that balances control and access gives users trust and confidence in data. Imagine offering promotions on products that a customer doesn’t need, or bombarding the wrong customers with notifications. Poor data quality can lead to such situations, and ultimately results in customer churn. You have to build processes that validate data quality and take corrective actions. AWS Glue Data Quality can help you build solutions that validate the quality of data at rest and in transit, based on predefined rules.

To set up a cross-functional governance structure for customer data, you need a capability for governing and sharing data across your organization. With Amazon DataZone, admins and data stewards can manage and govern access to data, and consumers such as data engineers, data scientists, product managers, analysts, and other business users can discover, use, and collaborate with that data to drive insights. It streamlines data access, letting you find and use customer data, promotes team collaboration with shared data assets, and provides personalized analytics either via a web app or API on a portal. AWS Lake Formation makes sure data is accessed securely, guaranteeing the right people see the right data for the right reasons, which is crucial for effective cross-functional governance in any organization. Business metadata is stored and managed by Amazon DataZone, which is underpinned by technical metadata and schema information, which is registered in the AWS Glue Data Catalog. This technical metadata is also used both by other governance services such as Lake Formation and Amazon DataZone, and analytics services such as Amazon Redshift, Athena, and AWS Glue.

Bringing it all together

Using the following diagram as a reference, you can create projects and teams for building and operating different capabilities. For example, you can have a data integration team focus on the data collection pillar—you can then align functional roles, like data architects and data engineers. You can build your analytics and data science practices to focus on the analytics and activation pillars, respectively. Then you can create a specialized team for customer identity processing and for building the unified view of the customer. You can establish a data governance team with data stewards from different functions, security admins, and data governance policymakers to design and automate policies.

Conclusion

Building a robust C360 capability is fundamental for your organization to gain insights into your customer base. AWS Databases, Analytics, and AI/ML services can help streamline this process, providing scalability and efficiency. Following the five pillars to guide your thinking, you can build an end-to-end data strategy that defines the C360 view across the organization, makes sure data is accurate, and establishes cross-functional governance for customer data. You can categorize and prioritize the products and features you have to build within each pillar, select the right tool for the job, and build the skills you need in your teams.

Visit AWS for Data Customer Stories to learn how AWS is transforming customer journeys, from the world’s largest enterprises to growing startups.


About the Authors

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with organizations in retail, ecommerce, FinTech, HealthTech, and travel to achieve their business objectives with well architected data platforms.

Sandipan Bhaumik (Sandi) is a Senior Analytics Specialist Solutions Architect at AWS. He helps customers modernize their data platforms in the cloud to perform analytics securely at scale, reduce operational overhead, and optimize usage for cost-effectiveness and sustainability.

Simplify document search at scale with intelligent search bot on AWS

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/simplify-document-search-at-scale-with-intelligent-search-bot-on-aws/

Enterprise document management systems (EDMS) manage the lifecycle and distribution of documents. They often rely on keyword-based search functionality. However, it increasingly becomes hard to discover documents as such repositories grow to tens of thousands of items.

In this blog, we discuss how Amazon Web Services (AWS) built an intelligent search bot on top of the document repository of a global life sciences company. Before this enhancement, the native repository search function relied solely on keywords and document names, leading to a trial-and-error process. Now, scientists can effortlessly query the repository using natural language to locate the right document in a few seconds—even through voice commands while working with lab equipment.

Use case

In life sciences, documentation is critical for regulatory compliance with GxP. Scientists in life sciences use EDMS on a daily basis to retrieve standard operating procedures (SOPs). SOPs contain task-level instructions (for example, how to monitor lab equipment or use utilities such as steam generators and chilled water circulation pumps).

EDMS search capability is often limited to file names and metadata. In our use case, file names were numerical and metadata was typically a single-sentence description plus keywords.

Scientists wanted to query for a particular context and type of task they’re about to perform. However, this data was not extracted from document content and thus not readily available for search purposes. Scientists also wanted to be able to search by using a voice interface (for example, when they operate on lab equipment).

To address this, we designed a conversational bot interface. This bot locates the most relevant SOP based on pre-generated document extracts and returns a hyperlink to the most suitable document, as shown in Figure 1.

Example of document search prompt and chatbot response

Figure 1. Example of document search prompt and chatbot response

Overview of the intelligent search bot solution

Our solution requirements for intelligent search were:

  • Semantic search index and ranking based on full text
  • Search capability through voice and text
  • Out-of-the-box integration with web applications and mobile devices

We choose Amazon Lex to provide the conversational interface using text or speech. Lex bots can be integrated with web and mobile applications using AWS Amplify Interactions. We used Amazon Kendra to create an intelligent search index on top of the data repository, which we hosted on Amazon Simple Storage Service (Amazon S3).

The advantage of using an Amazon Kendra index is its out-of-the-box semantic search and ranking capability based on document content. We use AWS Lambda to take care of Amazon S3 path mappings, and document attribute formatting for replicated documents, in order to make them retrievable by Amazon Kendra.

Intelligent search bot for enterprise document management systems

Figure 2. Intelligent search bot for enterprise document management systems

Benefits of integrating intelligent search bot with your EDMS

The benefits of extending EDMS with intelligent search bot include:

  • Improved usability by adding text- and speech-based channels to match user situations (for example, scientists operating on lab equipment)
  • Native, out-of-the-box integration with third-party systems (for example, Adobe Experience Manager, Alfresco, HubSpot, Marketo, Salesforce)
  • Implementation timeboxed in a two-week agile sprint and requires no data science skills

Extensibility to large language models

The Amazon Kendra Retrieve API allows the extension to a document retrieval chain pattern using a large language model (LLM) from Amazon Bedrock or Amazon SageMaker Jumpstart. With this pattern, Amazon Kendra generated document summaries can be passed to the LLM for correlation, as shown in Figure 3. Consult the LangChain documentation to learn how to configure retrieval chains.

Extending the document search bot to large language model

Figure 3. Extending the document search bot to large language model

In our use case, the effort for such extension went beyond the incremental optimizations and limited migration timeframe. Also, preference for a chain pattern should be given to complex correlations across documents.

This wasn’t the case here, as documents were functionally disjunct (for example, by device type, geographical site, and process task). Therefore, document retrieval with the Amazon Kendra API was sufficient and we deferred the extra effort associated with custom-built LLM prompt layers.

Implementation considerations

We started the EDMS migration to AWS by replicating the data repository to Amazon S3 by using AWS DataSync. We stored every document and corresponding metadata files as separate Amazon S3 objects.

For the Amazon Kendra index mappings to initialize properly:

  • Metadata must be a JSON Amazon S3 object
  • Follow the name convention <document>.<extension>.metadata.json
  • Have reserved or common document attributes correctly formatted

The EDMS system did not adhere to this when generating metadata files so we offloaded the transformation to a Lambda function. The function fixed metadata attributes such as, changing version type (_version) from numeric to string and date (_created_at) from string to ISO8601. It also changed metadata names/Amazon S3 paths by enacting new objects (PutObject API) and deleting the original objects (DeleteObject API).

We configured Lambda invocation on Amazon S3 PutObject operations using Amazon S3 event notifications. We set the sync run schedule for the Amazon Kendra index to run on demand.

Alternatively, you can run it on a predefined schedule or as part of each Lambda invocation (using the update_index boto3 operation). Finally, we monitor for sync run fails associated with the Amazon Kendra index using Amazon CloudWatch.

Conclusion

This blog showed how you can enhance the keyword-based search of your EDMS. We embedded document search queries behind a chatbot to simplify user interaction.

You can build this solution quickly, with no machine learning skills, as part of your EDMS cloud migration. In more advanced use cases, including complete refactoring, consider extending this solution to use a large language model from Amazon Bedrock or Amazon SageMaker Jumpstart.

Related information

Driving Development Forward: How the PGA TOUR speeds up Development with the AWS CDK

Post Syndicated from Evgeny Karasik original https://aws.amazon.com/blogs/devops/driving-development-forward-how-the-pga-tour-speeds-up-development-with-the-aws-cdk/

This post is written by Jeff Kammerer, Senior Solutions Architect.

The PGA TOUR is the world’s premier membership organization for touring professional golfers, co-sanctioning tournaments on the PGA TOUR along with several other developmental, senior, and international tournament series.

The PGA TOUR is passionate about bringing its fans closer to the players, tournaments, and courses. They developed a new mobile app and the PGATOUR.com website to give fans immersive, enhanced, and personalized access to near-real-time leaderboards, shot-by-shot data, video highlights, sports news, statistics, and 3D shot tracking. It is critical for PGA TOUR, which operates in a highly competitive space, to keep up with fans’ demands and deliver engaging content. The maturing DevOps culture, partnered with accelerating the development process, was crucial to the PGA TOUR’s fan engagement transformation.

The PGA TOUR’s fans want near real-time and highly accurate data. To deliver and evolve engaging fan experiences, the PGA TOUR needed to empower their team of developers to quickly release new updates and features. However, the TOUR’s previous architecture required separate code bases for their website and mobile app in a monolithic technology stack. Each update required changes in both code bases, causing feature turnaround time of a minimum of two weeks. The cost and time required to deliver features fans wanted to see in both the app and website were not sustainable. As a result, the TOUR redesigned their mobile app and website using AWS native services and a microservice based architecture to alleviate these pain points.

Accelerating Development with Infrastructure as Code (IaC)

The TOUR’s cloud infrastructure team used AWS CloudFormation for several years to model, provision, and manage their cloud infrastructure. However, the app and web development team within the PGA Tour were not familiar with and did not want to use the JSON and YAML templates that CloudFormation requires, and preferred the coding languages that the AWS Cloud Development Kit (CDK) supports. The developers use TypeScript to develop the new mobile app and website using services like AWS AppSync, AWS Lambda, AWS Step Functions, and AWS Batch.  Additionally, the PGA TOUR wanted to simplify how they assigned the correct and minimal IAM permissions needed. As a result, the TOUR developers started using the CDK for IaC because it offered a natural extension to how they were already writing code.

The TOUR leverages all three layers of the AWS CDK Construct Library. They take advantage of higher-layer Pattern Constructs for key services like AWS Lambda and AWS Elastic Container Service (Amazon ECS). The CDK pattern constructs provide a reference architecture or design patterns intended to help complete common tasks. The pattern constructs for AWS Lambda, Amazon ECS, and existing patterns saved the TOUR hours and weeks of development time. They also use the lower-level Layer 2 and Layer 1 Constructs for services like Amazon DynamoDB and AWS AppSync.

PGA TOUR’s New Mobile App

Figure1. Welcome to the PGA TOUR’s New App

PGA TOUR Benefits From Using AWS CDK

Using AWS CDK enabled and empowered the platform and development teams and changed how the PGA TOUR operates their technical environments. They create and de-provision environments as needed to build, test, and deploy new features into production. Automating changes in their underlying infrastructure has become very easy for the PGA TOUR. As an example, the TOUR wanted to update their Lambda runtimes to release 18. With AWS CDK, this change was implemented with a single-line change in their Lambda Common stack and pushed to the over 300 Lambda functions they deployed.

The CDK provides flexibility and agility, which helps the TOUR manage constant change in the appearance of their mobile app and website content given they run different tournaments each week. The TOUR uses the CDK to provision parallel environments where they prepare for the next tournament with unique functions and content without risking impact to services during the current tournament. Once the current tournament is complete, they can flip to the new stack and de-provision the old. The CDK has allowed the TOUR to move from a bi-weekly three-hour maintenance window release schedule to multiple as needed releases per day that take approximately 7 minutes. It has enabled the TOUR to push production releases and fixes, even in the middle of tournament play which previously had been deemed too risky under the prior monolithic technology stack. In one case, the TOUR developers could go from identifying a bug to coding a fix with push through User Acceptance Testing (UAT) and into production in 42 minutes. This is a process that was previously measured in hours or days.

High level AWS CDK/App Architecture

Figure2. High level AWS CDK/App Architecture

Expressing the organizational capability change AWS CDK facilitates for the PGA TOUR Digital team in context of the widely accepted DevOps Research & Assessment (DORA) metrics which assesses organizational maturity in DevOps:

DORA Metrics

One of the best benefits the TOUR realized using AWS CDK was how much it helped reduce complexity of managing AWS Identity and Access Management (IAM) permissions. The TOUR understands how important it is to maintain granular control of IAM trust policies, especially when working in a serverless architecture. David Provan, shared “AWS CDK encourages security by design and you end up considering security through the entire project rather than coming back to do security hardening after development”. AWS CDK automates the necessary IAM permissions at an atomic level in a manner where they are set and managed correctly. When the PGA TOUR takes resources down, AWS CDK removes the IAM permissions.

Lessons Learned and Looking Forward

The steepest learning curve for the PGA TOUR was in the granularity of their CDK Stacks. They initially started with a single large stack, but found that breaking the application into smaller stacks allowed them to be more surgical with granular deployments and updates. They found some services like AWS Lambda update very quickly, whereas DynamoDB deployed with global tables across multiple regions takes longer and benefit from being in their own stack. This balance is something the TOUR is still working on as they iterate after the initial launch.

Looking forward, the PGA TOUR sees longer-range benefits where the CDK will allow them to reuse their stacks and accelerate development for other departments or entities in the future. They also see benefit for reusing code and patterns across different workloads entirely.

Conclusion

The AWS Cloud Development Kit has been transformational to how the PGA TOUR is deploying their services on AWS and working to bring exciting and immersive experiences to fans. To learn more, review the AWS CDK Developer Guide to read about best practices for developing cloud applications, and review this blog that provides an overview of Working with the AWS Cloud Development kit and AWS Construct Library. Also, explore what CDK can do for you.

How VMware Tanzu CloudHealth migrated from self-managed Kafka to Amazon MSK

Post Syndicated from Rivlin Pereira original https://aws.amazon.com/blogs/big-data/how-vmware-tanzu-cloudhealth-migrated-from-self-managed-kafka-to-amazon-msk/

This is a post co-written with Rivlin Pereira & Vaibhav Pandey from Tanzu CloudHealth (VMware by Broadcom).

VMware Tanzu CloudHealth is the cloud cost management platform of choice for more than 20,000 organizations worldwide, who rely on it to optimize and govern their largest and most complex multi-cloud environments. In this post, we discuss how the VMware Tanzu CloudHealth DevOps team migrated their self-managed Apache Kafka workloads (running version 2.0) to Amazon Managed Streaming for Apache Kafka (Amazon MSK) running version 2.6.2. We discuss the system architectures, deployment pipelines, topic creation, observability, access control, topic migration, and all the issues we faced with the existing infrastructure, along with how and why we migrated to the new Kafka setup and some lessons learned.

Kafka cluster overview

In the fast-evolving landscape of distributed systems, VMware Tanzu CloudHealth’s next-generation microservices platform relies on Kafka as its messaging backbone. For us, Kafka’s high-performance distributed log system excels in handling massive data streams, making it indispensable for seamless communication. Serving as a distributed log system, Kafka efficiently captures and stores diverse logs, from HTTP server access logs to security event audit logs.

Kafka’s versatility shines in supporting key messaging patterns, treating messages as basic logs or structured key-value stores. Dynamic partitioning and consistent ordering ensure efficient message organization. The unwavering reliability of Kafka aligns with our commitment to data integrity.

The integration of Ruby services with Kafka is streamlined through the Karafka library, acting as a higher-level wrapper. Our other language stack services use similar wrappers. Kafka’s robust debugging features and administrative commands play a pivotal role in ensuring smooth operations and infrastructure health.

Kafka as an architectural pillar

In VMware Tanzu CloudHealth’s next-generation microservices platform, Kafka emerges as a critical architectural pillar. Its ability to handle high data rates, support diverse messaging patterns, and guarantee message delivery aligns seamlessly with our operational needs. As we continue to innovate and scale, Kafka remains a steadfast companion, enabling us to build a resilient and efficient infrastructure.

Why we migrated to Amazon MSK

For us, migrating to Amazon MSK came down to three key decision points:

  • Simplified technical operations – Running Kafka on a self-managed infrastructure was an operational overhead for us. We hadn’t updated Kafka version 2.0.0 for a while, and Kafka brokers were going down in production, causing issues with topics going offline. We also had to run scripts manually for increasing replication factors and rebalancing leaders, which was additional manual effort.
  • Deprecated legacy pipelines and simplified permissions – We were looking to move away from our existing pipelines written in Ansible to create Kafka topics on the cluster. We also had a cumbersome process of giving team members access to Kafka machines in staging and production, and we wanted to simplify this.
  • Cost, patching, and support – Because Apache Zookeeper is completely managed and patched by AWS, moving to Amazon MSK was going to save us time and money. In addition, we discovered that running Amazon MSK with the same type of brokers on Amazon Elastic Compute Cloud (Amazon EC2) was cheaper to run on Amazon MSK. Combined with the fact that we get security patches applied on brokers by AWS, migrating to Amazon MSK was an easy decision. This also meant that the team was freed up to work on other important things. Finally, getting enterprise support from AWS was also critical in our final decision to move to a managed solution.

How we migrated to Amazon MSK

With the key drivers identified, we moved ahead with a proposed design to migrate existing self-managed Kafka to Amazon MSK. We conducted the following pre-migration steps before the actual implementation:

  • Assessment:
    • Conducted a meticulous assessment of the existing EC2 Kafka cluster, understanding its configurations and dependencies
    • Verified Kafka version compatibility with Amazon MSK
  • Amazon MSK setup with Terraform
  • Network configuration:
    • Ensured seamless network connectivity between the EC2 Kafka and MSK clusters, fine-tuning security groups and firewall settings

After the pre-migration steps, we implemented the following for the new design:

  • Automated deployment, upgrade, and topic creation pipelines for MSK clusters:
    • In the new setup, we wanted to have automated deployments and upgrades of the MSK clusters in a repeatable fashion using an IaC tool. Therefore, we created custom Terraform modules for MSK cluster deployments as well as upgrades. These modules where called from a Jenkins pipeline for automated deployments and upgrades of the MSK clusters. For Kafka topic creation, we were using an Ansible-based home-grown pipeline, which wasn’t stable and led to a lot of complaints from dev teams. As a result, we evaluated options for deployments to Kubernetes clusters and used the Strimzi Topic Operator to create topics on MSK clusters. Topic creation was automated using Jenkins pipelines, which dev teams could self-service.
  • Better observability for clusters:
    • The old Kafka clusters didn’t have good observability. We only had alerts on Kafka broker disk size. With Amazon MSK, we took advantage of open monitoring using Prometheus. We stood up a standalone Prometheus server that scraped metrics from MSK clusters and sent them to our internal observability tool. As a result of improved observability, we were able to set up robust alerting for Amazon MSK, which wasn’t possible with our old setup.
  • Improved COGS and better compute infrastructure:
    • For our old Kafka infrastructure, we had to pay for managing Kafka, Zookeeper instances, plus any additional broker storage costs and data transfer costs. With the move to Amazon MSK, because Zookeeper is completely managed by AWS, we only have to pay for Kafka nodes, broker storage, and data transfer costs. As a result, in final Amazon MSK setup for production, we saved not only on infrastructure costs but also operational costs.
  • Simplified operations and enhanced security:
    • With the move to Amazon MSK, we didn’t have to manage any Zookeeper instances. Broker security patching was also taken care by AWS for us.
    • Cluster upgrades became simpler with the move to Amazon MSK; it’s a straightforward process to initiate from the Amazon MSK console.
    • With Amazon MSK, we got broker automatic scaling out of the box. As a result, we didn’t have to worry about brokers running out of disk space, thereby leading to additional stability of the MSK cluster.
    • We also got additional security for the cluster because Amazon MSK supports encryption at rest by default, and various options for encryption in transit are also available. For more information, refer to Data protection in Amazon Managed Streaming for Apache Kafka.

During our pre-migration steps, we validated the setup on the staging environment before moving ahead with production.

Kafka topic migration strategy

With the MSK cluster setup complete, we performed a data migration of Kafka topics from the old cluster running on Amazon EC2 to the new MSK cluster. To achieve this, we performed the following steps:

  • Set up MirrorMaker with Terraform – We used Terraform to orchestrate the deployment of a MirrorMaker cluster consisting of 15 nodes. This demonstrated the scalability and flexibility by adjusting the number of nodes based on the migration’s concurrent replication needs.
  • Implement a concurrent replication strategy – We implemented a concurrent replication strategy with 15 MirrorMaker nodes to expedite the migration process. Our Terraform-driven approach contributed to cost optimization by efficiently managing resources during the migration and ensured the reliability and consistency of the MSK and MirrorMaker clusters. It also showcased how the chosen setup accelerates data transfer, optimizing both time and resources.
  • Migrate data – We successfully migrated 2 TB of data in a remarkably short timeframe, minimizing downtime and showcasing the efficiency of the concurrent replication strategy.
  • Set up post-migration monitoring – We implemented robust monitoring and alerting during the migration, contributing to a smooth process by identifying and addressing issues promptly.

The following diagram illustrates the architecture after the topic migration was complete.
Mirror-maker setup

Challenges and lessons learned

Embarking on a migration journey, especially with large datasets, is often accompanied by unforeseen challenges. In this section, we delve into the challenges encountered during the migration of topics from EC2 Kafka to Amazon MSK using MirrorMaker, and share valuable insights and solutions that shaped the success of our migration.

Challenge 1: Offset discrepancies

One of the challenges we encountered was the mismatch in topic offsets between the source and destination clusters, even with offset synchronization enabled in MirrorMaker. The lesson learned here was that offset values don’t necessarily need to be identical, as long as offset sync is enabled, which makes sure the topics have the correct position to read the data from.

We addressed this problem by using a custom tool to run tests on consumer groups, confirming that the translated offsets were either smaller or caught up, indicating synchronization as per MirrorMaker.

Challenge 2: Slow data migration

The migration process faced a bottleneck—data transfer was slower than anticipated, especially with a substantial 2 TB dataset. Despite a 20-node MirrorMaker cluster, the speed was insufficient.

To overcome this, the team strategically grouped MirrorMaker nodes based on unique port numbers. Clusters of five MirrorMaker nodes, each with a distinct port, significantly boosted throughput, allowing us to migrate data within hours instead of days.

Challenge 3: Lack of detailed process documentation

Navigating the uncharted territory of migrating large datasets using MirrorMaker highlighted the absence of detailed documentation for such scenarios.

Through trial and error, the team crafted an IaC module using Terraform. This module streamlined the entire cluster creation process with optimized settings, enabling a seamless start to the migration within minutes.

Final setup and next steps

As a result of the move to Amazon MSK, our final setup after topic migration looked like the following diagram.
MSK Blog
We’re considering the following future improvements:

Conclusion.

In this post, we discussed how VMware Tanzu CloudHealth migrated their existing Amazon EC2-based Kafka infrastructure to Amazon MSK. We walked you through the new architecture, deployment and topic creation pipelines, improvements to observability and access control, topic migration challenges, and the issues we faced with the existing infrastructure, along with how and why we migrated to the new Amazon MSK setup. We also talked about all the advantages that Amazon MSK gave us, the final architecture we achieved with this migration, and lessons learned.

For us, the interplay of offset synchronization, strategic node grouping, and IaC proved pivotal in overcoming obstacles and ensuring a successful migration from Amazon EC2 Kafka to Amazon MSK. This post serves as a testament to the power of adaptability and innovation in migration challenges, offering insights for others navigating a similar path.

If you’re running self-managed Kafka on AWS, we encourage you to try the managed Kafka offering, Amazon MSK.


About the Authors

Rivlin Pereira is Staff DevOps Engineer at VMware Tanzu Division. He is very passionate about Kubernetes and works on CloudHealth Platform building and operating cloud solutions that are scalable, reliable and cost effective.

Vaibhav Pandey, a Staff Software Engineer at Broadcom, is a key contributor to the development of cloud computing solutions. Specializing in architecting and engineering data storage layers, he is passionate about building and scaling SaaS applications for optimal performance.

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Todd McGrath is a data streaming specialist at Amazon Web Services where he advises customers on their streaming strategies, integration, architecture, and solutions. On the personal side, he enjoys watching and supporting his 3 teenagers in their preferred activities as well as following his own pursuits such as fishing, pickleball, ice hockey, and happy hour with friends and family on pontoon boats. Connect with him on LinkedIn.

Satya Pattanaik is a Sr. Solutions Architect at AWS. He has been helping ISVs build scalable and resilient applications on AWS Cloud. Prior joining AWS, he played significant role in Enterprise segments with their growth and success. Outside of work, he spends time learning “how to cook a flavorful BBQ” and trying out new recipes.

How the GoDaddy data platform achieved over 60% cost reduction and 50% performance boost by adopting Amazon EMR Serverless

Post Syndicated from Brandon Abear original https://aws.amazon.com/blogs/big-data/how-the-godaddy-data-platform-achieved-over-60-cost-reduction-and-50-performance-boost-by-adopting-amazon-emr-serverless/

This is a guest post co-written with Brandon Abear, Dinesh Sharma, John Bush, and Ozcan IIikhan from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

At GoDaddy, we take pride in being a data-driven company. Our relentless pursuit of valuable insights from data fuels our business decisions and ensures customer satisfaction. Our commitment to efficiency is unwavering, and we’ve undertaken an exciting initiative to optimize our batch processing jobs. In this journey, we have identified a structured approach that we refer to as the seven layers of improvement opportunities. This methodology has become our guide in the pursuit of efficiency.

In this post, we discuss how we enhanced operational efficiency with Amazon EMR Serverless. We share our benchmarking results and methodology, and insights into the cost-effectiveness of EMR Serverless vs. fixed capacity Amazon EMR on EC2 transient clusters on our data workflows orchestrated using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). We share our strategy for the adoption of EMR Serverless in areas where it excels. Our findings reveal significant benefits, including over 60% cost reduction, 50% faster Spark workloads, a remarkable five-times improvement in development and testing speed, and a significant reduction in our carbon footprint.

Background

In late 2020, GoDaddy’s data platform initiated its AWS Cloud journey, migrating an 800-node Hadoop cluster with 2.5 PB of data from its data center to EMR on EC2. This lift-and-shift approach facilitated a direct comparison between on-premises and cloud environments, ensuring a smooth transition to AWS pipelines, minimizing data validation issues and migration delays.

By early 2022, we successfully migrated our big data workloads to EMR on EC2. Using best practices learned from the AWS FinHack program, we fine-tuned resource-intensive jobs, converted Pig and Hive jobs to Spark, and reduced our batch workload spend by 22.75% in 2022. However, scalability challenges emerged due to the multitude of jobs. This prompted GoDaddy to embark on a systematic optimization journey, establishing a foundation for more sustainable and efficient big data processing.

Seven layers of improvement opportunities

In our quest for operational efficiency, we have identified seven distinct layers of opportunities for optimization within our batch processing jobs, as shown in the following figure. These layers range from precise code-level enhancements to more comprehensive platform improvements. This multi-layered approach has become our strategic blueprint in the ongoing pursuit of better performance and higher efficiency.

Seven layers of improvement opportunities

The layers are as follows:

  • Code optimization – Focuses on refining the code logic and how it can be optimized for better performance. This involves performance enhancements through selective caching, partition and projection pruning, join optimizations, and other job-specific tuning. Using AI coding solutions is also an integral part of this process.
  • Software updates – Updating to the latest versions of open source software (OSS) to capitalize on new features and improvements. For example, Adaptive Query Execution in Spark 3 brings significant performance and cost improvements.
  • Custom Spark configurations Tuning of custom Spark configurations to maximize resource utilization, memory, and parallelism. We can achieve significant improvements by right-sizing tasks, such as through spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes, spark.executor.cores, and spark.executor.memory. However, these custom configurations might be counterproductive if they are not compatible with the specific Spark version.
  • Resource provisioning time The time it takes to launch resources like ephemeral EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2). Although some factors influencing this time are outside of an engineer’s control, identifying and addressing the factors that can be optimized can help reduce overall provisioning time.
  • Fine-grained scaling at task level Dynamically adjusting resources such as CPU, memory, disk, and network bandwidth based on each stage’s needs within a task. The aim here is to avoid fixed cluster sizes that could result in resource waste.
  • Fine-grained scaling across multiple tasks in a workflow Given that each task has unique resource requirements, maintaining a fixed resource size may result in under- or over-provisioning for certain tasks within the same workflow. Traditionally, the size of the largest task determines the cluster size for a multi-task workflow. However, dynamically adjusting resources across multiple tasks and steps within a workflow result in a more cost-effective implementation.
  • Platform-level enhancements – Enhancements at preceding layers can only optimize a given job or a workflow. Platform improvement aims to attain efficiency at the company level. We can achieve this through various means, such as updating or upgrading the core infrastructure, introducing new frameworks, allocating appropriate resources for each job profile, balancing service usage, optimizing the use of Savings Plans and Spot Instances, or implementing other comprehensive changes to boost efficiency across all tasks and workflows.

Layers 1–3: Previous cost reductions

After we migrated from on premises to AWS Cloud, we primarily focused our cost-optimization efforts on the first three layers shown in the diagram. By transitioning our most costly legacy Pig and Hive pipelines to Spark and optimizing Spark configurations for Amazon EMR, we achieved significant cost savings.

For example, a legacy Pig job took 10 hours to complete and ranked among the top 10 most expensive EMR jobs. Upon reviewing TEZ logs and cluster metrics, we discovered that the cluster was vastly over-provisioned for the data volume being processed and remained under-utilized for most of the runtime. Transitioning from Pig to Spark was more efficient. Although no automated tools were available for the conversion, manual optimizations were made, including:

  • Reduced unnecessary disk writes, saving serialization and deserialization time (Layer 1)
  • Replaced Airflow task parallelization with Spark, simplifying the Airflow DAG (Layer 1)
  • Eliminated redundant Spark transformations (Layer 1)
  • Upgraded from Spark 2 to 3, using Adaptive Query Execution (Layer 2)
  • Addressed skewed joins and optimized smaller dimension tables (Layer 3)

As a result, job cost decreased by 95%, and job completion time was reduced to 1 hour. However, this approach was labor-intensive and not scalable for numerous jobs.

Layers 4–6: Find and adopt the right compute solution

In late 2022, following our significant accomplishments in optimization at the previous levels, our attention moved towards enhancing the remaining layers.

Understanding the state of our batch processing

We use Amazon MWAA to orchestrate our data workflows in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. In this post, the terms workflow and job are used interchangeably, referring to the Directed Acyclic Graphs (DAGs) consisting of tasks orchestrated by Amazon MWAA. For each workflow, we have sequential or parallel tasks, and even a combination of both in the DAG between create_emr and terminate_emr tasks running on a transient EMR cluster with fixed compute capacity throughout the workflow run. Even after optimizing a portion of our workload, we still had numerous non-optimized workflows that were under-utilized due to over-provisioning of compute resources based on the most resource-intensive task in the workflow, as shown in the following figure.

This highlighted the impracticality of static resource allocation and led us to recognize the necessity of a dynamic resource allocation (DRA) system. Before proposing a solution, we gathered extensive data to thoroughly understand our batch processing. Analyzing the cluster step time, excluding provisioning and idle time, revealed significant insights: a right-skewed distribution with over half of the workflows completing in 20 minutes or less and only 10% taking more than 60 minutes. This distribution guided our choice of a fast-provisioning compute solution, dramatically reducing workflow runtimes. The following diagram illustrates step times (excluding provisioning and idle time) of EMR on EC2 transient clusters in one of our batch processing accounts.

Furthermore, based on the step time (excluding provisioning and idle time) distribution of the workflows, we categorized our workflows into three groups:

  • Quick run – Lasting 20 minutes or less
  • Medium run – Lasting between 20–60 minutes
  • Long run – Exceeding 60 minutes, often spanning several hours or more

Another factor we needed to consider was the extensive use of transient clusters for reasons such as security, job and cost isolation, and purpose-built clusters. Additionally, there was a significant variation in resource needs between peak hours and periods of low utilization.

Instead of fixed-size clusters, we could potentially use managed scaling on EMR on EC2 to achieve some cost benefits. However, migrating to EMR Serverless appears to be a more strategic direction for our data platform. In addition to potential cost benefits, EMR Serverless offers additional advantages such as a one-click upgrade to the newest Amazon EMR versions, a simplified operational and debugging experience, and automatic upgrades to the latest generations upon rollout. These features collectively simplify the process of operating a platform on a larger scale.

Evaluating EMR Serverless: A case study at GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

Recognizing the potential of EMR Serverless, we conducted an in-depth benchmark study using real production workflows. The study aimed to assess EMR Serverless performance and efficiency while also creating an adoption plan for large-scale implementation. The findings were highly encouraging, showing EMR Serverless can effectively handle our workloads.

Benchmarking methodology

We split our data workflows into three categories based on total step time (excluding provisioning and idle time): quick run (0–20 minutes), medium run (20–60 minutes), and long run (over 60 minutes). We analyzed the impact of the EMR deployment type (Amazon EC2 vs. EMR Serverless) on two key metrics: cost-efficiency and total runtime speedup, which served as our overall evaluation criteria. Although we did not formally measure ease of use and resiliency, these factors were considered throughout the evaluation process.

The high-level steps to assess the environment are as follows:

  1. Prepare the data and environment:
    1. Choose three to five random production jobs from each job category.
    2. Implement required adjustments to prevent interference with production.
  2. Run tests:
    1. Run scripts over several days or through multiple iterations to gather precise and consistent data points.
    2. Perform tests using EMR on EC2 and EMR Serverless.
  3. Validate data and test runs:
    1. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  4. Gather metrics and analyze results:
    1. Gather relevant metrics from the tests.
    2. Analyze results to draw insights and conclusions.

Benchmark results

Our benchmark results showed significant enhancements across all three job categories for both runtime speedup and cost-efficiency. The improvements were most pronounced for quick jobs, directly resulting from faster startup times. For instance, a 20-minute (including cluster provisioning and shut down) data workflow running on an EMR on EC2 transient cluster of fixed compute capacity finishes in 10 minutes on EMR Serverless, providing a shorter runtime with cost benefits. Overall, the shift to EMR Serverless delivered substantial performance improvements and cost reductions at scale across job brackets, as seen in the following figure.

Historically, we devoted more time to tuning our long-run workflows. Interestingly, we discovered that the existing custom Spark configurations for these jobs did not always translate well to EMR Serverless. In cases where the results were insignificant, a common approach was to discard previous Spark configurations related to executor cores. By allowing EMR Serverless to autonomously manage these Spark configurations, we often observed improved outcomes. The following graph shows the average runtime and cost improvement per job when comparing EMR Serverless to EMR on EC2.

Per Job Improvement

The following table shows a sample comparison of results for the same workflow running on different deployment options of Amazon EMR (EMR on EC2 and EMR Serverless).

Metric EMR on EC2
(Average)
EMR Serverless
(Average)
EMR on EC2 vs
EMR Serverless
Total Run Cost ($) $ 5.82 $ 2.60 55%
Total Run Time (Minutes) 53.40 39.40 26%
Provisioning Time (Minutes) 10.20 0.05 .
Provisioning Cost ($) $ 1.19 . .
Steps Time (Minutes) 38.20 39.16 -3%
Steps Cost ($) $ 4.30 . .
Idle Time (Minutes) 4.80 . .
EMR Release Label emr-6.9.0 .
Hadoop Distribution Amazon 3.3.3 .
Spark Version Spark 3.3.0 .
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3 .
Job Type Spark .

AWS Graviton2 on EMR Serverless performance evaluation

After seeing compelling results with EMR Serverless for our workloads, we decided to further analyze the performance of the AWS Graviton2 (arm64) architecture within EMR Serverless. AWS had benchmarked Spark workloads on Graviton2 EMR Serverless using the TPC-DS 3TB scale, showing a 27% overall price-performance improvement.

To better understand the integration benefits, we ran our own study using GoDaddy’s production workloads on a daily schedule and observed an impressive 23.8% price-performance enhancement across a range of jobs when using Graviton2. For more details about this study, see GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless.

Adoption strategy for EMR Serverless

We strategically implemented a phased rollout of EMR Serverless via deployment rings, enabling systematic integration. This gradual approach let us validate improvements and halt further adoption of EMR Serverless, if needed. It served both as a safety net to catch issues early and a means to refine our infrastructure. The process mitigated change impact through smooth operations while building team expertise of our Data Engineering and DevOps teams. Additionally, it fostered tight feedback loops, allowing prompt adjustments and ensuring efficient EMR Serverless integration.

We divided our workflows into three main adoption groups, as shown in the following image:

  • Canaries This group aids in detecting and resolving any potential problems early in the deployment stage.
  • Early adopters This is the second batch of workflows that adopt the new compute solution after initial issues have been identified and rectified by the canaries group.
  • Broad deployment rings The largest group of rings, this group represents the wide-scale deployment of the solution. These are deployed after successful testing and implementation in the previous two groups.

Rings

We further broke down these workflows into granular deployment rings to adopt EMR Serverless, as shown in the following table.

Ring # Name Details
Ring 0 Canary Low adoption risk jobs that are expected to yield some cost saving benefits.
Ring 1 Early Adopters Low risk Quick-run Spark jobs that expect to yield high gains.
Ring 2 Quick-run Rest of the Quick-run (step_time <= 20 min) Spark jobs
Ring 3 LargerJobs_EZ High potential gain, easy move, medium-run and long-run Spark jobs
Ring 4 LargerJobs Rest of the medium-run and long-run Spark jobs with potential gains
Ring 5 Hive Hive jobs with potentially higher cost savings
Ring 6 Redshift_EZ Easy migration Redshift jobs that suit EMR Serverless
Ring 7 Glue_EZ Easy migration Glue jobs that suit EMR Serverless

Production adoption results summary

The encouraging benchmarking and canary adoption results generated considerable interest in wider EMR Serverless adoption at GoDaddy. To date, the EMR Serverless rollout remains underway. Thus far, it has reduced costs by 62.5% and accelerated total batch workflow completion by 50.4%.

Based on preliminary benchmarks, our team expected substantial gains for quick jobs. To our surprise, actual production deployments surpassed projections, averaging 64.4% faster vs. 42% projected, and 71.8% cheaper vs. 40% predicted.

Remarkably, long-running jobs also saw significant performance improvements due to the rapid provisioning of EMR Serverless and aggressive scaling enabled by dynamic resource allocation. We observed substantial parallelization during high-resource segments, resulting in a 40.5% faster total runtime compared to traditional approaches. The following chart illustrates the average enhancements per job category.

Prod Jobs Savings

Additionally, we observed the highest degree of dispersion for speed improvements within the long-run job category, as shown in the following box-and-whisker plot.

Whisker Plot

Sample workflows adopted EMR Serverless

For a large workflow migrated to EMR Serverless, comparing 3-week averages pre- and post-migration revealed impressive cost savings—a 75.30% decrease based on retail pricing with 10% improvement in total runtime, boosting operational efficiency. The following graph illustrates the cost trend.

Although quick-run jobs realized minimal per-dollar cost reductions, they delivered the most significant percentage cost savings. With thousands of these workflows running daily, the accumulated savings are substantial. The following graph shows the cost trend for a small workload migrated from EMR on EC2 to EMR Serverless. Comparing 3-week pre- and post-migration averages revealed a remarkable 92.43% cost savings on the retail on-demand pricing, alongside an 80.6% acceleration in total runtime.

Sample workflows adopted EMR Serverless 2

Layer 7: Platform-wide improvements

We aim to revolutionize compute operations at GoDaddy, providing simplified yet powerful solutions for all users with our Intelligent Compute Platform. With AWS compute solutions like EMR Serverless and EMR on EC2, it provided optimized runs of data processing and machine learning (ML) workloads. An ML-powered job broker intelligently determines when and how to run jobs based on various parameters, while still allowing power users to customize. Additionally, an ML-powered compute resource manager pre-provisions resources based on load and historical data, providing efficient, fast provisioning at optimum cost. Intelligent compute empowers users with out-of-the-box optimization, catering to diverse personas without compromising power users.

The following diagram shows a high-level illustration of the intelligent compute architecture.

Insights and recommended best-practices

The following section discusses the insights we’ve gathered and the recommended best practices we’ve developed during our preliminary and wider adoption stages.

Infrastructure preparation

Although EMR Serverless is a deployment method within EMR, it requires some infrastructure preparedness to optimize its potential. Consider the following requirements and practical guidance on implementation:

  • Use large subnets across multiple Availability Zones – When running EMR Serverless workloads within your VPC, make sure the subnets span across multiple Availability Zones and are not constrained by IP addresses. Refer to Configuring VPC access and Best practices for subnet planning for details.
  • Modify maximum concurrent vCPU quota For extensive compute requirements, it is recommended to increase your max concurrent vCPUs per account service quota.
  • Amazon MWAA version compatibility When adopting EMR Serverless, GoDaddy’s decentralized Amazon MWAA ecosystem for data pipeline orchestration created compatibility issues from disparate AWS Providers versions. Directly upgrading Amazon MWAA was more efficient than updating numerous DAGs. We facilitated adoption by upgrading Amazon MWAA instances ourselves, documenting issues, and sharing findings and effort estimates for accurate upgrade planning.
  • GoDaddy EMR operator To streamline migrating numerous Airflow DAGs from EMR on EC2 to EMR Serverless, we developed custom operators adapting existing interfaces. This allowed seamless transitions while retaining familiar tuning options. Data engineers could easily migrate pipelines with simple find-replace imports and immediately use EMR Serverless.

Unexpected behavior mitigation

The following are unexpected behaviors we ran into and what we did to mitigate them:

  • Spark DRA aggressive scaling For some jobs (8.33% of initial benchmarks, 13.6% of production), cost increased after migrating to EMR Serverless. This was due to Spark DRA excessively assigning new workers briefly, prioritizing performance over cost. To counteract this, we set maximum executor thresholds by adjusting spark.dynamicAllocation.maxExecutor, effectively limiting EMR Serverless scaling aggression. When migrating from EMR on EC2, we suggest observing the max core count in the Spark History UI to replicate similar compute limits in EMR Serverless, such as --conf spark.executor.cores and --conf spark.dynamicAllocation.maxExecutors.
  • Managing disk space for large-scale jobs When transitioning jobs that process large data volumes with substantial shuffles and significant disk requirements to EMR Serverless, we recommend configuring spark.emr-serverless.executor.disk by referring to existing Spark job metrics. Furthermore, configurations like spark.executor.cores combined with spark.emr-serverless.executor.disk and spark.dynamicAllocation.maxExecutors allow control over the underlying worker size and total attached storage when advantageous. For example, a shuffle-heavy job with relatively low disk usage may benefit from using a larger worker to increase the likelihood of local shuffle fetches.

Conclusion

As discussed in this post, our experiences with adopting EMR Serverless on arm64 have been overwhelmingly positive. The impressive results we’ve achieved, including a 60% reduction in cost, 50% faster runs of batch Spark workloads, and an astounding five-times improvement in development and testing speed, speak volumes about the potential of this technology. Furthermore, our current results suggest that by widely adopting Graviton2 on EMR Serverless, we could potentially reduce the carbon footprint by up to 60% for our batch processing.

However, it’s crucial to understand that these results are not a one-size-fits-all scenario. The enhancements you can expect are subject to factors including, but not limited to, the specific nature of your workflows, cluster configurations, resource utilization levels, and fluctuations in computational capacity. Therefore, we strongly advocate for a data-driven, ring-based deployment strategy when considering the integration of EMR Serverless, which can help optimize its benefits to the fullest.

Special thanks to Mukul Sharma and Boris Berlin for their contributions to benchmarking. Many thanks to Travis Muhlestein (CDO), Abhijit Kundu (VP Eng), Vincent Yung (Sr. Director Eng.), and Wai Kin Lau (Sr. Director Data Eng.) for their continued support.


About the Authors

Brandon Abear is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He enjoys all things big data. In his spare time, he enjoys traveling, watching movies, and playing rhythm games.

Dinesh Sharma is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about user experience and developer productivity, always looking for ways to optimize engineering processes and saving cost. In his spare time, he loves reading and is an avid manga fan.

John Bush is a Principal Software Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about making it easier for organizations to manage data and use it to drive their businesses forward. In his spare time, he loves hiking, camping, and riding his ebike.

Ozcan Ilikhan is the Director of Engineering for the Data and ML Platform at GoDaddy. He has over two decades of multidisciplinary leadership experience, spanning startups to global enterprises. He has a passion for leveraging data and AI in creating solutions that delight customers, empower them to achieve more, and boost operational efficiency. Outside of his professional life, he enjoys reading, hiking, gardening, volunteering, and embarking on DIY projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in big data and analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Use AWS Glue ETL to perform merge, partition evolution, and schema evolution on Apache Iceberg

Post Syndicated from Satyanarayana Adimula original https://aws.amazon.com/blogs/big-data/use-aws-glue-etl-to-perform-merge-partition-evolution-and-schema-evolution-on-apache-iceberg/

As enterprises collect increasing amounts of data from various sources, the structure and organization of that data often need to change over time to meet evolving analytical needs. However, altering schema and table partitions in traditional data lakes can be a disruptive and time-consuming task, requiring renaming or recreating entire tables and reprocessing large datasets. This hampers agility and time to insight.

Schema evolution enables adding, deleting, renaming, or modifying columns without needing to rewrite existing data. This is critical for fast-moving enterprises to augment data structures to support new use cases. For example, an ecommerce company may add new customer demographic attributes or order status flags to enrich analytics. Apache Iceberg manages these schema changes in a backward-compatible way through its innovative metadata table evolution architecture.

Similarly, partition evolution allows seamless adding, dropping, or splitting partitions. For instance, an ecommerce marketplace may initially partition order data by day. As orders accumulate, and querying by day becomes inefficient, they may split to day and customer ID partitions. Table partitioning organizes big datasets most efficiently for query performance. Iceberg gives enterprises the flexibility to incrementally adjust partitions rather than requiring tedious rebuild procedures. New partitions can be added in a fully compatible way without downtime or having to rewrite existing data files.

This post demonstrates how you can harness Iceberg, Amazon Simple Storage Service (Amazon S3), AWS Glue, AWS Lake Formation, and AWS Identity and Access Management (IAM) to implement a transactional data lake supporting seamless evolution. By allowing for painless schema and partition adjustments as data insights evolve, you can benefit from the future-proof flexibility needed for business success.

Overview of solution

For our example use case, a fictional large ecommerce company processes thousands of orders each day. When orders are received, updated, cancelled, shipped, delivered, or returned, the changes are made in their on-premises system, and those changes need to be replicated to an S3 data lake so that data analysts can run queries through Amazon Athena. The changes can contain schema updates as well. Due to the security requirements of different organizations, they need to manage fine-grained access control for the analysts through Lake Formation.

The following diagram illustrates the solution architecture.

The solution workflow includes the following key steps:

  1. Ingest data from on premises into a Dropzone location using a data ingestion pipeline.
  2. Merge the data from the Dropzone location into Iceberg using AWS Glue.
  3. Query the data using Athena.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Set up the infrastructure with AWS CloudFormation

To create your infrastructure with an AWS CloudFormation template, complete the following steps:

  1. Log in as an administrator to your AWS account.
  2. Open the AWS CloudFormation console.
  3. Choose Launch Stack:
  4. For Stack name, enter a name (for this post, icebergdemo1).
  5. Choose Next.
  6. Provide information for the following parameters:
    1. DatalakeUserName
    2. DatalakeUserPassword
    3. DatabaseName
    4. TableName
    5. DatabaseLFTagKey
    6. DatabaseLFTagValue
    7. TableLFTagKey
    8. TableLFTagValue
  7. Choose Next.
  8. Choose Next again.
  9. In the Review section, review the values you entered.
  10. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

In a few minutes, the stack status will change to CREATE_COMPLETE.

You can go to the Outputs tab of the stack to see all the resources it has provisioned. The resources are prefixed with the stack name you provided (for this post, icebergdemo1).

Create an Iceberg table using Lambda and grant access using Lake Formation

To create an Iceberg table and grant access on it, complete the following steps:

  1. Navigate to the Resources tab of the CloudFormation stack icebergdemo1 and search for logical ID named LambdaFunctionIceberg.
  2. Choose the hyperlink of the associated physical ID.

You’re redirected to the Lambda function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.

  1. On the Configuration tab, choose Environment variables in the left pane.
  1. On the Code tab, you can inspect the function code.

The function uses the AWS SDK for Python (Boto3) APIs to provision the resources. It assumes the provisioned data lake admin role to perform the following tasks:

  • Grant DATA_LOCATION_ACCESS access to the data lake admin role on the registered data lake location
  • Create Lake Formation Tags (LF-Tags)
  • Create a database in the AWS Glue Data Catalog using the AWS Glue create_database API
  • Assign LF-Tags to the database
  • Grant DESCRIBE access on the database using LF-Tags to the data lake IAM user and AWS Glue ETL IAM role
  • Create an Iceberg table using the AWS Glue create_table API:
response_create_table = glue_client.create_table(
DatabaseName= 'icebergdb1',
OpenTableFormatInput= { 
 'IcebergInput': { 
 'MetadataOperation': 'CREATE',
 'Version': '2'
 }
},
TableInput={
    'Name': ‘ecomorders’,
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'ordernum', 'Type': 'int'},
            {'Name': 'sku', 'Type': 'string'},
            {'Name': 'quantity','Type': 'int'},
            {'Name': 'category','Type': 'string'},
            {'Name': 'status','Type': 'string'},
            {'Name': 'shipping_id','Type': 'string'}
        ],  
        'Location': 's3://icebergdemo1-s3bucketiceberg-vthvwwblrwe8/iceberg/'
    },
    'TableType': 'EXTERNAL_TABLE'
    }
)
  • Assign LF-Tags to the table
  • Grant DESCRIBE and SELECT on the Iceberg table LF-Tags for the data lake IAM user
  • Grant ALL, DESCRIBE, SELECT, INSERT, DELETE, and ALTER access on the Iceberg table LF-Tags to the AWS Glue ETL IAM role
  1. On the Test tab, choose Test to run the function.

When the function is complete, you will see the message “Executing function: succeeded.”

Lake Formation helps you centrally manage, secure, and globally share data for analytics and machine learning. With Lake Formation, you can manage fine-grained access control for your data lake data on Amazon S3 and its metadata in the Data Catalog.

To add an Amazon S3 location as Iceberg storage in your data lake, register the location with Lake Formation. You can then use Lake Formation permissions for fine-grained access control to the Data Catalog objects that point to this location, and to the underlying data in the location.

The CloudFormation stack registered the data lake location.

Data location permissions in Lake Formation enable principals to create and alter Data Catalog resources that point to the designated registered Amazon S3 locations. Data location permissions work in addition to Lake Formation data permissions to secure information in your data lake.

Lake Formation tag-based access control (LF-TBAC) is an authorization strategy that defines permissions based on attributes. In Lake Formation, these attributes are called LF-Tags. You can attach LF-Tags to Data Catalog resources, Lake Formation principals, and table columns. You can assign and revoke permissions on Lake Formation resources using these LF-Tags. Lake Formation allows operations on those resources when the principal’s tag matches the resource tag.

Verify the Iceberg table from the Lake Formation console

To verify the Iceberg table, complete the following steps:

  1. On the Lake Formation console, choose Databases in the navigation pane.
  2. Open the details page for icebergdb1.

You can see the associated database LF-Tags.

  1. Choose Tables in the navigation pane.
  2. Open the details page for ecomorders.

In the Table details section, you can observe the following:

  • Table format shows as Apache Iceberg
  • Table management shows as Managed by Data Catalog
  • Location lists the data lake location of the Iceberg table

In the LF-Tags section, you can see the associated table LF-Tags.

In the Table details section, expand Advanced table properties to view the following:

  • metadata_location points to the location of the Iceberg table’s metadata file
  • table_type shows as ICEBERG

On the Schema tab, you can view the columns defined on the Iceberg table.

Integrate Iceberg with the AWS Glue Data Catalog and Amazon S3

Iceberg tracks individual data files in a table instead of directories. When there is an explicit commit on the table, Iceberg creates data files and adds them to the table. Iceberg maintains the table state in metadata files. Any change in table state creates a new metadata file that atomically replaces the older metadata. Metadata files track the table schema, partitioning configuration, and other properties.

Iceberg requires file systems that support the operations to be compatible with object stores like Amazon S3.

Iceberg creates snapshots for the table contents. Each snapshot is a complete set of data files in the table at a point in time. Data files in snapshots are stored in one or more manifest files that contain a row for each data file in the table, its partition data, and its metrics.

The following diagram illustrates this hierarchy.

When you create an Iceberg table, it creates the metadata folder first and a metadata file in the metadata folder. The data folder is created when you load data into the Iceberg table.

Contents of the Iceberg metadata file

The Iceberg metadata file contains a lot of information, including the following:

  • format-version –Version of the Iceberg table
  • Location – Amazon S3 location of the table
  • Schemas – Name and data type of all columns on the table
  • partition-specs – Partitioned columns
  • sort-orders – Sort order of columns
  • properties – Table properties
  • current-snapshot-id – Current snapshot
  • refs – Table references
  • snapshots – List of snapshots, each containing the following information:
    • sequence-number – Sequence number of snapshots in chronological order (the highest number represents the current snapshot, 1 for the first snapshot)
    • snapshot-id – Snapshot ID
    • timestamp-ms – Timestamp when the snapshot was committed
    • summary – Summary of changes committed
    • manifest-list – List of manifests; this file name starts with snap-< snapshot-id >
  • schema-id – Sequence number of the schema in chronological order (the highest number represents the current schema)
  • snapshot-log – List of snapshots in chronological order
  • metadata-log – List of metadata files in chronological order

The metadata file has all the historical changes to the table’s data and schema. Reviewing the contents on the metafile file directly can be a time-consuming task. Fortunately, you can query the Iceberg metadata using Athena.

Iceberg framework in AWS Glue

AWS Glue 4.0 supports Iceberg tables registered with Lake Formation. In the AWS Glue ETL jobs, you need the following code to enable the Iceberg framework:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

args = getResolvedOptions(sys.argv, ['JOB_NAME','warehouse_path']
    
# Set up configuration for AWS Glue to work with Apache Iceberg
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", args['warehouse_path'])
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
conf.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

For read/write access to underlying data, in addition to Lake Formation permissions, the AWS Glue IAM role to run the AWS Glue ETL jobs was granted lakeformation: GetDataAccess IAM permission. With this permission, Lake Formation grants the request for temporary credentials to access the data.

The CloudFormation stack provisioned the four AWS Glue ETL jobs for you. The name of each job starts with your stack name (icebergdemo1). Complete the following steps to view the jobs:

  1. Log in as an administrator to your AWS account.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Search for jobs with icebergdemo1 in the name.

Merge data from Dropzone into the Iceberg table

For our use case, the company ingests their ecommerce orders data daily from their on-premises location into an Amazon S3 Dropzone location. The CloudFormation stack loaded three files with sample orders for 3 days, as shown in the following figures. You see the data in the Dropzone location s3://icebergdemo1-s3bucketdropzone-kunftrcblhsk/data.

The AWS Glue ETL job icebergdemo1-GlueETL1-merge will run daily to merge the data into the Iceberg table. It has the following logic to add or update the data on Iceberg:

  • Create a Spark DataFrame from input data:
df = spark.read.format(dropzone_dataformat).option("header", True).load(dropzone_path)
df = df.withColumn("ordernum", df["ordernum"].cast(IntegerType())) \
    .withColumn("quantity", df["quantity"].cast(IntegerType()))
df.createOrReplaceTempView("input_table")
  • For a new order, add it to the table
  • If the table has a matching order, update the status and shipping_id:
stmt_merge = f"""
    MERGE INTO glue_catalog.{database_name}.{table_name} AS t
    USING input_table AS s 
    ON t.ordernum= s.ordernum
    WHEN MATCHED 
            THEN UPDATE SET 
                t.status = s.status,
                t.shipping_id = s.shipping_id
    WHEN NOT MATCHED THEN INSERT *
    """
spark.sql(stmt_merge)

Complete the following steps to run the AWS Glue merge job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the ETL job icebergdemo1-GlueETL1-merge.
  3. On the Actions dropdown menu, choose Run with parameters.
  4. On the Run parameters page, go to Job parameters.
  5. For the --dropzone_path parameter, provide the S3 location of the input data (icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge1).
  6. Run the job to add all the orders: 1001, 1002, 1003, and 1004.
  7. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge2.
  8. Run the job again to add orders 2001 and 2002, and update orders 1001, 1002, and 1003.
  9. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge3.
  10. Run the job again to add order 3001 and update orders 1001, 1003, 2001, and 2002.

Go to the data folder of table to see the data files written by Iceberg when you merged the data into the table using the Glue ETL job icebergdemo1-GlueETL1-merge.

Query Iceberg using Athena

The CloudFormation stack created the IAM user iceberguser1, which has read access on the Iceberg table using LF-Tags. To query Iceberg using Athena via this user, complete the following steps:

  1. Log in as iceberguser1 to the AWS Management Console.
  2. On the Athena console, choose Workgroups in the navigation pane.
  3. Locate the workgroup that CloudFormation provisioned (icebergdemo1-workgroup)
  4. Verify Athena engine version 3.

The Athena engine version 3 supports Iceberg file formats, including Parquet, ORC, and Avro.

  1. Go to the Athena query editor.
  2. Choose the workgroup icebergdemo1-workgroup on the dropdown menu.
  3. For Database, choose icebergdb1. You will see the table ecomorders.
  4. Run the following query to see the data in the Iceberg table:
    SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

  5. Run the following query to see table’s current partitions:
    DESCRIBE icebergdb1.ecomorders ;

Partition-spec describes how table is partitioned. In this example, there are no partitioned fields because you didn’t define any partitions on the table.

Iceberg partition evolution

You may need to change your partition structure; for example, due to trend changes of common query patterns in downstream analytics. A change of partition structure for traditional tables is a significant operation that requires an entire data copy.

Iceberg makes this straightforward. When you change the partition structure on Iceberg, it doesn’t require you to rewrite the data files. The old data written with earlier partitions remains unchanged. New data is written using the new specifications in a new layout. Metadata for each of the partition versions is kept separately.

Let’s add the partition field category to the Iceberg table using the AWS Glue ETL job icebergdemo1-GlueETL2-partition-evolution:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD PARTITION FIELD category ;

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL2-partition-evolution. When the job is complete, you can query partitions using Athena.

DESCRIBE icebergdb1.ecomorders ;

SELECT * FROM "icebergdb1"."ecomorders$partitions";

You can see the partition field category, but the partition values are null. There are no new data files in the data folder, because partition evolution is a metadata operation and doesn’t rewrite data files. When you add or update data, you will see the corresponding partition values populated.

Iceberg schema evolution

Iceberg supports in-place table evolution. You can evolve a table schema just like SQL. Iceberg schema updates are metadata changes, so no data files need to be rewritten to perform the schema evolution.

To explore the Iceberg schema evolution, run the ETL job icebergdemo1-GlueETL3-schema-evolution via the AWS Glue console. The job runs the following SparkSQL statements:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD COLUMNS (shipping_carrier string) ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    RENAME COLUMN shipping_id TO tracking_number ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ALTER COLUMN ordernum TYPE bigint ;

In the Athena query editor, run the following query:

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum asc ;

You can verify the schema changes to the Iceberg table:

  • A new column has been added called shipping_carrier
  • The column shipping_id has been renamed to tracking_number
  • The data type of the column ordernum has changed from int to bigint
    DESCRIBE icebergdb1.ecomorders;

Positional update

The data in tracking_number contains the shipping carrier concatenated with the tracking number. Let’s assume that we want to split this data in order to keep the shipping carrier in the shipping_carrier field and the tracking number in the tracking_number field.

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL4-update-table. The job runs the following SparkSQL statement to update the table:

UPDATE glue_catalog.icebergdb1.ecomorders
SET shipping_carrier = substring(tracking_number,1,3),
    tracking_number = substring(tracking_number,4,50)
WHERE tracking_number != '' ;

Query the Iceberg table to verify the updated data on tracking_number and shipping_carrier.

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

Now that the data has been updated on the table, you should see the partition values populated for category:

SELECT * FROM "icebergdb1"."ecomorders$partitions"
ORDER BY partition;

Clean up

To avoid incurring future charges, clean up the resources you created:

  1. On the Lambda console, open the details page for the function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.
  2. In the Environment variables section, choose the key Task_To_Perform and update the value to CLEANUP.
  3. Run the function, which drops the database, table, and their associated LF-Tags.
  4. On the AWS CloudFormation console, delete the stack icebergdemo1.

Conclusion

In this post, you created an Iceberg table using the AWS Glue API and used Lake Formation to control access on the Iceberg table in a transactional data lake. With AWS Glue ETL jobs, you merged data into the Iceberg table, and performed schema evolution and partition evolution without rewriting or recreating the Iceberg table. With Athena, you queried the Iceberg data and metadata.

Based on the concepts and demonstrations from this post, you can now build a transactional data lake in an enterprise using Iceberg, AWS Glue, Lake Formation, and Amazon S3.


About the Author

Satya Adimula is a Senior Data Architect at AWS based in Boston. With over two decades of experience in data and analytics, Satya helps organizations derive business insights from their data at scale.

How BMO improved data security with Amazon Redshift and AWS Lake Formation

Post Syndicated from Amy Tseng original https://aws.amazon.com/blogs/big-data/how-bmo-improved-data-security-with-amazon-redshift-and-aws-lake-formation/

This post is cowritten with Amy Tseng, Jack Lin and Regis Chow from BMO.

BMO is the 8th largest bank in North America by assets. It provides personal and commercial banking, global markets, and investment banking services to 13 million customers. As they continue to implement their Digital First strategy for speed, scale and the elimination of complexity, they are always seeking ways to innovate, modernize and also streamline data access control in the Cloud. BMO has accumulated sensitive financial data and needed to build an analytic environment that was secure and performant. One of the bank’s key challenges related to strict cybersecurity requirements is to implement field level encryption for personally identifiable information (PII), Payment Card Industry (PCI), and data that is classified as high privacy risk (HPR). Data with this secured data classification is stored in encrypted form both in the data warehouse and in their data lake. Only users with required permissions are allowed to access data in clear text.

Amazon Redshift is a fully managed data warehouse service that tens of thousands of customers use to manage analytics at scale. Amazon Redshift supports industry-leading security with built-in identity management and federation for single sign-on (SSO) along with multi-factor authentication. The Amazon Redshift Spectrum feature enables direct query of your Amazon Simple Storage Service (Amazon S3) data lake, and many customers are using this to modernize their data platform.

AWS Lake Formation is a fully managed service that simplifies building, securing, and managing data lakes. It provides fine-grained access control, tagging (tag-based access control (TBAC)), and integration across analytical services. It enables simplifying the governance of data catalog objects and accessing secured data from services like Amazon Redshift Spectrum.

In this post, we share the solution using Amazon Redshift role based access control (RBAC) and AWS Lake Formation tag-based access control for federated users to query your data lake using Amazon Redshift Spectrum.

Use-case

BMO had more than Petabyte(PB) of financial sensitive data classified as follows:

  1. Personally Identifiable Information (PII)
  2. Payment Card Industry (PCI)
  3. High Privacy Risk (HPR)

The bank aims to store data in their Amazon Redshift data warehouse and Amazon S3 data lake. They have a large, diverse end user base across sales, marketing, credit risk, and other business lines and personas:

  1. Business analysts
  2. Data engineers
  3. Data scientists

Fine-grained access control needs to be applied to the data on both Amazon Redshift and data lake data accessed using Amazon Redshift Spectrum. The bank leverages AWS services like AWS Glue and Amazon SageMaker on this analytics platform. They also use an external identity provider (IdP) to manage their preferred user base and integrate it with these analytics tools. End users access this data using third-party SQL clients and business intelligence tools.

Solution overview

In this post, we’ll use synthetic data very similar to BMO data with data classified as PII, PCI, or HPR. Users and groups exists in External IdP. These users federate for single sign on to Amazon Redshift using native IdP federation. We’ll define the permissions using Redshift role based access control (RBAC) for the user roles. For users accessing the data in data lake using Amazon Redshift Spectrum, we’ll use Lake Formation policies for access control.

Technical Solution

To implement customer needs for securing different categories of data, it requires the definition of multiple AWS IAM roles, which requires knowledge in IAM policies and maintaining those when permission boundary changes.

In this post, we show how we simplified managing the data classification policies with minimum number of Amazon Redshift AWS IAM roles aligned by data classification, instead of permutations and combinations of roles by lines of business and data classifications. Other organizations (e.g., Financial Service Institute [FSI]) can benefit from the BMO’s implementation of data security and compliance.

As a part of this blog, the data will be uploaded into Amazon S3. Access to the data is controlled using policies defined using Redshift RBAC for corresponding Identity provider user groups and TAG Based access control will be implemented using AWS Lake Formation for data on S3.

Solution architecture

The following diagram illustrates the solution architecture along with the detailed steps.

  1. IdP users with groups like lob_risk_public, Lob_risk_pci, hr_public, and hr_hpr are assigned in External IdP (Identity Provider).
  2. Each users is mapped to the Amazon Redshift local roles that are sent from IdP, and including aad:lob_risk_pci, aad:lob_risk_public, aad:hr_public, and aad:hr_hpr in Amazon Redshift. For example, User1 who is part of Lob_risk_public and hr_hpr will grant role usage accordingly.
  3. Attach iam_redshift_hpr, iam_redshift_pcipii, and iam_redshift_public AWS IAM roles to Amazon Redshift cluster.
  4. AWS Glue databases which are backed on s3 (e.g., lobrisk,lobmarket,hr and their respective tables) are referenced in Amazon Redshift. Using Amazon Redshift Spectrum, you can query these external tables and databases (e.g., external_lobrisk_pci, external_lobrisk_public, external_hr_public, and external_hr_hpr), which are created using AWS IAM roles iam_redshift_pcipii, iam_redshift_hpr, iam_redshift_public as shown in the solutions steps.
  5. AWS Lake Formation is used to control access to the external schemas and tables.
  6. Using AWS Lake Formation tags, we apply the fine-grained access control to these external tables for AWS IAM roles (e.g., iam_redshift_hpr, iam_redshift_pcipii, and iam_redshift_public).
  7. Finally, grant usage for these external schemas to their Amazon Redshift roles.

Walkthrough

The following sections walk you through implementing the solution using synthetic data.

Download the data files and place your files into buckets

Amazon S3 serves as a scalable and durable data lake on AWS. Using Data Lake you can bring any open format data like CSV, JSON, PARQUET, or ORC into Amazon S3 and perform analytics on your data.

The solutions utilize CSV data files containing information classified as PCI, PII, HPR, or Public. You can download input files using the provided links below. Using the downloaded files upload into Amazon S3 by creating folder and files as shown in below screenshot by following the instruction here. The detail of each file is provided in the following list:

Register the files into AWS Glue Data Catalog using crawlers

The following instructions demonstrate how to register files downloaded into the AWS Glue Data Catalog using crawlers. We organize files into databases and tables using AWS Glue Data Catalog, as per the following steps. It is recommended to review the documentation to learn how to properly set up an AWS Glue Database. Crawlers can automate the process of registering our downloaded files into the catalog rather than doing it manually. You’ll create the following databases in the AWS Glue Data Catalog:

  • lobrisk
  • lobmarket
  • hr

Example steps to create an AWS Glue database for lobrisk data are as follows:

  • Go to the AWS Glue Console.
  • Next, select Databases under Data Catalog.
  • Choose Add database and enter the name of databases as lobrisk.
  • Select Create database, as shown in the following screenshot.

Repeat the steps for creating other database like lobmarket and hr.

An AWS Glue Crawler scans the above files and catalogs metadata about them into the AWS Glue Data Catalog. The Glue Data Catalog organizes this Amazon S3 data into tables and databases, assigning columns and data types so the data can be queried using SQL that Amazon Redshift Spectrum can understand. Please review the AWS Glue documentation about creating the Glue Crawler. Once AWS Glue crawler finished executing, you’ll see the following respective database and tables:

  • lobrisk
    • lob_risk_high_confidential_public
    • lob_risk_high_confidential
  • lobmarket
    • credit_card_transaction_pci
    • credit_card_transaction_pci_public
  • hr
    • customers_pii_hpr_public
    • customers_pii_hpr

Example steps to create an AWS Glue Crawler for lobrisk data are as follows:

  • Select Crawlers under Data Catalog in AWS Glue Console.
  • Next, choose Create crawler. Provide the crawler name as lobrisk_crawler and choose Next.

Make sure to select the data source as Amazon S3 and browse the Amazon S3 path to the lob_risk_high_confidential_public folder and choose an Amazon S3 data source.

  • Crawlers can crawl multiple folders in Amazon S3. Choose Add a data source and include path S3://<<Your Bucket >>/ lob_risk_high_confidential.

  • After adding another Amazon S3 folder, then choose Next.

  • Next, create a new IAM role in the Configuration security settings.
  • Choose Next.

  • Select the Target database as lobrisk. Choose Next.

  • Next, under Review, choose Create crawler.
  • Select Run Crawler. This creates two tables : lob_risk_high_confidential_public and lob_risk_high_confidential under database lobrisk.

Similarly, create an AWS Glue crawler for lobmarket and hr data using the above steps.

Create AWS IAM roles

Using AWS IAM, create the following IAM roles with Amazon Redshift, Amazon S3, AWS Glue, and AWS Lake Formation permissions.

You can create AWS IAM roles in this service using this link. Later, you can attach a managed policy to these IAM roles:

  • iam_redshift_pcipii (AWS IAM role attached to Amazon Redshift cluster)
    • AmazonRedshiftFullAccess
    • AmazonS3FullAccess
    • Add inline policy (Lakeformation-inline) for Lake Formation permission as follows:
      {
         "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftPolicyForLF",
                  "Effect": "Allow",
                  "Action": [
                      "lakeformation:GetDataAccess"
                  ],
                  "Resource": "*"
              }
          ]

    • iam_redshift_hpr (AWS IAM role attached to Amazon Redshift cluster): Add the following managed:
      • AmazonRedshiftFullAccess
      • AmazonS3FullAccess
      • Add inline policy (Lakeformation-inline), which was created previously.
    • iam_redshift_public (AWS IAM role attached to Amazon Redshift cluster): Add the following managed policy:
      • AmazonRedshiftFullAccess
      • AmazonS3FullAccess
      • Add inline policy (Lakeformation-inline), which was created previously.
    • LF_admin (Lake Formation Administrator): Add the following managed policy:
      • AWSLakeFormationDataAdmin
      • AWSLakeFormationCrossAccountManager
      • AWSGlueConsoleFullAccess

Use Lake Formation tag-based access control (LF-TBAC) to access control the AWS Glue data catalog tables.

LF-TBAC is an authorization strategy that defines permissions based on attributes. Using LF_admin Lake Formation administrator, you can create LF-tags, as mentioned in the following details:

Key Value
Classification:HPR no, yes
Classification:PCI no, yes
Classification:PII no, yes
Classifications non-sensitive, sensitive

Follow the below instructions to create Lake Formation tags:

  • Log into Lake Formation Console (https://console.aws.amazon.com/lakeformation/) using LF-Admin AWS IAM role.
  • Go to LF-Tags and permissions in Permissions sections.
  • Select Add LF-Tag.

  • Create the remaining LF-Tags as directed in table earlier. Once created you find the LF-Tags as show below.

Assign LF-TAG to the AWS Glue catalog tables

Assigning Lake Formation tags to tables typically involves a structured approach. The Lake Formation Administrator can assign tags based on various criteria, such as data source, data type, business domain, data owner, or data quality. You have the ability to allocate LF-Tags to Data Catalog assets, including databases, tables, and columns, which enables you to manage resource access effectively. Access to these resources is restricted to principals who have been given corresponding LF-Tags (or those who have been granted access through the named resource approach).

Follow the instruction in the give link to assign  LF-TAGS to Glue Data Catalog Tables:

Glue Catalog Tables Key Value
customers_pii_hpr_public Classification non-sensitive
customers_pii_hpr Classification:HPR yes
credit_card_transaction_pci Classification:PCI yes
credit_card_transaction_pci_public Classifications non-sensitive
lob_risk_high_confidential_public Classifications non-sensitive
lob_risk_high_confidential Classification:PII yes

Follow the below instructions to assign a LF-Tag to Glue Tables from AWS Console as follows:

  • To access the databases in Lake Formation Console, go to the Data catalog section and choose Databases.
  • Select the lobrisk database and choose View Tables.
  • Select lob_risk_high_confidential table and edit the LF-Tags.
  • Assign the Classification:HPR as Assigned Keys and Values as Yes. Select Save.

  • Similarly, assign the Classification Key and Value as non-sensitive for the lob_risk_high_confidential_public table.

Follow the above instructions to assign tables to remaining tables for lobmarket and hr databases.

Grant permissions to resources using a LF-Tag expression grant to Redshift IAM Roles

Grant select, describe Lake Formation permission to LF-Tags and Redshift IAM role using Lake Formation Administrator in Lake formation console. To grant, please follow the documentation.

Use the following table to grant the corresponding IAM role to LF-tags:

IAM role LF-Tags Key LF-Tags Value Permission
iam_redshift_pcipii Classification:PII yes Describe, Select
. Classification:PCI yes .
iam_redshift_hpr Classification:HPR yes Describe, Select
iam_redshift_public Classifications non-sensitive Describe, Select

Follow the below instructions to grant permissions to LF-tags and IAM roles:

  • Choose Data lake permissions in Permissions section in the AWS Lake Formation Console.
  • Choose Grants. Select IAM users and roles in Principals.
  • In LF-tags or catalog resources select Key as Classifications and values as non-sensitive.

  • Next, select Table permissions as Select & Describe. Choose grants.

Follow the above instructions for remaining LF-Tags and their IAM roles, as shown in the previous table.

Map the IdP user groups to the Redshift roles

In Redshift, use Native IdP federation to map the IdP user groups to the Redshift roles. Use Query Editor V2.

create role aad:rs_lobrisk_pci_role;
create role aad:rs_lobrisk_public_role;
create role aad:rs_hr_hpr_role;
create role aad:rs_hr_public_role;
create role aad:rs_lobmarket_pci_role;
create role aad:rs_lobmarket_public_role;

Create External schemas

In Redshift, create External schemas using AWS IAM roles and using AWS Glue Catalog databases. External schema’s are created as per data classification using iam_role.

create external schema external_lobrisk_pci
from data catalog
database 'lobrisk'
iam_role 'arn:aws:iam::571750435036:role/iam_redshift_pcipii';

create external schema external_hr_hpr
from data catalog
database 'hr'
iam_role 'arn:aws:iam::571750435036:role/iam_redshift_hpr';

create external schema external_lobmarket_pci
from data catalog
database 'lobmarket'
iam_role 'arn:aws:iam::571750435036:role/iam_redshift_pcipii';

create external schema external_lobrisk_public
from data catalog
database 'lobrisk'
iam_role 'arn:aws:iam::571750435036:role/iam_redshift_public';

create external schema external_hr_public
from data catalog
database 'hr'
iam_role 'arn:aws:iam::571750435036:role/iam_redshift_public';

create external schema external_lobmarket_public
from data catalog
database 'lobmarket'
iam_role 'arn:aws:iam::571750435036:role/iam_redshift_public';

Verify list of tables

Verify list of tables in each external schema. Each schema lists only the tables Lake Formation has granted to IAM_ROLES used to create external schema. Below is the list of tables in Redshift query edit v2 output on top left hand side.

Grant usage on external schemas to different Redshift local Roles

In Redshift, grant usage on external schemas to different Redshift local Roles as follows:

grant usage on schema external_lobrisk_pci to role aad:rs_lobrisk_pci_role;
grant usage on schema external_lobrisk_public to role aad:rs_lobrisk_public_role;

grant usage on schema external_lobmarket_pci to role aad:rs_lobmarket_pci_role;
grant usage on schema external_lobmarket_public to role aad:rs_lobmarket_public_role;

grant usage on schema external_hr_hpr_pci to role aad:rs_hr_hpr_role;
grant usage on schema external_hr_public to role aad:rs_hr_public_role;

Verify access to external schema

Verify access to external schema using user from Lob Risk team. User lobrisk_pci_user federated into Amazon Redshift local role rs_lobrisk_pci_role. Role rs_lobrisk_pci_role only has access to external schema external_lobrisk_pci.

set session_authorization to creditrisk_pci_user;
select * from external_lobrisk_pci.lob_risk_high_confidential limit 10;

On querying table from external_lobmarket_pci schema, you’ll see that your permission is denied.

set session_authorization to lobrisk_pci_user;
select * from external_lobmarket_hpr.lob_card_transaction_pci;

BMO’s automated access provisioning

Working with the bank, we developed an access provisioning framework that allows the bank to create a central repository of users and what data they have access to. The policy file is stored in Amazon S3. When the file is updated, it is processed, messages are placed in Amazon SQS. AWS Lambda using Data API is used to apply access control to Amazon Redshift roles. Simultaneously, AWS Lambda is used to automate tag-based access control in AWS Lake Formation.

Benefits of adopting this model were:

  1. Created a scalable automation process to allow dynamically applying changing policies.
  2. Streamlined the user accesses on-boarding and processing with existing enterprise access management.
  3. Empowered each line of business to restrict access to sensitive data they own and protect customers data and privacy at enterprise level.
  4. Simplified the AWS IAM role management and maintenance by greatly reduced number of roles required.

With the recent release of Amazon Redshift integration with AWS Identity center which allows identity propagation across AWS service can be leveraged to simplify and scale this implementation.

Conclusion

In this post, we showed you how to implement robust access controls for sensitive customer data in Amazon Redshift, which were challenging when trying to define many distinct AWS IAM roles. The solution presented in this post demonstrates how organizations can meet data security and compliance needs with a consolidated approach—using a minimal set of AWS IAM roles organized by data classification rather than business lines.

By using Amazon Redshift’s native integration with External IdP and defining RBAC policies in both Redshift and AWS Lake Formation, granular access controls can be applied without creating an excessive number of distinct roles. This allows the benefits of role-based access while minimizing administrative overhead.

Other financial services institutions looking to secure customer data and meet compliance regulations can follow a similar consolidated RBAC approach. Careful policy definition, aligned to data sensitivity rather than business functions, can help reduce the proliferation of AWS IAM roles. This model balances security, compliance, and manageability for governance of sensitive data in Amazon Redshift and broader cloud data platforms.

In short, a centralized RBAC model based on data classification streamlines access management while still providing robust data security and compliance. This approach can benefit any organization managing sensitive customer information in the cloud.


About the Authors

Amy Tseng is a Managing Director of Data and Analytics(DnA) Integration at BMO. She is one of the AWS Data Hero. She has over 7 years of experiences in Data and Analytics Cloud migrations in AWS. Outside of work, Amy loves traveling and hiking.

Jack Lin is a Director of Engineering on the Data Platform at BMO. He has over 20 years of experience working in platform engineering and software engineering. Outside of work, Jack loves playing soccer, watching football games and traveling.

Regis Chow is a Director of DnA Integration at BMO. He has over 5 years of experience working in the cloud and enjoys solving problems through innovation in AWS. Outside of work, Regis loves all things outdoors, he is especially passionate about golf and lawn care.

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Harshida Patel is a Principal Solutions Architect, Analytics with AWS.

Raghu Kuppala is an Analytics Specialist Solutions Architect experienced working in the databases, data warehousing, and analytics space. Outside of work, he enjoys trying different cuisines and spending time with his family and friends.

Empowering data-driven excellence: How the Bluestone Data Platform embraced data mesh for success

Post Syndicated from Toney Thomas original https://aws.amazon.com/blogs/big-data/empowering-data-driven-excellence-how-the-bluestone-data-platform-embraced-data-mesh-for-success/

This post is co-written with Toney Thomas and Ben Vengerovsky from Bluestone.

In the ever-evolving world of finance and lending, the need for real-time, reliable, and centralized data has become paramount. Bluestone, a leading financial institution, embarked on a transformative journey to modernize its data infrastructure and transition to a data-driven organization. In this post, we explore how Bluestone uses AWS services, notably the cloud data warehousing service Amazon Redshift, to implement a cutting-edge data mesh architecture, revolutionizing the way they manage, access, and utilize their data assets.

The challenge: Legacy to modernization

Bluestone was operating with a legacy SQL-based lending platform, as illustrated in the following diagram. To stay competitive and responsive to changing market dynamics, they decided to modernize their infrastructure. This modernization involved transitioning to a software as a service (SaaS) based loan origination and core lending platforms. Because these new systems produced vast amounts of data, the challenge of ensuring a single source of truth for all data consumers emerged.

Birth of the Bluestone Data Platform

To address the need for centralized, scalable, and governable data, Bluestone introduced the Bluestone Data Platform. This platform became the hub for all data-related activities across the organization. AWS played a pivotal role in bringing this vision to life.

The following are the key components of the Bluestone Data Platform:

  • Data mesh architecture – Bluestone adopted a data mesh architecture, a paradigm that distributes data ownership across different business units. Each data producer within the organization has its own data lake in Apache Hudi format, ensuring data sovereignty and autonomy.
  • Four-layered data lake and data warehouse architecture – The architecture comprises four layers, including the analytical layer, which houses purpose-built facts and dimension datasets that are hosted in Amazon Redshift. These datasets are pivotal for reporting and analytics use cases, powered by services like Amazon Redshift and tools like Power BI.
  • Machine learning analytics – Various business units, such as Servicing, Lending, Sales & Marketing, Finance, and Credit Risk, use machine learning analytics, which run on top of the dimensional model within the data lake and data warehouse. This enables data-driven decision-making across the organization.
  • Governance and self-service – The Bluestone Data Platform provides a governed, curated, and self-service avenue for all data use cases. AWS services like AWS Lake Formation in conjunction with Atlan help govern data access and policies.
  • Data quality framework – To ensure data reliability, they implemented a data quality framework. It continuously assesses data quality and syncs quality scores to the Atlan governance tool, instilling confidence in the data assets within the platform.

The following diagram illustrates the architecture of their updated data platform.

AWS and third-party services

AWS played a pivotal and multifaceted role in empowering Bluestone’s Data Platform to thrive. The following AWS and third-party services were instrumental in shaping Bluestone’s journey toward becoming a data-driven organization:

  • Amazon Redshift – Bluestone harnessed the power of Amazon Redshift and its features like data sharing to create a centralized repository of data assets. This strategic move facilitated seamless data sharing and collaboration across diverse business units, paving the way for more informed and data-driven decision-making.
  • Lake Formation – Lake Formation emerged as a cornerstone in Bluestone’s data governance strategy. It played a critical role in enforcing data access controls and implementing data policies. With Lake Formation, Bluestone achieved protection of sensitive data and compliance with regulatory requirements.
  • Data quality monitoring – To maintain data reliability and accuracy, Bluestone deployed a robust data quality framework. AWS services were essential in this endeavor, because they complemented open source tools to establish an in-house data quality monitoring system. This system continuously assesses data quality, providing confidence in the reliability of the organization’s data assets.
  • Data governance tooling – Bluestone chose Atlan, available through AWS Marketplace, to implement comprehensive data governance tooling. This SaaS service played a pivotal role in onboarding multiple business teams and fostering a data-centric culture within Bluestone. It empowered teams to efficiently manage and govern data assets.
  • Orchestration using Amazon MWAA – Bluestone heavily relied on Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to manage workflow orchestrations efficiently. This orchestration framework seamlessly integrated with various data quality rules, which were evaluated using Great Expectations operators within the Airflow environment.
  • AWS DMS – Bluestone used AWS Database Migration Service (AWS DMS) to streamline the consolidation of legacy data into the data platform. This service facilitated the smooth transfer of data from legacy SQL Server warehouses to the data lake and data warehouse, providing data continuity and accessibility.
  • AWS Glue – Bluestone used the AWS Glue PySpark environment for implementing data extract, transform, and load (ETL) processes. It played a pivotal role in processing data originating from various source systems, providing data consistency and suitability for analytical use.
  • AWS Glue Data Catalog – Bluestone centralized their data management using the AWS Glue Data Catalog. This catalog served as the backbone for managing data assets within the Bluestone data estate, enhancing data discoverability and accessibility.
  • AWS CloudTrail – Bluestone implemented AWS CloudTrail to monitor and audit platform activities rigorously. This security-focused service provided essential visibility into platform actions, providing compliance and security in data operations.

AWS’s comprehensive suite of services has been integral in propelling the Bluestone Data Platform towards data-driven success. These services have not only enabled efficient data governance, quality assurance, and orchestration, but have also fostered a culture of data centricity within the organization, ultimately leading to better decision-making and competitive advantage. Bluestone’s journey showcases the power of AWS in transforming organizations into data-driven leaders in their respective industries.

Bluestone data architecture

Bluestone’s data architecture has undergone a dynamic transformation, transitioning from a lake house framework to a data mesh architecture. This evolution was driven by the organization’s need for data products with distributed ownership and the necessity for a centralized mechanism to govern and access these data products across various business units.

The following diagram illustrates the solution architecture and its use of AWS and third-party services.

Let’s delve deeper into how this architecture shift has unfolded and what it entails:

  • The need for change – The catalyst for this transformation was the growing demand for discrete data products tailored to the unique requirements of each business unit within Bluestone. Because these business units generated their own data assets in their respective domains, the challenge lay in efficiently managing, governing, and accessing these diverse data stores. Bluestone recognized the need for a more structured and scalable approach.
  • Data products with distributed ownership – In response to this demand, Bluestone adopted a data mesh architecture, which allowed for the creation of distinct data products aligned with each business unit’s needs. Each of these data products exists independently, generating and curating data assets specific to its domain. These data products serve as individual data hubs, ensuring data autonomy and specialization.
  • Centralized catalog integration – To streamline the discovery and accessibility of the data assets that are dispersed across these data products, Bluestone introduced a centralized catalog. This catalog acts as a unified repository where all data products register their respective data assets. It serves as a critical component for data discovery and management.
  • Data governance tool integration – Ensuring data governance and lineage tracking across the organization was another pivotal consideration. Bluestone implemented a robust data governance tool that connects to the centralized catalog. This integration makes sure that the overarching lineage of data assets is comprehensively mapped and captured. Data governance processes are thereby enforced consistently, guaranteeing data quality and compliance.
  • Amazon Redshift data sharing for control and access – To facilitate controlled and secure access to data assets residing within individual data product Redshift instances, Bluestone used Amazon Redshift data sharing. This capability allows data assets to be exposed and shared selectively, providing granular control over access while maintaining data security and integrity.

In essence, Bluestone’s journey from a lake house to a data mesh architecture represents a strategic shift in data management and governance. This transformation empowers different business units to operate autonomously within their data domains while ensuring centralized control, governance, and accessibility. The integration of a centralized catalog and data governance tooling, coupled with the flexibility of Amazon Redshift data sharing, creates a harmonious ecosystem where data-driven decision-making thrives, ultimately contributing to Bluestone’s success in the ever-evolving financial landscape.

Conclusion

Bluestone’s journey from a legacy SQL-based system to a modern data mesh architecture on AWS has improved the way the organization interacts with data and positioned them as a data-driven powerhouse in the financial industry. By embracing AWS services, Bluestone has successfully achieved a centralized, scalable, and governable data platform that empowers its teams to make informed decisions, drive innovation, and stay ahead in the competitive landscape. This transformation serves as compelling proof that Amazon Redshift and AWS Cloud data sharing capabilities are a great pathway for organizations looking to embark on their own data-driven journeys with AWS.


About the Authors

Toney Thomas is a Data Architect and Data Engineering Lead at Bluestone, renowned for his role in envisioning and coining the company’s pioneering data strategy. With a strategic focus on harnessing the power of advanced technology to tackle intricate business challenges, Toney leads a dynamic team of Data Engineers, Reporting Engineers, Quality Assurance specialists, and Business Analysts at Bluestone. His leadership extends to driving the implementation of robust data governance frameworks across diverse organizational units. Under his guidance, Bluestone has achieved remarkable success, including the deployment of innovative platforms such as a fully governed data mesh business data system with embedded data quality mechanisms, aligning seamlessly with the organization’s commitment to data democratization and excellence.

Ben Vengerovsky is a Data Platform Product Manager at Bluestone. He is passionate about using cloud technology to revolutionize the company’s data infrastructure. With a background in mortgage lending and a deep understanding of AWS services, Ben specializes in designing scalable and efficient data solutions that drive business growth and enhance customer experiences. He thrives on collaborating with cross-functional teams to translate business requirements into innovative technical solutions that empower data-driven decision-making.

Rada Stanic is a Chief Technologist at Amazon Web Services, where she helps ANZ customers across different segments solve their business problems using AWS Cloud technologies. Her special areas of interest are data analytics, machine learning/AI, and application modernization.

Deploying an EMR cluster on AWS Outposts to process data from an on-premises database

Post Syndicated from Macey Neff original https://aws.amazon.com/blogs/compute/deploying-an-emr-cluster-on-aws-outposts-to-process-data-from-an-on-premises-database/

seThis post is written by Eder de Mattos, Sr. Cloud Security Consultant, AWS and Fernando Galves, Outpost Solutions Architect, AWS.

In this post, you will learn how to deploy an Amazon EMR cluster on AWS Outposts and use it to process data from an on-premises database. Many organizations have regulatory, contractual, or corporate policy requirements to process and store data in a specific geographical location. These strict requirements become a challenge for organizations to find flexible solutions that balance regulatory compliance with the agility of cloud services. Amazon EMR is the industry-leading cloud big data platform for data processing, interactive analysis, and machine learning (ML) that uses open-source frameworks. With Amazon EMR on Outposts, you can seamlessly use data analytics solutions to process data locally in your on-premises environment without moving data to the cloud. This post focuses on creating and configuring an Amazon EMR cluster on AWS Outposts rack using Amazon Virtual Private Cloud (Amazon VPC) endpoints and keeping the networking traffic in the on-premises environment.

Architecture overview

In this architecture, there is an Amazon EMR cluster created in an AWS Outposts subnet. The cluster retrieves data from an on-premises PostgreSQL database, employs a PySpark Step for data processing, and then stores the result in a new table within the same database. The following diagram shows this architecture.

Architecture overview

Figure 1 Architecture overview

Networking traffic on premises: The communication between the EMR cluster and the on-premises PostgreSQL database is through the Local Gateway. The core Amazon Elastic Compute Cloud (Amazon EC2) instances of the EMR cluster are associated with Customer-owned IP addresses (CoIP), and each instance has two IP addresses: an internal IP and a CoIP IP. The internal IP is used to communicate locally in the subnet, and the CoIP IP is used to communicate with the on-premises network.

Amazon VPC endpoints: Amazon EMR establishes communication with the VPC through an interface VPC endpoint. This communication is private and conducted entirely within the AWS network instead of connecting over the internet. In this architecture, VPC endpoints are created on a subnet in the AWS Region.

The support files used to create the EMR cluster are stored in an Amazon Simple Storage Service (Amazon S3) bucket. The communication between the VPC and Amazon S3 stays within the AWS network. The following files are stored in this S3 bucket:

  • get-postgresql-driver.sh: This is a bootstrap script to download the PostgreSQL driver to allow the Spark step to communicate to the PostgreSQL database through JDBC. You can download it through the GitHub repository for this Amazon EMR on Outposts blog post.
  • postgresql-42.6.0.jar: PostgreSQL binary JAR file for the JDBC driver.
  • spark-step-example.py: Example of a Step application in PySpark to simulate the connection to the PostgreSQL database.

AWS Systems Manager is configured to manage the EC2 instances that belong to the EMR cluster. It uses an interface VPC endpoint to allow the VPC to communicate privately with the Systems Manager.

The database credentials to connect to the PostgreSQL database are stored in AWS Secrets Manager. Amazon EMR integrates with Secrets Manager. This allows the secret to be stored in the Secrets Manager and be used through its ARN in the cluster configuration. During the creation of the EMR cluster, the secret is accessed privately through an interface VPC endpoint and stored in the variable DBCONNECTION in the EMR cluster.

In this solution, we are creating a small EMR cluster with one primary and one core node. For the correct sizing of your cluster, see Estimating Amazon EMR cluster capacity.

There is additional information to improve the security posture for organizations that use AWS Control Tower landing zone and AWS Organizations. The post Architecting for data residency with AWS Outposts rack and landing zone guardrails is a great place to start.

Prerequisites

Before deploying the EMR cluster on Outposts, you must make sure the following resources are created and configured in your AWS account:

  1. Outposts rack are installed, up and running.
  2. Amazon EC2 key pair is created. To create it, you can follow the instructions in Create a key pair using Amazon EC2 in the Amazon EC2 user guide.

Deploying the EMR cluster on Outposts

1.      Deploy the CloudFormation template to create the infrastructure for the EMR cluster

You can use this AWS CloudFormation template to create the infrastructure for the EMR cluster. To create a stack, you can follow the instructions in Creating a stack on the AWS CloudFormation console in the AWS CloudFormation user guide.

2.      Create an EMR cluster

To launch a cluster with Spark installed using the console:

Step 1: Configure Name and Applications

  1. Sign in to the AWS Management Console, and open the Amazon EMR console.
  2. Under EMR on EC2, in the left navigation pane, select Clusters, and then choose Create Cluster.
  3. On the Create cluster page, enter a unique cluster name for the Name
  4. For Amazon EMR release, choose emr-6.13.0.
  5. In the Application bundle field, select Spark 3.4.1 and Zeppelin 0.10.1, and unselect all the other options.
  6. For the Operating system options, select Amazon Linux release.

Create Cluster Figure 2: Create Cluster

Step 2: Choose Cluster configuration method

  1. Under the Cluster configuration, select Uniform instance groups.
  2. For the Primary and the Core, select the EC2 instance type available in the Outposts rack that is supported by the EMR cluster.
  3. Remove the instance group Task 1 of 1.

Remove the instance group Task 1 of 1

Figure 3: Remove the instance group Task 1 of 1

Step 3: Set up Cluster scaling and provisioning, Networking and Cluster termination

  1. In the Cluster scaling and provisioning option, choose Set cluster size manually and type the value 1 for the Core
  2. On the Networking, select the VPC and the Outposts subnet.
  3. For Cluster termination, choose Manually terminate cluster.

Step 4: Configure the Bootstrap actions

A. In the Bootstrap actions, add an action with the following information:

    1. Name: copy-postgresql-driver.sh
    2. Script location: s3://<bucket-name>/copy-postgresql-driver.sh. Modify the <bucket-name> variable to the bucket name you specified as a parameter in Step 1.

Add bootstrap action

Figure 4: Add bootstrap action

Step 5: Configure Cluster logs and Tags

a. Under Cluster logs, choose Publish cluster-specific logs to Amazon S3 and enter s3://<bucket-name>/logs for the field Amazon S3 location. Modify the <bucket-name> variable to the bucket name you specified as a parameter in Step 1.

Amazon S3 location for cluster logs

Figure 5: Amazon S3 location for cluster logs

b. In Tags, add new tag. You must enter for-use-with-amazon-emr-managed-policies for the Key field and true for Value.

Add tags

Figure 6: Add tags

Step 6: Set up Software settings and Security configuration and EC2 key pair

a. In the Software settings, enter the following configuration replacing the Secret ARN created in Step 1:

[
          {
                    "Classification": "spark-defaults",
                    "Properties": {
                              "spark.driver.extraClassPath": "/opt/spark/postgresql/driver/postgresql-42.6.0.jar",
                              "spark.executor.extraClassPath": "/opt/spark/postgresql/driver/postgresql-42.6.0.jar",
                              "[email protected]":
                                         "arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>"
                    }
          }
]

This is an example of the Secret ARN replaced:

Example of the Secret ARN replaced

Figure 7: Example of the Secret ARN replaced

b. For the Security configuration and EC2 key pair, choose the SSH key pair.

Step 7: Choose Identity and Access Management (IAM) roles

a. Under Identity and Access Management (IAM) roles:

    1. In the Amazon EMR service role:
      • Choose AmazonEMR-outposts-cluster-role for the Service role.
    2. In EC2 instance profile for Amazon EMR
      • Choose AmazonEMR-outposts-EC2-role.

Choose the service role and instance profile

Figure 8: Choose the service role and instance profile

Step 8: Create cluster

  1. Choose Create cluster to launch the cluster and open the cluster details page.

Now, the EMR cluster is starting. When your cluster is ready to process tasks, its status changes to Waiting. This means the cluster is up, running, and ready to accept work.

Result of the cluster creation

Figure 9: Result of the cluster creation

3.      Add CoIPs to EMR core nodes

You need to allocate an Elastic IP from the CoIP pool and associate it with the EC2 instance of the EMR core nodes. This is necessary to allow the core nodes to access the on-premises environment. To allocate an Elastic IP, follow the instructions in Allocate an Elastic IP address in Amazon EC2 User Guide for Linux Instances. In Step 5, choose the Customer-owned pool of IPV4 addresses.

Once the CoIP IP is allocated, associate it with each EC2 instance of the EMR core node. Follow the instructions in Associate an Elastic IP address with an instance or network interface in Amazon EC2 User Guide for Linux Instances.

Checking the configuration

  1. Make sure the EC2 instance of the core nodes can ping the IP of the PostgreSQL database.

Connect to the Core node EC2 instance using Systems Manager and ping the IP address of the PostgreSQL database.

Connectivity test

Figure 10: Connectivity test

  1. Make sure the Status of the EMR cluster is Waiting.

: Cluster is ready and waiting

Figure 11: Cluster is ready and waiting

Adding a step to the Amazon EMR cluster

You can use the following Spark application to simulate the data processing from the PostgreSQL database.

spark-step-example.py:

import os
from pyspark.sql import SparkSession

if __name__ == "__main__":

    # ---------------------------------------------------------------------
    # Step 1: Get the database connection information from the EMR cluster 
    #         configuration
    dbconnection = os.environ.get('DBCONNECTION')
    #    Remove brackets
    dbconnection_info = (dbconnection[1:-1]).split(",")
    #    Initialize variables
    dbusername = ''
    dbpassword = ''
    dbhost = ''
    dbport = ''
    dbname = ''
    dburl = ''
    #    Parse the database connection information
    for dbconnection_attribute in dbconnection_info:
        (key_data, key_value) = dbconnection_attribute.split(":", 1)

        if key_data == "username":
            dbusername = key_value
        elif key_data == "password":
            dbpassword = key_value
        elif key_data == 'host':
            dbhost = key_value
        elif key_data == 'port':
            dbport = key_value
        elif key_data == 'dbname':
            dbname = key_value

    dburl = "jdbc:postgresql://" + dbhost + ":" + dbport + "/" + dbname

    # ---------------------------------------------------------------------
    # Step 2: Connect to the PostgreSQL database and select data from the 
    #         pg_catalog.pg_tables table
    spark_db = SparkSession.builder.config("spark.driver.extraClassPath",                                          
               "/opt/spark/postgresql/driver/postgresql-42.6.0.jar") \
               .appName("Connecting to PostgreSQL") \
               .getOrCreate()

    #    Connect to the database
    data_db = spark_db.read.format("jdbc") \
        .option("url", dburl) \
        .option("driver", "org.postgresql.Driver") \
        .option("query", "select count(*) from pg_catalog.pg_tables") \
        .option("user", dbusername) \
        .option("password", dbpassword) \
        .load()

    # ---------------------------------------------------------------------
    # Step 3: To do the data processing
    #
    #    TO-DO

    # ---------------------------------------------------------------------
    # Step 4: Save the data into the new table in the PostgreSQL database
    #
    data_db.write \
        .format("jdbc") \
        .option("url", dburl) \
        .option("dbtable", "results_proc") \
        .option("user", dbusername) \
        .option("password", dbpassword) \
        .save()

    # ---------------------------------------------------------------------
    # Step 5: Close the Spark session
    #
    spark_db.stop()
    # ---------------------------------------------------------------------

You must upload the file spark-step-example.py to the bucket created in Step 1 of this post before submitting the Spark application to the EMR cluster. You can get the file at this GitHub repository for a Spark step example.

Submitting the Spark application step using the Console

To submit the Spark application to the EMR cluster, follow the instructions in To submit a Spark step using the console in the Amazon EMR Release Guide. In Step 4 of this Amazon EMR guide, provide the following parameters to add a step:

  1. choose Cluster mode for the Deploy mode
  2. type a name for your step (such as Step 1)
  3. for the Application location, choose s3://<bucket-name>/spark-step-example.py and replace the <bucket-name> variable to the bucket name you specified as a parameter in Step 1
  4. leave the Spark-submit options field blank

Add a step to the EMR cluster

Figure 12: Add a step to the EMR cluster

The Step is created with the Status Pending. When it is done, the Status changes to Completed.

Step executed successfully

Figure 13: Step executed successfully

Cleaning up

When the EMR cluster is no longer needed, you can delete the resources created to avoid incurring future costs by following these steps:

  1. Follow the instructions in Terminate a cluster with the console in the Amazon EMR Documentation Management Guide. Remember to turn off the Termination protection.
  2. Dissociate and release the CoIP IPs allocated to the EC2 instances of the EMR core nodes.
  3. Delete the stack in the AWS CloudFormation using the instructions in Deleting a Stack on the AWS CloudFormation console in the AWS CloudFormation User Guide

Conclusion

Amazon EMR on Outposts allows you to use the managed services offered by AWS to perform big data processing close to your data that needs to remain on-premises. This architecture eliminates the need to transfer on-premises data to the cloud, providing a robust solution for organizations with regulatory, contractual, or corporate policy requirements to store and process data in a specific location. With the EMR cluster accessing the on-premises database directly through local networking, you can expect faster and more efficient data processing without compromising on compliance or agility. To learn more, visit the Amazon EMR on AWS Outposts product overview page.

Build an analytics pipeline that is resilient to schema changes using Amazon Redshift Spectrum

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/build-an-analytics-pipeline-that-is-resilient-to-schema-changes-using-amazon-redshift-spectrum/

You can ingest and integrate data from multiple Internet of Things (IoT) sensors to get insights. However, you may have to integrate data from multiple IoT sensor devices to derive analytics like equipment health information from all the sensors based on common data elements. Each of these sensor devices could be transmitting data with unique schemas and different attributes.

You can ingest data from all your IoT sensors to a central location on Amazon Simple Storage Service (Amazon S3). Schema evolution is a feature where a database table’s schema can evolve to accommodate for changes in the attributes of the files getting ingested. With the schema evolution functionality available in AWS Glue, Amazon Redshift Spectrum can automatically handle schema changes when new attributes get added or existing attributes get dropped. This is achieved with an AWS Glue crawler by reading schema changes based on the S3 file structures. The crawler creates a hybrid schema that works with both old and new datasets. You can read from all the ingested data files at a specified Amazon S3 location with different schemas through a single Amazon Redshift Spectrum table by referring to the AWS Glue metadata catalog.

In this post, we demonstrate how to use the AWS Glue schema evolution feature to read from multiple JSON formatted files with various schemas that are stored in a single Amazon S3 location. We also show how to query this data in Amazon S3 with Redshift Spectrum without redefining the schema or loading the data into Redshift tables.

Solution overview

The solution consists of the following steps:

  • Create an Amazon Data Firehose delivery stream with Amazon S3 as its destination.
  • Generate sample stream data from the Amazon Kinesis Data Generator (KDG) with the Firehose delivery stream as the destination.
  • Upload the initial data files to the Amazon S3 location.
  • Create and run an AWS Glue crawler to populate the Data Catalog with external table definition by reading the data files from Amazon S3.
  • Create the external schema called iotdb_ext in Amazon Redshift and query the Data Catalog table.
  • Query the external table from Redshift Spectrum to read data from the initial schema.
  • Add additional data elements to the KDG template and send the data to the Firehose delivery stream.
  • Validate that the additional data files are loaded to Amazon S3 with additional data elements.
  • Run an AWS Glue crawler to update the external table definitions.
  • Query the external table from Redshift Spectrum again to read the combined dataset from two different schemas.
  • Delete a data element from the template and send the data to the Firehose delivery stream.
  • Validate that the additional data files are loaded to Amazon S3 with one less data element.
  • Run an AWS Glue crawler to update the external table definitions.
  • Query the external table from Redshift Spectrum to read the combined dataset from three different schemas.

This solution is depicted in the following architecture diagram.

Prerequisites

This solution requires the following prerequisites:

Implement the solution

Complete the following steps to build the solution:

  • On the Kinesis console, create a Firehose delivery stream with the following parameters:
    • For Source, choose Direct PUT.
    • For Destination, choose Amazon S3.
    • For S3 bucket, enter your S3 bucket.
    • For Dynamic partitioning, select Enabled.

    • Add the following dynamic partitioning keys:
      • Key year with expression .connectionTime | strptime("%d/%m/%Y:%H:%M:%S") | strftime("%Y")
      • Key month with expression .connectionTime | strptime("%d/%m/%Y:%H:%M:%S") | strftime("%m")
      • Key day with expression .connectionTime | strptime("%d/%m/%Y:%H:%M:%S") | strftime("%d")
      • Key hour with expression .connectionTime | strptime("%d/%m/%Y:%H:%M:%S") | strftime("%H")
    • For S3 bucket prefix, enter year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/

You can review your delivery stream details on the Kinesis Data Firehose console.

Your delivery stream configuration details should be similar to the following screenshot.

  • Generate sample stream data from the KDG with the Firehose delivery stream as the destination with the following template:
    {
    "sensorId": {{random.number(999999999)}},
    "sensorType": "{{random.arrayElement( ["Thermostat","SmartWaterHeater","HVACTemperatureSensor","WaterPurifier"] )}}",
    "internetIP": "{{internet.ip}}",
    "recordedDate": "{{date.past}}",
    "connectionTime": "{{date.now("DD/MM/YYYY:HH:mm:ss")}}",
    "currentTemperature": "{{random.number({"min":10,"max":150})}}",
    "serviceContract": "{{random.arrayElement( ["ActivePartsService","Inactive","SCIP","ActiveServiceOnly"] )}}",
    "status": "{{random.arrayElement( ["OK","FAIL","WARN"] )}}" }

  • On the Amazon S3 console, validate that the initial set of files got loaded into the S3 bucket.
  • On the AWS Glue console, create and run an AWS Glue Crawler with the data source as the S3 bucket that you used in the earlier step.

When the crawler is complete, you can validate that the table was created on the AWS Glue console.

  • In Amazon Redshift Query Editor v2, connect to the Redshift instance and create an external schema pointing to the AWS Glue Data Catalog database. In the following code, use the Amazon Resource Name (ARN) for the IAM role that your cluster uses for authentication and authorization. As a minimum, the IAM role must have permission to perform a LIST operation on the S3 bucket to be accessed and a GET operation on the S3 objects the bucket contains.
    CREATE external SCHEMA iotdb_ext FROM data catalog DATABASE 'iotdb' IAM_ROLE 'arn:aws:iam::&lt;AWS account-id&gt;:role/&lt;role-name&gt;' 
    CREATE external DATABASE if not exists;

  • Query the table defined in the Data Catalog from the Redshift external schema and note the columns defined in the KDG template:
    select * from iotdb_ext.sensorsiotschemaevol;

  • Add an additional data element in the KDG template and send the data to the Firehose delivery stream:
    "serviceRecommendedDate": "{{date.future}}",

  • Validate that the new data was added to the S3 bucket.
  • Rerun the AWS Glue crawler.
  • Query the table again from the Redshift external schema and note the newly populated dataset vs. the previous dataset for the servicerecommendeddate column:
    select * from iotdb_ext.sensorsiotschemaevol where servicerecommendeddate is not null;

    select * from iotdb_ext.sensorsiotschemaevol where servicerecommendeddate is null;

  • Delete the data element status from the KDG template and resend the data to the Firehose delivery stream.
  • Validate that new data was added to the S3 bucket.
  • Rerun the AWS Glue crawler.
  • Query the table again from the Redshift external schema and note the newly populated dataset vs. previous datasets with values for the status column:
    select * from iotdb_ext.sensorsiotschemaevol order by connectiontime desc;

    select * from iotdb_ext.sensorsiotschemaevol order by connectiontime;

Troubleshooting

If data is not loaded into Amazon S3 after sending it from the KDG template to the Firehose delivery stream, refresh and make sure you are logged in to the KDG.

Clean up

You may want to delete your S3 data and Redshift cluster if you are not planning to use it further to avoid unnecessary cost to your AWS account.

Conclusion

With the emergence of requirements for predictive and prescriptive analytics based on big data, there is a growing demand for data solutions that integrate data from multiple heterogeneous data models with minimal effort. In this post, we showcased how you can derive metrics from common atomic data elements from different data sources with unique schemas. You can store data from all the data sources in a common S3 location, either in the same folder or multiple subfolders by each data source. You can define and schedule an AWS Glue crawler to run at the same frequency as the data refresh requirements for your data consumption. With this solution, you can create a Redshift Spectrum table to read from an S3 location with varying file structures using the AWS Glue Data Catalog and schema evolution functionality.

If you have any questions or suggestions, please leave your feedback in the comment section. If you need further assistance with building analytics solutions with data from various IoT sensors, please contact your AWS account team.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

How Gupshup built their multi-tenant messaging analytics platform on Amazon Redshift

Post Syndicated from Gaurav Singh original https://aws.amazon.com/blogs/big-data/how-gupshup-built-their-multi-tenant-messaging-analytics-platform-on-amazon-redshift/

Gupshup is a leading conversational messaging platform, powering over 10 billion messages per month. Across verticals, thousands of large and small businesses in emerging markets use Gupshup to build conversational experiences across marketing, sales, and support. Gupshup’s carrier-grade platform provides a single messaging API for 30+ channels, a rich conversational experience-building tool kit for any use case, and a network of emerging market partnerships across messaging channels, device manufacturers, ISVs, and operators.

Objective

Gupshup wanted to build a messaging analytics platform that provided:

  • Build a platform to get detailed insights, data, and reports about WhatsApp/SMS campaigns and track the success of every text message sent by the end customers.
  • Easily gain insight into trends, delivery rates, and speed.
  • Save time and eliminate unnecessary processes.

About Redshift and some relevant features for the use case

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Amazon Redshift extends beyond traditional data warehousing workloads, by integrating with the AWS cloud with features such as querying the data lake with Spectrum, semistructured data ingestion and querying with PartiQL, streaming ingestion from Amazon Kinesis and Amazon MSK, Redshift ML, federated queries to Amazon Aurora and Amazon RDS operational databases, and federated materialized views.

In this use case, Gupshup is heavily relying on Amazon Redshift as their data warehouse to process billions of streaming events every month, performing intricate data-pipeline-like operations on such data and incrementally maintaining a hierarchy of aggregations on top of raw data. They have been enjoying the flexibility and convenience that Amazon Redshift has brought to their business. By leveraging the Amazon Redshift materialized views, Gupshup has been able to dramatically improve query performance on recurring and predictable workloads, such as dashboard queries from Business Intelligence (BI) tools. Additionally, extract, load, and transform (ELT) data processing is sped up and made easier. To store commonly used pre-computations and seamlessly utilize them to reduce latency on ensuing analytical queries, Redshift materialized views feature incremental refresh capability which enables Gupshup to be more agile while using less code. Without writing complicated code for incremental updates, they were able to deliver data latency of roughly 15 minutes for some use cases.

Overall architecture and implementation details with Redshift Materialized views

Gupshup uses a CDC mechanism to extract data from their source systems and persist it in S3 in order to meet these needs. A series of materialized view refreshes are used to calculate metrics, after which the incremental data from S3 is loaded into Redshift. This compiled data is then imported into Aurora PostgreSQL Serverless for operational reporting. The ability of Redshift to incrementally refresh materialized views, enabling it to process massive amounts of data progressively, the capacity for scaling, which utilizes concurrency and elastic resizing for vertical scaling, as well as the RA3 architecture, delivers the separation of storage and compute to scale one without worrying about the other, led Gupshup to make this choice. Gupshup chose Aurora PostgreSQL as the operational reporting layer due to its anticipated increase in concurrency and cost-effectiveness for queries that retrieve only precalculated metrics.

Incremental analytics is the main reason for Gupshup to use Redshift. The diagram shows a simplified version of a typical data processing pipeline where data comes via multiple streams. The streams need to be joined together, then enriched by joining with master data tables. This is followed by series of joins and aggregations. All this needs to be performed in incremental manner, providing 30 minutes of latency.

Gupshup uses Redshift’s incremental materialized view feature to accomplish this. All of the join, enrich, and aggregation statements are written using sql statements. The stream-to-stream joins are performed by ingesting both streams in a table sorted by the key fields. Then an incremental MV aggregates data by the key fields. Redshift then automatically takes care of keeping the MVs refreshed incrementally with incoming data. The incremental view maintenance feature works even for hierarchical aggregations with MVs based on other MVs. This allows Gupshup to build an entire processing pipeline incrementally. It has actually helped Gupshup reduce cycle time during the POC and prototyping phases. Moreover, no separate effort is required to process historical data versus live streaming data.

Apart from incremental analytics, Redshift simplifies a lot of operational aspects. E.g., use the snapshot-restore feature to quickly create a green experimental cluster from an existing blue serving cluster. In case the processing logic changes (which happens quite often in prototyping stages), they need to reprocess all historical data. Gupshup uses Redshift’s elastic scaling feature to temporarily scale the cluster up and then scale it down when done. They also use Redshift to directly power some of their high-concurrency dashboards. For such cases, the concurrency scaling feature of Redshift really comes in handy. Apart from this, they have a lot of in-house data analysts who need to run ad hoc queries on live production data. They use the workload management features of Redshift to make sure their analysts can run queries while ensuring that production queries do not get affected.

Benefits realized with Amazon Redshift

  • On-Demand Scaling
  • Ease of use and maintenance with less code
  • Performance benefits with an incremental MV refresh

Conclusion

Gupshup, an enterprise messaging company, needed a scalable data warehouse solution to analyze billions of events generated each month. They chose Amazon Redshift to build a cloud data warehouse that could handle this scale of data and enable fast analytics.

By combining Redshift’s scalability, snapshots, workload management, and low-operational approach, Gupshup provides data-driven insights in less than 15 minutes analytics refresh rate.

Overall, Redshift’s scalability, performance, ease of management, and cost effectiveness have allowed Gupshup to gain data-driven insights from billions of events in near real-time. A scalable and robust data foundation is enabling Gupshup to build innovative messaging products and a competitive advantage.

The incremental refresh of materialized views feature of Redshift allowed us to be more agile with less code:

  • For some use cases, we are able to provide data latency of about 15 minutes, without having to write complex code for incremental updates.
  • The incremental refresh feature is a main differentiating factor that gives Redshift an edge over some of its competitors. I request that you keep improving and enhancing it.

“The incremental refresh of materialized views feature of Redshift allowed us to be more agile with less code”

Pankaj Bisen, Director of AI and Analytics at Gupshup.


About the Authors

Shabi Abbas Sayed is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with the customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.