Tag Archives: Best practices

Gain insights from historical location data using Amazon Location Service and AWS analytics services

Post Syndicated from Alan Peaty original https://aws.amazon.com/blogs/big-data/gain-insights-from-historical-location-data-using-amazon-location-service-and-aws-analytics-services/

Many organizations around the world rely on the use of physical assets, such as vehicles, to deliver a service to their end-customers. By tracking these assets in real time and storing the results, asset owners can derive valuable insights on how their assets are being used to continuously deliver business improvements and plan for future changes. For example, a delivery company operating a fleet of vehicles may need to ascertain the impact from local policy changes outside of their control, such as the announced expansion of an Ultra-Low Emission Zone (ULEZ). By combining historical vehicle location data with information from other sources, the company can devise empirical approaches for better decision-making. For example, the company’s procurement team can use this information to make decisions about which vehicles to prioritize for replacement before policy changes go into effect.

Developers can use the support in Amazon Location Service for publishing device position updates to Amazon EventBridge to build a near-real-time data pipeline that stores locations of tracked assets in Amazon Simple Storage Service (Amazon S3). Additionally, you can use AWS Lambda to enrich incoming location data with data from other sources, such as an Amazon DynamoDB table containing vehicle maintenance details. Then a data analyst can use the geospatial querying capabilities of Amazon Athena to gain insights, such as the number of days their vehicles have operated in the proposed boundaries of an expanded ULEZ. Because vehicles that do not meet ULEZ emissions standards are subjected to a daily charge to operate within the zone, you can use the location data, along with maintenance data such as age of the vehicle, current mileage, and current emissions standards to estimate the amount the company would have to spend on daily fees.

This post shows how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use this data to drive meaningful insights using AWS Glue and Athena.

Overview of solution

This is a fully serverless solution for location-based asset management. The solution consists of the following interfaces:

  • IoT or mobile application – A mobile application or an Internet of Things (IoT) device allows the tracking of a company vehicle while it is in use and transmits its current location securely to the data ingestion layer in AWS. The ingestion approach is not in scope of this post. Instead, a Lambda function in our solution simulates sample vehicle journeys and directly updates Amazon Location tracker objects with randomized locations.
  • Data analytics – Business analysts gather operational insights from multiple data sources, including the location data collected from the vehicles. Data analysts are looking for answers to questions such as, “How long did a given vehicle historically spend inside a proposed zone, and how much would the fees have cost had the policy been in place over the past 12 months?”

The following diagram illustrates the solution architecture.
Architecture diagram

The workflow consists of the following key steps:

  1. The tracking functionality of Amazon Location is used to track the vehicle. Using EventBridge integration, filtered positional updates are published to an EventBridge event bus. This solution uses distance-based filtering to reduce costs and jitter. Distanced-based filtering ignores location updates in which devices have moved less than 30 meters (98.4 feet).
  2. Amazon Location device position events arrive on the EventBridge default bus with source: ["aws.geo"] and detail-type: ["Location Device Position Event"]. One rule is created to forward these events to two downstream targets: a Lambda function, and a Firehose delivery stream.
  3. Two different patterns, based on each target, are described in this post to demonstrate different approaches to committing the data to a S3 bucket:
    1. Lambda function – The first approach uses a Lambda function to demonstrate how you can use code in the data pipeline to directly transform the incoming location data. You can modify the Lambda function to fetch additional vehicle information from a separate data store (for example, a DynamoDB table or a Customer Relationship Management system) to enrich the data, before storing the results in an S3 bucket. In this model, the Lambda function is invoked for each incoming event.
    2. Firehose delivery stream – The second approach uses a Firehose delivery stream to buffer and batch the incoming positional updates, before storing them in an S3 bucket without modification. This method uses GZIP compression to optimize storage consumption and query performance. You can also use the data transformation feature of Data Firehose to invoke a Lambda function to perform data transformation in batches.
  4. AWS Glue crawls both S3 bucket paths, populates the AWS Glue database tables based on the inferred schemas, and makes the data available to other analytics applications through the AWS Glue Data Catalog.
  5. Athena is used to run geospatial queries on the location data stored in the S3 buckets. The Data Catalog provides metadata that allows analytics applications using Athena to find, read, and process the location data stored in Amazon S3.
  6. This solution includes a Lambda function that continuously updates the Amazon Location tracker with simulated location data from fictitious journeys. The Lambda function is triggered at regular intervals using a scheduled EventBridge rule.

You can test this solution yourself using the AWS Samples GitHub repository. The repository contains the AWS Serverless Application Model (AWS SAM) template and Lambda code required to try out this solution. Refer to the instructions in the README file for steps on how to provision and decommission this solution.

Visual layouts in some screenshots in this post may look different than those on your AWS Management Console.

Data generation

In this section, we discuss the steps to manually or automatically generate journey data.

Manually generate journey data

You can manually update device positions using the AWS Command Line Interface (AWS CLI) command aws location batch-update-device-position. Replace the tracker-name, device-id, Position, and SampleTime values with your own, and make sure that successive updates are more than 30 meters in distance apart to place an event on the default EventBridge event bus:

aws location batch-update-device-position --tracker-name <tracker-name> --updates "[{\"DeviceId\": \"<device-id>\", \"Position\": [<longitude>, <latitude>], \"SampleTime\": \"<YYYY-MM-DDThh:mm:ssZ>\"}]"

Automatically generate journey data using the simulator

The provided AWS CloudFormation template deploys an EventBridge scheduled rule and an accompanying Lambda function that simulates tracker updates from vehicles. This rule is enabled by default, and runs at a frequency specified by the SimulationIntervalMinutes CloudFormation parameter. The data generation Lambda function updates the Amazon Location tracker with a randomized position offset from the vehicles’ base locations.

Vehicle names and base locations are stored in the vehicles.json file. A vehicle’s starting position is reset each day, and base locations have been chosen to give them the ability to drift in and out of the ULEZ on a given day to provide a realistic journey simulation.

You can disable the rule temporarily by navigating to the scheduled rule details on the EventBridge console. Alternatively, change the parameter State: ENABLED to State: DISABLED for the scheduled rule resource GenerateDevicePositionsScheduleRule in the template.yml file. Rebuild and re-deploy the AWS SAM template for this change to take effect.

Location data pipeline approaches

The configurations outlined in this section are deployed automatically by the provided AWS SAM template. The information in this section is provided to describe the pertinent parts of the solution.

Amazon Location device position events

Amazon Location sends device position update events to EventBridge in the following format:

{
    "version":"0",
    "id":"<event-id>",
    "detail-type":"Location Device Position Event",
    "source":"aws.geo",
    "account":"<account-number>",
    "time":"<YYYY-MM-DDThh:mm:ssZ>",
    "region":"<region>",
    "resources":[
        "arn:aws:geo:<region>:<account-number>:tracker/<tracker-name>"
    ],
    "detail":{
        "EventType":"UPDATE",
        "TrackerName":"<tracker-name>",
        "DeviceId":"<device-id>",
        "SampleTime":"<YYYY-MM-DDThh:mm:ssZ>",
        "ReceivedTime":"<YYYY-MM-DDThh:mm:ss.sssZ>",
        "Position":[
            <longitude>, 
            <latitude>
	]
    }
}

You can optionally specify an input transformation to modify the format and contents of the device position event data before it reaches the target.

Data enrichment using Lambda

Data enrichment in this pattern is facilitated through the invocation of a Lambda function. In this example, we call this function ProcessDevicePosition, and use a Python runtime. A custom transformation is applied in the EventBridge target definition to receive the event data in the following format:

{
    "EventType":<EventType>,
    "TrackerName":<TrackerName>,
    "DeviceId":<DeviceId>,
    "SampleTime":<SampleTime>,
    "ReceivedTime":<ReceivedTime>,
    "Position":[<Longitude>,<Latitude>]
}

You could apply additional transformations, such as the refactoring of Latitude and Longitude data into separate key-value pairs if this is required by the downstream business logic processing the events.

The following code demonstrates the Python application logic that is run by the ProcessDevicePosition Lambda function. Error handling has been skipped in this code snippet for brevity. The full code is available in the GitHub repo.

import json
import os
import uuid
import boto3

# Import environment variables from Lambda function.
bucket_name = os.environ["S3_BUCKET_NAME"]
bucket_prefix = os.environ["S3_BUCKET_LAMBDA_PREFIX"]

s3 = boto3.client("s3")

def lambda_handler(event, context):
    key = "%s/%s/%s-%s.json" % (bucket_prefix,
                                event["DeviceId"],
                                event["SampleTime"],
                                str(uuid.uuid4())
    body = json.dumps(event, separators=(",", ":"))
    body_encoded = body.encode("utf-8")
    s3.put_object(Bucket=bucket_name, Key=key, Body=body_encoded)
    return {
        "statusCode": 200,
        "body": "success"
    }

The preceding code creates an S3 object for each device position event received by EventBridge. The code uses the DeviceId as a prefix to write the objects to the bucket.

You can add additional logic to the preceding Lambda function code to enrich the event data using other sources. The example in the GitHub repo demonstrates enriching the event with data from a DynamoDB vehicle maintenance table.

In addition to the prerequisite AWS Identity and Access Management (IAM) permissions provided by the role AWSBasicLambdaExecutionRole, the ProcessDevicePosition function requires permissions to perform the S3 put_object action and any other actions required by the data enrichment logic. IAM permissions required by the solution are documented in the template.yml file.

{
    "Version":"2012-10-17",
    "Statement":[
        {
            "Action":[
                "s3:ListBucket"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>"
            ],
            "Effect":"Allow"
        },
        {
            "Action":[
                "s3:PutObject"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>/<S3_BUCKET_LAMBDA_PREFIX>/*"
            ],
            "Effect":"Allow"
        }
    ]
}

Data pipeline using Amazon Data Firehose

Complete the following steps to create your Firehose delivery stream:

  1. On the Amazon Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose as Direct PUT.
  4. For Destination, choose Amazon S3.
  5. For Firehose stream name, enter a name (for this post, ProcessDevicePositionFirehose).
    Create Firehose stream
  6. Configure the destination settings with details about the S3 bucket in which the location data is stored, along with the partitioning strategy:
    1. Use <S3_BUCKET_NAME> and <S3_BUCKET_FIREHOSE_PREFIX> to determine the bucket and object prefixes.
    2. Use DeviceId as an additional prefix to write the objects to the bucket.
  7. Enable Dynamic partitioning and New line delimiter to make sure partitioning is automatic based on DeviceId, and that new line delimiters are added between records in objects that are delivered to Amazon S3.

These are required by AWS Glue to later crawl the data, and for Athena to recognize individual records.
Destination settings for Firehose stream

Create an EventBridge rule and attach targets

The EventBridge rule ProcessDevicePosition defines two targets: the ProcessDevicePosition Lambda function, and the ProcessDevicePositionFirehose delivery stream. Complete the following steps to create the rule and attach targets:

  1. On the EventBridge console, create a new rule.
  2. For Name, enter a name (for this post, ProcessDevicePosition).
  3. For Event bus¸ choose default.
  4. For Rule type¸ select Rule with an event pattern.
    EventBridge rule detail
  5. For Event source, select AWS events or EventBridge partner events.
    EventBridge event source
  6. For Method, select Use pattern form.
  7. In the Event pattern section, specify AWS services as the source, Amazon Location Service as the specific service, and Location Device Position Event as the event type.
    EventBridge creation method
  8. For Target 1, attach the ProcessDevicePosition Lambda function as a target.
    EventBridge target 1
  9. We use Input transformer to customize the event that is committed to the S3 bucket.
    EventBridge target 1 transformer
  10. Configure Input paths map and Input template to organize the payload into the desired format.
    1. The following code is the input paths map:
      {
          EventType: $.detail.EventType
          TrackerName: $.detail.TrackerName
          DeviceId: $.detail.DeviceId
          SampleTime: $.detail.SampleTime
          ReceivedTime: $.detail.ReceivedTime
          Longitude: $.detail.Position[0]
          Latitude: $.detail.Position[1]
      }

    2. The following code is the input template:
      {
          "EventType":<EventType>,
          "TrackerName":<TrackerName>,
          "DeviceId":<DeviceId>,
          "SampleTime":<SampleTime>,
          "ReceivedTime":<ReceivedTime>,
          "Position":[<Longitude>, <Latitude>]
      }

  11. For Target 2, choose the ProcessDevicePositionFirehose delivery stream as a target.
    EventBridge target 2

This target requires an IAM role that allows one or multiple records to be written to the Firehose delivery stream:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecords"
            ],
            "Resource": [
                "arn:aws:firehose:<region>:<account-id>:deliverystream/<delivery-stream-name>"
            ],
            "Effect": "Allow"
        }
    ]
}

Crawl and catalog the data using AWS Glue

After sufficient data has been generated, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the crawlers that have been created, location-analytics-glue-crawler-lambda and location-analytics-glue-crawler-firehose.
  3. Choose Run.

The crawlers will automatically classify the data into JSON format, group the records into tables and partitions, and commit associated metadata to the AWS Glue Data Catalog.
Crawlers

  1. When the Last run statuses of both crawlers show as Succeeded, confirm that two tables (lambda and firehose) have been created on the Tables page.

The solution partitions the incoming location data based on the deviceid field. Therefore, as long as there are no new devices or schema changes, the crawlers don’t need to run again. However, if new devices are added, or a different field is used for partitioning, the crawlers need to run again.
Tables

You’re now ready to query the tables using Athena.

Query the data using Athena

Athena is a serverless, interactive analytics service built to analyze unstructured, semi-structured, and structured data where it is hosted. If this is your first time using the Athena console, follow the instructions to set up a query result location in Amazon S3. To query the data with Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. For Data source, choose AwsDataCatalog.
  3. For Database, choose location-analytics-glue-database.
  4. On the options menu (three vertical dots), choose Preview Table to query the content of both tables.
    Preview table

The query displays 10 sample positional records currently stored in the table. The following screenshot is an example from previewing the firehose table. The firehose table stores raw, unmodified data from the Amazon Location tracker.
Query results
You can now experiment with geospatial queries.The GeoJSON file for the 2021 London ULEZ expansion is part of the repository, and has already been converted into a query compatible with both Athena tables.

  1. Copy and paste the content from the 1-firehose-athena-ulez-2021-create-view.sql file found in the examples/firehose folder into the query editor.

This query uses the ST_Within geospatial function to determine if a recorded position is inside or outside the ULEZ zone defined by the polygon. A new view called ulezvehicleanalysis_firehose is created with a new column, insidezone, which captures whether the recorded position exists within the zone.

A simple Python utility is provided, which converts the polygon features found in the downloaded GeoJSON file into ST_Polygon strings based on the well-known text format that can be used directly in an Athena query.

  1. Choose Preview View on the ulezvehicleanalysis_firehose view to explore its content.
    Preview view

You can now run queries against this view to gain overarching insights.

  1. Copy and paste the content from the 2-firehose-athena-ulez-2021-query-days-in-zone.sql file found in the examples/firehose folder into the query editor.

This query establishes the total number of days each vehicle has entered ULEZ, and what the expected total charges would be. The query has been parameterized using the ? placeholder character. Parameterized queries allow you to rerun the same query with different parameter values.

  1. Enter the daily fee amount for Parameter 1, then run the query.
    Query editor

The results display each vehicle, the total number of days spent in the proposed ULEZ, and the total charges based on the daily fee you entered.
Query results
You can repeat this exercise using the lambda table. Data in the lambda table is augmented with additional vehicle details present in the vehicle maintenance DynamoDB table at the time it is processed by the Lambda function. The solution supports the following fields:

  • MeetsEmissionStandards (Boolean)
  • Mileage (Number)
  • PurchaseDate (String, in YYYY-MM-DD format)

You can also enrich the new data as it arrives.

  1. On the DynamoDB console, find the vehicle maintenance table under Tables. The table name is provided as output VehicleMaintenanceDynamoTable in the deployed CloudFormation stack.
  2. Choose Explore table items to view the content of the table.
  3. Choose Create item to create a new record for a vehicle.
    Create item
  4. Enter DeviceId (such as vehicle1 as a String), PurchaseDate (such as 2005-10-01 as a String), Mileage (such as 10000 as a Number), and MeetsEmissionStandards (with a value such as False as Boolean).
  5. Choose Create item to create the record.
    Create item
  6. Duplicate the newly created record with additional entries for other vehicles (such as for vehicle2 or vehicle3), modifying the values of the attributes slightly each time.
  7. Rerun the location-analytics-glue-crawler-lambda AWS Glue crawler after new data has been generated to confirm that the update to the schema with new fields is registered.
  8. Copy and paste the content from the 1-lambda-athena-ulez-2021-create-view.sql file found in the examples/lambda folder into the query editor.
  9. Preview the ulezvehicleanalysis_lambda view to confirm that the new columns have been created.

If errors such as Column 'mileage' cannot be resolved are displayed, the data enrichment is not taking place, or the AWS Glue crawler has not yet detected updates to the schema.

If the Preview table option is only returning results from before you created records in the DynamoDB table, return the query results in descending order using sampletime (for example, order by sampletime desc limit 100;).
Query results
Now we focus on the vehicles that don’t currently meet emissions standards, and order the vehicles in descending order based on the mileage per year (calculated using the latest mileage / age of vehicle in years).

  1. Copy and paste the content from the 2-lambda-athena-ulez-2021-query-days-in-zone.sql file found in the examples/lambda folder into the query editor.
    Query results

In this example, we can see that out of our fleet of vehicles, five have been reported as not meeting emission standards. We can also see the vehicles that have accumulated high mileage per year, and the number of days spent in the proposed ULEZ. The fleet operator may now decide to prioritize these vehicles for replacement. Because location data is enriched with the most up-to-date vehicle maintenance data at the time it is ingested, you can further evolve these queries to run over a defined time window. For example, you could factor in mileage changes within the past year.

Due to the dynamic nature of the data enrichment, any new data being committed to Amazon S3, along with the query results, will be altered as and when records are updated in the DynamoDB vehicle maintenance table.

Clean up

Refer to the instructions in the README file to clean up the resources provisioned for this solution.

Conclusion

This post demonstrated how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use the collected device position data to drive analytical insights using AWS Glue and Athena. By tracking these assets in real time and storing the results, companies can derive valuable insights on how effectively their fleets are being utilized and better react to changes in the future. You can now explore extending this sample code with your own device tracking data and analytics requirements.


About the Authors

Alan Peaty is a Senior Partner Solutions Architect at AWS. Alan helps Global Systems Integrators (GSIs) and Global Independent Software Vendors (GISVs) solve complex customer challenges using AWS services. Prior to joining AWS, Alan worked as an architect at systems integrators to translate business requirements into technical solutions. Outside of work, Alan is an IoT enthusiast and a keen runner who loves to hit the muddy trails of the English countryside.

Parag Srivastava is a Solutions Architect at AWS, helping enterprise customers with successful cloud adoption and migration. During his professional career, he has been extensively involved in complex digital transformation projects. He is also passionate about building innovative solutions around geospatial aspects of addresses.

How the GoDaddy data platform achieved over 60% cost reduction and 50% performance boost by adopting Amazon EMR Serverless

Post Syndicated from Brandon Abear original https://aws.amazon.com/blogs/big-data/how-the-godaddy-data-platform-achieved-over-60-cost-reduction-and-50-performance-boost-by-adopting-amazon-emr-serverless/

This is a guest post co-written with Brandon Abear, Dinesh Sharma, John Bush, and Ozcan IIikhan from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

At GoDaddy, we take pride in being a data-driven company. Our relentless pursuit of valuable insights from data fuels our business decisions and ensures customer satisfaction. Our commitment to efficiency is unwavering, and we’ve undertaken an exciting initiative to optimize our batch processing jobs. In this journey, we have identified a structured approach that we refer to as the seven layers of improvement opportunities. This methodology has become our guide in the pursuit of efficiency.

In this post, we discuss how we enhanced operational efficiency with Amazon EMR Serverless. We share our benchmarking results and methodology, and insights into the cost-effectiveness of EMR Serverless vs. fixed capacity Amazon EMR on EC2 transient clusters on our data workflows orchestrated using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). We share our strategy for the adoption of EMR Serverless in areas where it excels. Our findings reveal significant benefits, including over 60% cost reduction, 50% faster Spark workloads, a remarkable five-times improvement in development and testing speed, and a significant reduction in our carbon footprint.

Background

In late 2020, GoDaddy’s data platform initiated its AWS Cloud journey, migrating an 800-node Hadoop cluster with 2.5 PB of data from its data center to EMR on EC2. This lift-and-shift approach facilitated a direct comparison between on-premises and cloud environments, ensuring a smooth transition to AWS pipelines, minimizing data validation issues and migration delays.

By early 2022, we successfully migrated our big data workloads to EMR on EC2. Using best practices learned from the AWS FinHack program, we fine-tuned resource-intensive jobs, converted Pig and Hive jobs to Spark, and reduced our batch workload spend by 22.75% in 2022. However, scalability challenges emerged due to the multitude of jobs. This prompted GoDaddy to embark on a systematic optimization journey, establishing a foundation for more sustainable and efficient big data processing.

Seven layers of improvement opportunities

In our quest for operational efficiency, we have identified seven distinct layers of opportunities for optimization within our batch processing jobs, as shown in the following figure. These layers range from precise code-level enhancements to more comprehensive platform improvements. This multi-layered approach has become our strategic blueprint in the ongoing pursuit of better performance and higher efficiency.

Seven layers of improvement opportunities

The layers are as follows:

  • Code optimization – Focuses on refining the code logic and how it can be optimized for better performance. This involves performance enhancements through selective caching, partition and projection pruning, join optimizations, and other job-specific tuning. Using AI coding solutions is also an integral part of this process.
  • Software updates – Updating to the latest versions of open source software (OSS) to capitalize on new features and improvements. For example, Adaptive Query Execution in Spark 3 brings significant performance and cost improvements.
  • Custom Spark configurations Tuning of custom Spark configurations to maximize resource utilization, memory, and parallelism. We can achieve significant improvements by right-sizing tasks, such as through spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes, spark.executor.cores, and spark.executor.memory. However, these custom configurations might be counterproductive if they are not compatible with the specific Spark version.
  • Resource provisioning time The time it takes to launch resources like ephemeral EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2). Although some factors influencing this time are outside of an engineer’s control, identifying and addressing the factors that can be optimized can help reduce overall provisioning time.
  • Fine-grained scaling at task level Dynamically adjusting resources such as CPU, memory, disk, and network bandwidth based on each stage’s needs within a task. The aim here is to avoid fixed cluster sizes that could result in resource waste.
  • Fine-grained scaling across multiple tasks in a workflow Given that each task has unique resource requirements, maintaining a fixed resource size may result in under- or over-provisioning for certain tasks within the same workflow. Traditionally, the size of the largest task determines the cluster size for a multi-task workflow. However, dynamically adjusting resources across multiple tasks and steps within a workflow result in a more cost-effective implementation.
  • Platform-level enhancements – Enhancements at preceding layers can only optimize a given job or a workflow. Platform improvement aims to attain efficiency at the company level. We can achieve this through various means, such as updating or upgrading the core infrastructure, introducing new frameworks, allocating appropriate resources for each job profile, balancing service usage, optimizing the use of Savings Plans and Spot Instances, or implementing other comprehensive changes to boost efficiency across all tasks and workflows.

Layers 1–3: Previous cost reductions

After we migrated from on premises to AWS Cloud, we primarily focused our cost-optimization efforts on the first three layers shown in the diagram. By transitioning our most costly legacy Pig and Hive pipelines to Spark and optimizing Spark configurations for Amazon EMR, we achieved significant cost savings.

For example, a legacy Pig job took 10 hours to complete and ranked among the top 10 most expensive EMR jobs. Upon reviewing TEZ logs and cluster metrics, we discovered that the cluster was vastly over-provisioned for the data volume being processed and remained under-utilized for most of the runtime. Transitioning from Pig to Spark was more efficient. Although no automated tools were available for the conversion, manual optimizations were made, including:

  • Reduced unnecessary disk writes, saving serialization and deserialization time (Layer 1)
  • Replaced Airflow task parallelization with Spark, simplifying the Airflow DAG (Layer 1)
  • Eliminated redundant Spark transformations (Layer 1)
  • Upgraded from Spark 2 to 3, using Adaptive Query Execution (Layer 2)
  • Addressed skewed joins and optimized smaller dimension tables (Layer 3)

As a result, job cost decreased by 95%, and job completion time was reduced to 1 hour. However, this approach was labor-intensive and not scalable for numerous jobs.

Layers 4–6: Find and adopt the right compute solution

In late 2022, following our significant accomplishments in optimization at the previous levels, our attention moved towards enhancing the remaining layers.

Understanding the state of our batch processing

We use Amazon MWAA to orchestrate our data workflows in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. In this post, the terms workflow and job are used interchangeably, referring to the Directed Acyclic Graphs (DAGs) consisting of tasks orchestrated by Amazon MWAA. For each workflow, we have sequential or parallel tasks, and even a combination of both in the DAG between create_emr and terminate_emr tasks running on a transient EMR cluster with fixed compute capacity throughout the workflow run. Even after optimizing a portion of our workload, we still had numerous non-optimized workflows that were under-utilized due to over-provisioning of compute resources based on the most resource-intensive task in the workflow, as shown in the following figure.

This highlighted the impracticality of static resource allocation and led us to recognize the necessity of a dynamic resource allocation (DRA) system. Before proposing a solution, we gathered extensive data to thoroughly understand our batch processing. Analyzing the cluster step time, excluding provisioning and idle time, revealed significant insights: a right-skewed distribution with over half of the workflows completing in 20 minutes or less and only 10% taking more than 60 minutes. This distribution guided our choice of a fast-provisioning compute solution, dramatically reducing workflow runtimes. The following diagram illustrates step times (excluding provisioning and idle time) of EMR on EC2 transient clusters in one of our batch processing accounts.

Furthermore, based on the step time (excluding provisioning and idle time) distribution of the workflows, we categorized our workflows into three groups:

  • Quick run – Lasting 20 minutes or less
  • Medium run – Lasting between 20–60 minutes
  • Long run – Exceeding 60 minutes, often spanning several hours or more

Another factor we needed to consider was the extensive use of transient clusters for reasons such as security, job and cost isolation, and purpose-built clusters. Additionally, there was a significant variation in resource needs between peak hours and periods of low utilization.

Instead of fixed-size clusters, we could potentially use managed scaling on EMR on EC2 to achieve some cost benefits. However, migrating to EMR Serverless appears to be a more strategic direction for our data platform. In addition to potential cost benefits, EMR Serverless offers additional advantages such as a one-click upgrade to the newest Amazon EMR versions, a simplified operational and debugging experience, and automatic upgrades to the latest generations upon rollout. These features collectively simplify the process of operating a platform on a larger scale.

Evaluating EMR Serverless: A case study at GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

Recognizing the potential of EMR Serverless, we conducted an in-depth benchmark study using real production workflows. The study aimed to assess EMR Serverless performance and efficiency while also creating an adoption plan for large-scale implementation. The findings were highly encouraging, showing EMR Serverless can effectively handle our workloads.

Benchmarking methodology

We split our data workflows into three categories based on total step time (excluding provisioning and idle time): quick run (0–20 minutes), medium run (20–60 minutes), and long run (over 60 minutes). We analyzed the impact of the EMR deployment type (Amazon EC2 vs. EMR Serverless) on two key metrics: cost-efficiency and total runtime speedup, which served as our overall evaluation criteria. Although we did not formally measure ease of use and resiliency, these factors were considered throughout the evaluation process.

The high-level steps to assess the environment are as follows:

  1. Prepare the data and environment:
    1. Choose three to five random production jobs from each job category.
    2. Implement required adjustments to prevent interference with production.
  2. Run tests:
    1. Run scripts over several days or through multiple iterations to gather precise and consistent data points.
    2. Perform tests using EMR on EC2 and EMR Serverless.
  3. Validate data and test runs:
    1. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  4. Gather metrics and analyze results:
    1. Gather relevant metrics from the tests.
    2. Analyze results to draw insights and conclusions.

Benchmark results

Our benchmark results showed significant enhancements across all three job categories for both runtime speedup and cost-efficiency. The improvements were most pronounced for quick jobs, directly resulting from faster startup times. For instance, a 20-minute (including cluster provisioning and shut down) data workflow running on an EMR on EC2 transient cluster of fixed compute capacity finishes in 10 minutes on EMR Serverless, providing a shorter runtime with cost benefits. Overall, the shift to EMR Serverless delivered substantial performance improvements and cost reductions at scale across job brackets, as seen in the following figure.

Historically, we devoted more time to tuning our long-run workflows. Interestingly, we discovered that the existing custom Spark configurations for these jobs did not always translate well to EMR Serverless. In cases where the results were insignificant, a common approach was to discard previous Spark configurations related to executor cores. By allowing EMR Serverless to autonomously manage these Spark configurations, we often observed improved outcomes. The following graph shows the average runtime and cost improvement per job when comparing EMR Serverless to EMR on EC2.

Per Job Improvement

The following table shows a sample comparison of results for the same workflow running on different deployment options of Amazon EMR (EMR on EC2 and EMR Serverless).

Metric EMR on EC2
(Average)
EMR Serverless
(Average)
EMR on EC2 vs
EMR Serverless
Total Run Cost ($) $ 5.82 $ 2.60 55%
Total Run Time (Minutes) 53.40 39.40 26%
Provisioning Time (Minutes) 10.20 0.05 .
Provisioning Cost ($) $ 1.19 . .
Steps Time (Minutes) 38.20 39.16 -3%
Steps Cost ($) $ 4.30 . .
Idle Time (Minutes) 4.80 . .
EMR Release Label emr-6.9.0 .
Hadoop Distribution Amazon 3.3.3 .
Spark Version Spark 3.3.0 .
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3 .
Job Type Spark .

AWS Graviton2 on EMR Serverless performance evaluation

After seeing compelling results with EMR Serverless for our workloads, we decided to further analyze the performance of the AWS Graviton2 (arm64) architecture within EMR Serverless. AWS had benchmarked Spark workloads on Graviton2 EMR Serverless using the TPC-DS 3TB scale, showing a 27% overall price-performance improvement.

To better understand the integration benefits, we ran our own study using GoDaddy’s production workloads on a daily schedule and observed an impressive 23.8% price-performance enhancement across a range of jobs when using Graviton2. For more details about this study, see GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless.

Adoption strategy for EMR Serverless

We strategically implemented a phased rollout of EMR Serverless via deployment rings, enabling systematic integration. This gradual approach let us validate improvements and halt further adoption of EMR Serverless, if needed. It served both as a safety net to catch issues early and a means to refine our infrastructure. The process mitigated change impact through smooth operations while building team expertise of our Data Engineering and DevOps teams. Additionally, it fostered tight feedback loops, allowing prompt adjustments and ensuring efficient EMR Serverless integration.

We divided our workflows into three main adoption groups, as shown in the following image:

  • Canaries This group aids in detecting and resolving any potential problems early in the deployment stage.
  • Early adopters This is the second batch of workflows that adopt the new compute solution after initial issues have been identified and rectified by the canaries group.
  • Broad deployment rings The largest group of rings, this group represents the wide-scale deployment of the solution. These are deployed after successful testing and implementation in the previous two groups.

Rings

We further broke down these workflows into granular deployment rings to adopt EMR Serverless, as shown in the following table.

Ring # Name Details
Ring 0 Canary Low adoption risk jobs that are expected to yield some cost saving benefits.
Ring 1 Early Adopters Low risk Quick-run Spark jobs that expect to yield high gains.
Ring 2 Quick-run Rest of the Quick-run (step_time <= 20 min) Spark jobs
Ring 3 LargerJobs_EZ High potential gain, easy move, medium-run and long-run Spark jobs
Ring 4 LargerJobs Rest of the medium-run and long-run Spark jobs with potential gains
Ring 5 Hive Hive jobs with potentially higher cost savings
Ring 6 Redshift_EZ Easy migration Redshift jobs that suit EMR Serverless
Ring 7 Glue_EZ Easy migration Glue jobs that suit EMR Serverless

Production adoption results summary

The encouraging benchmarking and canary adoption results generated considerable interest in wider EMR Serverless adoption at GoDaddy. To date, the EMR Serverless rollout remains underway. Thus far, it has reduced costs by 62.5% and accelerated total batch workflow completion by 50.4%.

Based on preliminary benchmarks, our team expected substantial gains for quick jobs. To our surprise, actual production deployments surpassed projections, averaging 64.4% faster vs. 42% projected, and 71.8% cheaper vs. 40% predicted.

Remarkably, long-running jobs also saw significant performance improvements due to the rapid provisioning of EMR Serverless and aggressive scaling enabled by dynamic resource allocation. We observed substantial parallelization during high-resource segments, resulting in a 40.5% faster total runtime compared to traditional approaches. The following chart illustrates the average enhancements per job category.

Prod Jobs Savings

Additionally, we observed the highest degree of dispersion for speed improvements within the long-run job category, as shown in the following box-and-whisker plot.

Whisker Plot

Sample workflows adopted EMR Serverless

For a large workflow migrated to EMR Serverless, comparing 3-week averages pre- and post-migration revealed impressive cost savings—a 75.30% decrease based on retail pricing with 10% improvement in total runtime, boosting operational efficiency. The following graph illustrates the cost trend.

Although quick-run jobs realized minimal per-dollar cost reductions, they delivered the most significant percentage cost savings. With thousands of these workflows running daily, the accumulated savings are substantial. The following graph shows the cost trend for a small workload migrated from EMR on EC2 to EMR Serverless. Comparing 3-week pre- and post-migration averages revealed a remarkable 92.43% cost savings on the retail on-demand pricing, alongside an 80.6% acceleration in total runtime.

Sample workflows adopted EMR Serverless 2

Layer 7: Platform-wide improvements

We aim to revolutionize compute operations at GoDaddy, providing simplified yet powerful solutions for all users with our Intelligent Compute Platform. With AWS compute solutions like EMR Serverless and EMR on EC2, it provided optimized runs of data processing and machine learning (ML) workloads. An ML-powered job broker intelligently determines when and how to run jobs based on various parameters, while still allowing power users to customize. Additionally, an ML-powered compute resource manager pre-provisions resources based on load and historical data, providing efficient, fast provisioning at optimum cost. Intelligent compute empowers users with out-of-the-box optimization, catering to diverse personas without compromising power users.

The following diagram shows a high-level illustration of the intelligent compute architecture.

Insights and recommended best-practices

The following section discusses the insights we’ve gathered and the recommended best practices we’ve developed during our preliminary and wider adoption stages.

Infrastructure preparation

Although EMR Serverless is a deployment method within EMR, it requires some infrastructure preparedness to optimize its potential. Consider the following requirements and practical guidance on implementation:

  • Use large subnets across multiple Availability Zones – When running EMR Serverless workloads within your VPC, make sure the subnets span across multiple Availability Zones and are not constrained by IP addresses. Refer to Configuring VPC access and Best practices for subnet planning for details.
  • Modify maximum concurrent vCPU quota For extensive compute requirements, it is recommended to increase your max concurrent vCPUs per account service quota.
  • Amazon MWAA version compatibility When adopting EMR Serverless, GoDaddy’s decentralized Amazon MWAA ecosystem for data pipeline orchestration created compatibility issues from disparate AWS Providers versions. Directly upgrading Amazon MWAA was more efficient than updating numerous DAGs. We facilitated adoption by upgrading Amazon MWAA instances ourselves, documenting issues, and sharing findings and effort estimates for accurate upgrade planning.
  • GoDaddy EMR operator To streamline migrating numerous Airflow DAGs from EMR on EC2 to EMR Serverless, we developed custom operators adapting existing interfaces. This allowed seamless transitions while retaining familiar tuning options. Data engineers could easily migrate pipelines with simple find-replace imports and immediately use EMR Serverless.

Unexpected behavior mitigation

The following are unexpected behaviors we ran into and what we did to mitigate them:

  • Spark DRA aggressive scaling For some jobs (8.33% of initial benchmarks, 13.6% of production), cost increased after migrating to EMR Serverless. This was due to Spark DRA excessively assigning new workers briefly, prioritizing performance over cost. To counteract this, we set maximum executor thresholds by adjusting spark.dynamicAllocation.maxExecutor, effectively limiting EMR Serverless scaling aggression. When migrating from EMR on EC2, we suggest observing the max core count in the Spark History UI to replicate similar compute limits in EMR Serverless, such as --conf spark.executor.cores and --conf spark.dynamicAllocation.maxExecutors.
  • Managing disk space for large-scale jobs When transitioning jobs that process large data volumes with substantial shuffles and significant disk requirements to EMR Serverless, we recommend configuring spark.emr-serverless.executor.disk by referring to existing Spark job metrics. Furthermore, configurations like spark.executor.cores combined with spark.emr-serverless.executor.disk and spark.dynamicAllocation.maxExecutors allow control over the underlying worker size and total attached storage when advantageous. For example, a shuffle-heavy job with relatively low disk usage may benefit from using a larger worker to increase the likelihood of local shuffle fetches.

Conclusion

As discussed in this post, our experiences with adopting EMR Serverless on arm64 have been overwhelmingly positive. The impressive results we’ve achieved, including a 60% reduction in cost, 50% faster runs of batch Spark workloads, and an astounding five-times improvement in development and testing speed, speak volumes about the potential of this technology. Furthermore, our current results suggest that by widely adopting Graviton2 on EMR Serverless, we could potentially reduce the carbon footprint by up to 60% for our batch processing.

However, it’s crucial to understand that these results are not a one-size-fits-all scenario. The enhancements you can expect are subject to factors including, but not limited to, the specific nature of your workflows, cluster configurations, resource utilization levels, and fluctuations in computational capacity. Therefore, we strongly advocate for a data-driven, ring-based deployment strategy when considering the integration of EMR Serverless, which can help optimize its benefits to the fullest.

Special thanks to Mukul Sharma and Boris Berlin for their contributions to benchmarking. Many thanks to Travis Muhlestein (CDO), Abhijit Kundu (VP Eng), Vincent Yung (Sr. Director Eng.), and Wai Kin Lau (Sr. Director Data Eng.) for their continued support.


About the Authors

Brandon Abear is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He enjoys all things big data. In his spare time, he enjoys traveling, watching movies, and playing rhythm games.

Dinesh Sharma is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about user experience and developer productivity, always looking for ways to optimize engineering processes and saving cost. In his spare time, he loves reading and is an avid manga fan.

John Bush is a Principal Software Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about making it easier for organizations to manage data and use it to drive their businesses forward. In his spare time, he loves hiking, camping, and riding his ebike.

Ozcan Ilikhan is the Director of Engineering for the Data and ML Platform at GoDaddy. He has over two decades of multidisciplinary leadership experience, spanning startups to global enterprises. He has a passion for leveraging data and AI in creating solutions that delight customers, empower them to achieve more, and boost operational efficiency. Outside of his professional life, he enjoys reading, hiking, gardening, volunteering, and embarking on DIY projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in big data and analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Physics on AWS: Optimizing wind turbine performance using OpenFAST in a digital twin

Post Syndicated from Marco Masciola original https://aws.amazon.com/blogs/architecture/physics-on-aws-optimizing-wind-turbine-performance-using-openfast-in-a-digital-twin/

Wind energy plays a crucial role in global decarbonization efforts by generating emission-free power from an abundant resource. In 2022, wind energy produced 2100 terawatt-hours (TWh) globally, or over 7% of global electricity, with expectations to reach 7400 TWh by 2030.

Despite its potential, several challenges must be addressed to help meet grid decarbonization targets. As wind energy adoption grows, issues like gearbox fatigue and leading-edge erosion need to be resolved to ensure a predictable supply of energy. For example, in the United States, wind turbines underperform by as much as 10% after 11 years of operation, despite expectations for the machine to operate at full potential for 25 years.

This blog reveals a digital twin architecture using the National Renewable Energy Laboratory’s (NREL) OpenFAST, an open-source multi-physics wind turbine simulation tool, to characterize operational anomalies and continuously improve wind farm performance. This approach can be used to support an overall maintenance strategy to optimize performance and profitability while reducing risk.

While a digital twin can take many forms, this architecture represents it with a physical wind turbine connected to the cloud using IoT devices to monitor performance and augmented knowledge using on-demand simulations. The insight gained from simulations can update the physical asset control system in near real-time to balance operational performance.

Why build this?

This digital twin can catch reliability assessment discrepancies by benchmarking real-world time series with simulations. Aeroelastic simulators like OpenFAST define operational envelopes as part of wind turbine design and certification in accordance with IEC 61400-1 and 61400-3. However, subtle, unanticipated changes in environmental conditions not accounted for in the initial design certification, such as higher turbulence intensity, accelerate degradation.

This architecture can be used to validate if a controller change can limit gradual performance damage before the controller changes are deployed by using the same simulation software for wind turbine design. This example scenario, one that operators currently struggle with, is threaded in the next section.

Digital twin architecture

Figure 1 illustrates the event-driven architecture in which resources launch on-demand simulations as anomalies occur.

Architecture for wind turbine digital twin solution

Figure 1. Architecture for wind turbine digital twin solution

Simulation and real-world results can feed into a calculation engine to update the wind turbine controller software and improve operational performance through this workflow:

  1. Wind turbine sensors are connected to the AWS cloud using AWS IoT Core.
  2. An IoT rule forwards sensor data to Amazon Timestream, a purpose-built time series database.
  3. A scheduled AWS Lambda function queries Timestream to detect anomalies in time-series data.
  4. Upon anomaly detection, Amazon Simple Notification Service (Amazon SNS) publishes notifications and OpenFAST simulation input files are prepared in the Lambda preprocessor.
  5. Simulations are executed on demand, where the latest OpenFAST simulation is pulled from Amazon Elastic Container Registry (Amazon ECR).
  6. Simulations are dispatched through RESTful API and run using AWS Fargate.
  7. Simulation results are uploaded to Amazon Simple Storage Service (Amazon S3).
  8. Simulation time-series data is processed using AWS Lambda, where a decision is made to update the controller software based on the anomaly.
  9. The Lambda post-processer initiates a wind turbine controller software update, which is communicated to the wind turbine through AWS IoT Core.
  10. Results are visualized in Amazon Managed Grafana.

An example of an anomaly in step 3 is a controller overspeed alarm. Simple rule-based anomaly detection can be used to detect exceedance thresholds. You can also incorporate more sophisticated forms of anomaly detection using machine learning through Amazon SageMaker. The workflow above preserves four elements to create a digital twin. We will explore these four elements in the next section:

Event-driven architecture

Event-driven architectures enable decoupled systems and asynchronous communications between services. An event-driven workflow is initiated automatically as events occur. An event might be an active alarm or an OpenFAST output file uploaded to Amazon S3. This means that the number of actively monitored wind turbines can scale from one to 100 (or more) without allocating new resources.

AWS Lambda provides instant scaling to increase the number of OpenFAST simulations available for processing. Additionally, Fargate removes the need to provision or manage the underlying OpenFAST compute instances. Leveraging serverless compute services removes the need to manage underlying infrastructure, provides demand-based scaling, and reduces costs compared to statically provisioned infrastructure.

In practice, event-driven architecture provides teams with flexibility to automatically prepare input files, dispatch simulations, and post-process results without manually provisioning resources.

Containerization

Containerization is a process to deploy an application with libraries needed for execution. Docker creates a container image that bundles the OpenFAST executable. FastAPI is also included in the OpenFAST container so that simulations can be dispatched through a web RESTful API, as demonstrated in Figure 2. Note that OpenFAST and FastAPI are independent projects. The RESTful API for OpenFAST is provisioned with commands to:

  • Run the simulation with initial conditions (PUT: /execute)
  • Upload simulation results to Amazon S3 (POST: /upload_to_s3)
  • Provide simulation status (GET: /status)
  • Delete simulation results (DELETE: /simulation)

This setup enables engineering teams to pull an OpenFAST simulation version aligned with physical wind turbines in operation without manual configuration.

Web frontend showing the RESTfulAPI commands available for dispatching OpenFAST simulations

Figure 2. Web frontend showing the RESTfulAPI commands available for dispatching OpenFAST simulations

Load balancing and auto scaling

The architecture uses Amazon EC2 Auto Scaling and an ALB to manage fluctuating processing demands and enable concurrent OpenFAST simulations. EC2 Auto Scaling dynamically scales the number of OpenFAST containers based on the volume of simulation requests and offers cost savings to avoid idle resources. Paired with an ALB, this setup evenly distributes simulation requests across OpenFAST containers, ensuring desired performance levels and high availability.

Data visualization

Amazon Timestream collects and archives real-time metrics from physical wind turbines. Timestream can store any metric from the physical asset collected through IoT Core, including rotor speed, generator power, generator speed, generator torque, or wind turbine control system alarms, as shown in Figure 3. One distinctive Timestream feature is scheduled queries that can regularly perform automated tasks like measuring 10-minute average wind speeds or tracking down units with controller alarms.

This provides operations teams the ability to view granular insights in real time or query historical data using SQL. Amazon Managed Grafana is also connected to OpenFAST results stored in Amazon S3 to compare simulation results with real-world operational data and view the response of simulated components. Engineering teams benefit from Amazon Managed Grafana because it provides a window into how the simulation responds to controller changes. Engineers can then verify whether the physical machine responds in the expected manner.

Example Amazon Managed Grafana dashboard

Figure 3. Example Amazon Managed Grafana dashboard

Conclusion

The AWS Cloud offers services and infrastructure to provide organization resources to process data and build digital twins. Organizations can leverage open-source models to improve operational performance and physics-based simulations to improve accuracy. By integrating technology paradigms such as event-drive architectures, wind turbine operators can make data-driven decisions in real time. Organizations can create virtual replicas of physical wind turbines to diagnose the source of alarms and adopt strategies to limit excessive wear before permanent damage occurs.

Real-time cost savings for Amazon Managed Service for Apache Flink

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/real-time-cost-savings-for-amazon-managed-service-for-apache-flink/

When running Apache Flink applications on Amazon Managed Service for Apache Flink, you have the unique benefit of taking advantage of its serverless nature. This means that cost-optimization exercises can happen at any time—they no longer need to happen in the planning phase. With Managed Service for Apache Flink, you can add and remove compute with the click of a button.

Apache Flink is an open source stream processing framework used by hundreds of companies in critical business applications, and by thousands of developers who have stream-processing needs for their workloads. It is highly available and scalable, offering high throughput and low latency for the most demanding stream-processing applications. These scalable properties of Apache Flink can be key to optimizing your cost in the cloud.

Managed Service for Apache Flink is a fully managed service that reduces the complexity of building and managing Apache Flink applications. Managed Service for Apache Flink manages the underlying infrastructure and Apache Flink components that provide durable application state, metrics, logs, and more.

In this post, you can learn about the Managed Service for Apache Flink cost model, areas to save on cost in your Apache Flink applications, and overall gain a better understanding of your data processing pipelines. We dive deep into understanding your costs, understanding whether your application is overprovisioned, how to think about scaling automatically, and ways to optimize your Apache Flink applications to save on cost. Lastly, we ask important questions about your workload to determine if Apache Flink is the right technology for your use case.

How costs are calculated on Managed Service for Apache Flink

To optimize for costs with regards to your Managed Service for Apache Flink application, it can help to have a good idea of what goes into the pricing for the managed service.

Managed Service for Apache Flink applications are comprised of Kinesis Processing Units (KPUs), which are compute instances composed of 1 virtual CPU and 4 GB of memory. The total number of KPUs assigned to the application is determined by multiplying two parameters that you control directly:

  • Parallelism – The level of parallel processing in the Apache Flink application
  • Parallelism per KPU – The number of resources dedicated to each parallelism

The number of KPUs is determined by the simple formula: KPU = Parallelism / ParallelismPerKPU, rounded up to the next integer.

An additional KPU per application is also charged for orchestration and not directly used for data processing.

The total number of KPUs determines the number of resources, CPU, memory, and application storage allocated to the application. For each KPU, the application receives 1 vCPU and 4 GB of memory, of which 3 GB are allocated by default to the running application and the remaining 1 GB is used for application state store management. Each KPU also comes with 50 GB of storage attached to the application. Apache Flink retains application state in-memory to a configurable limit, and spillover to the attached storage.

The third cost component is durable application backups, or snapshots. This is entirely optional and its impact on the overall cost is small, unless you retain a very large number of snapshots.

At the time of writing, each KPU in the US East (Ohio) AWS Region costs $0.11 per hour, and attached application storage costs $0.10 per GB per month. The cost of durable application backup (snapshots) is $0.023 per GB per month. Refer to Amazon Managed Service for Apache Flink Pricing for up-to-date pricing and different Regions.

The following diagram illustrates the relative proportions of cost components for a running application on Managed Service for Apache Flink. You control the number of KPUs via the parallelism and parallelism per KPU parameters. Durable application backup storage is not represented.

pricing model

In the following sections, we examine how to monitor your costs, optimize the usage of application resources, and find the required number of KPUs to handle your throughput profile.

AWS Cost Explorer and understanding your bill

To see what your current Managed Service for Apache Flink spend is, you can use AWS Cost Explorer.

On the Cost Explorer console, you can filter by date range, usage type, and service to isolate your spend for Managed Service for Apache Flink applications. The following screenshot shows the past 12 months of cost broken down into the price categories described in the previous section. The majority of spend in many of these months was from interactive KPUs from Amazon Managed Service for Apache Flink Studio.

Analyse the cost of your Apache Flink application with AWS Cost Explorer

Using Cost Explorer can not only help you understand your bill, but help further optimize particular applications that may have scaled beyond expectations automatically or due to throughput requirements. With proper application tagging, you could also break this spend down by application to see which applications account for the cost.

Signs of overprovisioning or inefficient use of resources

To minimize costs associated with Managed Service for Apache Flink applications, a straightforward approach involves reducing the number of KPUs your applications use. However, it’s crucial to recognize that this reduction could adversely affect performance if not thoroughly assessed and tested. To quickly gauge whether your applications might be overprovisioned, examine key indicators such as CPU and memory usage, application functionality, and data distribution. However, although these indicators can suggest potential overprovisioning, it’s essential to conduct performance testing and validate your scaling patterns before making any adjustments to the number of KPUs.

Metrics

Analyzing metrics for your application on Amazon CloudWatch can reveal clear signals of overprovisioning. If the containerCPUUtilization and containerMemoryUtilization metrics consistently remain below 20% over a statistically significant period for your application’s traffic patterns, it might be viable to scale down and allocate more data to fewer machines. Generally, we consider applications appropriately sized when containerCPUUtilization hovers between 50–75%. Although containerMemoryUtilization can fluctuate throughout the day and be influenced by code optimization, a consistently low value for a substantial duration could indicate potential overprovisioning.

Parallelism per KPU underutilized

Another subtle sign that your application is overprovisioned is if your application is purely I/O bound, or only does simple call-outs to databases and non-CPU intensive operations. If this is the case, you can use the parallelism per KPU parameter within Managed Service for Apache Flink to load more tasks onto a single processing unit.

You can view the parallelism per KPU parameter as a measure of density of workload per unit of compute and memory resources (the KPU). Increasing parallelism per KPU above the default value of 1 makes the processing more dense, allocating more parallel processes on a single KPU.

The following diagram illustrates how, by keeping the application parallelism constant (for example, 4) and increasing parallelism per KPU (for example, from 1 to 2), your application uses fewer resources with the same level of parallel runs.

How KPUs are calculated

The decision of increasing parallelism per KPU, like all recommendations in this post, should be taken with great care. Increasing the parallelism per KPU value can put more load on a single KPU, and it must be willing to tolerate that load. I/O-bound operations will not increase CPU or memory utilization in any meaningful way, but a process function that calculates many complex operations against the data would not be an ideal operation to collate onto a single KPU, because it could overwhelm the resources. Performance test and evaluate if this is a good option for your applications.

How to approach sizing

Before you stand up a Managed Service for Apache Flink application, it can be difficult to estimate the number of KPUs you should allocate for your application. In general, you should have a good sense of your traffic patterns before estimating. Understanding your traffic patterns on a megabyte-per-second ingestion rate basis can help you approximate a starting point.

As a general rule, you can start with one KPU per 1 MB/s that your application will process. For example, if your application processes 10 MB/s (on average), you would allocate 10 KPUs as a starting point for your application. Keep in mind that this is a very high-level approximation that we have seen effective for a general estimate. However, you also need to performance test and evaluate whether or not this is an appropriate sizing in the long term based on metrics (CPU, memory, latency, overall job performance) over a long period of time.

To find the appropriate sizing for your application, you need to scale up and down the Apache Flink application. As mentioned, in Managed Service for Apache Flink you have two separate controls: parallelism and parallelism per KPU. Together, these parameters determine the level of parallel processing within the application and the overall compute, memory, and storage resources available.

The recommended testing methodology is to change parallelism or parallelism per KPU separately, while experimenting to find the right sizing. In general, only change parallelism per KPU to increase the number of parallel I/O-bound operations, without increasing the overall resources. For all other cases, only change parallelism—KPU will change consequentially—to find the right sizing for your workload.

You can also set parallelism at the operator level to restrict sources, sinks, or any other operator that might need to be restricted and independent of scaling mechanisms. You could use this for an Apache Flink application that reads from an Apache Kafka topic that has 10 partitions. With the setParallelism() method, you could restrict the KafkaSource to 10, but scale the Managed Service for Apache Flink application to a parallelism higher than 10 without creating idle tasks for the Kafka source. It is recommended for other data processing cases to not statically set operator parallelism to a static value, but rather a function of the application parallelism so that it scales when the overall application scales.

Scaling and auto scaling

In Managed Service for Apache Flink, modifying parallelism or parallelism per KPU is an update of the application configuration. It causes the application to automatically take a snapshot (unless disabled), stop the application, and restart it with the new sizing, restoring the state from the snapshot. Scaling operations don’t cause data loss or inconsistencies, but it does pause data processing for a short period of time while infrastructure is added or removed. This is something you need to consider when rescaling in a production environment.

During the testing and optimization process, we recommend disabling automatic scaling and modifying parallelism and parallelism per KPU to find the optimal values. As mentioned, manual scaling is just an update of the application configuration, and can be run via the AWS Management Console or API with the UpdateApplication action.

When you have found the optimal sizing, if you expect your ingested throughput to vary considerably, you may decide to enable auto scaling.

In Managed Service for Apache Flink, you can use multiple types of automatic scaling:

  • Out-of-the-box automatic scaling – You can enable this to adjust the application parallelism automatically based on the containerCPUUtilization metric. Automatic scaling is enabled by default on new applications. For details about the automatic scaling algorithm, refer to Automatic Scaling.
  • Fine-grained, metric-based automatic scaling – This is straightforward to implement. The automation can be based on virtually any metrics, including custom metrics your application exposes.
  • Scheduled scaling – This may be useful if you expect peaks of workload at given times of the day or days of the week.

Out-of-the-box automatic scaling and fine-grained metric-based scaling are mutually exclusive. For more details about fine-grained metric-based auto scaling and scheduled scaling, and a fully working code example, refer to Enable metric-based and scheduled scaling for Amazon Managed Service for Apache Flink.

Code optimizations

Another way to approach cost savings for your Managed Service for Apache Flink applications is through code optimization. Un-optimized code will require more machines to perform the same computations. Optimizing the code could allow for lower overall resource utilization, which in turn could allow for scaling down and cost savings accordingly.

The first step to understanding your code performance is through the built-in utility within Apache Flink called Flame Graphs.

Flame graph

Flame Graphs, which are accessible via the Apache Flink dashboard, give you a visual representation of your stack trace. Each time a method is called, the bar that represents that method call in the stack trace gets larger proportional to the total sample count. This means that if you have an inefficient piece of code with a very long bar in the flame graph, this could be cause for investigation as to how to make this code more efficient. Additionally, you can use Amazon CodeGuru Profiler to monitor and optimize your Apache Flink applications running on Managed Service for Apache Flink.

When designing your applications, it is recommended to use the highest-level API that is required for a particular operation at a given time. Apache Flink offers four levels of API support: Flink SQL, Table API, Datastream API, and ProcessFunction APIs, with increasing levels of complexity and responsibility. If your application can be written entirely in the Flink SQL or Table API, using this can help take advantage of the Apache Flink framework rather than managing state and computations manually.

Data skew

On the Apache Flink dashboard, you can gather other useful information about your Managed Service for Apache Flink jobs.

Open the Flink Dashboard

On the dashboard, you can inspect individual tasks within your job application graph. Each blue box represents a task, and each task is composed of subtasks, or distributed units of work for that task. You can identify data skew among subtasks this way.

Flink dashboard

Data skew is an indicator that more data is being sent to one subtask than another, and that a subtask receiving more data is doing more work than the other. If you have such symptoms of data skew, you can work to eliminate it by identifying the source. For example, a GroupBy or KeyedStream could have a skew in the key. This would mean that data is not evenly spread among keys, resulting in an uneven distribution of work across Apache Flink compute instances. Imagine a scenario where you are grouping by userId, but your application receives data from one user significantly more than the rest. This can result in data skew. To eliminate this, you can choose a different grouping key to evenly distribute the data across subtasks. Keep in mind that this will require code modification to choose a different key.

When the data skew is eliminated, you can return to the containerCPUUtilization and containerMemoryUtilization metrics to reduce the number of KPUs.

Other areas for code optimization include making sure that you’re accessing external systems via the Async I/O API or via a data stream join, because a synchronous query out to a data store can create slowdowns and issues in checkpointing. Additionally, refer to Troubleshooting Performance for issues you might experience with slow checkpoints or logging, which can cause application backpressure.

How to determine if Apache Flink is the right technology

If your application doesn’t use any of the powerful capabilities behind the Apache Flink framework and Managed Service for Apache Flink, you could potentially save on cost by using something simpler.

Apache Flink’s tagline is “Stateful Computations over Data Streams.” Stateful, in this context, means that you are using the Apache Flink state construct. State, in Apache Flink, allows you to remember messages you have seen in the past for longer periods of time, making things like streaming joins, deduplication, exactly-once processing, windowing, and late-data handling possible. It does so by using an in-memory state store. On Managed Service for Apache Flink, it uses RocksDB to maintain its state.

If your application doesn’t involve stateful operations, you may consider alternatives such as AWS Lambda, containerized applications, or an Amazon Elastic Compute Cloud (Amazon EC2) instance running your application. The complexity of Apache Flink may not be necessary in such cases. Stateful computations, including cached data or enrichment procedures requiring independent stream position memory, may warrant Apache Flink’s stateful capabilities. If there’s a potential for your application to become stateful in the future, whether through prolonged data retention or other stateful requirements, continuing to use Apache Flink could be more straightforward. Organizations emphasizing Apache Flink for stream processing capabilities may prefer to stick with Apache Flink for stateful and stateless applications so all their applications process data in the same way. You should also factor in its orchestration features like exactly-once processing, fan-out capabilities, and distributed computation before transitioning from Apache Flink to alternatives.

Another consideration is your latency requirements. Because Apache Flink excels at real-time data processing, using it for an application with a 6-hour or 1-day latency requirement does not make sense. The cost savings by switching to a temporal batch process out of Amazon Simple Storage Service (Amazon S3), for example, would be significant.

Conclusion

In this post, we covered some aspects to consider when attempting cost-savings measures for Managed Service for Apache Flink. We discussed how to identify your overall spend on the managed service, some useful metrics to monitor when scaling down your KPUs, how to optimize your code for scaling down, and how to determine if Apache Flink is right for your use case.

Implementing these cost-saving strategies not only enhances your cost efficiency but also provides a streamlined and well-optimized Apache Flink deployment. By staying mindful of your overall spend, using key metrics, and making informed decisions about scaling down resources, you can achieve a cost-effective operation without compromising performance. As you navigate the landscape of Apache Flink, constantly evaluating whether it aligns with your specific use case becomes pivotal, so you can achieve a tailored and efficient solution for your data processing needs.

If any of the recommendations discussed in this post resonate with your workloads, we encourage you to try them out. With the metrics specified, and the tips on how to understand your workloads better, you should now have what you need to efficiently optimize your Apache Flink workloads on Managed Service for Apache Flink. The following are some helpful resources you can use to supplement this post:


About the Authors

Jeremy BerJeremy Ber has been working in the telemetry data space for the past 10 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. At AWS, he is a Streaming Specialist Solutions Architect, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink.

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/best-practices-to-implement-near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-msk/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, straightforward, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the most widely used cloud data warehouse. You can run and scale analytics in seconds on all your data, without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use Structured Query Language (SQL) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss the best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK.

Overview of solution

We walk through an example pipeline to ingest data from an MSK topic into Amazon Redshift using Amazon Redshift streaming ingestion. We also show how to unnest JSON data using dot notation in Amazon Redshift. The following diagram illustrates our solution architecture.

The process flow consists of the following steps:

  1. Create a streaming materialized view in your Redshift cluster to consume live streaming data from the MSK topics.
  2. Use a stored procedure to implement change data capture (CDC) using the unique combination of Kafka Partition and Kafka Offset at the record level for the ingested MSK topic.
  3. Create a user-facing table in the Redshift cluster and use dot notation to unnest the JSON document from the streaming materialized view into data columns of the table. You can continuously load fresh data by calling the stored procedure at regular intervals.
  4. Establish connectivity between an Amazon QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

As part of this post, we also discuss the following topics:

  • Steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift
  • Best practices to achieve optimized performance from streaming materialized views
  • Monitoring techniques to track failures in Amazon Redshift streaming ingestion

Prerequisites

You must have the following:

Considerations while setting up your MSK topic

Keep in mind the following considerations when configuring your MSK topic:

  • Make sure that the name of your MSK topic is no longer than 128 characters.
  • As of this writing, MSK records containing compressed data can’t be directly queried in Amazon Redshift. Amazon Redshift doesn’t support any native decompression methods for client-side compressed data in an MSK topic.
  • Follow best practices while setting up your MSK cluster.
  • Review the streaming ingestion limitations for any other considerations.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to the Setting up IAM and performing streaming ingestion from Kafka.
  2. Make sure that data is flowing into your MSK topic using Amazon CloudWatch metrics (for example, BytesOutPerSec).
  3. Launch the query editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Redshift cluster for the next steps. The following steps were run in query editor v2.
  4. Create an external schema to map to the MSK cluster. Replace your IAM role ARN and the MSK cluster ARN in the following statement:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Optionally, if your topic names are case sensitive, you need to enable enable_case_sensitive_identifier to be able to access them in Amazon Redshift. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Create a materialized view to consume the stream data from the MSK topic:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

The metadata column kafka_value that arrives from Amazon MSK is stored in VARBYTE format in Amazon Redshift. For this post, you use the JSON_PARSE function to convert kafka_value to a SUPER data type. You also use the CAN_JSON_PARSE function in the filter condition to skip invalid JSON records and guard against errors due to JSON parsing failures. We discuss how to store the invalid data for future debugging later in this post.

  1. Refresh the streaming materialized view, which triggers Amazon Redshift to read from the MSK topic and load data into the materialized view:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the MSK topic to the Data column of SUPER type in the streaming materialized view Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Use dot notation as shown in the following code to unnest your JSON payload:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

The following screenshot shows what the result looks like after unnesting.

If you have arrays in your JSON document, consider unnesting your data using PartiQL statements in Amazon Redshift. For more information, refer to the section Unnest the JSON document in the post Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB.

Incremental data load strategy

Complete the following steps to implement an incremental data load:

  1. Create a table called Orders in Amazon Redshift, which end-users will use for visualization and business analysis:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Next, you create a stored procedure called SP_Orders_Load to implement CDC from a streaming materialized view and load into the final Orders table. You use the combination of Kafka_Partition and Kafka_Offset available in the streaming materialized view as system columns to implement CDC. The combination of these two columns will always be unique within an MSK topic, which makes sure that none of the records are missed during the process. The stored procedure contains the following components:

  • To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level.
  • Refresh the streaming materialized view manually if auto refresh is not enabled.
  • Create an audit table called Orders_Streaming_Audit if it doesn’t exist to keep track of the last offset for a partition that was loaded into Orders table during the last run of the stored procedure.
  • Unnest and insert only new or changed data into a staging table called Orders_Staging_Table, reading from the streaming materialized view Orders_Stream_MV, where Kafka_Offset is greater than the last processed Kafka_Offset recorded in the audit table Orders_Streaming_Audit for the Kafka_Partition being processed.
  • When loading for the first time using this stored procedure, there will be no data in the Orders_Streaming_Audit table and all the data from Orders_Stream_MV will get loaded into the Orders table.
  • Insert only business-relevant columns to the user-facing Orders table, selecting from the staging table Orders_Staging_Table.
  • Insert the max Kafka_Offset for every loaded Kafka_Partition into the audit table Orders_Streaming_Audit

We have added the intermediate staging table Orders_Staging_Table in this solution to help with the debugging in case of unexpected failures and trackability. Skipping the staging step and directly loading into the final table from Orders_Stream_MV can provide lower latency depending on your use case.

  1. Create the stored procedure with the following code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Run the stored procedure to load data into the Orders table:
    call SP_Orders_Load();

  3. Validate data in the Orders table.

Establish cross-account streaming ingestion

If your MSK cluster belongs to a different account, complete the following steps to create IAM roles to set up cross-account streaming ingestion. Let’s assume the Redshift cluster is in account A and the MSK cluster is in account B, as shown in the following diagram.

Complete the following steps:

  1. In account B, create an IAM role called MyRedshiftMSKRole that allows Amazon Redshift (account A) to communicate with the MSK cluster (account B) named MyTestCluster. Depending on whether your MSK cluster uses IAM authentication or unauthenticated access to connect, you need to create an IAM role with one of the following policies:
    • An IAM policAmazonAmazon MSK using unauthenticated access:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • An IAM policy for Amazon MSK when using IAM authentication:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

The resource section in the preceding example gives access to all topics in the MyTestCluster MSK cluster. If you need to restrict the IAM role to specific topics, you need to replace the topic resource with a more restrictive resource policy.

  1. After you create the IAM role in account B, take note of the IAM role ARN (for example, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. In account A, create a Redshift customizable IAM role called MyRedshiftRole, that Amazon Redshift will assume when connecting to Amazon MSK. The role should have a policy like the following, which allows the Amazon Redshift IAM Role in account A to assume the Amazon MSK role in account B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Take note of the role ARN for the Amazon Redshift IAM role (for example, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Go back to account B and add this role in the trust policy of the IAM role arn:aws:iam::0123456789:role/MyRedshiftMSKRole to allow account B to trust the IAM role from account A. The trust policy should look like the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Sign in to the Amazon Redshift console as account A.
  6. Launch the query editor v2 or your preferred SQL client and run the following statements to access the MSK topic in account B. To map to the MSK cluster, create an external schema using role chaining by specifying IAM role ARNs, separated by a comma without any spaces around it. The role attached to the Redshift cluster comes first in the chain.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Performance considerations

Keep in mind the following performance considerations:

  • Keep the streaming materialized view simple and move transformations like unnesting, aggregation, and case expressions to a later step—for example, by creating another materialized view on top of the streaming materialized view.
  • Consider creating only one streaming materialized view in a single Redshift cluster or workgroup for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic. Live streaming data in a streaming materialized view can be shared across multiple Redshift clusters or Redshift Serverless workgroups using data sharing.
  • While defining your streaming materialized view, avoid using Json_Extract_Path_Text to pre-shred data, because Json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. It is preferable to land the data as is from the stream and then shred it later.
  • Where possible, consider skipping the sort key in the streaming materialized view to accelerate the ingestion speed. When a streaming materialized view has a sort key, a sort operation will occur with every batch of ingested data from the stream. Sorting has a performance overheard depending on the sort key data type, number of sort key columns, and amount of data ingested in each batch. This sorting step can increase the latency before the streaming data is available to query. You should weigh which is more important: latency on ingestion or latency on querying the data.
  • For optimized performance of the streaming materialized view and to reduce storage usage, occasionally purge data from the materialized view using delete, truncate, or alter table append.
  • If you need to ingest multiple MSK topics in parallel into Amazon Redshift, start with a smaller number of streaming materialized views and keep adding more materialized views to evaluate the overall ingestion performance within a cluster or workgroup.
  • Increasing the number of nodes in a Redshift provisioned cluster or the base RPU of a Redshift Serverless workgroup can help boost the ingestion performance of a streaming materialized view. For optimal performance, you should aim to have as many slices in your Redshift provisioned cluster as there are partitions in your MSK topic, or 8 RPU for every four partitions in your MSK topic.

Monitoring techniques

Records in the topic that exceed the size of the target materialized view column at the time of ingestion will be skipped. Records that are skipped by the materialized view refresh will be logged in the SYS_STREAM_SCAN_ERRORS system table.

Errors that occur when processing a record due to a calculation or a data type conversion or some other logic in the materialized view definition will result in the materialized view refresh failure until the offending record has expired from the topic. To avoid these types of issues, test the logic of your materialized view definition carefully; otherwise, land the records into the default VARBYTE column and process them later.

The following are available monitoring views:

  • SYS_MV_REFRESH_HISTORY – Use this view to gather information about the refresh history of your streaming materialized views. The results include the refresh type, such as manual or auto, and the status of the most recent refresh. The following query shows the refresh history for a streaming materialized view:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Use this view to check the reason why a record failed to load via streaming ingestion from an MSK topic. As of writing this post, when ingesting from Amazon MSK, this view only logs errors when the record is larger than the materialized view column size. This view will also show the unique identifier (offset) of the MSK record in the position column. The following query shows the error code and error reason when a record exceeded the maximum size limit:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Use this view to monitor the number of records scanned at a given record_time. This view also tracks the offset of the last record read in the batch. The following query shows topic data for a specific materialized view:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Use this view to check the overall metrics for a streaming materialized view refresh. This will also log errors in the error_message column for errors that don’t show up in SYS_STREAM_SCAN_ERRORS. The following query shows the error causing the refresh failure of a streaming materialized view:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Additional considerations for implementation

You have the choice to optionally generate a materialized view on top of a streaming materialized view, allowing you to unnest and precompute results for end-users. This approach eliminates the need to store the results in a final table using a stored procedure.

In this post, you use the CAN_JSON_PARSE function to guard against any errors to more successfully ingest data—in this case, the streaming records that can’t be parsed are skipped by Amazon Redshift. However, if you want to keep track of your error records, consider storing them in a column using the following SQL when creating the streaming materialized view:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

You can also consider unloading data from the view SYS_STREAM_SCAN_ERRORS into an Amazon Simple Storage Service (Amazon S3) bucket and get alerts by sending a report via email using Amazon Simple Notification Service (Amazon SNS) notifications whenever a new S3 object is created.

Lastly, based on your data freshness requirement, you can use Amazon EventBridge to schedule the jobs in your data warehouse to call the aforementioned SP_Orders_Load stored procedure on a regular basis. EventBridge does this at fixed intervals, and you may need to have a mechanism (for example, an AWS Step Functions state machine) to monitor if the previous call to the procedure completed. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. You can also refer to Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API. Another option is to use Amazon Redshift query editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

Conclusion

In this post, we discussed best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK. We showed you an example pipeline to ingest data from an MSK topic into Amazon Redshift using streaming ingestion. We also showed a reliable strategy to perform incremental streaming data load into Amazon Redshift using Kafka Partition and Kafka Offset. Additionally, we demonstrated the steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift and discussed performance considerations for optimized ingestion rate. Lastly, we discussed monitoring techniques to track failures in Amazon Redshift streaming ingestion.

If you have any questions, leave them in the comments section.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.

Petabyte-scale log analytics with Amazon S3, Amazon OpenSearch Service, and Amazon OpenSearch Ingestion

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/petabyte-scale-log-analytics-with-amazon-s3-amazon-opensearch-service-and-amazon-opensearch-ingestion/

Organizations often need to manage a high volume of data that is growing at an extraordinary rate. At the same time, they need to optimize operational costs to unlock the value of this data for timely insights and do so with a consistent performance.

With this massive data growth, data proliferation across your data stores, data warehouse, and data lakes can become equally challenging. With a modern data architecture on AWS, you can rapidly build scalable data lakes; use a broad and deep collection of purpose-built data services; ensure compliance via unified data access, security, and governance; scale your systems at a low cost without compromising performance; and share data across organizational boundaries with ease, allowing you to make decisions with speed and agility at scale.

You can take all your data from various silos, aggregate that data in your data lake, and perform analytics and machine learning (ML) directly on top of that data. You can also store other data in purpose-built data stores to analyze and get fast insights from both structured and unstructured data. This data movement can be inside-out, outside-in, around the perimeter or sharing across.

For example, application logs and traces from web applications can be collected directly in a data lake, and a portion of that data can be moved out to a log analytics store like Amazon OpenSearch Service for daily analysis. We think of this concept as inside-out data movement. The analyzed and aggregated data stored in Amazon OpenSearch Service can again be moved to the data lake to run ML algorithms for downstream consumption from applications. We refer to this concept as outside-in data movement.

Let’s look at an example use case. Example Corp. is a leading Fortune 500 company that specializes in social content. They have hundreds of applications generating data and traces at approximately 500 TB per day and have the following criteria:

  • Have logs available for fast analytics for 2 days
  • Beyond 2 days, have data available in a storage tier that can be made available for analytics with a reasonable SLA
  • Retain the data beyond 1 week in cold storage for 30 days (for purposes of compliance, auditing, and others)

In the following sections, we discuss three possible solutions to address similar use cases:

  • Tiered storage in Amazon OpenSearch Service and data lifecycle management
  • On-demand ingestion of logs using Amazon OpenSearch Ingestion
  • Amazon OpenSearch Service direct queries with Amazon Simple Storage Service (Amazon S3)

Solution 1: Tiered storage in OpenSearch Service and data lifecycle management

OpenSearch Service supports three integrated storage tiers: hot, UltraWarm, and cold storage. Based on your data retention, query latency, and budgeting requirements, you can choose the best strategy to balance cost and performance. You can also migrate data between different storage tiers.

Hot storage is used for indexing and updating, and provides the fastest access to data. Hot storage takes the form of an instance store or Amazon Elastic Block Store (Amazon EBS) volumes attached to each node.

UltraWarm offers significantly lower costs per GiB for read-only data that you query less frequently and doesn’t need the same performance as hot storage. UltraWarm nodes use Amazon S3 with related caching solutions to improve performance.

Cold storage is optimized to store infrequently accessed or historical data. When you use cold storage, you detach your indexes from the UltraWarm tier, making them inaccessible. You can reattach these indexes in a few seconds when you need to query that data.

For more details on data tiers within OpenSearch Service, refer to Choose the right storage tier for your needs in Amazon OpenSearch Service.

Solution overview

The workflow for this solution consists of the following steps:

  1. Incoming data generated by the applications is streamed to an S3 data lake.
  2. Data is ingested into Amazon OpenSearch using S3-SQS near-real-time ingestion through notifications set up on the S3 buckets.
  3. After 2 days, hot data is migrated to UltraWarm storage to support read queries.
  4. After 5 days in UltraWarm, the data is migrated to cold storage for 21 days and detached from any compute. The data can be reattached to UltraWarm when needed. Data is deleted from cold storage after 21 days.
  5. Daily indexes are maintained for easy rollover. An Index State Management (ISM) policy automates the rollover or deletion of indexes that are older than 2 days.

The following is a sample ISM policy that rolls over data into the UltraWarm tier after 2 days, moves it to cold storage after 5 days, and deletes it from cold storage after 21 days:

{
    "policy": {
        "description": "hot warm delete workflow",
        "default_state": "hot",
        "schema_version": 1,
        "states": [
            {
                "name": "hot",
                "actions": [
                    {
                        "rollover": {
                            "min_index_age": "2d",
                            "min_primary_shard_size": "30gb"
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "warm"
                    }
                ]
            },
            {
                "name": "warm",
                "actions": [
                    {
                        "replica_count": {
                            "number_of_replicas": 5
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "cold",
                        "conditions": {
                            "min_index_age": "5d"
                        }
                    }
                ]
            },
            {
                "name": "cold",
                "actions": [
                    {
                        "retry": {
                            "count": 5,
                            "backoff": "exponential",
                            "delay": "1h"
                        },
                        "cold_migration": {
                            "start_time": null,
                            "end_time": null,
                            "timestamp_field": "@timestamp",
                            "ignore": "none"
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "delete",
                        "conditions": {
                            "min_index_age": "21d"
                        }
                    }
                ]
            },
            {
                "name": "delete",
                "actions": [
                    {
                        "retry": {
                            "count": 3,
                            "backoff": "exponential",
                            "delay": "1m"
                        },
                        "cold_delete": {}
                    }
                ],
                "transitions": []
            }
        ],
        "ism_template": {
            "index_patterns": [
                "log*"
            ],
            "priority": 100
        }
    }
}

Considerations

UltraWarm uses sophisticated caching techniques to enable querying for infrequently accessed data. Although the data access is infrequent, the compute for UltraWarm nodes needs to be running all the time to make this access possible.

When operating at PB scale, to reduce the area of effect of any errors, we recommend decomposing the implementation into multiple OpenSearch Service domains when using tiered storage.

The next two patterns remove the need to have long-running compute and describe on-demand techniques where the data is either brought when needed or queried directly where it resides.

Solution 2: On-demand ingestion of logs data through OpenSearch Ingestion

OpenSearch Ingestion is a fully managed data collector that delivers real-time log and trace data to OpenSearch Service domains. OpenSearch Ingestion is powered by the open source data collector Data Prepper. Data Prepper is part of the open source OpenSearch project.

With OpenSearch Ingestion, you can filter, enrich, transform, and deliver your data for downstream analysis and visualization. You configure your data producers to send data to OpenSearch Ingestion. It automatically delivers the data to the domain or collection that you specify. You can also configure OpenSearch Ingestion to transform your data before delivering it. OpenSearch Ingestion is serverless, so you don’t need to worry about scaling your infrastructure, operating your ingestion fleet, and patching or updating the software.

There are two ways that you can use Amazon S3 as a source to process data with OpenSearch Ingestion. The first option is S3-SQS processing. You can use S3-SQS processing when you require near-real-time scanning of files after they are written to S3. It requires an Amazon Simple Queue Service (Amazon S3) queue that receives S3 Event Notifications. You can configure S3 buckets to raise an event any time an object is stored or modified within the bucket to be processed.

Alternatively, you can use a one-time or recurring scheduled scan to batch process data in an S3 bucket. To set up a scheduled scan, configure your pipeline with a schedule at the scan level that applies to all your S3 buckets, or at the bucket level. You can configure scheduled scans with either a one-time scan or a recurring scan for batch processing.

For a comprehensive overview of OpenSearch Ingestion, see Amazon OpenSearch Ingestion. For more information about the Data Prepper open source project, visit Data Prepper.

Solution overview

We present an architecture pattern with the following key components:

  • Application logs are streamed into to the data lake, which helps feed hot data into OpenSearch Service in near-real time using OpenSearch Ingestion S3-SQS processing.
  • ISM policies within OpenSearch Service handle index rollovers or deletions. ISM policies let you automate these periodic, administrative operations by triggering them based on changes in the index age, index size, or number of documents. For example, you can define a policy that moves your index into a read-only state after 2 days and then deletes it after a set period of 3 days.
  • Cold data is available in the S3 data lake to be consumed on demand into OpenSearch Service using OpenSearch Ingestion scheduled scans.

The following diagram illustrates the solution architecture.

The workflow includes the following steps:

  1. Incoming data generated by the applications is streamed to the S3 data lake.
  2. For the current day, data is ingested into OpenSearch Service using S3-SQS near-real-time ingestion through notifications set up in the S3 buckets.
  3. Daily indexes are maintained for easy rollover. An ISM policy automates the rollover or deletion of indexes that are older than 2 days.
  4. If a request is made for analysis of data beyond 2 days and the data is not in the UltraWarm tier, data will be ingested using the one-time scan feature of Amazon S3 between the specific time window.

For example, if the present day is January 10, 2024, and you need data from January 6, 2024 at a specific interval for analysis, you can create an OpenSearch Ingestion pipeline with an Amazon S3 scan in your YAML configuration, with the start_time and end_time to specify when you want the objects in the bucket to be scanned:

version: "2"
ondemand-ingest-pipeline:
  source:
    s3:
      codec:
        newline:
      compression: "gzip"
      scan:
        start_time: 2023-12-28T01:00:00
        end_time: 2023-12-31T09:00:00
        buckets:
          - bucket:
              name: <bucket-name>
      aws:
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::<acct num>:role/PipelineRole"
    
    acknowledgments: true
  processor:
    - parse_json:
    - date:
        from_time_received: true
        destination: "@timestamp"           
  sink:
    - opensearch:                  
        index: "logs_ondemand_20231231"
        hosts: [ "https://search-XXXX-domain-XXXXXXXXXX.us-east-1.es.amazonaws.com" ]
        aws:                  
          sts_role_arn: "arn:aws:iam::<acct num>:role/PipelineRole"
          region: "us-east-1"

Considerations

Take advantage of compression

Data in Amazon S3 can be compressed, which reduces your overall data footprint and results in significant cost savings. For example, if you are generating 15 PB of raw JSON application logs per month, you can use a compression mechanism like GZIP, which can reduce the size to approximately 1PB or less, resulting in significant cost savings.

Stop the pipeline when possible

OpenSearch Ingestion scales automatically between the minimum and maximum OCUs set for the pipeline. After the pipeline has completed the Amazon S3 scan for the specified duration mentioned in the pipeline configuration, the pipeline continues to run for continuous monitoring at the minimum OCUs.

For on-demand ingestion for past time durations where you don’t expect new objects to be created, consider using supported pipeline metrics such as recordsOut.count to create Amazon CloudWatch alarms that can stop the pipeline. For a list of supported metrics, refer to Monitoring pipeline metrics.

CloudWatch alarms perform an action when a CloudWatch metric exceeds a specified value for some amount of time. For example, you might want to monitor recordsOut.count to be 0 for longer than 5 minutes to initiate a request to stop the pipeline through the AWS Command Line Interface (AWS CLI) or API.

Solution 3: OpenSearch Service direct queries with Amazon S3

OpenSearch Service direct queries with Amazon S3 (preview) is a new way to query operational logs in Amazon S3 and S3 data lakes without needing to switch between services. You can now analyze infrequently queried data in cloud object stores and simultaneously use the operational analytics and visualization capabilities of OpenSearch Service.

OpenSearch Service direct queries with Amazon S3 provides zero-ETL integration to reduce the operational complexity of duplicating data or managing multiple analytics tools by enabling you to directly query your operational data, reducing costs and time to action. This zero-ETL integration is configurable within OpenSearch Service, where you can take advantage of various log type templates, including predefined dashboards, and configure data accelerations tailored to that log type. Templates include VPC Flow Logs, Elastic Load Balancing logs, and NGINX logs, and accelerations include skipping indexes, materialized views, and covered indexes.

With OpenSearch Service direct queries with Amazon S3, you can perform complex queries that are critical to security forensics and threat analysis and correlate data across multiple data sources, which aids teams in investigating service downtime and security events. After you create an integration, you can start querying your data directly from OpenSearch Dashboards or the OpenSearch API. You can audit connections to ensure that they are set up in a scalable, cost-efficient, and secure way.

Direct queries from OpenSearch Service to Amazon S3 use Spark tables within the AWS Glue Data Catalog. After the table is cataloged in your AWS Glue metadata catalog, you can run queries directly on your data in your S3 data lake through OpenSearch Dashboards.

Solution overview

The following diagram illustrates the solution architecture.

This solution consists of the following key components:

  • The hot data for the current day is stream processed into OpenSearch Service domains through the event-driven architecture pattern using the OpenSearch Ingestion S3-SQS processing feature
  • The hot data lifecycle is managed through ISM policies attached to daily indexes
  • The cold data resides in your Amazon S3 bucket, and is partitioned and cataloged

The following screenshot shows a sample http_logs table that is cataloged in the AWS Glue metadata catalog. For detailed steps, refer to Data Catalog and crawlers in AWS Glue.

Before you create a data source, you should have an OpenSearch Service domain with version 2.11 or later and a target S3 table in the AWS Glue Data Catalog with the appropriate AWS Identity and Access Management (IAM) permissions. IAM will need access to the desired S3 buckets and have read and write access to the AWS Glue Data Catalog. The following is a sample role and trust policy with appropriate permissions to access the AWS Glue Data Catalog through OpenSearch Service:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "directquery.opensearchservice.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

The following is a sample custom policy with access to Amazon S3 and AWS Glue:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": "es:ESHttp*",
            "Resource": "arn:aws:es:*:<acct_num>:domain/*"
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Action": [
                "s3:Get*",
                "s3:List*",
                "s3:Put*",
                "s3:Describe*"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket-name>",
                "arn:aws:s3:::<bucket-name>/*"
            ]
        },
        {
            "Sid": "GlueCreateAndReadDataCatalog",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:CreateDatabase",
                "glue:GetDatabases",
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:CreatePartition",
                "glue:BatchCreatePartition",
                "glue:GetUserDefinedFunctions"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<acct_num>:catalog",
                "arn:aws:glue:us-east-1:<acct_num>:database/*",
                "arn:aws:glue:us-east-1:<acct_num>:table/*"
            ]
        }
    ]
}

To create a new data source on the OpenSearch Service console, provide the name of your new data source, specify the data source type as Amazon S3 with the AWS Glue Data Catalog, and choose the IAM role for your data source.

After you create a data source, you can go to the OpenSearch dashboard of the domain, which you use to configure access control, define tables, set up log type-based dashboards for popular log types, and query your data.

After you set up your tables, you can query your data in your S3 data lake through OpenSearch Dashboards. You can run a sample SQL query for the http_logs table you created in the AWS Glue Data Catalog tables, as shown in the following screenshot.

Best practices

Ingest only the data you need

Work backward from your business needs and establish the right datasets you’ll need. Evaluate if you can avoid ingesting noisy data and ingest only curated, sampled, or aggregated data. Using these cleaned and curated datasets will help you optimize the compute and storage resources needed to ingest this data.

Reduce the size of data before ingestion

When you design your data ingestion pipelines, use strategies such as compression, filtering, and aggregation to reduce the size of the ingested data. This will permit smaller data sizes to be transferred over the network and stored in your data layer.

Conclusion

In this post, we discussed solutions that enable petabyte-scale log analytics using OpenSearch Service in a modern data architecture. You learned how to create a serverless ingestion pipeline to deliver logs to an OpenSearch Service domain, manage indexes through ISM policies, configure IAM permissions to start using OpenSearch Ingestion, and create the pipeline configuration for data in your data lake. You also learned how to set up and use the OpenSearch Service direct queries with Amazon S3 feature (preview) to query data from your data lake.

To choose the right architecture pattern for your workloads when using OpenSearch Service at scale, consider the performance, latency, cost and data volume growth over time in order to make the right decision.

  • Use Tiered storage architecture with Index State Management policies when you need fast access to your hot data and want to balance the cost and performance with UltraWarm nodes for read-only data.
  • Use On Demand Ingestion of your data into OpenSearch Service when you can tolerate ingestion latencies to query your data not retained in your hot nodes. You can achieve significant cost savings when using compressed data in Amazon S3 and ingesting data on demand into OpenSearch Service.
  • Use Direct query with S3 feature when you want to directly analyze your operational logs in Amazon S3 with the rich analytics and visualization features of OpenSearch Service.

As a next step, refer to the Amazon OpenSearch Developer Guide to explore logs and metric pipelines that you can use to build a scalable observability solution for your enterprise applications.


About the Authors

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.


Muthu Pitchaimani
is a Senior Specialist Solutions Architect with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.


Sam Selvan
is a Principal Specialist Solution Architect with Amazon OpenSearch Service.

Use AWS Glue ETL to perform merge, partition evolution, and schema evolution on Apache Iceberg

Post Syndicated from Satyanarayana Adimula original https://aws.amazon.com/blogs/big-data/use-aws-glue-etl-to-perform-merge-partition-evolution-and-schema-evolution-on-apache-iceberg/

As enterprises collect increasing amounts of data from various sources, the structure and organization of that data often need to change over time to meet evolving analytical needs. However, altering schema and table partitions in traditional data lakes can be a disruptive and time-consuming task, requiring renaming or recreating entire tables and reprocessing large datasets. This hampers agility and time to insight.

Schema evolution enables adding, deleting, renaming, or modifying columns without needing to rewrite existing data. This is critical for fast-moving enterprises to augment data structures to support new use cases. For example, an ecommerce company may add new customer demographic attributes or order status flags to enrich analytics. Apache Iceberg manages these schema changes in a backward-compatible way through its innovative metadata table evolution architecture.

Similarly, partition evolution allows seamless adding, dropping, or splitting partitions. For instance, an ecommerce marketplace may initially partition order data by day. As orders accumulate, and querying by day becomes inefficient, they may split to day and customer ID partitions. Table partitioning organizes big datasets most efficiently for query performance. Iceberg gives enterprises the flexibility to incrementally adjust partitions rather than requiring tedious rebuild procedures. New partitions can be added in a fully compatible way without downtime or having to rewrite existing data files.

This post demonstrates how you can harness Iceberg, Amazon Simple Storage Service (Amazon S3), AWS Glue, AWS Lake Formation, and AWS Identity and Access Management (IAM) to implement a transactional data lake supporting seamless evolution. By allowing for painless schema and partition adjustments as data insights evolve, you can benefit from the future-proof flexibility needed for business success.

Overview of solution

For our example use case, a fictional large ecommerce company processes thousands of orders each day. When orders are received, updated, cancelled, shipped, delivered, or returned, the changes are made in their on-premises system, and those changes need to be replicated to an S3 data lake so that data analysts can run queries through Amazon Athena. The changes can contain schema updates as well. Due to the security requirements of different organizations, they need to manage fine-grained access control for the analysts through Lake Formation.

The following diagram illustrates the solution architecture.

The solution workflow includes the following key steps:

  1. Ingest data from on premises into a Dropzone location using a data ingestion pipeline.
  2. Merge the data from the Dropzone location into Iceberg using AWS Glue.
  3. Query the data using Athena.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Set up the infrastructure with AWS CloudFormation

To create your infrastructure with an AWS CloudFormation template, complete the following steps:

  1. Log in as an administrator to your AWS account.
  2. Open the AWS CloudFormation console.
  3. Choose Launch Stack:
  4. For Stack name, enter a name (for this post, icebergdemo1).
  5. Choose Next.
  6. Provide information for the following parameters:
    1. DatalakeUserName
    2. DatalakeUserPassword
    3. DatabaseName
    4. TableName
    5. DatabaseLFTagKey
    6. DatabaseLFTagValue
    7. TableLFTagKey
    8. TableLFTagValue
  7. Choose Next.
  8. Choose Next again.
  9. In the Review section, review the values you entered.
  10. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

In a few minutes, the stack status will change to CREATE_COMPLETE.

You can go to the Outputs tab of the stack to see all the resources it has provisioned. The resources are prefixed with the stack name you provided (for this post, icebergdemo1).

Create an Iceberg table using Lambda and grant access using Lake Formation

To create an Iceberg table and grant access on it, complete the following steps:

  1. Navigate to the Resources tab of the CloudFormation stack icebergdemo1 and search for logical ID named LambdaFunctionIceberg.
  2. Choose the hyperlink of the associated physical ID.

You’re redirected to the Lambda function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.

  1. On the Configuration tab, choose Environment variables in the left pane.
  1. On the Code tab, you can inspect the function code.

The function uses the AWS SDK for Python (Boto3) APIs to provision the resources. It assumes the provisioned data lake admin role to perform the following tasks:

  • Grant DATA_LOCATION_ACCESS access to the data lake admin role on the registered data lake location
  • Create Lake Formation Tags (LF-Tags)
  • Create a database in the AWS Glue Data Catalog using the AWS Glue create_database API
  • Assign LF-Tags to the database
  • Grant DESCRIBE access on the database using LF-Tags to the data lake IAM user and AWS Glue ETL IAM role
  • Create an Iceberg table using the AWS Glue create_table API:
response_create_table = glue_client.create_table(
DatabaseName= 'icebergdb1',
OpenTableFormatInput= { 
 'IcebergInput': { 
 'MetadataOperation': 'CREATE',
 'Version': '2'
 }
},
TableInput={
    'Name': ‘ecomorders’,
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'ordernum', 'Type': 'int'},
            {'Name': 'sku', 'Type': 'string'},
            {'Name': 'quantity','Type': 'int'},
            {'Name': 'category','Type': 'string'},
            {'Name': 'status','Type': 'string'},
            {'Name': 'shipping_id','Type': 'string'}
        ],  
        'Location': 's3://icebergdemo1-s3bucketiceberg-vthvwwblrwe8/iceberg/'
    },
    'TableType': 'EXTERNAL_TABLE'
    }
)
  • Assign LF-Tags to the table
  • Grant DESCRIBE and SELECT on the Iceberg table LF-Tags for the data lake IAM user
  • Grant ALL, DESCRIBE, SELECT, INSERT, DELETE, and ALTER access on the Iceberg table LF-Tags to the AWS Glue ETL IAM role
  1. On the Test tab, choose Test to run the function.

When the function is complete, you will see the message “Executing function: succeeded.”

Lake Formation helps you centrally manage, secure, and globally share data for analytics and machine learning. With Lake Formation, you can manage fine-grained access control for your data lake data on Amazon S3 and its metadata in the Data Catalog.

To add an Amazon S3 location as Iceberg storage in your data lake, register the location with Lake Formation. You can then use Lake Formation permissions for fine-grained access control to the Data Catalog objects that point to this location, and to the underlying data in the location.

The CloudFormation stack registered the data lake location.

Data location permissions in Lake Formation enable principals to create and alter Data Catalog resources that point to the designated registered Amazon S3 locations. Data location permissions work in addition to Lake Formation data permissions to secure information in your data lake.

Lake Formation tag-based access control (LF-TBAC) is an authorization strategy that defines permissions based on attributes. In Lake Formation, these attributes are called LF-Tags. You can attach LF-Tags to Data Catalog resources, Lake Formation principals, and table columns. You can assign and revoke permissions on Lake Formation resources using these LF-Tags. Lake Formation allows operations on those resources when the principal’s tag matches the resource tag.

Verify the Iceberg table from the Lake Formation console

To verify the Iceberg table, complete the following steps:

  1. On the Lake Formation console, choose Databases in the navigation pane.
  2. Open the details page for icebergdb1.

You can see the associated database LF-Tags.

  1. Choose Tables in the navigation pane.
  2. Open the details page for ecomorders.

In the Table details section, you can observe the following:

  • Table format shows as Apache Iceberg
  • Table management shows as Managed by Data Catalog
  • Location lists the data lake location of the Iceberg table

In the LF-Tags section, you can see the associated table LF-Tags.

In the Table details section, expand Advanced table properties to view the following:

  • metadata_location points to the location of the Iceberg table’s metadata file
  • table_type shows as ICEBERG

On the Schema tab, you can view the columns defined on the Iceberg table.

Integrate Iceberg with the AWS Glue Data Catalog and Amazon S3

Iceberg tracks individual data files in a table instead of directories. When there is an explicit commit on the table, Iceberg creates data files and adds them to the table. Iceberg maintains the table state in metadata files. Any change in table state creates a new metadata file that atomically replaces the older metadata. Metadata files track the table schema, partitioning configuration, and other properties.

Iceberg requires file systems that support the operations to be compatible with object stores like Amazon S3.

Iceberg creates snapshots for the table contents. Each snapshot is a complete set of data files in the table at a point in time. Data files in snapshots are stored in one or more manifest files that contain a row for each data file in the table, its partition data, and its metrics.

The following diagram illustrates this hierarchy.

When you create an Iceberg table, it creates the metadata folder first and a metadata file in the metadata folder. The data folder is created when you load data into the Iceberg table.

Contents of the Iceberg metadata file

The Iceberg metadata file contains a lot of information, including the following:

  • format-version –Version of the Iceberg table
  • Location – Amazon S3 location of the table
  • Schemas – Name and data type of all columns on the table
  • partition-specs – Partitioned columns
  • sort-orders – Sort order of columns
  • properties – Table properties
  • current-snapshot-id – Current snapshot
  • refs – Table references
  • snapshots – List of snapshots, each containing the following information:
    • sequence-number – Sequence number of snapshots in chronological order (the highest number represents the current snapshot, 1 for the first snapshot)
    • snapshot-id – Snapshot ID
    • timestamp-ms – Timestamp when the snapshot was committed
    • summary – Summary of changes committed
    • manifest-list – List of manifests; this file name starts with snap-< snapshot-id >
  • schema-id – Sequence number of the schema in chronological order (the highest number represents the current schema)
  • snapshot-log – List of snapshots in chronological order
  • metadata-log – List of metadata files in chronological order

The metadata file has all the historical changes to the table’s data and schema. Reviewing the contents on the metafile file directly can be a time-consuming task. Fortunately, you can query the Iceberg metadata using Athena.

Iceberg framework in AWS Glue

AWS Glue 4.0 supports Iceberg tables registered with Lake Formation. In the AWS Glue ETL jobs, you need the following code to enable the Iceberg framework:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

args = getResolvedOptions(sys.argv, ['JOB_NAME','warehouse_path']
    
# Set up configuration for AWS Glue to work with Apache Iceberg
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", args['warehouse_path'])
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
conf.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

For read/write access to underlying data, in addition to Lake Formation permissions, the AWS Glue IAM role to run the AWS Glue ETL jobs was granted lakeformation: GetDataAccess IAM permission. With this permission, Lake Formation grants the request for temporary credentials to access the data.

The CloudFormation stack provisioned the four AWS Glue ETL jobs for you. The name of each job starts with your stack name (icebergdemo1). Complete the following steps to view the jobs:

  1. Log in as an administrator to your AWS account.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Search for jobs with icebergdemo1 in the name.

Merge data from Dropzone into the Iceberg table

For our use case, the company ingests their ecommerce orders data daily from their on-premises location into an Amazon S3 Dropzone location. The CloudFormation stack loaded three files with sample orders for 3 days, as shown in the following figures. You see the data in the Dropzone location s3://icebergdemo1-s3bucketdropzone-kunftrcblhsk/data.

The AWS Glue ETL job icebergdemo1-GlueETL1-merge will run daily to merge the data into the Iceberg table. It has the following logic to add or update the data on Iceberg:

  • Create a Spark DataFrame from input data:
df = spark.read.format(dropzone_dataformat).option("header", True).load(dropzone_path)
df = df.withColumn("ordernum", df["ordernum"].cast(IntegerType())) \
    .withColumn("quantity", df["quantity"].cast(IntegerType()))
df.createOrReplaceTempView("input_table")
  • For a new order, add it to the table
  • If the table has a matching order, update the status and shipping_id:
stmt_merge = f"""
    MERGE INTO glue_catalog.{database_name}.{table_name} AS t
    USING input_table AS s 
    ON t.ordernum= s.ordernum
    WHEN MATCHED 
            THEN UPDATE SET 
                t.status = s.status,
                t.shipping_id = s.shipping_id
    WHEN NOT MATCHED THEN INSERT *
    """
spark.sql(stmt_merge)

Complete the following steps to run the AWS Glue merge job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the ETL job icebergdemo1-GlueETL1-merge.
  3. On the Actions dropdown menu, choose Run with parameters.
  4. On the Run parameters page, go to Job parameters.
  5. For the --dropzone_path parameter, provide the S3 location of the input data (icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge1).
  6. Run the job to add all the orders: 1001, 1002, 1003, and 1004.
  7. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge2.
  8. Run the job again to add orders 2001 and 2002, and update orders 1001, 1002, and 1003.
  9. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge3.
  10. Run the job again to add order 3001 and update orders 1001, 1003, 2001, and 2002.

Go to the data folder of table to see the data files written by Iceberg when you merged the data into the table using the Glue ETL job icebergdemo1-GlueETL1-merge.

Query Iceberg using Athena

The CloudFormation stack created the IAM user iceberguser1, which has read access on the Iceberg table using LF-Tags. To query Iceberg using Athena via this user, complete the following steps:

  1. Log in as iceberguser1 to the AWS Management Console.
  2. On the Athena console, choose Workgroups in the navigation pane.
  3. Locate the workgroup that CloudFormation provisioned (icebergdemo1-workgroup)
  4. Verify Athena engine version 3.

The Athena engine version 3 supports Iceberg file formats, including Parquet, ORC, and Avro.

  1. Go to the Athena query editor.
  2. Choose the workgroup icebergdemo1-workgroup on the dropdown menu.
  3. For Database, choose icebergdb1. You will see the table ecomorders.
  4. Run the following query to see the data in the Iceberg table:
    SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

  5. Run the following query to see table’s current partitions:
    DESCRIBE icebergdb1.ecomorders ;

Partition-spec describes how table is partitioned. In this example, there are no partitioned fields because you didn’t define any partitions on the table.

Iceberg partition evolution

You may need to change your partition structure; for example, due to trend changes of common query patterns in downstream analytics. A change of partition structure for traditional tables is a significant operation that requires an entire data copy.

Iceberg makes this straightforward. When you change the partition structure on Iceberg, it doesn’t require you to rewrite the data files. The old data written with earlier partitions remains unchanged. New data is written using the new specifications in a new layout. Metadata for each of the partition versions is kept separately.

Let’s add the partition field category to the Iceberg table using the AWS Glue ETL job icebergdemo1-GlueETL2-partition-evolution:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD PARTITION FIELD category ;

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL2-partition-evolution. When the job is complete, you can query partitions using Athena.

DESCRIBE icebergdb1.ecomorders ;

SELECT * FROM "icebergdb1"."ecomorders$partitions";

You can see the partition field category, but the partition values are null. There are no new data files in the data folder, because partition evolution is a metadata operation and doesn’t rewrite data files. When you add or update data, you will see the corresponding partition values populated.

Iceberg schema evolution

Iceberg supports in-place table evolution. You can evolve a table schema just like SQL. Iceberg schema updates are metadata changes, so no data files need to be rewritten to perform the schema evolution.

To explore the Iceberg schema evolution, run the ETL job icebergdemo1-GlueETL3-schema-evolution via the AWS Glue console. The job runs the following SparkSQL statements:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD COLUMNS (shipping_carrier string) ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    RENAME COLUMN shipping_id TO tracking_number ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ALTER COLUMN ordernum TYPE bigint ;

In the Athena query editor, run the following query:

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum asc ;

You can verify the schema changes to the Iceberg table:

  • A new column has been added called shipping_carrier
  • The column shipping_id has been renamed to tracking_number
  • The data type of the column ordernum has changed from int to bigint
    DESCRIBE icebergdb1.ecomorders;

Positional update

The data in tracking_number contains the shipping carrier concatenated with the tracking number. Let’s assume that we want to split this data in order to keep the shipping carrier in the shipping_carrier field and the tracking number in the tracking_number field.

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL4-update-table. The job runs the following SparkSQL statement to update the table:

UPDATE glue_catalog.icebergdb1.ecomorders
SET shipping_carrier = substring(tracking_number,1,3),
    tracking_number = substring(tracking_number,4,50)
WHERE tracking_number != '' ;

Query the Iceberg table to verify the updated data on tracking_number and shipping_carrier.

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

Now that the data has been updated on the table, you should see the partition values populated for category:

SELECT * FROM "icebergdb1"."ecomorders$partitions"
ORDER BY partition;

Clean up

To avoid incurring future charges, clean up the resources you created:

  1. On the Lambda console, open the details page for the function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.
  2. In the Environment variables section, choose the key Task_To_Perform and update the value to CLEANUP.
  3. Run the function, which drops the database, table, and their associated LF-Tags.
  4. On the AWS CloudFormation console, delete the stack icebergdemo1.

Conclusion

In this post, you created an Iceberg table using the AWS Glue API and used Lake Formation to control access on the Iceberg table in a transactional data lake. With AWS Glue ETL jobs, you merged data into the Iceberg table, and performed schema evolution and partition evolution without rewriting or recreating the Iceberg table. With Athena, you queried the Iceberg data and metadata.

Based on the concepts and demonstrations from this post, you can now build a transactional data lake in an enterprise using Iceberg, AWS Glue, Lake Formation, and Amazon S3.


About the Author

Satya Adimula is a Senior Data Architect at AWS based in Boston. With over two decades of experience in data and analytics, Satya helps organizations derive business insights from their data at scale.

Introducing the AWS WAF traffic overview dashboard

Post Syndicated from Dmitriy Novikov original https://aws.amazon.com/blogs/security/introducing-the-aws-waf-traffic-overview-dashboard/

For many network security operators, protecting application uptime can be a time-consuming challenge of baselining network traffic, investigating suspicious senders, and determining how best to mitigate risks. Simplifying this process and understanding network security posture at all times is the goal of most IT organizations that are trying to scale their applications without also needing to scale their security operations center staff. To help you with this challenge, AWS WAF introduced traffic overview dashboards so that you can make informed decisions about your security posture when your application is protected by AWS WAF.

In this post, we introduce the new dashboards and delve into a few use cases to help you gain better visibility into the overall security of your applications using AWS WAF and make informed decisions based on insights from the dashboards.

Introduction to traffic overview dashboards

The traffic overview dashboard in AWS WAF displays an overview of security-focused metrics so that you can identify and take action on security risks in a few clicks, such as adding rate-based rules during distributed denial of service (DDoS) events. The dashboards include near real-time summaries of the Amazon CloudWatch metrics that AWS WAF collects when it evaluates your application web traffic.

These dashboards are available by default and require no additional setup. They show metrics—total requests, blocked requests, allowed requests, bot compared to non-bot requests, bot categories, CAPTCHA solve rate, top 10 matched rules, and more—for each web access control list (web ACL) that you monitor with AWS WAF.

You can access default metrics such as the total number of requests, blocked requests, and common attacks blocked, or you can customize your dashboard with the metrics and visualizations that are most important to you.

These dashboards provide enhanced visibility and help you answer questions such as these:

  • What percent of the traffic that AWS WAF inspected is getting blocked?
  • What are the top originating countries for the traffic that’s getting blocked?
  • What are common attacks that AWS WAF detects and protects me from?
  • How do my traffic patterns from this week compare with last week?

The dashboard has native and out-of-the-box integration with CloudWatch. Using this integration, you can navigate back and forth between the dashboard and CloudWatch; for example, you can get a more granular metric overview by viewing the dashboard in CloudWatch. You can also add existing CloudWatch widgets and metrics to the traffic overview dashboard, bringing your tried-and-tested visibility structure into the dashboard.

With the introduction of the traffic overview dashboard, one AWS WAF tool—Sampled requests—is now a standalone tab inside a web ACL. In this tab, you can view a graph of the rule matches for web requests that AWS WAF has inspected. Additionally, if you have enabled request sampling, you can see a table view of a sample of the web requests that AWS WAF has inspected.

The sample of requests contains up to 100 requests that matched the criteria for a rule in the web ACL and another 100 requests for requests that didn’t match rules and thus had the default action for the web ACL applied. The requests in the sample come from the protected resources that have received requests for your content in the previous three hours.

The following figure shows a typical layout for the traffic overview dashboard. It categorizes inspected requests with a breakdown of each of the categories that display actionable insights, such as attack types, client device types, and countries. Using this information and comparing it with your expected traffic profile, you can decide whether to investigate further or block the traffic right away. For the example in Figure 1, you might want to block France-originating requests from mobile devices if your web application isn’t supposed to receive traffic from France and is a desktop-only application.

Figure 1: Dashboard with sections showing multiple categories serves as a single pane of glass

Figure 1: Dashboard with sections showing multiple categories serves as a single pane of glass

Use case 1: ­Analyze traffic patterns with the dashboard

In addition to visibility into your web traffic, you can use the new dashboard to analyze patterns that could indicate potential threats or issues. By reviewing the dashboard’s graphs and metrics, you can spot unusual spikes or drops in traffic that deserve further investigation.

The top-level overview shows the high-level traffic volume and patterns. From there, you can drill down into the web ACL metrics to see traffic trends and metrics for specific rules and rule groups. The dashboard displays metrics such as allowed requests, blocked requests, and more.

Notifications or alerts about a deviation from expected traffic patterns provide you a signal to explore the event. During your exploration, you can use the dashboard to understand the broader context and not just the event in isolation. This makes it simpler to detect a trend in anomalies that could signify a security event or misconfigured rules. For example, if you normally get 2,000 requests per minute from a particular country, but suddenly see 10,000 requests per minute from it, you should investigate. Using the dashboard, you can look at the traffic across various dimensions. The spike in requests alone might not be a clear indication of a threat, but if you see an additional indicator, such as an unexpected device type, this could be a strong reason for you to take follow-up action.

The following figure shows the actions taken by rules in a web ACL and which rule matched the most.

Figure 2: Multidimensional overview of the web requests

Figure 2: Multidimensional overview of the web requests

The dashboard also shows the top blocked and allowed requests over time. Check whether unusual spikes in blocked requests correspond to spikes in traffic from a particular IP address, country, or user agent. That could indicate attempted malicious activity or bot traffic.

The following figure shows a disproportionately larger number of matches to a rule indicating that a particular vector is used against a protected web application.

Figure 3: The top terminating rule could indicate a particular vector of an attack

Figure 3: The top terminating rule could indicate a particular vector of an attack

Likewise, review the top allowed requests. If you see a spike in traffic to a specific URL, you should investigate whether your application is working properly.

Next steps after you analyze traffic

After you’ve analyzed the traffic patterns, here are some next steps to consider:

  • Tune your AWS WAF rules to better match legitimate or malicious traffic based on your findings. You might be able to fine-tune rules to reduce false positives or false negatives. Tune rules that are blocking legitimate traffic by adjusting regular expressions or conditions.
  • Configure AWS WAF logging, and if you have a dedicated security information and event management (SIEM) solution, integrate the logging to enable automated alerting for anomalies.
  • Set up AWS WAF to automatically block known malicious IPs. You can maintain an IP block list based on identified threat actors. Additionally, you can use the Amazon IP reputation list managed rule group, which the Amazon Threat Research Team regularly updates.
  • If you see spikes in traffic to specific pages, check that your web applications are functioning properly to rule out application issues driving unusual patterns.
  • Add new rules to block new attack patterns that you spot in the traffic flows. Then review the metrics to help confirm the impact of the new rules.
  • Monitor source IPs for DDoS events and other malicious spikes. Use AWS WAF rate-based rules to help mitigate these spikes.
  • If you experience traffic floods, implement additional layers of protection by using CloudFront with DDoS protection.

The new dashboard gives you valuable insight into the traffic that reaches your applications and takes the guesswork out of traffic analysis. Using the insights that it provides, you can fine-tune your AWS WAF protections and block threats before they affect availability or data. Analyze the data regularly to help detect potential threats and make informed decisions about optimizing.

As an example, if you see an unexpected spike of traffic, which looks conspicuous in the dashboard compared to historical traffic patterns, from a country where you don’t anticipate traffic originating from, you can create a geographic match rule statement in your web ACL to block this traffic and prevent it from reaching your web application.

The dashboard is a great tool to gain insights and to understand how AWS WAF managed rules help protect your traffic.

Use case 2: Understand bot traffic during onboarding and fine-tune your bot control rule group

With AWS WAF Bot Control, you can monitor, block, or rate limit bots such as scrapers, scanners, crawlers, status monitors, and search engines. If you use the targeted inspection level of the rule group, you can also challenge bots that don’t self-identify, making it harder and more expensive for malicious bots to operate against your website.

On the traffic overview dashboard, under the Bot Control overview tab, you can see how much of your current traffic is coming from bots, based on request sampling (if you don’t have Bot Control enabled) and real-time CloudWatch metrics (if you do have Bot Control enabled).

During your onboarding phase, use this dashboard to monitor your traffic and understand how much of it comes from various types of bots. You can use this as a starting point to customize your bot management. For example, you can enable common bot control rule groups in count mode and see if desired traffic is being mislabeled. Then you can add rule exceptions, as described in AWS WAF Bot Control example: Allow a specific blocked bot.

The following figure shows a collection of widgets that visualize various dimensions of requests detected as generated by bots. By understanding categories and volumes, you can make an informed decision to either investigate by further delving into logs or block a specific category if it’s clear that it’s unwanted traffic.

Figure 4: Collection of bot-related metrics on the dashboard

Figure 4: Collection of bot-related metrics on the dashboard

After you get started, you can use the same dashboard to monitor your bot traffic and evaluate adding targeted detection for sophisticated bots that don’t self-identify. Targeted protections use detection techniques such as browser interrogation, fingerprinting, and behavior heuristics to identify bad bot traffic. AWS WAF tokens are an integral part of these enhanced protections.

AWS WAF creates, updates, and encrypts tokens for clients that successfully respond to silent challenges and CAPTCHA puzzles. When a client with a token sends a web request, it includes the encrypted token, and AWS WAF decrypts the token and verifies its contents.

In the Bot Control dashboard, the token status pane shows counts for the various token status labels, paired with the rule action that was applied to the request. The IP token absent thresholds pane shows data for requests from IPs that sent too many requests without a token. You can use this information to fine-tune your AWS WAF configuration.

For example, within a Bot Control rule group, it’s possible for a request without a valid token to exit the rule group evaluation and continue to be evaluated by the web ACL. To block requests that are missing their token or for which the token is rejected, you can add a rule to run immediately after the managed rule group to capture and block requests that the rule group doesn’t handle for you. Using the Token status pane, illustrated in Figure 5, you can also monitor the volume of requests that acquire tokens and decide if you want to rate limit or block such requests.

Figure 5: Token status enables monitoring of the volume of requests that acquire tokens

Figure 5: Token status enables monitoring of the volume of requests that acquire tokens

Comparison with CloudFront security dashboard

The AWS WAF traffic overview dashboard provides enhanced overall visibility into web traffic reaching resources that are protected with AWS WAF. In contrast, the CloudFront security dashboard brings AWS WAF visibility and controls directly to your CloudFront distribution. If you want the detailed visibility and analysis of patterns that could indicate potential threats or issues, then the AWS WAF traffic overview dashboard is the best fit. However, if your goal is to manage application delivery and security in one place without navigating between service consoles and to gain visibility into your application’s top security trends, allowed and blocked traffic, and bot activity, then the CloudFront security dashboard could be a better option.

Availability and pricing

The new dashboards are available in the AWS WAF console, and you can use them to better monitor your traffic. These dashboards are available by default, at no cost, and require no additional setup. CloudWatch logging has a separate pricing model and if you have full logging enabled you will incur CloudWatch charges. See here for more information about CloudWatch charges. You can customize the dashboards if you want to tailor the displayed data to the needs of your environment.

Conclusion

With the AWS WAF traffic overview dashboard, you can get actionable insights on your web security posture and traffic patterns that might need your attention to improve your perimeter protection.

In this post, you learned how to use the dashboard to help secure your web application. You walked through traffic patterns analysis and possible next steps. Additionally, you learned how to observe traffic from bots and follow up with actions related to them according to the needs of your application.

The AWS WAF traffic overview dashboard is designed to meet most use cases and be a go-to default option for security visibility over web traffic. However, if you’d prefer to create a custom solution, see the guidance in the blog post Deploy a dashboard for AWS WAF with minimal effort.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Dmitriy Novikov

Dmitriy Novikov

As a Senior Solutions Architect at AWS, Dmitriy supports AWS customers to use emerging technologies to generate business value. He’s a technology enthusiast who loves finding innovative solutions to complex challenges. He enjoys sharing his learnings on architecture and best practices in blog posts and whitepapers and at events. Outside of work, Dmitriy has a passion for reading and triathlons.

Harith Gaddamanugu

Harith Gaddamanugu

Harith works at AWS as a Senior Edge Specialist Solutions Architect. He stays motivated by solving problems for customers across AWS Perimeter Protection and Edge services. When he’s not working, he enjoys spending time outdoors with friends and family.

Simplify data streaming ingestion for analytics using Amazon MSK and Amazon Redshift

Post Syndicated from Sebastian Vlad original https://aws.amazon.com/blogs/big-data/simplify-data-streaming-ingestion-for-analytics-using-amazon-msk-and-amazon-redshift/

Towards the end of 2022, AWS announced the general availability of real-time streaming ingestion to Amazon Redshift for Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), eliminating the need to stage streaming data in Amazon Simple Storage Service (Amazon S3) before ingesting it into Amazon Redshift.

Streaming ingestion from Amazon MSK into Amazon Redshift, represents a cutting-edge approach to real-time data processing and analysis. Amazon MSK serves as a highly scalable, and fully managed service for Apache Kafka, allowing for seamless collection and processing of vast streams of data. Integrating streaming data into Amazon Redshift brings immense value by enabling organizations to harness the potential of real-time analytics and data-driven decision-making.

This integration enables you to achieve low latency, measured in seconds, while ingesting hundreds of megabytes of streaming data per second into Amazon Redshift. At the same time, this integration helps make sure that the most up-to-date information is readily available for analysis. Because the integration doesn’t require staging data in Amazon S3, Amazon Redshift can ingest streaming data at a lower latency and without intermediary storage cost.

You can configure Amazon Redshift streaming ingestion on a Redshift cluster using SQL statements to authenticate and connect to an MSK topic. This solution is an excellent option for data engineers that are looking to simplify data pipelines and reduce the operational cost.

In this post, we provide a complete overview on how to configure Amazon Redshift streaming ingestion from Amazon MSK.

Solution overview

The following architecture diagram describes the AWS services and features you will be using.

architecture diagram describing the AWS services and features you will be using

The workflow includes the following steps:

  1. You start with configuring an Amazon MSK Connect source connector, to create an MSK topic, generate mock data, and write it to the MSK topic. For this post, we work with mock customer data.
  2. The next step is to connect to a Redshift cluster using the Query Editor v2.
  3. Finally, you configure an external schema and create a materialized view in Amazon Redshift, to consume the data from the MSK topic. This solution does not rely on an MSK Connect sink connector to export the data from Amazon MSK to Amazon Redshift.

The following solution architecture diagram describes in more detail the configuration and integration of the AWS services you will be using.
solution architecture diagram describing in more detail the configuration and integration of the AWS services you will be using
The workflow includes the following steps:

  1. You deploy an MSK Connect source connector, an MSK cluster, and a Redshift cluster within the private subnets on a VPC.
  2. The MSK Connect source connector uses granular permissions defined in an AWS Identity and Access Management (IAM) in-line policy attached to an IAM role, which allows the source connector to perform actions on the MSK cluster.
  3. The MSK Connect source connector logs are captured and sent to an Amazon CloudWatch log group.
  4. The MSK cluster uses a custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  5. The MSK cluster logs are captured and sent to an Amazon CloudWatch log group.
  6. The Redshift cluster uses granular permissions defined in an IAM in-line policy attached to an IAM role, which allows the Redshift cluster to perform actions on the MSK cluster.
  7. You can use the Query Editor v2 to connect to the Redshift cluster.

Prerequisites

To simplify the provisioning and configuration of the prerequisite resources, you can use the following AWS CloudFormation template:

Complete the following steps when launching the stack:

  1. For Stack name, enter a meaningful name for the stack, for example, prerequisites.
  2. Choose Next.
  3. Choose Next.
  4. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Submit.

The CloudFormation stack creates the following resources:

  • A VPC custom-vpc, created across three Availability Zones, with three public subnets and three private subnets:
    • The public subnets are associated with a public route table, and outbound traffic is directed to an internet gateway.
    • The private subnets are associated with a private route table, and outbound traffic is sent to a NAT gateway.
  • An internet gateway attached to the Amazon VPC.
  • A NAT gateway that is associated with an elastic IP and is deployed in one of the public subnets.
  • Three security groups:
    • msk-connect-sg, which will be later associated with the MSK Connect connector.
    • redshift-sg, which will be later associated with the Redshift cluster.
    • msk-cluster-sg, which will be later associated with the MSK cluster. It allows inbound traffic from msk-connect-sg, and redshift-sg.
  • Two CloudWatch log groups:
    • msk-connect-logs, to be used for the MSK Connect logs.
    • msk-cluster-logs, to be used for the MSK cluster logs.
  • Two IAM Roles:
    • msk-connect-role, which includes granular IAM permissions for MSK Connect.
    • redshift-role, which includes granular IAM permissions for Amazon Redshift.
  • A custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  • An MSK cluster, with three brokers deployed across the three private subnets of custom-vpc. The msk-cluster-sg security group and the custom-msk-cluster-configuration configuration are applied to the MSK cluster. The broker logs are delivered to the msk-cluster-logs CloudWatch log group.
  • A Redshift cluster subnet group, which is using the three private subnets of custom-vpc.
  • A Redshift cluster, with one single node deployed in a private subnet within the Redshift cluster subnet group. The redshift-sg security group and redshift-role IAM role are applied to the Redshift cluster.

Create an MSK Connect custom plugin

For this post, we use an Amazon MSK data generator deployed in MSK Connect, to generate mock customer data, and write it to an MSK topic.

Complete the following steps:

  1. Download the Amazon MSK data generator JAR file with dependencies from GitHub.
    awslabs github page for downloading the jar file of the amazon msk data generator
  2. Upload the JAR file into an S3 bucket in your AWS account.
    amazon s3 console image showing the uploaded jar file in an s3 bucket
  3. On the Amazon MSK console, choose Custom plugins under MSK Connect in the navigation pane.
  4. Choose Create custom plugin.
  5. Choose Browse S3, search for the Amazon MSK data generator JAR file you uploaded to Amazon S3, then choose Choose.
  6. For Custom plugin name, enter msk-datagen-plugin.
  7. Choose Create custom plugin.

When the custom plugin is created, you will see that its status is Active, and you can move to the next step.
amazon msk console showing the msk connect custom plugin being successfully created

Create an MSK Connect connector

Complete the following steps to create your connector:

  1. On the Amazon MSK console, choose Connectors under MSK Connect in the navigation pane.
  2. Choose Create connector.
  3. For Custom plugin type, choose Use existing plugin.
  4. Select msk-datagen-plugin, then choose Next.
  5. For Connector name, enter msk-datagen-connector.
  6. For Cluster type, choose Self-managed Apache Kafka cluster.
  7. For VPC, choose custom-vpc.
  8. For Subnet 1, choose the private subnet within your first Availability Zone.

For the custom-vpc created by the CloudFormation template, we are using odd CIDR ranges for public subnets, and even CIDR ranges for the private subnets:

    • The CIDRs for the public subnets are 10.10.1.0/24, 10.10.3.0/24, and 10.10.5.0/24
    • The CIDRs for the private subnets are 10.10.2.0/24, 10.10.4.0/24, and 10.10.6.0/24
  1. For Subnet 2, select the private subnet within your second Availability Zone.
  2. For Subnet 3, select the private subnet within your third Availability Zone.
  3. For Bootstrap servers, enter the list of bootstrap servers for TLS authentication of your MSK cluster.

To retrieve the bootstrap servers for your MSK cluster, navigate to the Amazon MSK console, choose Clusters, choose msk-cluster, then choose View client information. Copy the TLS values for the bootstrap servers.

  1. For Security groups, choose Use specific security groups with access to this cluster, and choose msk-connect-sg.
  2. For Connector configuration, replace the default settings with the following:
connector.class=com.amazonaws.mskdatagen.GeneratorSourceConnector
tasks.max=2
genkp.customer.with=#{Code.isbn10}
genv.customer.name.with=#{Name.full_name}
genv.customer.gender.with=#{Demographic.sex}
genv.customer.favorite_beer.with=#{Beer.name}
genv.customer.state.with=#{Address.state}
genkp.order.with=#{Code.isbn10}
genv.order.product_id.with=#{number.number_between '101','109'}
genv.order.quantity.with=#{number.number_between '1','5'}
genv.order.customer_id.matching=customer.key
global.throttle.ms=2000
global.history.records.max=1000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
  1. For Connector capacity, choose Provisioned.
  2. For MCU count per worker, choose 1.
  3. For Number of workers, choose 1.
  4. For Worker configuration, choose Use the MSK default configuration.
  5. For Access permissions, choose msk-connect-role.
  6. Choose Next.
  7. For Encryption, select TLS encrypted traffic.
  8. Choose Next.
  9. For Log delivery, choose Deliver to Amazon CloudWatch Logs.
  10. Choose Browse, select msk-connect-logs, and choose Choose.
  11. Choose Next.
  12. Review and choose Create connector.

After the custom connector is created, you will see that its status is Running, and you can move to the next step.
amazon msk console showing the msk connect connector being successfully created

Configure Amazon Redshift streaming ingestion for Amazon MSK

Complete the following steps to set up streaming ingestion:

  1. Connect to your Redshift cluster using Query Editor v2, and authenticate with the database user name awsuser, and password Awsuser123.
  2. Create an external schema from Amazon MSK using the following SQL statement.

In the following code, enter the values for the redshift-role IAM role, and the msk-cluster cluster ARN.

CREATE EXTERNAL SCHEMA msk_external_schema
FROM MSK
IAM_ROLE '<insert your redshift-role arn>'
AUTHENTICATION iam
CLUSTER_ARN '<insert your msk-cluster arn>';
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create an external schema from amazon msk

  1. Create a materialized view using the following SQL statement:
CREATE MATERIALIZED VIEW msk_mview AUTO REFRESH YES AS
SELECT
    "kafka_partition",
    "kafka_offset",
    "kafka_timestamp_type",
    "kafka_timestamp",
    "kafka_key",
    JSON_PARSE(kafka_value) as Data,
    "kafka_headers"
FROM
    "dev"."msk_external_schema"."customer"
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create a materialized view

  1. You can now query the materialized view using the following SQL statement:
select * from msk_mview LIMIT 100;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the materialized view

  1. To monitor the progress of records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_STATES monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_STATES;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the sys stream scan states monitoring view

  1. To monitor errors encountered on records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_ERRORS monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_ERRORS;
  1. Choose Run to run the SQL statement.redshift query editor v2 showing the SQL statement used to query the sys stream scan errors monitoring view

Clean up

After following along, if you no longer need the resources you created, delete them in the following order to prevent incurring additional charges:

  1. Delete the MSK Connect connector msk-datagen-connector.
  2. Delete the MSK Connect plugin msk-datagen-plugin.
  3. Delete the Amazon MSK data generator JAR file you downloaded, and delete the S3 bucket you created.
  4. After you delete your MSK Connect connector, you can delete the CloudFormation template. All the resources created by the CloudFormation template will be automatically deleted from your AWS account.

Conclusion

In this post, we demonstrated how to configure Amazon Redshift streaming ingestion from Amazon MSK, with a focus on privacy and security.

The combination of the ability of Amazon MSK to handle high throughput data streams with the robust analytical capabilities of Amazon Redshift empowers business to derive actionable insights promptly. This real-time data integration enhances the agility and responsiveness of organizations in understanding changing data trends, customer behaviors, and operational patterns. It allows for timely and informed decision-making, thereby gaining a competitive edge in today’s dynamic business landscape.

This solution is also applicable for customers that are looking to use Amazon MSK Serverless and Amazon Redshift Serverless.

We hope this post was a good opportunity to learn more about AWS service integration and configuration. Let us know your feedback in the comments section.


About the authors

Sebastian Vlad is a Senior Partner Solutions Architect with Amazon Web Services, with a passion for data and analytics solutions and customer success. Sebastian works with enterprise customers to help them design and build modern, secure, and scalable solutions to achieve their business outcomes.

Sharad Pai is a Lead Technical Consultant at AWS. He specializes in streaming analytics and helps customers build scalable solutions using Amazon MSK and Amazon Kinesis. He has over 16 years of industry experience and is currently working with media customers who are hosting live streaming platforms on AWS, managing peak concurrency of over 50 million. Prior to joining AWS, Sharad’s career as a lead software developer included 9 years of coding, working with open source technologies like JavaScript, Python, and PHP.

Combine AWS Glue and Amazon MWAA to build advanced VPC selection and failover strategies

Post Syndicated from Michael Greenshtein original https://aws.amazon.com/blogs/big-data/combine-aws-glue-and-amazon-mwaa-to-build-advanced-vpc-selection-and-failover-strategies/

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development.

AWS Glue customers often have to meet strict security requirements, which sometimes involve locking down the network connectivity allowed to the job, or running inside a specific VPC to access another service. To run inside the VPC, the jobs needs to be assigned to a single subnet, but the most suitable subnet can change over time (for instance, based on the usage and availability), so you may prefer to make that decision at runtime, based on your own strategy.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is an AWS service to run managed Airflow workflows, which allow writing custom logic to coordinate how tasks such as AWS Glue jobs run.

In this post, we show how to run an AWS Glue job as part of an Airflow workflow, with dynamic configurable selection of the VPC subnet assigned to the job at runtime.

Solution overview

To run inside a VPC, an AWS Glue job needs to be assigned at least a connection that includes network configuration. Any connection allows specifying a VPC, subnet, and security group, but for simplicity, this post uses connections of type: NETWORK, which just defines the network configuration and doesn’t involve external systems.

If the job has a fixed subnet assigned by a single connection, in case of a service outage on the Availability Zones or if the subnet isn’t available for other reasons, the job can’t run. Furthermore, each node (driver or worker) in an AWS Glue job requires an IP address assigned from the subnet. When running many large jobs concurrently, this could lead to an IP address shortage and the job running with fewer nodes than intended or not running at all.

AWS Glue extract, transform, and load (ETL) jobs allow multiple connections to be specified with multiple network configurations. However, the job will always try to use the connections’ network configuration in the order listed and pick the first one that passes the health checks and has at least two IP addresses to get the job started, which might not be the optimal option.

With this solution, you can enhance and customize that behavior by reordering the connections dynamically and defining the selection priority. If a retry is needed, the connections are reprioritized again based on the strategy, because the conditions might have changed since the last run.

As a result, it helps prevent the job from failing to run or running under capacity due to subnet IP address shortage or even an outage, while meeting the network security and connectivity requirements.

The following diagram illustrates the solution architecture.

Prerequisites

To follow the steps of the post, you need a user that can log in to the AWS Management Console and has permission to access Amazon MWAA, Amazon Virtual Private Cloud (Amazon VPC), and AWS Glue. The AWS Region where you choose to deploy the solution needs the capacity to create a VPC and two elastic IP addresses. The default Regional quota for both types of resources is five, so you might need to request an increase via the console.

You also need an AWS Identity and Access Management (IAM) role suitable to run AWS Glue jobs if you don’t have one already. For instructions, refer to Create an IAM role for AWS Glue.

Deploy an Airflow environment and VPC

First, you’ll deploy a new Airflow environment, including the creation of a new VPC with two public subnets and two private ones. This is because Amazon MWAA requires Availability Zone failure tolerance, so it needs to run on two subnets on two different Availability Zones in the Region. The public subnets are used so the NAT Gateway can provide internet access for the private subnets.

Complete the following steps:

  1. Create an AWS CloudFormation template in your computer by copying the template from the following quick start guide into a local text file.
  2. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  3. Choose Create stack with the option With new resources (standard).
  4. Choose Upload a template file and choose the local template file.
  5. Choose Next.
  6. Complete the setup steps, entering a name for the environment, and leave the rest of the parameters as default.
  7. On the last step, acknowledge that resources will be created and choose Submit.

The creation can take 20–30 minutes, until the status of the stack changes to CREATE_COMPLETE.

The resource that will take most of time is the Airflow environment. While it’s being created, you can continue with the following steps, until you are required to open the Airflow UI.

  1. On the stack’s Resources tab, note the IDs for the VPC and two private subnets (PrivateSubnet1 and PrivateSubnet2), to use in the next step.

Create AWS Glue connections

The CloudFormation template deploys two private subnets. In this step, you create an AWS Glue connection to each one so AWS Glue jobs can run in them. Amazon MWAA recently added the capacity to run the Airflow cluster on shared VPCs, which reduces cost and simplifies network management. For more information, refer to Introducing shared VPC support on Amazon MWAA.

Complete the following steps to create the connections:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. Choose Network as the data source.
  4. Choose the VPC and private subnet (PrivateSubnet1) created by the CloudFormation stack.
  5. Use the default security group.
  6. Choose Next.
  7. For the connection name, enter MWAA-Glue-Blog-Subnet1.
  8. Review the details and complete the creation.
  9. Repeat these steps using PrivateSubnet2 and name the connection MWAA-Glue-Blog-Subnet2.

Create the AWS Glue job

Now you create the AWS Glue job that will be triggered later by the Airflow workflow. The job uses the connections created in the previous section, but instead of assigning them directly on the job, as you would normally do, in this scenario you leave the job connections list empty and let the workflow decide which one to use at runtime.

The job script in this case is not significant and is just intended to demonstrate the job ran in one of the subnets, depending on the connection.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane, then choose Script editor.
  2. Leave the default options (Spark engine and Start fresh) and choose Create script.
  3. Replace the placeholder script with the following Python code:
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The driver node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. Rename the job to AirflowBlogJob.
  5. On the Job details tab, for IAM Role, choose any role and enter 2 for the number of workers (just for frugality).
  6. Save these changes so the job is created.

Grant AWS Glue permissions to the Airflow environment role

The role created for Airflow by the CloudFormation template provides the basic permissions to run workflows but not to interact with other services such as AWS Glue. In a production project, you would define your own templates with these additional permissions, but in this post, for simplicity, you add the additional permissions as an inline policy. Complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Locate the role created by the template; it will start with the name you assigned to the CloudFormation stack and then -MwaaExecutionRole-.
  3. On the role details page, on the Add permissions menu, choose Create inline policy.
  4. Switch from Visual to JSON mode and enter the following JSON on the textbox. It assumes that the AWS Glue role you have follows the convention of starting with AWSGlueServiceRole. For enhanced security, you can replace the wildcard resource on the ec2:DescribeSubnets permission with the ARNs of the two private subnets from the CloudFormation stack.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:DescribeSubnets"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. Choose Next.
  6. Enter GlueRelatedPermissions as the policy name and complete the creation.

In this example, we use an ETL script job; for a visual job, because it generates the script automatically on save, the Airflow role would need permission to write to the configured script path on Amazon Simple Storage Service (Amazon S3).

Create the Airflow DAG

An Airflow workflow is based on a Directed Acyclic Graph (DAG), which is defined by a Python file that programmatically specifies the different tasks involved and its interdependencies. Complete the following scripts to create the DAG:

  1. Create a local file named glue_job_dag.py using a text editor.

In each of the following steps, we provide a code snippet to enter into the file and an explanation of what is does.

  1. The following snippet adds the required Python modules imports. The modules are already installed on Airflow; if that weren’t the case, you would need to use a requirements.txt file to indicate to Airflow which modules to install. It also defines the Boto3 clients that the code will use later. By default, they will use the same role and Region as Airflow, that’s why you set up before the role with the additional permissions required.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. The following snippet adds three functions to implement the connection order strategy, which defines how to reorder the connections given to establish their priority. This is just an example; you can build your custom code to implement your own logic, as per your needs. The code first checks the IPs available on each connection subnet and separates the ones that have enough IPs available to run the job at full capacity and those that could be used because they have at least two IPs available, which is the minimum a job needs to start. If the strategy is set to random, it will randomize the order within each of the connection groups previously described and add any other connections. If the strategy is capacity, it will order them from most IPs free to fewest.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Name=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            try:
                available_ips = get_available_ips_from_connection(connection_name)
                # Priority to connections that can hold the full cluster and we haven't just tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The bare minimum to start a Glue job
                    usable_connections.append((connection_name, available_ips))                
            except Exception as e:
                print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, strategy):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if strategy=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have priority
            all_conn = good_connections + usable_connections
        elif strategy=="capacity":
            # We can sort both at the same time
            all_conn = good_connections + usable_connections
            all_conn.sort(key=lambda x: -x[1])
        else: 
            raise ValueError(f"Unknown strategy specified: {strategy}")    
        result = [c[0] for c in all_conn] # Just need the name
        # Keep at the end any other connections that could not be checked for ips
        result += [c for c in connection_list if c not in result]
        return result
    

  3. The following code creates the DAG itself with the run job task, which updates the job with the connection order defined by the strategy, runs it, and waits for the results. The job name, connections, and strategy come from Airflow variables, so it can be easily configured and updated. It has two retries with exponential backoff configured, so if the tasks fails, it will repeat the full task including the connection selection. Maybe now the best choice is another connection, or the subnet previously picked randomly is in an Availability Zone that is currently suffering an outage, and by picking a different one, it can recover.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand only
        start_date=datetime(2000, 1, 1), # A start date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @task(
            task_id="glue_task", 
            retries=2,
            retry_delay=duration(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity
            print(f"Connections available: {glue_connections}")
            print(f"Glue job name: {glue_jobname}")
            print(f"Strategy to use: {strategy}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, strategy)
            print(f"Running Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Preserve other connections that we don't manage
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clean up properties so we can reuse the dict for the update request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].split('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Create the Airflow workflow

Now you create a workflow that invokes the AWS Glue job you just created:

  1. On the Amazon S3 console, locate the bucket created by the CloudFormation template, which will have a name starting with the name of the stack and then -environmentbucket- (for example, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder called dags, and inside that folder, upload the DAG file glue_job_dag.py that you created in the previous section.
  3. On the Amazon MWAA console, navigate to the environment you deployed with the CloudFormation stack.

If the status is not yet Available, wait until it reaches that state. It shouldn’t take longer than 30 minutes since you deployed the CloudFormation stack.

  1. Choose the environment link on the table to see the environment details.

It’s configured to pick up DAGs from the bucket and folder you used in the previous steps. Airflow will monitor that folder for changes.

  1. Choose Open Airflow UI to open a new tab accessing the Airflow UI, using the integrated IAM security to log you in.

If there’s any issue with the DAG file you created, it will display an error on top of the page indicating the lines affected. In that case, review the steps and upload again. After a few seconds, it will parse it and update or remove the error banner.

  1. On the Admin menu, choose Variables.
  2. Add three variables with the following keys and values:
    1. Key glue_job_dag.glue_connections with value MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. Key glue_job_dag.glue_job_name with value AirflowBlogJob.
    3. Key glue_job_dag.strategy with value capacity.

Run the job with a dynamic subnet assignment

Now you’re ready to run the workflow and see the strategy dynamically reordering the connections.

  1. On the Airflow UI, choose DAGs, and on the row glue_job_dag, choose the play icon.
  2. On the Browse menu, choose Task instances.
  3. On the instances table, scroll right to display the Log Url and choose the icon on it to open the log.

The log will update as the task runs; you can locate the line starting with “Running Glue job with the connection order:” and the previous lines showing details of the connection IPs and the category assigned. If an error occurs, you’ll see the details in this log.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane, then choose the job AirflowBlogJob.
  2. On the Runs tab, choose the run instance, then the Output logs link, which will open a new tab.
  3. On the new tab, use the log stream link to open it.

It will display the IP that the driver was assigned and which subnet it belongs to, which should match the connection indicated by Airflow (if the log is not displayed, choose Resume so it gets updated as soon as it’s available).

  1. On the Airflow UI, edit the Airflow variable glue_job_dag.strategy to set it to random.
  2. Run the DAG multiple times and see how the ordering changes.

Clean up

If you no longer need the deployment, delete the resources to avoid any further charges:

  1. Delete the Python script you uploaded, so the S3 bucket can be automatically deleted in the next step.
  2. Delete the CloudFormation stack.
  3. Delete the AWS Glue job.
  4. Delete the script that the job saved in Amazon S3.
  5. Delete the connections you created as part of this post.

Conclusion

In this post, we showed how AWS Glue and Amazon MWAA can work together to build more advanced custom workflows, while minimizing the operational and management overhead. This solution gives you more control about how your AWS Glue job runs to meet special operational, network, or security requirements.

You can deploy your own Amazon MWAA environment in multiple ways, such as with the template used in this post, on the Amazon MWAA console, or using the AWS CLI. You can also implement your own strategies to orchestrate AWS Glue jobs, based on your network architecture and requirements (for instance, to run the job closer to the data when possible).


About the authors

Michael Greenshtein is an Analytics Specialist Solutions Architect for the Public Sector.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Deploying an EMR cluster on AWS Outposts to process data from an on-premises database

Post Syndicated from Macey Neff original https://aws.amazon.com/blogs/compute/deploying-an-emr-cluster-on-aws-outposts-to-process-data-from-an-on-premises-database/

seThis post is written by Eder de Mattos, Sr. Cloud Security Consultant, AWS and Fernando Galves, Outpost Solutions Architect, AWS.

In this post, you will learn how to deploy an Amazon EMR cluster on AWS Outposts and use it to process data from an on-premises database. Many organizations have regulatory, contractual, or corporate policy requirements to process and store data in a specific geographical location. These strict requirements become a challenge for organizations to find flexible solutions that balance regulatory compliance with the agility of cloud services. Amazon EMR is the industry-leading cloud big data platform for data processing, interactive analysis, and machine learning (ML) that uses open-source frameworks. With Amazon EMR on Outposts, you can seamlessly use data analytics solutions to process data locally in your on-premises environment without moving data to the cloud. This post focuses on creating and configuring an Amazon EMR cluster on AWS Outposts rack using Amazon Virtual Private Cloud (Amazon VPC) endpoints and keeping the networking traffic in the on-premises environment.

Architecture overview

In this architecture, there is an Amazon EMR cluster created in an AWS Outposts subnet. The cluster retrieves data from an on-premises PostgreSQL database, employs a PySpark Step for data processing, and then stores the result in a new table within the same database. The following diagram shows this architecture.

Architecture overview

Figure 1 Architecture overview

Networking traffic on premises: The communication between the EMR cluster and the on-premises PostgreSQL database is through the Local Gateway. The core Amazon Elastic Compute Cloud (Amazon EC2) instances of the EMR cluster are associated with Customer-owned IP addresses (CoIP), and each instance has two IP addresses: an internal IP and a CoIP IP. The internal IP is used to communicate locally in the subnet, and the CoIP IP is used to communicate with the on-premises network.

Amazon VPC endpoints: Amazon EMR establishes communication with the VPC through an interface VPC endpoint. This communication is private and conducted entirely within the AWS network instead of connecting over the internet. In this architecture, VPC endpoints are created on a subnet in the AWS Region.

The support files used to create the EMR cluster are stored in an Amazon Simple Storage Service (Amazon S3) bucket. The communication between the VPC and Amazon S3 stays within the AWS network. The following files are stored in this S3 bucket:

  • get-postgresql-driver.sh: This is a bootstrap script to download the PostgreSQL driver to allow the Spark step to communicate to the PostgreSQL database through JDBC. You can download it through the GitHub repository for this Amazon EMR on Outposts blog post.
  • postgresql-42.6.0.jar: PostgreSQL binary JAR file for the JDBC driver.
  • spark-step-example.py: Example of a Step application in PySpark to simulate the connection to the PostgreSQL database.

AWS Systems Manager is configured to manage the EC2 instances that belong to the EMR cluster. It uses an interface VPC endpoint to allow the VPC to communicate privately with the Systems Manager.

The database credentials to connect to the PostgreSQL database are stored in AWS Secrets Manager. Amazon EMR integrates with Secrets Manager. This allows the secret to be stored in the Secrets Manager and be used through its ARN in the cluster configuration. During the creation of the EMR cluster, the secret is accessed privately through an interface VPC endpoint and stored in the variable DBCONNECTION in the EMR cluster.

In this solution, we are creating a small EMR cluster with one primary and one core node. For the correct sizing of your cluster, see Estimating Amazon EMR cluster capacity.

There is additional information to improve the security posture for organizations that use AWS Control Tower landing zone and AWS Organizations. The post Architecting for data residency with AWS Outposts rack and landing zone guardrails is a great place to start.

Prerequisites

Before deploying the EMR cluster on Outposts, you must make sure the following resources are created and configured in your AWS account:

  1. Outposts rack are installed, up and running.
  2. Amazon EC2 key pair is created. To create it, you can follow the instructions in Create a key pair using Amazon EC2 in the Amazon EC2 user guide.

Deploying the EMR cluster on Outposts

1.      Deploy the CloudFormation template to create the infrastructure for the EMR cluster

You can use this AWS CloudFormation template to create the infrastructure for the EMR cluster. To create a stack, you can follow the instructions in Creating a stack on the AWS CloudFormation console in the AWS CloudFormation user guide.

2.      Create an EMR cluster

To launch a cluster with Spark installed using the console:

Step 1: Configure Name and Applications

  1. Sign in to the AWS Management Console, and open the Amazon EMR console.
  2. Under EMR on EC2, in the left navigation pane, select Clusters, and then choose Create Cluster.
  3. On the Create cluster page, enter a unique cluster name for the Name
  4. For Amazon EMR release, choose emr-6.13.0.
  5. In the Application bundle field, select Spark 3.4.1 and Zeppelin 0.10.1, and unselect all the other options.
  6. For the Operating system options, select Amazon Linux release.

Create Cluster Figure 2: Create Cluster

Step 2: Choose Cluster configuration method

  1. Under the Cluster configuration, select Uniform instance groups.
  2. For the Primary and the Core, select the EC2 instance type available in the Outposts rack that is supported by the EMR cluster.
  3. Remove the instance group Task 1 of 1.

Remove the instance group Task 1 of 1

Figure 3: Remove the instance group Task 1 of 1

Step 3: Set up Cluster scaling and provisioning, Networking and Cluster termination

  1. In the Cluster scaling and provisioning option, choose Set cluster size manually and type the value 1 for the Core
  2. On the Networking, select the VPC and the Outposts subnet.
  3. For Cluster termination, choose Manually terminate cluster.

Step 4: Configure the Bootstrap actions

A. In the Bootstrap actions, add an action with the following information:

    1. Name: copy-postgresql-driver.sh
    2. Script location: s3://<bucket-name>/copy-postgresql-driver.sh. Modify the <bucket-name> variable to the bucket name you specified as a parameter in Step 1.

Add bootstrap action

Figure 4: Add bootstrap action

Step 5: Configure Cluster logs and Tags

a. Under Cluster logs, choose Publish cluster-specific logs to Amazon S3 and enter s3://<bucket-name>/logs for the field Amazon S3 location. Modify the <bucket-name> variable to the bucket name you specified as a parameter in Step 1.

Amazon S3 location for cluster logs

Figure 5: Amazon S3 location for cluster logs

b. In Tags, add new tag. You must enter for-use-with-amazon-emr-managed-policies for the Key field and true for Value.

Add tags

Figure 6: Add tags

Step 6: Set up Software settings and Security configuration and EC2 key pair

a. In the Software settings, enter the following configuration replacing the Secret ARN created in Step 1:

[
          {
                    "Classification": "spark-defaults",
                    "Properties": {
                              "spark.driver.extraClassPath": "/opt/spark/postgresql/driver/postgresql-42.6.0.jar",
                              "spark.executor.extraClassPath": "/opt/spark/postgresql/driver/postgresql-42.6.0.jar",
                              "[email protected]":
                                         "arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>"
                    }
          }
]

This is an example of the Secret ARN replaced:

Example of the Secret ARN replaced

Figure 7: Example of the Secret ARN replaced

b. For the Security configuration and EC2 key pair, choose the SSH key pair.

Step 7: Choose Identity and Access Management (IAM) roles

a. Under Identity and Access Management (IAM) roles:

    1. In the Amazon EMR service role:
      • Choose AmazonEMR-outposts-cluster-role for the Service role.
    2. In EC2 instance profile for Amazon EMR
      • Choose AmazonEMR-outposts-EC2-role.

Choose the service role and instance profile

Figure 8: Choose the service role and instance profile

Step 8: Create cluster

  1. Choose Create cluster to launch the cluster and open the cluster details page.

Now, the EMR cluster is starting. When your cluster is ready to process tasks, its status changes to Waiting. This means the cluster is up, running, and ready to accept work.

Result of the cluster creation

Figure 9: Result of the cluster creation

3.      Add CoIPs to EMR core nodes

You need to allocate an Elastic IP from the CoIP pool and associate it with the EC2 instance of the EMR core nodes. This is necessary to allow the core nodes to access the on-premises environment. To allocate an Elastic IP, follow the instructions in Allocate an Elastic IP address in Amazon EC2 User Guide for Linux Instances. In Step 5, choose the Customer-owned pool of IPV4 addresses.

Once the CoIP IP is allocated, associate it with each EC2 instance of the EMR core node. Follow the instructions in Associate an Elastic IP address with an instance or network interface in Amazon EC2 User Guide for Linux Instances.

Checking the configuration

  1. Make sure the EC2 instance of the core nodes can ping the IP of the PostgreSQL database.

Connect to the Core node EC2 instance using Systems Manager and ping the IP address of the PostgreSQL database.

Connectivity test

Figure 10: Connectivity test

  1. Make sure the Status of the EMR cluster is Waiting.

: Cluster is ready and waiting

Figure 11: Cluster is ready and waiting

Adding a step to the Amazon EMR cluster

You can use the following Spark application to simulate the data processing from the PostgreSQL database.

spark-step-example.py:

import os
from pyspark.sql import SparkSession

if __name__ == "__main__":

    # ---------------------------------------------------------------------
    # Step 1: Get the database connection information from the EMR cluster 
    #         configuration
    dbconnection = os.environ.get('DBCONNECTION')
    #    Remove brackets
    dbconnection_info = (dbconnection[1:-1]).split(",")
    #    Initialize variables
    dbusername = ''
    dbpassword = ''
    dbhost = ''
    dbport = ''
    dbname = ''
    dburl = ''
    #    Parse the database connection information
    for dbconnection_attribute in dbconnection_info:
        (key_data, key_value) = dbconnection_attribute.split(":", 1)

        if key_data == "username":
            dbusername = key_value
        elif key_data == "password":
            dbpassword = key_value
        elif key_data == 'host':
            dbhost = key_value
        elif key_data == 'port':
            dbport = key_value
        elif key_data == 'dbname':
            dbname = key_value

    dburl = "jdbc:postgresql://" + dbhost + ":" + dbport + "/" + dbname

    # ---------------------------------------------------------------------
    # Step 2: Connect to the PostgreSQL database and select data from the 
    #         pg_catalog.pg_tables table
    spark_db = SparkSession.builder.config("spark.driver.extraClassPath",                                          
               "/opt/spark/postgresql/driver/postgresql-42.6.0.jar") \
               .appName("Connecting to PostgreSQL") \
               .getOrCreate()

    #    Connect to the database
    data_db = spark_db.read.format("jdbc") \
        .option("url", dburl) \
        .option("driver", "org.postgresql.Driver") \
        .option("query", "select count(*) from pg_catalog.pg_tables") \
        .option("user", dbusername) \
        .option("password", dbpassword) \
        .load()

    # ---------------------------------------------------------------------
    # Step 3: To do the data processing
    #
    #    TO-DO

    # ---------------------------------------------------------------------
    # Step 4: Save the data into the new table in the PostgreSQL database
    #
    data_db.write \
        .format("jdbc") \
        .option("url", dburl) \
        .option("dbtable", "results_proc") \
        .option("user", dbusername) \
        .option("password", dbpassword) \
        .save()

    # ---------------------------------------------------------------------
    # Step 5: Close the Spark session
    #
    spark_db.stop()
    # ---------------------------------------------------------------------

You must upload the file spark-step-example.py to the bucket created in Step 1 of this post before submitting the Spark application to the EMR cluster. You can get the file at this GitHub repository for a Spark step example.

Submitting the Spark application step using the Console

To submit the Spark application to the EMR cluster, follow the instructions in To submit a Spark step using the console in the Amazon EMR Release Guide. In Step 4 of this Amazon EMR guide, provide the following parameters to add a step:

  1. choose Cluster mode for the Deploy mode
  2. type a name for your step (such as Step 1)
  3. for the Application location, choose s3://<bucket-name>/spark-step-example.py and replace the <bucket-name> variable to the bucket name you specified as a parameter in Step 1
  4. leave the Spark-submit options field blank

Add a step to the EMR cluster

Figure 12: Add a step to the EMR cluster

The Step is created with the Status Pending. When it is done, the Status changes to Completed.

Step executed successfully

Figure 13: Step executed successfully

Cleaning up

When the EMR cluster is no longer needed, you can delete the resources created to avoid incurring future costs by following these steps:

  1. Follow the instructions in Terminate a cluster with the console in the Amazon EMR Documentation Management Guide. Remember to turn off the Termination protection.
  2. Dissociate and release the CoIP IPs allocated to the EC2 instances of the EMR core nodes.
  3. Delete the stack in the AWS CloudFormation using the instructions in Deleting a Stack on the AWS CloudFormation console in the AWS CloudFormation User Guide

Conclusion

Amazon EMR on Outposts allows you to use the managed services offered by AWS to perform big data processing close to your data that needs to remain on-premises. This architecture eliminates the need to transfer on-premises data to the cloud, providing a robust solution for organizations with regulatory, contractual, or corporate policy requirements to store and process data in a specific location. With the EMR cluster accessing the on-premises database directly through local networking, you can expect faster and more efficient data processing without compromising on compliance or agility. To learn more, visit the Amazon EMR on AWS Outposts product overview page.

Simplify authentication with native LDAP integration on Amazon EMR

Post Syndicated from Stefano Sandona original https://aws.amazon.com/blogs/big-data/simplify-authentication-with-native-ldap-integration-on-amazon-emr/

Many companies have corporate identities stored inside identity providers (IdPs) like Active Directory (AD) or OpenLDAP. Previously, customers using Amazon EMR could integrate their clusters with Active Directory by configuring a one-way realm trust between their AD domain and the EMR cluster Kerberos realm. For more details, refer to Tutorial: Configure a cross-realm trust with an Active Directory domain.

This setup has been a key enabler to make corporate users and groups available inside EMR clusters and define access control policies to control their data access (for example, through the Amazon EMR native Apache Ranger integration).

Although this option is still available, Amazon EMR has released support for native LDAP authentication, a new security feature that simplifies the integration with OpenLDAP and Active Directory.

This feature enables the following:

  • automatic configuration of security for the supported applications (HiveServer2, Trino, Presto and Livy) to use the Kerberos protocol under the hood and LDAP as external authentication. This allows a more straightforward integration from external tools that, to connect with cluster endpoints, do not have anymore to setup kerberos authentication but, instead, can simply be configured to provide an LDAP username and password
  • fine-grained access control (FGAC) over who can access your EMR clusters through SSH
  • fine-grained authorization policies on top of Hive Metastore database and tables if used in combination with the native Amazon EMR Apache Ranger integration.

In this post, we dive deep into the Amazon EMR LDAP authentication, showing how the authentication flow works, how to retrieve and test the needed LDAP configurations, and how to confirm an EMR cluster is properly LDAP integrated.

Using the information on this blog:

  • Teams managing EMR clusters can enhance coordination with their LDAP IdP administrators in order to request the proper information and properly perform pre-configuration tests
  • EMR cluster end-users can understand how straightforward it is to connect from external tools to LDAP-enabled EMR clusters compared to the previous Kerberos-based authentication

How Amazon EMR LDAP integration works

When talking about authentication in the context of EMR frameworks, we can distinguish between two levels:

  • External authentication – Used by users and external components to interact with the installed frameworks
  • Internal authentication – Used within the frameworks to authenticate the communications of internal components

With this new feature, internal framework authentication is still managed through Kerberos, but this is transparent to the end-users or external services that, on the other side, use a user name and password to authenticate.

The supported EMR installed frameworks implement an LDAP-based authentication method that, given a set of user name and password credentials, validates them against the LDAP endpoint and, in the case of success, enables the use of the framework.

The following diagram summarizes how the authentication flow works.

The workflow includes the following steps:

  1. A user connects with one of the supported endpoints (such as HiveServer2, Trino/Presto Coordinator, or Hue WebUI) and provides their corporate credentials (user name and password).
  2. The contacted framework uses a custom authenticator that performs the authentication using the EMR Secret Agent service running inside the cluster instances.
  3. The EMR Secret Agent service validates the provided credentials against the LDAP endpoint.
  4. In the case of success, the following occurs:
    • A Kerberos principal is created for the specific user on the cluster MIT key distribution center (MIT KDC) running inside the primary node.
    • The Kerberos principal keytab is created inside the home directory of the user on the primary node.

After the authentication is complete, the user can start using the framework.

Inside all the cluster instances, the SSSD service is configured to retrieve users and groups from the LDAP endpoint and make them available as system users.

The authentication flow when connecting with SSH is a bit different, and is summarized in the following diagram.

The workflow includes the following steps:

  1. A user connects with SSH to the EMR primary instance, providing the corporate credentials (user name and password).
  2. The contacted SSHD service uses the SSSD service to validate the provided credentials.
  3. The SSSD service validates the provided credentials against the LDAP endpoint. In the case of success, the user lands on the related home directory. At this point, the user can use the different CLIs (beeline, trino-cli, presto-cli, curl) to access Hive, Trino/Presto, or Livy.
  4. To use the Spark CLIs (spark-submit, pyspark, spark-shell), the user has to invoke the ldap-kinit script and provide the requested user name and password.
  5. The authentication is performed using the EMR Secret Agent service running inside the cluster instances.
  6. The EMR Secret Agent service validates the provided credentials against the LDAP endpoint.
  7. In the case of success, the following occurs:
    • A Kerberos principal is created for the specific user on the cluster MIT KDC running inside the primary node.
    • The Kerberos principal keytab is created inside the home directory of the user on the primary node.
    • A kerberos ticket is obtained and stored on the user Kerberos ticket cache on the primary node.

After the ldap-kinit script completes, the user can start using the Spark CLIs.

In the following sections, we show how to retrieve the required LDAP setting values and investigate how to launch a cluster with EMR LDAP authentication and test it.

Find the proper LDAP parameters

To configure LDAP authentication for Amazon EMR, the first step is to retrieve the LDAP properties to be used to set up your cluster. You need the following information:

  • The LDAP server DNS name
  • A certificate in PEM format to be used to interact over Secure LDAP (LDAPS) with the LDAP endpoint
  • The LDAP user search base, which is a path (or branch) on the LDAP tree from where to search users (only users belonging to this branch will be retrieved)
  • The LDAP groups search base, which is a path (or branch) on the LDAP tree from where to search groups (only groups belonging to this branch will be retrieved)
  • The LDAP server bind user credentials, which are the user name and password for a service user (usually called a bind user) to be used to trigger LDAP queries and retrieve user information such as user name and group membership.

With Active Directory, an AD admin can retrieve this information directly from the Active Directory Users and Computers tool. When you choose a user in this tool, you can see the related attributes (for example, distinguishedName). The following screenshot shows an example.

From the screenshot, we can see that the distinguishedName for the user john is CN=john,OU=users,OU=italy,OU=emr,DC=awsemr,DC=com, which means that john belongs to the following search bases, ordered from the most narrow to the most wide:

  • OU=users,OU=italy,OU=emr,DC=awsemr,DC=com
  • OU=italy,OU=emr,DC=awsemr,DC=com
  • OU=emr,DC=awsemr,DC=com
  • DC=awsemr,DC=com

Depending on the amount of entries inside a company LDAP directory, using a wide search base may lead to long retrieval times and timeouts. It’s a good practice to configure the search base to be as narrow as possible in order to include all the needed users. In the preceding example, OU=users,OU=italy,OU=emr,DC=awsemr,DC=com may be a good search base if all the users you want to provide access to the EMR cluster are part of that Organizational Unit.

Another way to retrieve user attributes is by using the ldapsearch tool. You can use this method for Active Directory as well as OpenLDAP, and it’s extremely useful to test the connectivity with the LDAP endpoint.

The following is an example with Active Directory (OpenLDAP is similar).

The LDAP endpoint should be resolvable and reachable by Amazon Elastic Compute Cloud (Amazon EC2) EMR cluster instances via TCP on port 636. It’s suggested to run the test from an Amazon Linux 2 EC2 instance belonging to the same subnet as the EMR cluster and having the same EMR security group associated as the EMR cluster instances.

After you launch an EC2 instance, install the nc tool and test the DNS resolution and connectivity. Assuming that DC1.awsemr.com is the DNS name for the LDAP endpoint, run the following commands:

sudo yum install nc
nc -vz DC1.awsemr.com 636

If the DNS resolution isn’t working properly, you should receive an error like the following:

Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Could not resolve hostname "DC1.awsemr.com": Name or service not known. QUITTING.

If the endpoint is not reachable, you should receive an error like the following:

Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connection timed out.

In either of these cases, the networking and DNS team should be involved in order to troubleshot and solve the issues.

In case of success, the output should look like the following:

Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 10.0.1.235:636.
Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.

If everything works, proceed with the testing and install the openldap clients as follows:

sudo yum install openldap-clients

Then run ldapsearch commands to retrieve information about users and groups from the LDAP endpoint. The following are sample ldapsearch commands:

#Customize these 6 variables
LDAPS_CERTIFICATE=/path/to/ldaps_cert.pem
LDAPS_ENDPOINT=DC1.awsemr.com
BINDUSER="CN=binduser,CN=Users,DC=awsemr,DC=com"
BINDUSER_PASSWORD=binduserpassword
SEARCH_BASE=DC=awsemr,DC=com
USER_TO_SEARCH=john
FILTER=(sAMAccountName=${USER_TO_SEARCH})
INFO_TO_SEARCH="*"

#Search user
LDAPTLS_CACERT=${LDAPS_CERTIFICATE} ldapsearch -LLL -x -H ldaps://${LDAPS_ENDPOINT} -v -D "${BINDUSER}" -w "${BINDUSER_PASSWORD}" -b "${SEARCH_BASE}" "${FILTER}" "${INFO_TO_SEARCH}"

We use the following parameters:

  • -x – This enables simple authentication.
  • -D – This indicates the user to perform the search.
  • -w – This indicates the user password.
  • -H – This indicates the URL of the LDAP server.
  • -b – This is the base search.
  • LDAPTLS_CACERT – This indicates the LDAPS endpoint SSL PEM public certificate or the LDAPS endpoint root certificate authority SSL PEM public certificate. This can be obtained from an AD or OpenLDAP admin user.

The following is a sample output of the preceding command:

filter: (sAMAccountName=john)
requesting: *
dn: CN=john,OU=users,OU=italy,OU=emr,DC=awsemr,DC=com
objectClass: top
objectClass: person
objectClass: organizationalPerson
objectClass: user
cn: john
givenName: john
distinguishedName: CN=john,OU=users,OU=italy,OU=emr,DC=awsemr,DC=com
instanceType: 4
whenCreated: 20230804094021.0Z
whenChanged: 20230804094021.0Z
displayName: john
uSNCreated: 262459
memberOf: CN=data-engineers,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com
uSNChanged: 262466
name: john
objectGUID:: gTxn8qYvy0SVL+mYAAbb8Q==
userAccountControl: 66048
badPwdCount: 0
codePage: 0
countryCode: 0
badPasswordTime: 0
lastLogoff: 0
lastLogon: 0
pwdLastSet: 133356156212864439
primaryGroupID: 513
objectSid:: AQUAAAAAAAUVAAAAIKyNe7Dn3azp7Sh+rgQAAA==
accountExpires: 9223372036854775807
logonCount: 0
sAMAccountName: john
sAMAccountType: 805306368
userPrincipalName: [email protected]
objectCategory: CN=Person,CN=Schema,CN=Configuration,DC=awsemr,DC=com
dSCorePropagationData: 20230804094021.0Z
dSCorePropagationData: 16010101000000.0Z

As we can see from the sample output, the user john is identified by the distinguished name CN=john,OU=users,OU=italy,OU=emr,DC=awsemr,DC=com, and the data-engineers group to which the user belongs (memberOf value) is identified by the distinguished name CN=data-engineers,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com.

We can run our ldapsearch queries to retrieve the user and group information using a narrowed search base:

#Customize these 9 variables
LDAPS_CERTIFICATE=/path/to/ldaps_cert.pem
LDAPS_ENDPOINT=DC1.awsemr.com
BINDUSER="CN=binduser,CN=Users,DC=awsemr,DC=com"
BINDUSER_PASSWORD=binduserpassword
SEARCH_BASE=DC=awsemr,DC=com
USER_SEARCH_BASE=OU=users,OU=italy,OU=emr,DC=awsemr,DC=com
GROUPS_SEARCH_BASE=OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com
USER_TO_SEARCH=john
GROUP_TO_SEARCH=data-engineers

#Search User
LDAPTLS_CACERT=${LDAPS_CERTIFICATE} ldapsearch -LLL -x -H ldaps://${LDAPS_ENDPOINT} -v -D "${BINDUSER}" -w "${BINDUSER_PASSWORD}" -b "${USER_SEARCH_BASE}" "(sAMAccountName=${USER_TO_SEARCH})" "*"

#Search Group
LDAPTLS_CACERT=${LDAPS_CERTIFICATE} ldapsearch -LLL -x -H ldaps://${LDAPS_ENDPOINT} -v -D "${BINDUSER}" -w "${BINDUSER_PASSWORD}" -b "${GROUPS_SEARCH_BASE}" "(sAMAccountName=${GROUP_TO_SEARCH})" "*"

You can also apply other filters while searching. For more information about how to create LDAP filters, refer to LDAP Filters.

By running ldapsearch commands, you can test the LDAP connectivity and LDAP properties, and determine the needed setup.

Test the solution

After you have verified that the connectivity to the LDAP endpoint is open and the LDAP configurations are correct, proceed with setting up the environment to launch an EMR LDAP-enabled cluster.

Create AWS Secret Manager secrets

Before you create the EMR security configuration, you need to create two AWS Secret Manager secrets. You use these credentials to interact with the LDAP endpoint and retrieve user details such as user name and group membership.

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.
  3. For Secret type, select Other type of secret.
  4. Create a new secret specifying the binduser distinguished name as the key and the binduser password as the value.
  5. Create a second secret specifying in plaintext the LDAPS endpoint SSL public certificate or the LDAPS root certificate authority public certificate.
    This certificate is trusted, allowing a secure communication between the EMR cluster and the LDAPS endpoint.

Create the EMR security configuration

Complete the following steps to create the EMR security configuration:

  1. On the Amazon EMR console, choose Security configurations under EMR on EC2 in the navigation pane.
  2. Choose Create.
  3. For Security configuration name, enter a name.
  4. For Security configuration setup options, select Choose custom settings.
  5. For Encryption, select Turn on in-transit encryption.
  6. For Certificate provider type¸ select PEM.
  7. For Choose PEM certificate location, enter either a PEM bundle located in Amazon Simple Storage Service (Amazon S3) or a Java custom certificate provider.
    Note that in-transit encryption is mandatory in order to use the LDAP authentication feature. For more information about in-transit encryption, refer to Providing certificates for encrypting data in transit with Amazon EMR encryption.
  8. Choose Next.
  9. Select LDAP for Authentication protocol.
  10. For LDAP server location, enter the LDAPS endpoint (ldaps://<ldap_endpoint_DNS_name>).
  11. For LDAP SSL certificate, enter the second secret you created in Secrets Manager.
  12. For LDAP access filter, enter an LDAP filter that is applied in order to restrict access to a subset of users retrieved from the LDAP user search base. If the field is left empty, no filters are applied and all users belonging to the LDAP user search base can access the EMR LDAP-protected endpoints with their corporate credentials. The following are example filters and their functions:
    • (objectClass=person) – Filter users with the attribute objectClass set as person
    • (memberOf=CN=admins,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com) – Filter users belonging to the admins group
    • (|(memberof=CN=data-engineers,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com)(memberof=CN=admins,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com)) – Filter users belonging either to the data-engineers or the admins group (which we use for this post)
  13. Enter values for LDAP user search base and LDAP group search base. Note that the two search bases do not support inline filters (for example, the following is not supported: OU=users,OU=italy,OU=emr,DC=awsemr,DC=com?subtree?(|(memberof=CN=data-engineers,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com)(memberof=CN=admins,OU=groups,OU=italy,OU=emr,DC=awsemr,DC=com))).
  14. Select Turn on SSH login. This is needed only if you want your LDAP users to be able to SSH inside cluster instances with their corporate credentials. If SSH login is enabled, the LDAP access filter is needed—otherwise, SSH authentication will fail.
  15. For LDAP server bind credentials, enter the first secret you created in Secrets Manager.
  16. In the Authorization section, keep the defaults selected:
    • For IAM role for applications, select Instance profile.
    • For Fine-grained access control method, select None.
  17. Choose Next.
  18. Review the configuration summary and choose Create.

Launch the EMR cluster

You can launch the EMR cluster using the AWS Management Console, the AWS Command Line Interface (AWS CLI), or any AWS SDK.

When you’re creating the EMR on EC2 cluster, be sure to specify the following configurations:

  • EMR version – Use Amazon EMR 6.12.0 or above.
  • Applications – Select Hadoop, Spark, Hive, Hue, Livy and Presto/Trino.
  • Security configuration – Specify the security configuration you created in the previous step.
  • EC2 key pair – Use an existing key pair.
  • Network and security groups – Use a configuration that allows the EMR EC2 instances to interact with the LDAPS endpoint. In the Find the proper LDAP parameters section, you should have confirmed a valid setup.

Confirm the LDAP authentication is working

When the cluster is up and running, you can check the LDAP authentication is working properly.

If SSH login was enabled as part of LDAP authentication inside the EMR SecurityConfiguration, you can SSH into your cluster by specifying an LDAP user, prompting the related password when requested:

ssh myldapuser@<emr_primary_node>

If SSH login was disabled, you can SSH inside the cluster by using the EC2 key pair specified during cluster creation:

ssh -i mykeypair.pem ec2-user@<emr_primary_node>

An alternative way to access the primary instance, if you prefer, is to use Session Manager, a capability of AWS Systems Manager. For more information, refer to Connect to your Linux instance with AWS Systems Manager Session Manager.

When you’re inside the primary instance, you can test that the LDAP users and groups are properly retrieved by using the id command. The following is a sample command to check if the user john is properly retrieved with the related groups:

[ec2-user@ip-10-0-2-237 ~]# id john
uid=941601122(john) gid=941600513(users-group) groups=941600513(users-group),941601123(data-engineers)

You can then test authentication on the different installed frameworks.

First, let’s retrieve the frameworks’ public certificate and store it inside a truststore. All the frameworks share the same public certificate (the one we used to set up in-transit encryption), so you can use any of the SSL protected endpoints (Hive port 10000, Presto/Trino port 8446, Livy port 8998) to retrieve it. Take the certificate from the HiveServer2 endpoint (port 10000):

#Export Hive Server 2 public SSL certificate to a PEM file
openssl s_client -showcerts -connect $(hostname -f):10000 </dev/null 2>/dev/null|openssl x509 -outform PEM > certificate.pem

#Import the PEM certificate inside a truststore
echo "yes" | keytool -import -alias hive_cert -file certificate.pem -storetype JKS -keystore truststore.jks -storepass myStrongPassword

Then use this truststore to securely communicate with the different frameworks.

Use the following code to test HiveServer2 authentication with beeline:

#Use the truststore to connect to the Hive Server 2
beeline -u "jdbc:hive2://$(hostname -f):10000/default;ssl=true;sslTrustStore=truststore.jks;trustStorePassword=myStrongPassword" -n john -p johnPassword 

If using Presto, test Presto authentication with the presto CLI (provide the user password when requested):

#Use the truststore to connect to the Presto coordinator
presto-cli \
--user john \
--password \
--catalog hive \
--server https://$(hostname -f):8446 \
--truststore-path truststore.jks \
--truststore-password myStrongPassword

If using Trino, test Trino authentication with the trino CLI (provide the user password when requested):

#Use the truststore to connect to the Trino coordinator
trino-cli \
--user john \
--password \
--catalog hive \
--server https://$(hostname -f):8446 \
--truststore-path truststore.jks \
--truststore-password myStrongPassword

Test Livy authentication with curl:

#Trust the PEM certificte to connect to the Livy server

#Start session
curl --cacert certificate.pem -X POST \
-u "john:johnPassword" \
--data '{"kind": "spark"}' \
-H "Content-Type: application/json" \
https://$(hostname -f):8998/sessions \
-c cookies.txt

#Example of output
#{"id":0,"name":null,"appId":null,"owner":"john","proxyUser":"john","state":"starting","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":null},"log":["stdout: ","\nstderr: ","\nYARN Diagnostics: "]}

Test Spark commands with pyspark:

#SSH inside the primary instance with the specific user
ssh john@<emr-primary-node>
#Or impersonate the user
sudo su - john

#Create a keytab and obtain a kerberos ticket running the ldap-kinit tool
$ ldap-kinit
Username: john
Password: 

#Output
{"message":"ok","contents":{"username":"john","expirationTime":"2023-09-14T15:24:06.303Z[UTC]"}}

#Check the kerberos ticket has been created
$ klist

# Test spark CLIs
$ pyspark

>>> spark.sql("show databases").show()
>>> quit()

Note that here we tested the authentication from within the cluster, but we can interact with Trino, Hive, Presto and Livy even from outside the cluster as far as connectivity and DNS resolution are properly configured. Spark CLIs are the only ones which can be used only from inside the cluster.

To test Hue authentication, complete the following steps:

  1. Navigate to the Hue web UI hosted on http://<emr_primary_node>:8888/ and provide an LDAP user name and password.
  2. Test SQL queries inside the Hive and Trino/Presto editors.

To test with an external SQL tool (such as DBeaver connecting to Trino), complete the following steps. Be sure to configure the EMR primary node security group so that it allows TCP traffic from the DBeaver IP to the desired framework endpoint port (for example, 10000 for HiveServer2, 8446 for Trino/Presto) and to properly configure DNS resolution on the DBeaver client machine to properly resolve the EMR primary node hostname.

  1. From your EMR cluster primary instance, copy to an S3 bucket the files truststore.jks (previously created) and /usr/lib/trino/trino-jdbc/trino-jdbc-XXX-amzn-0.jar (change the version XXX depending on the EMR version).
  2. Download on your DBeaver client machine the truststore.jks and trino-jdbc-XXX-amzn-0.jar files.
  3. Open DBeaver and choose Database, then choose Driver Manager.
  4. Choose New to create a new driver.
  5. On the Settings tab, provide the following information:
    • For Driver Name, enter EMR Trino.
    • For Class Name, enter io.trino.jdbc.TrinoDriver.
    • For URL Template, enter jdbc:trino://{host}:{port}.
  6. On the Libraries tab, complete the following steps:
    • Choose Add File.
    • Choose the Trino JDBC driver JAR file from the local file system (trino-jdbc-XXX-amzn-0.jar).
  7. Choose OK to create the driver.
  8. Choose Database and New Database Connection.
  9. On the Main tab, specify the following:
    • For Connect by, select Host.
    • For Host, enter the EMR primary node.
    • For Port, enter the Trino port (8446 by default).
  10. On the Driver properties tab, add the following properties:
    • Add SSL with True as the value.
    • Add SSLTrustStorePath with the truststore.jks file location as the value.
    • Add SSLTrustStorePassword with the truststore.jks password that you used to create it as the value.
  11. Choose Finish.
  12. Choose the created connection and choose the Connect icon.
  13. Enter your LDAP user name and password, then choose OK.

If everything is working, you should be able to browse the Trino catalogs, databases, and tables in the navigation pane. To run queries, choose SQL Editor, then choose Open SQL Editor.

From the SQL Editor, you can query your tables.

Next steps

The new Amazon EMR LDAP authentication feature simplifies the way users can gain access to EMR installed frameworks. When users are using a framework, you may want to govern the data they can access. For this specific topic, you can use LDAP authentication in combination with the native EMR Apache Ranger integration. For more information, refer to Integrate Amazon EMR with Apache Ranger.

Clean up

Complete the following cleanup actions to remove the resources you created following this post and avoid incurring additional costs. For this post, we clean up using the AWS CLI. You can also clean up using similar actions via the console.

  1. If you launched an EC2 instance to check the LDAP connectivity and don’t need it anymore, delete it with the following command (specify your instance ID):
    aws ec2 terminate-instances \
    --instance-ids i-XXXXXXXX \
    --region <your-aws-region>

  2. If you launched an EC2 instance to test DBeaver and don’t need it anymore, you can use the preceding command to delete it.
  3. Delete the EMR cluster with the following command (specify your EMR cluster ID):
    aws emr terminate-clusters \
    --cluster-ids j-XXXXXXXXXXXXX \
    --region <your-aws-region>

    Note that if the EMR cluster has Termination Protection enabled, before you run the preceding terminate-clusters command, you have to disable it. You can do so with the following command (specify your EMR cluster ID):

    aws emr modify-cluster-attributes \
    --cluster-ids j-XXXXXXXXXXXXX \
    --no-termination-protected \
    --region eu-west-1

  4. Delete the EMR security configuration with the following command:
    aws emr delete-security-configuration \
    --name <your-security-configuration> \
    --region <your-aws-region>

  5. Delete the Secrets Manager secrets with the following commands:
    aws secretsmanager delete-secret \
    --secret-id <first-secret-name> \
    --force-delete-without-recovery \
    --region <your-aws-region>
    
    aws secretsmanager delete-secret \
    --secret-id <second-secret-name> \
    --force-delete-without-recovery \
    --region <your-aws-region>

Conclusion

In this post, we discussed how you can configure and test LDAP authentication on EMR on EC2 clusters. We discussed how to retrieve the needed LDAP settings, test connectivity with the LDAP endpoint, configure your EMR security configuration, and test that the LDAP authentication is properly working. This post also highlighted how the authentication flow is simplified compared to the standard Active Directory cross-realm trust configuration. To learn more about this feature, refer to Use Active Directory or LDAP servers for authentication with Amazon EMR.


About the Authors

Stefano Sandona is a Senior Big Data Solution Architect at AWS. He loves data, distributed systems and security. He helps customers around the world architecting secure, scalable and reliable big data platforms.

Adnan Hemani is a Software Development Engineer at AWS working with the EMR team. He focuses on the security posture of applications running on EMR clusters. He is interested in modern Big Data applications and how customers interact with them.

Best practices for managing Terraform State files in AWS CI/CD Pipeline

Post Syndicated from Arun Kumar Selvaraj original https://aws.amazon.com/blogs/devops/best-practices-for-managing-terraform-state-files-in-aws-ci-cd-pipeline/

Introduction

Today customers want to reduce manual operations for deploying and maintaining their infrastructure. The recommended method to deploy and manage infrastructure on AWS is to follow Infrastructure-As-Code (IaC) model using tools like AWS CloudFormation, AWS Cloud Development Kit (AWS CDK) or Terraform.

One of the critical components in terraform is managing the state file which keeps track of your configuration and resources. When you run terraform in an AWS CI/CD pipeline the state file has to be stored in a secured, common path to which the pipeline has access to. You need a mechanism to lock it when multiple developers in the team want to access it at the same time.

In this blog post, we will explain how to manage terraform state files in AWS, best practices on configuring them in AWS and an example of how you can manage it efficiently in your Continuous Integration pipeline in AWS when used with AWS Developer Tools such as AWS CodeCommit and AWS CodeBuild. This blog post assumes you have a basic knowledge of terraform, AWS Developer Tools and AWS CI/CD pipeline. Let’s dive in!

Challenges with handling state files

By default, the state file is stored locally where terraform runs, which is not a problem if you are a single developer working on the deployment. However if not, it is not ideal to store state files locally as you may run into following problems:

  • When working in teams or collaborative environments, multiple people need access to the state file
  • Data in the state file is stored in plain text which may contain secrets or sensitive information
  • Local files can get lost, corrupted, or deleted

Best practices for handling state files

The recommended practice for managing state files is to use terraform’s built-in support for remote backends. These are:

Remote backend on Amazon Simple Storage Service (Amazon S3): You can configure terraform to store state files in an Amazon S3 bucket which provides a durable and scalable storage solution. Storing on Amazon S3 also enables collaboration that allows you to share state file with others.

Remote backend on Amazon S3 with Amazon DynamoDB: In addition to using an Amazon S3 bucket for managing the files, you can use an Amazon DynamoDB table to lock the state file. This will allow only one person to modify a particular state file at any given time. It will help to avoid conflicts and enable safe concurrent access to the state file.

There are other options available as well such as remote backend on terraform cloud and third party backends. Ultimately, the best method for managing terraform state files on AWS will depend on your specific requirements.

When deploying terraform on AWS, the preferred choice of managing state is using Amazon S3 with Amazon DynamoDB.

AWS configurations for managing state files

  1. Create an Amazon S3 bucket using terraform. Implement security measures for Amazon S3 bucket by creating an AWS Identity and Access Management (AWS IAM) policy or Amazon S3 Bucket Policy. Thus you can restrict access, configure object versioning for data protection and recovery, and enable AES256 encryption with SSE-KMS for encryption control.
  1. Next create an Amazon DynamoDB table using terraform with Primary key set to LockID. You can also set any additional configuration options such as read/write capacity units. Once the table is created, you will configure the terraform backend to use it for state locking by specifying the table name in the terraform block of your configuration.
  1. For a single AWS account with multiple environments and projects, you can use a single Amazon S3 bucket. If you have multiple applications in multiple environments across multiple AWS accounts, you can create one Amazon S3 bucket for each account. In that Amazon S3 bucket, you can create appropriate folders for each environment, storing project state files with specific prefixes.

Now that you know how to handle terraform state files on AWS, let’s look at an example of how you can configure them in a Continuous Integration pipeline in AWS.

Architecture

Architecture on how to use terraform in an AWS CI pipeline

Figure 1: Example architecture on how to use terraform in an AWS CI pipeline

This diagram outlines the workflow implemented in this blog:

  1. The AWS CodeCommit repository contains the application code
  2. The AWS CodeBuild job contains the buildspec files and references the source code in AWS CodeCommit
  3. The AWS Lambda function contains the application code created after running terraform apply
  4. Amazon S3 contains the state file created after running terraform apply. Amazon DynamoDB locks the state file present in Amazon S3

Implementation

Pre-requisites

Before you begin, you must complete the following prerequisites:

Setting up the environment

  1. You need an AWS access key ID and secret access key to configure AWS CLI. To learn more about configuring the AWS CLI, follow these instructions.
  2. Clone the repo for complete example: git clone https://github.com/aws-samples/manage-terraform-statefiles-in-aws-pipeline
  3. After cloning, you could see the following folder structure:
AWS CodeCommit repository structure

Figure 2: AWS CodeCommit repository structure

Let’s break down the terraform code into 2 parts – one for preparing the infrastructure and another for preparing the application.

Preparing the Infrastructure

  1. The main.tf file is the core component that does below:
      • It creates an Amazon S3 bucket to store the state file. We configure bucket ACL, bucket versioning and encryption so that the state file is secure.
      • It creates an Amazon DynamoDB table which will be used to lock the state file.
      • It creates two AWS CodeBuild projects, one for ‘terraform plan’ and another for ‘terraform apply’.

    Note – It also has the code block (commented out by default) to create AWS Lambda which you will use at a later stage.

  1. AWS CodeBuild projects should be able to access Amazon S3, Amazon DynamoDB, AWS CodeCommit and AWS Lambda. So, the AWS IAM role with appropriate permissions required to access these resources are created via iam.tf file.
  1. Next you will find two buildspec files named buildspec-plan.yaml and buildspec-apply.yaml that will execute terraform commands – terraform plan and terraform apply respectively.
  1. Modify AWS region in the provider.tf file.
  1. Update Amazon S3 bucket name, Amazon DynamoDB table name, AWS CodeBuild compute types, AWS Lambda role and policy names to required values using variable.tf file. You can also use this file to easily customize parameters for different environments.

With this, the infrastructure setup is complete.

You can use your local terminal and execute below commands in the same order to deploy the above-mentioned resources in your AWS account.

terraform init
terraform validate
terraform plan
terraform apply

Once the apply is successful and all the above resources have been successfully deployed in your AWS account, proceed with deploying your application. 

Preparing the Application

  1. In the cloned repository, use the backend.tf file to create your own Amazon S3 backend to store the state file. By default, it will have below values. You can override them with your required values.
bucket = "tfbackend-bucket" 
key    = "terraform.tfstate" 
region = "eu-central-1"
  1. The repository has sample python code stored in main.py that returns a simple message when invoked.
  1. In the main.tf file, you can find the below block of code to create and deploy the Lambda function that uses the main.py code (uncomment these code blocks).
data "archive_file" "lambda_archive_file" {
    ……
}

resource "aws_lambda_function" "lambda" {
    ……
}
  1. Now you can deploy the application using AWS CodeBuild instead of running terraform commands locally which is the whole point and advantage of using AWS CodeBuild.
  1. Run the two AWS CodeBuild projects to execute terraform plan and terraform apply again.
  1. Once successful, you can verify your deployment by testing the code in AWS Lambda. To test a lambda function (console):
    • Open AWS Lambda console and select your function “tf-codebuild”
    • In the navigation pane, in Code section, click Test to create a test event
    • Provide your required name, for example “test-lambda”
    • Accept default values and click Save
    • Click Test again to trigger your test event “test-lambda”

It should return the sample message you provided in your main.py file. In the default case, it will display “Hello from AWS Lambda !” message as shown below.

Sample Amazon Lambda function response

Figure 3: Sample Amazon Lambda function response

  1. To verify your state file, go to Amazon S3 console and select the backend bucket created (tfbackend-bucket). It will contain your state file.
Amazon S3 bucket with terraform state file

Figure 4: Amazon S3 bucket with terraform state file

  1. Open Amazon DynamoDB console and check your table tfstate-lock and it will have an entry with LockID.
Amazon DynamoDB table with LockID

Figure 5: Amazon DynamoDB table with LockID

Thus, you have securely stored and locked your terraform state file using terraform backend in a Continuous Integration pipeline.

Cleanup

To delete all the resources created as part of the repository, run the below command from your terminal.

terraform destroy

Conclusion

In this blog post, we explored the fundamentals of terraform state files, discussed best practices for their secure storage within AWS environments and also mechanisms for locking these files to prevent unauthorized team access. And finally, we showed you an example of how efficiently you can manage them in a Continuous Integration pipeline in AWS.

You can apply the same methodology to manage state files in a Continuous Delivery pipeline in AWS. For more information, see CI/CD pipeline on AWS, Terraform backends types, Purpose of terraform state.

Arun Kumar Selvaraj

Arun Kumar Selvaraj is a Cloud Infrastructure Architect with AWS Professional Services. He loves building world class capability that provides thought leadership, operating standards and platform to deliver accelerated migration and development paths for his customers. His interests include Migration, CCoE, IaC, Python, DevOps, Containers and Networking.

Manasi Bhutada

Manasi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work she enjoys experimenting with food, playing pickleball, and diving into fun board games.

Improve your ETL performance using multiple Redshift warehouses for writes

Post Syndicated from Ryan Waldorf original https://aws.amazon.com/blogs/big-data/improve-your-etl-performance-using-multiple-redshift-warehouses-for-writes/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. Thousands of customers use Amazon Redshift read data sharing to enable instant, granular, and fast data access across Redshift provisioned clusters and serverless workgroups. This allows you to scale your read workloads to thousands of concurrent users without having to move or copy the data.

Now, at Amazon Redshift we are announcing multi-data warehouse writes through data sharing in public preview. This allows you to achieve better performance for extract, transform, and load (ETL) workloads by using different warehouses of different types and sizes based on your workload needs. Additionally, this allows you to easily keep your ETL jobs running more predictably as you can split them between warehouses in a few clicks, monitor and control costs as each warehouse has its own monitoring and cost controls, and foster collaboration as you can enable different teams to write to another team’s databases in just a few clicks.

The data is live and available across all warehouses as soon as it is committed, even when it’s written to cross-account or cross-region. For preview you can use a combination of ra3.4xl clusters, ra3.16xl clusters, or serverless workgroups.

In this post, we discuss when you should consider using multiple warehouses to write to the same databases, explain how multi-warehouse writes through data sharing works, and walk you through an example on how to use multiple warehouses to write to the same database.

Reasons for using multiple warehouses to write to the same databases

In this section, we discuss some of the reasons why you should consider using multiple warehouses to write to the same database.

Better performance and predictability for mixed workloads

Customers often start with a warehouse sized to fit their initial workload needs. For example, if you need to support occasional user queries and nightly ingestion of 10 million rows of purchase data, a 32 RPU workgroup may be perfectly suited for your needs. However, adding a new hourly ingestion of 400 million rows of user website and app interactions could slow existing users’ response times as the new workload consumes significant resources. You could resize to a larger workgroup so read and write workloads complete quickly without fighting over resources. However, this may provide unneeded power and cost for existing workloads. Also, because workloads share compute, a spike in one workload can affect the ability of other workloads to meet their SLAs.

The following diagram illustrates a single-warehouse architecture.

Single-Warehouse ETL Architecture. Three separate workloads--a Purchase History ETL job ingesting 10M rows nightly, Users running 25 read queries per hour, and a Web Interactions ETL job ingesting 400M rows/hour--all using the same 256 RPU Amazon Redshift serverless workgroup to read and write from the database called Customer DB.

With the ability to write through datashares, you can now separate the new user website and app interactions ETL into a separate, larger workgroup so that it completes quickly with the performance you need without impacting the cost or completion time of your existing workloads. The following diagram illustrates this multi-warehouse architecture.

Multi-Warehouse ETL Architecture. Two workloads--a Purchase History ETL job ingesting 10M rows nightly and users running 25 read queries per hour--using a 32 RPU serverless workgroup to read from and write to the database Customer DB. It shows a separate workload--a Web Interactions ETL job ingesting 400M rows/hour--using a separate 128 RPU serverless workgroup to write to the database Customer DB.

The multi-warehouse architecture enables you to have all write workloads complete on time with less combined compute, and subsequently lower cost, than a single warehouse supporting all workloads.

Control and monitor costs

When you use a single warehouse for all your ETL jobs, it can be difficult to understand which workloads are contributing to your costs. For instance, you may have one team running an ETL workload ingesting data from a CRM system while another team is ingesting data from internal operational systems. It’s hard for you to monitor and control the costs for the workloads because queries are running together using the same compute in the warehouse. By splitting the write workloads into separate warehouses, you can separately monitor and control costs while ensuring the workloads can progress independently without resource conflict.

Collaborate on live data with ease

The are times when two teams use different warehouses for data governance, compute performance, or cost reasons, but also at times need to write to the same shared data. For instance, you may have a set of customer 360 tables that need to be updated live as customers interact with your marketing, sales, and customer service teams. When these teams use different warehouses, keeping this data live can be difficult because you may have to build a multi-service ETL pipeline using tools like Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), and AWS Lambda to track live changes in each team’s data and ingest it into a single source.

With the ability to write through datashares, you can grant granular permissions on your database objects (for example, SELECT on one table, and SELECT, INSERT, and TRUNCATE on another) to different teams using different warehouses in a few clicks. This enables teams to start writing to the shared objects using their own warehouses. The data is live and available to all warehouses as soon as it is committed, and this even works if the warehouses are using different accounts and regions.

In the following sections, we walk you through how to use multiple warehouses to write to the same databases via data sharing.

Solution overview

We use the following terminology in this solution:

  • Namespace – A logical container for database objects, users and roles, their permissions on database objects, and compute (serverless workgroups and provisioned clusters).
  • Datashare – The unit of sharing for data sharing. You grant permissions on objects to datashares.
  • Producer – The warehouse that creates the datashare, grants permissions on objects to datashares, and grants other warehouses and accounts access to the datashare.
  • Consumer – The warehouse that is granted access to the datashare. You can think of consumers as datashare tenants.

This use case involves a customer with two warehouses: a primary warehouse used for attached to the primary namespace for most read and write queries, and a secondary warehouse attached to a secondary namespace that is primarily used to write to the primary namespace. We use the publicly available 10 GB TPCH dataset from AWS Labs, hosted in an S3 bucket. You can copy and paste many of the commands to follow along. Although it’s small for a data warehouse, this dataset allows easy functional testing of this feature.

The following diagram illustrates our solution architecture.

Architecture Diagram showing Two Warehouses for ETL

We set up the primary namespace by connecting to it via its warehouse, creating a marketing database in it with a prod and staging schema, and creating three tables in the prod schema called region, nation, and af_customer. We then load data into the region and nation tables using the warehouse. We do not ingest data into the af_customer table.

We then create a datashare in the primary namespace. We grant the datashare the ability to create objects in the staging schema and the ability to select, insert, update, and delete from objects in the prod schema. We then grant usage on the schema to another namespace in the account.

At that point, we connect to the secondary warehouse. We create a database from a datashare in that warehouse as well as a new user. We then grant permissions on the datashare object to the new user. Then we reconnect to the secondary warehouse as the new user.

We then create a customer table in the datashare’s staging schema and copy data from the TPCH 10 customer dataset into the staging table. We insert staging customer table data into the shared af_customer production table, and then truncate the table.

At this point, the ETL is complete and you are able to read the data in the primary namespace, inserted by the secondary ETL warehouse, from both the primary warehouse and the secondary ETL warehouse.

Prerequisites

To follow along with this post, you should have the following prerequisites:

  • Two warehouses created with the PREVIEW_2023 track. The warehouses can be a mix of serverless workgroups, ra3.4xl clusters, and ra3.16xl clusters.
  • Access to a superuser in both warehouses.
  • An AWS Identity and Access Management (IAM) role that is able to ingest data from Amazon Redshift to Amazon S3 (Amazon Redshift creates one by default when you create a cluster or serverless workgroup).
  • For cross-account only, you need access to an IAM user or role that is allowed to authorize datashares. For the IAM policy, refer to Sharing datashares.

Refer to Sharing both read and write data within an AWS account or across accounts (preview) for the most up-to-date information.

Set up the primary namespace (producer)

In this section, we show how to set up the primary (producer) namespace we will use to store our data.

Connect to producer

Complete the following steps to connect to the producer:

  1. On the Amazon Redshift console, choose Query editor v2 in the navigation pane.

In the query editor v2, you can see all the warehouses you have access to in the left pane. You can expand them to see their databases.

  1. Connect to your primary warehouse using a superuser.
  2. Run the following command to create the marketing database:
CREATE DATABASE marketing;

Create the database objects to share

Complete the following steps to create your database objects to share:

  1. After you create the marketing database, switch your database connection to the marketing database.

You may need to refresh your page to be able to see it.

  1. Run the following commands to create the two schemas you intend to share:
CREATE SCHEMA staging;
CREATE SCHEMA prod;
  1. Create the tables to share with the following code. These are standard DDL statements coming from the AWS Labs DDL file with modified table names.
create table prod.region (
  r_regionkey int4 not null,
  r_name char(25) not null ,
  r_comment varchar(152) not null,
  Primary Key(R_REGIONKEY)
);

create table prod.nation (
  n_nationkey int4 not null,
  n_name char(25) not null ,
  n_regionkey int4 not null,
  n_comment varchar(152) not null,
  Primary Key(N_NATIONKEY)
);

create table prod.af_customer (
  c_custkey int8 not null ,
  c_name varchar(25) not null,
  c_address varchar(40) not null,
  c_nationkey int4 not null,
  c_phone char(15) not null,
  c_acctbal numeric(12,2) not null,
  c_mktsegment char(10) not null,
  c_comment varchar(117) not null,
  Primary Key(C_CUSTKEY)
) distkey(c_custkey) sortkey(c_custkey);

Copy data into the region and nation tables

Run the following commands to copy data from the AWS Labs S3 bucket into the region and nation tables. If you created a cluster while keeping the default created IAM role, you can copy and paste the following commands to load data into your tables:

copy prod.nation from 's3://redshift-downloads/TPC-H/2.18/10GB/nation.tbl' iam_role default delimiter '|' region 'us-east-1';
copy prod.region from 's3://redshift-downloads/TPC-H/2.18/10GB/region.tbl' iam_role default delimiter '|' region 'us-east-1';

Create the datashare

Create the datashare using the following command:

create datashare marketing publicaccessible true;

The publicaccessible setting specifies whether or not a datashare can be used by consumers with publicly accessible provisioned clusters and serverless workgroups. If your warehouses are not publicly accessible, you can ignore that field.

Grant permissions on schemas to the datashare

To add objects with permissions to the datashare, use the grant syntax, specifying the datashare you’d like to grant the permissions to:

grant usage on schema prod to datashare marketing;
grant usage, create on schema staging to datashare marketing;

This allows the datashare consumers to use objects added to the prod schema and use and create objects added to the staging schema. To maintain backward compatibility, if you use the alter datashare command to add a schema, it will be the equivalent of granting usage on the schema.

Grant permissions on tables to the datashare

Now you can grant access to tables to the datashare using the grant syntax, specifying the permissions and the datashare. The following code grants all privileges on the af_customer table to the datashare:

grant all on table prod.af_customer to datashare marketing;

To maintain backward compatibility, if you use the alter datashare command to add a table, it will be the equivalent of granting select on the table.

Additionally, we’ve added scoped permissions that allow you to grant the same permission to all current and future objects within the datashare. We add the scoped select permission on the prod schema tables to the datashare:

grant select for tables in schema prod to datashare marketing;

After this grant, the customer will have select permissions on all current and future tables in the prod schema. This gives them select access on the region and nation tables.

View permissions granted to the datashare

You can view permissions granted to the datashare by running the following command:

show access for datashare marketing;

Grant permissions to the secondary ETL namespace

You can grant permissions to the secondary ETL namespace using the existing syntax. You do this by specifying the namespace ID. You can find the namespace on the namespace details page if your secondary ETL namespace is serverless, as part of the namespace ID in the cluster details page if your secondary ETL namespace is provisioned, or by connecting to the secondary ETL warehouse in the query editor v2 and running select current_namespace. You can then grant access to the other namespace with the following command (change the consumer namespace to the namespace UID of your own secondary ETL warehouse):

grant usage on datashare marketing to namespace '<consumer_namespace>';

Set up the secondary ETL namespace (consumer)

At this point, you’re ready to set up your secondary (consumer) ETL warehouse to start writing to the shared data.

Create a database from the datashare

Complete the following steps to create your database:

  1. In the query editor v2, switch to the secondary ETL warehouse.
  2. Run the command show datashares to see the marketing datashare as well as the datashare producer’s namespace.
  3. Use that namespace to create a database from the datashare, as shown in the following code:
create database marketing_ds_db with permissions from datashare marketing of namespace '&lt;producer_namespace&gt;';

Specifying with permissions allows you to grant granular permissions to individual database users and roles. Without this, if you grant usage permissions on the datashare database, users and roles get all permissions on all objects within the datashare database.

Create a user and grant permissions to that user

Create a user using the CREATE USER command:

create user data_engineer password '[choose a secure password]';
grant usage on database marketing_ds_db to data_engineer;
grant all on schema marketing_ds_db.prod to data_engineer;
grant all on schema marketing_ds_db.staging to data_engineer;
grant all on all tables in schema marketing_ds_db.staging to data_engineer;
grant all on all tables in schema marketing_ds_db.prod to data_engineer;

With these grants, you’ve given the user data_engineer all permissions on all objects in the datashare. Additionally, you’ve granted all permissions available in the schemas as scoped permissions for data_engineer. Any permissions on any objects added to those schemas will be automatically granted to data_engineer.

At this point, you can continue the steps using either the admin user you’re currently signed in as or the data_engineer.

Options for writing to the datashare database

You can write data to the datashare database three ways.

Use three-part notation while connected to a local database

Like with read data sharing, you can use three-part notation to reference the datashare database objects. For instance, insert into marketing_ds_db.prod.customer. Note that you can’t use multi-statement transactions to write to objects in the datashare database like this.

Connect directly to the datashare database

You can connect directly to the datashare database via the Redshift JDBC, ODBC, or Python driver, in addition to the Amazon Redshift Data API (new). To connect like this, specify the datashare database name in the connection string. This allows you to write to the datashare database using two-part notation and use multi-statement transactions to write to the datashare database. Note that some system and catalog tables are not available this way.

Run the use command

You can now specify that you want to use another database with the command use <database_name>. This allows you to write to the datashare database using two-part notation and use multi-statement transactions to write to the datashare database. Note that some system and catalog tables are not available this way. Also, when querying system and catalog tables, you will be querying the system and catalog tables of the database you are connected to, not the database you are using.

To try this method, run the following command:

use marketing_ds_db;

Start writing to the datashare database

In this section, we show how to write to the datashare database using the second and third options we discussed (direct connection or use command). We use the AWS Labs provided SQL to write to the datashare database.

Create a staging table

Create a table within the staging schema, because you’ve been granted create privileges. We create a table within the datashare’s staging schema with the following DDL statement:

create table staging.customer (
  c_custkey int8 not null ,
  c_name varchar(25) not null,
  c_address varchar(40) not null,
  c_nationkey int4 not null,
  c_phone char(15) not null,
  c_acctbal numeric(12,2) not null,
  c_mktsegment char(10) not null,
  c_comment varchar(117) not null,
  Primary Key(C_CUSTKEY)
) distkey(c_nationkey) sortkey(c_nationkey);

You can use two-part notation because you used the USE command or directly connected to the datashare database. If not, you need to specify the datashare database names as well.

Copy data into the staging table

Copy the customer TPCH 10 data from the AWS Labs public S3 bucket into the table using the following command:

copy staging.customer from 's3://redshift-downloads/TPC-H/2.18/10GB/customer.tbl' iam_role default delimiter '|' region 'us-east-1';

As before, this requires you to have set up the default IAM role when creating this warehouse.

Ingest African customer data to the table prod.af_customer

Run the following command to ingest only the African customer data to the table prod.af_customer:

insert into prod.af_customer
select c.* from staging.customer c
  join prod.nation n on c.c_nationkey = n.n_nationkey
  join prod.region r on n.n_regionkey = r.r_regionkey
  where r.r_regionkey = 0; --0 is the region key for Africa

This requires you to join on the nation and region tables you have select permission for.

Truncate the staging table

You can truncate the staging table so that you can write to it without recreating it in a future job. The truncate action will run transactionally and can be rolled back if you are connected directly to the datashare database or you are using the use command (even if you’re not using a datashare database). Use the following code:

truncate staging.customer;

At this point, you’ve completed ingesting the data to the primary namespace. You can query the af_customer table from both the primary warehouse and secondary ETL warehouse and see the same data.

Conclusion

In this post, we showed how to use multiple warehouses to write to the same database. This solution has the following benefits:

  • You can use provisioned clusters and serverless workgroups of different sizes to write to the same databases
  • You can write across accounts and regions
  • Data is live and available to all warehouses as soon as it is committed
  • Writes work even if the producer warehouse (the warehouse that owns the database) is paused

To learn more about this feature, see Sharing both read and write data within an AWS account or across accounts (preview). Additionally, if you have any feedback, please email us at [email protected].


About the authors

Ryan Waldorf is a Senior Product Manager at Amazon Redshift. Ryan focuses on features that enable customers to define and scale compute including data sharing and concurrency scaling.

Harshida Patel is a Analytics Specialist Principal Solutions Architect, with Amazon Web Services (AWS).

Sudipto Das is a Senior Principal Engineer at Amazon Web Services (AWS). He leads the technical architecture and strategy of multiple database and analytics services in AWS with special focus on Amazon Redshift and Amazon Aurora.

Secure connectivity patterns for Amazon MSK Serverless cross-account access

Post Syndicated from Tamer Soliman original https://aws.amazon.com/blogs/big-data/secure-connectivity-patterns-for-amazon-msk-serverless-cross-account-access/

Amazon MSK Serverless is a cluster type of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that makes it straightforward for you to run Apache Kafka without having to manage and scale cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain on a usage basis.

Deploying infrastructure across multiple VPCs and multiple accounts is considered best practice, facilitating scalability while maintaining isolation boundaries. In a multi-account environment, Kafka producers and consumers can exist within the same VPC—however, they are often located in different VPCs, sometimes within the same account, in a different account, or even in multiple different accounts. There is a need for a solution that can extend access to MSK Serverless clusters to producers and consumers from multiple VPCs within the same AWS account and across multiple AWS accounts. The solution needs to be scalable and straightforward to maintain.

In this post, we walk you through multiple solution approaches that address the MSK Serverless cross-VPC and cross-account access connectivity options, and we discuss the advantages and limitations of each approach.

MSK Serverless connectivity and authentication

When an MSK Serverless cluster is created, AWS manages the cluster infrastructure on your behalf and extends private connectivity back to your VPCs through VPC endpoints powered by AWS PrivateLink. You bootstrap your connection to the cluster through a bootstrap server that holds a record of all your underlying brokers.

At creation, a fully qualified domain name (FQDN) is assigned to your cluster bootstrap server. The bootstrap server FQDN has the general format of boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and your cluster brokers follow the format of bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, where ClusterUniqueID.xx is unique to your cluster and bxxxx is a dynamic broker range (b0001, b0037, and b0523 can be some of your assigned brokers at a point of time). It’s worth noting that the brokers assigned to your cluster are dynamic and change over time, but your bootstrap address remains the same for the cluster. All your communication with the cluster starts with the bootstrap server that can respond with the list of active brokers when required. For proper Kafka communication, your MSK client needs to be able to resolve the domain names of your bootstrap server as well as all your brokers.

At cluster creation, you specify the VPCs that you would like the cluster to communicate with (up to five VPCs in the same account as your cluster). For each VPC specified during cluster creation, cluster VPC endpoints are created along with a private hosted zone that includes a list of your bootstrap server and all dynamic brokers kept up to date. The private hosted zones facilitate resolving the FQDNs of your bootstrap server and brokers, from within the associated VPCs defined during cluster creation, to the respective VPC endpoints for each.

Cross-account access

To be able to extend private connectivity of your Kafka producers and consumers to your MSK Serverless cluster, you need to consider three main aspects: private connectivity, authentication and authorization, and DNS resolution.

The following diagram highlights the possible connectivity options. Although the diagram shows them all here for demonstration purposes, in most cases, you would use one or more of these options depending on your architecture, not necessary all in the same setup.

MSK cross account connectivity options

In this section, we discuss the different connectivity options along with their pros and cons. We also cover the authentication and DNS resolution aspects associated with the relevant connectivity options.

Private connectivity layer

This is the underlying private network connectivity. You can achieve this connectivity using VPC peering, AWS Transit Gateway, or PrivateLink, as indicated in the preceding diagram. VPC peering simplifies the setup, but it lacks the support for transitive routing. In most cases, peering is used when you have a limited number of VPCs or if your VPCs generally communicate with some limited number of core services VPCs without the need of lateral connectivity or transitive routing. On the other hand, AWS Transit Gateway facilitates transitive routing and can simplify the architecture when you have a large number of VPCs, and especially when lateral connectivity is required. PrivateLink is more suited for extending connectivity to a specific resource unidirectionally across VPCs or accounts without exposing full VPC-to-VPC connectivity, thereby adding a layer of isolation. PrivateLink is useful if you have overlapping CIDRs, which is a case that is not supported by Transit Gateway or VPC peering. PrivateLink is also useful when your connected parties are administrated separately, and when one-way connectivity and isolation are required.

If you choose PrivateLink as a connectivity option, you need to use a Network Load Balancer (NLB) with an IP type target group with its registered targets set as the IP addresses of the zonal endpoints of your MSK Serverless cluster.

Cluster authentication and authorization

In addition to having private connectivity and being able to resolve the bootstrap server and brokers domain names, for your producers and consumers to have access to your cluster, you need to configure your clients with proper credentials. MSK Serverless supports AWS Identity and Access Management (IAM) authentication and authorization. For cross-account access, your MSK client needs to assume a role that has proper credentials to access the cluster. This post focuses mainly on the cross-account connectivity and name resolution aspects. For more details on cross-account authentication and authorization, refer to the following GitHub repo.

DNS resolution

For Kafka producers and consumers located in accounts across the organization to be able to produce and consume to and from the centralized MSK Serverless cluster, they need to be able to resolve the FQDNs of the cluster bootstrap server as well as each of the cluster brokers. Understanding the dynamic nature of broker allocation, the solution will have to accommodate such a requirement. In the next section, we address how we can satisfy this part of the requirements.

Cluster cross-account DNS resolution

Now that we have discussed how MSK Serverless works, how private connectivity is extended, and the authentication and authorization requirements, let’s discuss how DNS resolution works for your cluster.

For every VPC associated with your cluster during cluster creation, a VPC endpoint is created along with a private hosted zone. Private hosted zones enable name resolve of the FQDNs of the cluster bootstrap server and the dynamically allocated brokers, from within each respective VPC. This works well when requests come from within any of the VPCs that were added during cluster creation because they already have the required VPC endpoints and relevant private hosted zones.

Let’s discuss how you can extend name resolution to other VPCs within the same account that were not included during cluster creation, and to others that may be located in other accounts.

You’ve already made your choice of the private connectivity option that best fits your architecture requirements, be it VPC peering, PrivateLink, or Transit Gateway. Assuming that you have also configured your MSK clients to assume roles that have the right IAM credentials in order to facilitate cluster access, you now need to address the name resolution aspect of connectivity. It’s worth noting that, although we list different connectivity options using VPC peering, Transit Gateway, and PrivateLink, in most cases only one or two of these connectivity options are present. You don’t necessarily need to have them all; they are listed here to demonstrate your options, and you are free to choose the ones that best fit your architecture and requirements.

In the following sections, we describe two different methods to address DNS resolution. For each method, there are advantages and limitations.

Private hosted zones

The following diagram highlights the solution architecture and its components. Note that, to simplify the diagram, and to make room for more relevant details required in this section, we have eliminated some of the connectivity options.

Cross-account access using Private Hosted Zones

The solution starts with creating a private hosted zone, followed by creating a VPC association.

Create a private hosted zone

We start by creating a private hosted zone for name resolution. To make the solution scalable and straightforward to maintain, you can choose to create this private hosted zone in the same MSK Serverless cluster account; in some cases, creating the private hosted zone in a centralized networking account is preferred. Having the private hosted zone created in the MSK Serverless cluster account facilitates centralized management of the private hosted zone alongside the MSK cluster. We can then associate the centralized private hosted zone with VPCs within the same account, or in different other accounts. Choosing to centralize your private hosted zones in a networking account can also be a viable solution to consider.

The purpose of the private hosted zone is to be able to resolve the FQDNs of the bootstrap server as well as all the dynamically assigned cluster-associated brokers. As discussed earlier, the bootstrap server FQDN format is boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and the cluster brokers use the format bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, with bxxxx being the broker ID. You need to create the new private hosted zone with the primary domain set as kafka-serverless.Region.amazonaws.com, with an A-Alias record called *.kafka-serverless.Region.amazonaws.com pointing to the Regional VPC endpoint of the MSK Serverless cluster in the MSK cluster VPC. This should be sufficient to direct all traffic targeting your cluster to the primary cluster VPC endpoints that you specified in your private hosted zone.

Now that you have created the private hosted zone, for name resolution to work, you need to associate the private hosted zone with every VPC where you have clients for the MSK cluster (producer or consumer).

Associate a private hosted zone with VPCs in the same account

For VPCs that are in the same account as the MSK cluster and weren’t included in the configuration during cluster creation, you can associate them to the private hosted zone created using the AWS Management Console by editing the private hosted zone settings and adding the respective VPCs. For more information, refer to Associating more VPCs with a private hosted zone.

Associate a private hosted zone in cross-account VPCs

For VPCs that are in a different account other than the MSK cluster account, refer to Associating an Amazon VPC and a private hosted zone that you created with different AWS accounts. The key steps are as follows:

  1. Create a VPC association authorization in the account where the private hosted zone is created (in this case, it’s the same account as the MSK Serverless cluster account) to authorize the remote VPCs to be associated with the hosted zone:
aws route53 create-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID
  1. Associate the VPC with the private hosted zone in the account where you have the VPCs with the MSK clients (remote account), referencing the association authorization you created earlier:
aws route53 list-vpc-association-authorizations --hosted-zone-id HostedZoneID
aws route53 associate-vpc-with-hosted-zone --hosted-zone-id HostedZoneID --VPC VPCRegion=Region,VPCId=vpc-ID
  1. Delete the VPC authorization to associate the VPC with the hosted zone:
aws route53 delete-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID

Deleting the authorization doesn’t affect the association, it just prevents you from re-associating the VPC with the hosted zone in the future. If you want to re-associate the VPC with the hosted zone, you’ll need to repeat steps 1 and 2 of this procedure.

Note that your VPC needs to have the enableDnsSupport and enableDnsHostnames DNS attributes enabled for this to work. These two settings can be configured under the VPC DNS settings. For more information, refer to DNS attributes in your VPC.

These procedures work well for all remote accounts when connectivity is extended using VPC peering or Transit Gateway. If your connectivity option uses PrivateLink, the private hosted zone needs to be created in the remote account instead (the account where the PrivateLink VPC endpoints are). In addition, an A-Alias record that resolves to the PrivateLink endpoint instead of the MSK cluster endpoint needs to be created as indicated in the earlier diagram. This will facilitate name resolution to the PrivateLink endpoint. If other VPCs need access to the cluster through that same PrivateLink setup, you need to follow the same private hosted zone association procedure as described earlier and associate your other VPCs with the private hosted zone created for your PrivateLink VPC.

Limitations

The private hosted zones solution has some key limitations.

Firstly, because you’re using kafka-serverless.Region.amazonaws.com as the primary domain for our private hosted zone, and your A-Alias record uses *.kafka-serverless.Region.amazonaws.com, all traffic to the MSK Serverless service originating from any VPC associated with this private hosted zone will be directed to the one specific cluster VPC Regional endpoint that you specified in the hosted zone A-Alias record.

This solution is valid if you have one MSK Serverless cluster in your centralized service VPC. If you need to provide access to multiple MSK Serverless clusters, you can use the same solution but adapt a distributed private hosted zone approach as opposed to a centralized approach. In a distributed private hosted zone approach, each private hosted zone can point to a specific cluster. The VPCs associated with that specific private hosted zone will communicate only to the respective cluster listed under the specific private hosted zone.

In addition, after you establish a VPC association with a private hosted zone resolving *.kafka-serverless.Region.amazonaws.com, the respective VPC will only be able to communicate with the cluster defined in that specific private hosted zone and no other cluster. An exception to this rule is if a local cluster is created within the same client VPC, in which case the clients within the VPC will only be able to communicate with only the local cluster.

You can also use PrivateLink to accommodate multiple clusters by creating a PrivateLink plus private hosted zone per cluster, replicating the configuration steps described earlier.

Both solutions, using distributed private hosted zones or PrivateLink, are still subject to the limitation that for each client VPC, you can only communicate with the one MSK Serverless cluster that your associated private hosted zone is configured for.

In the next section, we discuss another possible solution.

Resolver rules and AWS Resource Access Manager

The following diagram shows a high-level overview of the solution using Amazon Route 53 resolver rules and AWS Resource Access Manager.

Cross-account access using Resolver Rules and Resolver Endpoints

The solution starts with creating Route 53 inbound and outbound resolver endpoints, which are associated with the MSK cluster VPC. Then you create a resolver forwarding rule in the MSK account that is not associated with any VPC. Next, you share the resolver rule across accounts using Resource Access Manager. At the remote account where you need to extend name resolution to, you need to accept the resource share and associate the resolver rules with your target VPCs located in the remote account (the account where the MSK clients are located).

For more information about this approach, refer to the third use case in Simplify DNS management in a multi-account environment with Route 53 Resolver.

This solution accommodates multiple centralized MSK serverless clusters in a more scalable and flexible approach. Therefore, the solution counts on directing DNS requests to be resolved by the VPC where the MSK clusters are. Multiple MSK Serverless clusters can coexist, where clients in a particular VPC can communicate with one or more of them at the same time. This option is not supported with the private hosted zone solution approach.

Limitations

Although this solution has its advantages, it also has a few limitations.

Firstly, for a particular target consumer or producer account, all your MSK Serverless clusters need to be in the same core service VPC in the MSK account. This is due to the fact that the resolver rule is set on an account level and uses.kafka-serverless.Region.amazonaws.com as the primary domain, directing its resolution to one specific VPC resolver endpoint inbound/outbound pair within that service VPC. If you need to have separate clusters in different VPCs, consider creating separate accounts.

The second limitation is that all your client VPCs need to be in the same Region as your core MSK Serverless VPC. The reason behind this limitation is that resolver rules pointing to a resolver endpoint pair (in reality, they point to the outbound endpoint that loops into the inbound endpoints) need to be in the same Region as the resolver rules, and Resource Access Manager will extend the share only within the same Region. However, this solution is good when you have multiple MSK clusters in the same core VPC, and although your remote clients are in different VPCs and accounts, they are still within the same Region. A workaround for this limitation is to duplicate the creation of resolver rules and outbound resolver endpoint in a second Region, where the outbound endpoint loops back through the original first Region inbound resolver endpoint associated with the MSK Serverless cluster VPC (assuming IP connectivity is facilitated). This second Region resolver rule can then be shared using Resource Access Manager within the second Region.

Conclusion

You can configure MSK Serverless cross-VPC and cross-account access in multi-account environments using private hosted zones or Route 53 resolver rules. The solution discussed in this post allows you to centralize your configuration while extending cross-account access, making it a scalable and straightforward-to-maintain solution. You can create your MSK Serverless clusters with cross-account access for producers and consumers, keep your focus on your business outcomes, and gain insights from sources of data across your organization without having to right-size and manage a Kafka infrastructure.


About the Author

Tamer Soliman is a Senior Solutions Architect at AWS. He helps Independent Software Vendor (ISV) customers innovate, build, and scale on AWS. He has over two decades of industry experience, and is an inventor with three granted patents. His experience spans multiple technology domains including telecom, networking, application integration, data analytics, AI/ML, and cloud deployments. He specializes in AWS Networking and has a profound passion for machine leaning, AI, and Generative AI.

SaaS access control using Amazon Verified Permissions with a per-tenant policy store

Post Syndicated from Manuel Heinkel original https://aws.amazon.com/blogs/security/saas-access-control-using-amazon-verified-permissions-with-a-per-tenant-policy-store/

Access control is essential for multi-tenant software as a service (SaaS) applications. SaaS developers must manage permissions, fine-grained authorization, and isolation.

In this post, we demonstrate how you can use Amazon Verified Permissions for access control in a multi-tenant document management SaaS application using a per-tenant policy store approach. We also describe how to enforce the tenant boundary.

We usually see the following access control needs in multi-tenant SaaS applications:

  • Application developers need to define policies that apply across all tenants.
  • Tenant users need to control who can access their resources.
  • Tenant admins need to manage all resources for a tenant.

Additionally, independent software vendors (ISVs) implement tenant isolation to prevent one tenant from accessing the resources of another tenant. Enforcing tenant boundaries is imperative for SaaS businesses and is one of the foundational topics for SaaS providers.

Verified Permissions is a scalable, fine-grained permissions management and authorization service that helps you build and modernize applications without having to implement authorization logic within the code of your application.

Verified Permissions uses the Cedar language to define policies. A Cedar policy is a statement that declares which principals are explicitly permitted, or explicitly forbidden, to perform an action on a resource. The collection of policies defines the authorization rules for your application. Verified Permissions stores the policies in a policy store. A policy store is a container for policies and templates. You can learn more about Cedar policies from the Using Open Source Cedar to Write and Enforce Custom Authorization Policies blog post.

Before Verified Permissions, you had to implement authorization logic within the code of your application. Now, we’ll show you how Verified Permissions helps remove this undifferentiated heavy lifting in an example application.

Multi-tenant document management SaaS application

The application allows to add, share, access and manage documents. It requires the following access controls:

  • Application developers who can define policies that apply across all tenants.
  • Tenant users who can control who can access their documents.
  • Tenant admins who can manage all documents for a tenant.

Let’s start by describing the application architecture and then dive deeper into the design details.

Application architecture overview

There are two approaches to multi-tenant design in Verified Permissions: a single shared policy store and a per-tenant policy store. You can learn about the considerations, trade-offs and guidance for these approaches in the Verified Permissions user guide.

For the example document management SaaS application, we decided to use the per-tenant policy store approach for the following reasons:

  • Low-effort tenant policies isolation
  • The ability to customize templates and schema per tenant
  • Low-effort tenant off-boarding
  • Per-tenant policy store resource quotas

We decided to accept the following trade-offs:

  • High effort to implement global policies management (because the application use case doesn’t require frequent changes to these policies)
  • Medium effort to implement the authorization flow (because we decided that in this context, the above reasons outweigh implementing a mapping from tenant ID to policy store ID)

Figure 1 shows the document management SaaS application architecture. For simplicity, we omitted the frontend and focused on the backend.

Figure 1: Document management SaaS application architecture

Figure 1: Document management SaaS application architecture

  1. A tenant user signs in to an identity provider such as Amazon Cognito. They get a JSON Web Token (JWT), which they use for API requests. The JWT contains claims such as the user_id, which identifies the tenant user, and the tenant_id, which defines which tenant the user belongs to.
  2. The tenant user makes API requests with the JWT to the application.
  3. Amazon API Gateway verifies the validity of the JWT with the identity provider.
  4. If the JWT is valid, API Gateway forwards the request to the compute provider, in this case an AWS Lambda function, for it to run the business logic.
  5. The Lambda function assumes an AWS Identity and Access Management (IAM) role with an IAM policy that allows access to the Amazon DynamoDB table that provides tenant-to-policy-store mapping. The IAM policy scopes down access such that the Lambda function can only access data for the current tenant_id.
  6. The Lambda function looks up the Verified Permissions policy_store_id for the current request. To do this, it extracts the tenant_id from the JWT. The function then retrieves the policy_store_id from the tenant-to-policy-store mapping table.
  7. The Lambda function assumes another IAM role with an IAM policy that allows access to the Verified Permissions policy store, the document metadata table, and the document store. The IAM policy uses tenant_id and policy_store_id to scope down access.
  8. The Lambda function gets or stores documents metadata in a DynamoDB table. The function uses the metadata for Verified Permissions authorization requests.
  9. Using the information from steps 5 and 6, the Lambda function calls Verified Permissions to make an authorization decision or create Cedar policies.
  10. If authorized, the application can then access or store a document.

Application architecture deep dive

Now that you know the architecture for the use cases, let’s review them in more detail and work backwards from the user experience to the related part of the application architecture. The architecture focuses on permissions management. Accessing and storing the actual document is out of scope.

Define policies that apply across all tenants

The application developer must define global policies that include a basic set of access permissions for all tenants. We use Cedar policies to implement these permissions.

Because we’re using a per-tenant policy store approach, the tenant onboarding process should create these policies for each new tenant. Currently, to update policies, the deployment pipeline should apply changes to all policy stores.

The “Add a document” and “Manage all the documents for a tenant” sections that follow include examples of global policies.

Make sure that a tenant can’t edit the policies of another tenant

The application uses IAM to isolate the resources of one tenant from another. Because we’re using a per-tenant policy store approach we can use IAM to isolate one tenant policy store from another.

Architecture

Figure 2: Tenant isolation

Figure 2: Tenant isolation

  1. A tenant user calls an API endpoint using a valid JWT.
  2. The Lambda function uses AWS Security Token Service (AWS STS) to assume an IAM role with an IAM policy that allows access to the tenant-to-policy-store mapping DynamoDB table. The IAM policy only allows access to the table and the entries that belong to the requesting tenant. When the function assumes the role, it uses tenant_id to scope access to the items whose partition key matches the tenant_id. See the How to implement SaaS tenant isolation with ABAC and AWS IAM blog post for examples of such policies.
  3. The Lambda function uses the user’s tenant_id to get the Verified Permissions policy_store_id.
  4. The Lambda function uses the same mechanism as in step 2 to assume a different IAM role using tenant_id and policy_store_id which only allows access to the tenant policy store.
  5. The Lambda function accesses the tenant policy store.

Add a document

When a user first accesses the application, they don’t own any documents. To add a document, the frontend calls the POST /documents endpoint and supplies a document_name in the request’s body.

Cedar policy

We need a global policy that allows every tenant user to add a new document. The tenant onboarding process creates this policy in the tenant’s policy store.

permit (    
  principal,
  action == DocumentsAPI::Action::"addDocument",
  resource
);

This policy allows any principal to add a document. Because we’re using a per-tenant policy store approach, there’s no need to scope the principal to a tenant.

Architecture

Figure 3: Adding a document

Figure 3: Adding a document

  1. A tenant user calls the POST /documents endpoint to add a document.
  2. The Lambda function uses the user’s tenant_id to get the Verified Permissions policy_store_id.
  3. The Lambda function calls the Verified Permissions policy store to check if the tenant user is authorized to add a document.
  4. After successful authorization, the Lambda function adds a new document to the documents metadata database and uploads the document to the documents storage.

The database structure is described in the following table:

tenant_id (Partition key): String document_id (Sort key): String document_name: String document_owner: String
<TENANT_ID> <DOCUMENT_ID> <DOCUMENT_NAME> <USER_ID>
  • tenant_id: The tenant_id from the JWT claims.
  • document_id: A random identifier for the document, created by the application.
  • document_name: The name of the document supplied with the API request.
  • document_owner: The user who created the document. The value is the user_id from the JWT claims.

Share a document with another user of a tenant

After a tenant user has created one or more documents, they might want to share them with other users of the same tenant. To share a document, the frontend calls the POST /shares endpoint and provides the document_id of the document the user wants to share and the user_id of the receiving user.

Cedar policy

We need a global document owner policy that allows the document owner to manage the document, including sharing. The tenant onboarding process creates this policy in the tenant’s policy store.

permit (    
  principal,
  action,
  resource
) when {
  resource.owner == principal && 
  resource.type == "document"
};

The policy allows principals to perform actions on available resources (the document) when the principal is the document owner. This policy allows the shareDocument action, which we describe next, to share a document.

We also need a share policy that allows the receiving user to access the document. The application creates these policies for each successful share action. We recommend that you use policy templates to define the share policy. Policy templates allow a policy to be defined once and then attached to multiple principals and resources. Policies that use a policy template are called template-linked policies. Updates to the policy template are reflected across the principals and resources that use the template. The tenant onboarding process creates the share policy template in the tenant’s policy store.

We define the share policy template as follows:

permit (    
  principal == ?principal,  
  action == DocumentsAPI::Action::"accessDocument",
  resource == ?resource 
);

The following is an example of a template-linked policy using the share policy template:

permit (    
  principal == DocumentsAPI::User::"<user_id>",
  action == DocumentsAPI::Action::"accessDocument",
  resource == DocumentsAPI::Document::"<document_id>" 
);

The policy includes the user_id of the receiving user (principal) and the document_id of the document (resource).

Architecture

Figure 4: Sharing a document

Figure 4: Sharing a document

  1. A tenant user calls the POST /shares endpoint to share a document.
  2. The Lambda function uses the user’s tenant_id to get the Verified Permissions policy_store_id and policy template IDs for each action from the DynamoDB table that stores the tenant to policy store mapping. In this case the function needs to use the share_policy_template_id.
  3. The function queries the documents metadata DynamoDB table to retrieve the document_owner attribute for the document the user wants to share.
  4. The Lambda function calls Verified Permissions to check if the user is authorized to share the document. The request context uses the user_id from the JWT claims as the principal, shareDocument as the action, and the document_id as the resource. The document entity includes the document_owner attribute, which came from the documents metadata DynamoDB table.
  5. If the user is authorized to share the resource, the function creates a new template-linked share policy in the tenant’s policy store. This policy includes the user_id of the receiving user as the principal and the document_id as the resource.

Access a shared document

After a document has been shared, the receiving user wants to access the document. To access the document, the frontend calls the GET /documents endpoint and provides the document_id of the document the user wants to access.

Cedar policy

As shown in the previous section, during the sharing process, the application creates a template-linked share policy that allows the receiving user to access the document. Verified Permissions evaluates this policy when the user tries to access the document.

Architecture

Figure 5: Accessing a shared document

Figure 5: Accessing a shared document

  1. A tenant user calls the GET /documents endpoint to access the document.
  2. The Lambda function uses the user’s tenant_id to get the Verified Permissions policy_store_id.
  3. The Lambda function calls Verified Permissions to check if the user is authorized to access the document. The request context uses the user_id from the JWT claims as the principal, accessDocument as the action, and the document_id as the resource.

Manage all the documents for a tenant

When a customer signs up for a SaaS application, the application creates the tenant admin user. The tenant admin must have permissions to perform all actions on all documents for the tenant.

Cedar policy

We need a global policy that allows tenant admins to manage all documents. The tenant onboarding process creates this policy in the tenant’s policy store.

permit (    
  principal in DocumentsAPI::Group::"<admin_group_id>”,
  action,
  resource
);

This policy allows every member of the <admin_group_id> group to perform any action on any document.

Architecture

Figure 6: Managing documents

Figure 6: Managing documents

  1. A tenant admin calls the POST /documents endpoint to manage a document. 
  2. The Lambda function uses the user’s tenant_id to get the Verified Permissions policy_store_id.
  3. The Lambda function calls Verified Permissions to check if the user is authorized to manage the document.

Conclusion

In this blog post, we showed you how Amazon Verified Permissions helps to implement fine-grained authorization decisions in a multi-tenant SaaS application. You saw how to apply the per-tenant policy store approach to the application architecture. See the Verified Permissions user guide for how to choose between using a per-tenant policy store or one shared policy store. To learn more, visit the Amazon Verified Permissions documentation and workshop.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the Amazon Verified Permissions re:Post or contact AWS Support.

Manuel Heinkel

Manuel Heinkel

Manuel is a Solutions Architect at AWS, working with software companies in Germany to build innovative and secure applications in the cloud. He supports customers in solving business challenges and achieving success with AWS. Manuel has a track record of diving deep into security and SaaS topics. Outside of work, he enjoys spending time with his family and exploring the mountains.

Alex Pulver

Alex Pulver

Alex is a Principal Solutions Architect at AWS. He works with customers to help design processes and solutions for their business needs. His current areas of interest are product engineering, developer experience, and platform strategy. He’s the creator of Application Design Framework, which aims to align business and technology, reduce rework, and enable evolutionary architecture.

Generative AI Meets AWS Security

Post Syndicated from Trevor Morse original https://aws.amazon.com/blogs/devops/generative-ai-meets-aws-security/

A Case Study Presented by CodeWhisperer Customizations

Amazon CodeWhisperer is an AI-powered coding assistant that is trained on a wide variety of data, including Amazon and open-source code. With the launch of CodeWhisperer Customizations, customers can create a customization resource. The customization is produced by augmenting CodeWhisperer using a customer’s private code repositories. This enables organization-specific code recommendations tailored to the customer’s own internal APIs, libraries, and frameworks.

When we started designing CodeWhisperer Customizations, we considered what our guiding principles, our tenets, should be. Customer trust was at the top of the list, but that posed new questions. How could we best earn our customer’s trust with a feature that fundamentally relies on a customer’s sensitive information? How could we properly secure this data so that customers could safely leverage the advanced capabilities we launched for them?

When considering these questions, we analyzed several design principles. It was important to ensure that a customer’s data is never combined, or used alongside, another customer’s. In other words, we needed to store each customer’s data in isolation. Additionally, we also wanted to restrict data processing to single-tenant compute. By this, we mean that any access of the data itself should be done on short-lived and non-shared compute, whenever possible. Another principle we considered was how to prevent unauthorized access of customer data. Across AWS, we build our systems to not only ensure that no customer data is intermingled during normal service operation, but also to mitigate any risk of unauthorized users gaining unintended access to customer data.

These design principles pointed to a set of security controls available via native AWS technologies. We needed to provide data and compute isolation as well as mitigate confused deputy risks at each step of the process. In this blog post, we will consider how each of these security considerations is addressed, utilizing AWS best practices. We will first consider the flow of data through the admin’s management of customization resources. Next, we will outline data interactions when developers send runtime requests to a given customization from their integrated development environment (IDE).

In reading this blog post, you will learn how we developed CodeWhisperer Customizations with security at the forefront. We also hope that you are inspired to leverage some of the same AWS technologies in your own applications.

Diagram

This diagram depicts the flow of customer data through the CodeWhisperer service when managing, and using, a customization.
The diagram above depicts the flow of data during an administrator’s management of a customization as well as during a developer’s usage of the customization from their IDE.

  1. API Layer: Authenticates and authorizes each request. Passes data references to the downstream dependencies.
  2. Data Ingestion Layer: Ingests and processes customer data into the format required for CodeWhisperer.
  3. Customization Layer: Produces a customization resource based on the internal representation of the customer data. Shares the customization artifacts for inference.
  4. Model Inference Layer: Provides customer-specific recommendations based on the customization.
  5. AWS IAM Identity Center: Provides user-level authentication.
  6. Amazon Verified Permissions: Provides customization-level authorization.

Customization Management

Organization admins are responsible for managing their customizations. To enable CodeWhisperer to produce these resources, the admin provides access to their private code repositories. CodeWhisperer uses AWS Key Management Service (AWS KMS) encryption for all customization data, and admins can optionally configure their own profile-level encryption keys. Based on the role assumed by the admin in the AWS console, CodeWhisperer accesses and ingests the referenced code data on the user’s behalf.

Data Isolation

During customization management, data storage occurs in two forms:

  1. Longer-term/persistent (e.g. service-owned Amazon Simple Storage Service (Amazon S3) buckets)
  2. Short-term/transient (e.g. ephemeral disks on service-managed, serverless compute)

When persisting data in any form, the best security control to apply is encryption. By encrypting the data, only entities with access to the encryption key will be able to see, or use, the data. For example, when encrypted data is stored in Amazon S3, users with access to the bucket can see that the data exists, but will be unable to view the content, unless they also have access to the encryption key.

Within CodeWhisperer, long-term customer data storage in Amazon S3 is cryptographically isolated using KMS keys with customer-level encryption context metadata. The encryption context provides a further safeguard which prevents unauthorized users from accessing the content even if they gain access to the key. It also prevents unintentional, cross-customer data access as the context value is tied to a particular customer’s identity. Having access to the KMS key without this context is like having the physical invitation to a private meeting without knowing the spoken passphrase for the event.

CodeWhisperer gives customers the option to configure their own KMS keys for AWS to use when encrypting their data. Additionally, we restrict programmatic access (i.e. service usage) to Amazon S3 data via scoped-down IAM roles assigned to specific internal components. By doing this, AWS ensures that the KMS grants created for each key are strictly limited to the services that need access to the data for service operation.

When data needs to be persisted for short-term processing, we also encrypt it. CodeWhisperer leverages client-side encryption with service-owned keys for such ephemeral disks. Data is only stored on the disk while the process is executing, and any on-disk data storage is explicitly deleted, alarming on any failures, before the process is terminated. To ensure that there is no cross-over of customer data, each instance of the serverless compute is spun up for a specific operation on a specific resource. No two customer resources are processed by the same workflow or serverless function execution.

Compute Isolation

When creating or activating a customization, customer data is handled in a series of serverless environments. Most of this processing is facilitated through AWS Step Functions workflows – comprised of AWS Lambda, AWS Batch (on AWS Fargate), and nested Step Functions tasks. Each of these serverless tasks are instantiated for a given job in the system. In other words, the compute will not be shared, or reused, between two operations.

The general principle that can be observed here is the reuse of existing AWS services. By leveraging these various serverless options, we did not have to spend undifferentiated development effort on securing the compute usage. Instead, we inherited the security controls baked into these services and focused our energy on enabling the unique capabilities of customizing CodeWhisperer.

Confused Deputy Mitigations

When building a multi-tenant service, it is important to be mindful not only of how data is accessed in the expected cases, but also how it might be accessed in accidental as well as malicious scenarios. This is where the concept of confused deputy mitigations comes into picture.

To prevent cross-customer data access during data ingestion, we have two mitigations in place:

  1. We explicitly check that the AWS credentials received in the request correspond to the account that owns the data reference (i.e. AWS CodeStar Connections ARN).
  2. We utilize a secure token, based on the administrator’s role, to gain permissions to download the data from the customer-provided reference.

Once the data is inside the CodeWhisperer service boundaries though, we are not done. Since CodeWhisperer is built on top of a microservice-based architecture, we also need to ensure that only the expected internal components are able to interact with their respective consumers and dependencies. To prevent unauthorized users from invoking these internal services that handle the customer data, we utilize account-based allowlists. Each internal service is restricted to a set of CodeWhisperer-owned service accounts that have a need to invoke the service’s APIs. No external actors are aware of these internal accounts.

As further protection for the data inside these services, we utilize customer-managed key encryption for all Amazon S3 data. When a customer does not explicitly provide their own key, we utilize a CodeWhisperer-owned KMS key for the same encryption.

KMS key usage requires a grant. These grants provide a given entity the ability to use the key to read, or write, data. To mitigate the risk of improper usage of these grants, we installed certain controls. To limit the number of entities with top-level grant permissions, all grants are managed by a single microservice. To restrict the usage of the grants to the expected CodeWhisperer workflows, the grants are created for the minimum lifecycle. They are immediately retired once the CodeWhisperer operation is complete.

Customization Usage

After an admin creates, activates, and grants access to a customization resource, a developer can select the customization within their IDE. Upon invocation, CodeWhisperer captures the user’s IDE code context and sends it to CodeWhisperer. The request also includes their authentication token and a reference to their target customization resource. Given successful authentication and authorization, CodeWhisperer responds with the customized recommendation(s).

Data Isolation

There is no persistent data storage used during invocations of a customization. These invocations are stateless, meaning that any data passed within the request is not persisted beyond the life of the request itself. To mitigate any data risks within the lifetime of the request, we authenticate and authorize users via IAM Identity Center.

Since a customization is tied to proprietary company data and its recommendations can reproduce such data, it is crucial to maintain tight authorization around the resource access. CodeWhisperer authorizes individual users against the customization resource via Amazon Verified Permissions policies. These policies are configured by a customer admin in the AWS Console when they assign users and groups to a given customization. (Note: CodeWhisperer manages these Verified Permissions policies on behalf of our customers, which is why admins will not see the policies themselves listed in the console directly.) The service internally resolves the policy to the corresponding service-owned resources constituting the customization.

Compute Isolation

The primary compute for CodeWhisperer invocations is an instance hosting the generative model. Generative models run multi-tenanted on a physical host, i.e. each model runs on a dedicated compute resource within a host that has multiple such resources. By tying each request to a particular compute resource, inference calls cannot interact or communicate with any other ongoing inference.

All other runtime processing is executed in independent threads on Amazon Elastic Container Service (Amazon ECS) container instances with Fargate technology. No computation on user data spans across more than one of these threads within a given CodeWhisperer service.

Confused Deputy Mitigations

As we discussed for customization management, confused deputy mitigations are applied to reduce the risk of accidental and malicious access to customer data by unauthorized entities. To address this when a customization is used, we restrict customers, via Verified Permissions permissions, to accessing only the internal resources tied to their selected customization. We further protect against confused deputy risks by configuring a session policy for each inference request. This session policy scopes down the permission to a specific resource name, which is internally managed and not exposed publicly.

Conclusion

In the age of generative AI, data is a chief differentiator for the efficacy of end applications. CodeWhisperer’s foundational model has been trained on a wide array of generic data. This enables CodeWhisperer to boost developer productivity from the baseline and utilize open-source packages that are commonly included throughout software development. To further improve developer productivity, customers can leverage CodeWhisperer’s customization capability to ingest their private data and securely provide tailored recommendations to their developers.

CodeWhisperer Customizations was built with security and customer trust at the forefront. We have the following security invariants baked in from day one:

  • All asynchronous customer data workloads are fully data isolated.
  • All customer data is KMS key encrypted at rest, and when possible, encrypted with a customer KMS key.
  • All customer data access is gated by authorization derived from authenticated contexts obtained from trusted authorities (IAM, Identity Center).
  • All customer data in customization management workflows is stored in cryptographically enforced isolation.

We hope you are as excited as us about this capability with generative AI! Give CodeWhisperer Customizations a try today: https://docs.aws.amazon.com/codewhisperer/latest/userguide/customizations.html

Deploy CloudFormation Hooks to an Organization with service-managed StackSets

Post Syndicated from Kirankumar Chandrashekar original https://aws.amazon.com/blogs/devops/deploy-cloudformation-hooks-to-an-organization-with-service-managed-stacksets/

This post demonstrates using AWS CloudFormation StackSets to deploy CloudFormation Hooks from a centralized delegated administrator account to all accounts within an Organization Unit(OU). It provides step-by-step guidance to deploy controls at scale to your AWS Organization as Hooks using StackSets. By following this post, you will learn how to deploy a hook to hundreds of AWS accounts in minutes.

AWS CloudFormation StackSets help deploy CloudFormation stacks to multiple accounts and regions with a single operation. Using service-managed permissions, StackSets automatically generate the IAM roles required to deploy stack instances, eliminating the need for manual creation in each target account prior to deployment. StackSets provide auto-deploy capabilities to deploy stacks to new accounts as they’re added to an Organizational Unit (OU) in AWS Organization. With StackSets, you can deploy AWS well-architected multi-account solutions organization-wide in a single click and target stacks to selected accounts in OUs. You can also leverage StackSets to auto deploy foundational stacks like networking, policies, security, monitoring, disaster recovery, billing, and analytics to new accounts. This ensures consistent security and governance reflecting AWS best practices.

AWS CloudFormation Hooks allow customers to invoke custom logic to validate resource configurations before a CloudFormation stack create/update/delete operation. This helps enforce infrastructure-as-code policies by preventing non-compliant resources. Hooks enable policy-as-code to support consistency and compliance at scale. Without hooks, controlling CloudFormation stack operations centrally across accounts is more challenging because governance checks and enforcement have to be implemented through disjointed workarounds across disparate services after the resources are deployed. Other options like Config rules evaluate resource configurations on a timed basis rather than on stack operations. And SCPs manage account permissions but don’t include custom logic tailored to granular resource configurations. In contrast, CloudFormation hooks allows customer-defined automation to validate each resource as new stacks are deployed or existing ones updated. This enables stronger compliance guarantees and rapid feedback compared to asynchronous or indirect policy enforcement via other mechanisms.

Follow the later sections of this post that provide a step-by-step implementation for deploying hooks across accounts in an organization unit (OU) with a StackSet including:

  1. Configure service-managed permissions to automatically create IAM roles
  2. Create the StackSet in the delegated administrator account
  3. Target the OU to distribute hook stacks to member accounts

This shows how to easily enable a policy-as-code framework organization-wide.

I will show you how to register a custom CloudFormation hook as a private extension, restricting permissions and usage to internal administrators and automation. Registering the hook as a private extension limits discoverability and access. Only approved accounts and roles within the organization can invoke the hook, following security best practices of least privilege.

StackSets Architecture

As depicted in the following AWS StackSets architecture diagram, a dedicated Delegated Administrator Account handles creation, configuration, and management of the StackSet that defines the template for standardized provisioning. In addition, these centrally managed StackSets are deploying a private CloudFormation hook into all member accounts that belong to the given Organization Unit. Registering this as a private CloudFormation hook enables administrative control over the deployment lifecycle events it can respond to. Private hooks prevent public usage, ensuring the hook can only be invoked by approved accounts, roles, or resources inside your organization.

Architecture for deploying CloudFormation Hooks to accounts in an Organization

Diagram 1: StackSets Delegated Administration and Member Account Diagram

In the above architecture, Member accounts join the StackSet through their inclusion in a central Organization Unit. By joining, these accounts receive deployed instances of the StackSet template which provisions resources consistently across accounts, including the controlled private hook for administrative visibility and control.

The delegation of StackSet administration responsibilities to the Delegated Admin Account follows security best practices. Rather than having the sensitive central Management Account handle deployment logistics, delegation isolates these controls to an admin account with purpose-built permissions. The Management Account representing the overall AWS Organization focuses more on high-level compliance governance and organizational oversight. The Delegated Admin Account translates broader guardrails and policies into specific infrastructure automation leveraging StackSets capabilities. This separation of duties ensures administrative privileges are restricted through delegation while also enabling an organization-wide StackSet solution deployment at scale.

Centralized StackSets facilitate account governance through code-based infrastructure management rather than manual account-by-account changes. In summary, the combination of account delegation roles, StackSet administration, and joining through Organization Units creates an architecture to allow governed, infrastructure-as-code deployments across any number of accounts in an AWS Organization.

Sample Hook Development and Deployment

In the section, we will develop a hook on a workstation using the AWS CloudFormation CLI, package it, and upload it to the Hook Package S3 Bucket. Then we will deploy a CloudFormation stack that in turn deploys a hook across member accounts within an Organization Unit (OU) using StackSets.

The sample hook used in this blog post enforces that server-side encryption must be enabled for any S3 buckets and SQS queues created or updated on a CloudFormation stack. This policy requires that all S3 buckets and SQS queues be configured with server-side encryption when provisioned, ensuring security is built into our infrastructure by default. By enforcing encryption at the CloudFormation level, we prevent data from being stored unencrypted and minimize risk of exposure. Rather than manually enabling encryption post-resource creation, our developers simply enable it as a basic CloudFormation parameter. Adding this check directly into provisioning stacks leads to a stronger security posture across environments and applications. This example hook demonstrates functionality for mandating security best practices on infrastructure-as-code deployments.

Prerequisites

On the AWS Organization:

On the workstation where the hooks will be developed:

In the Delegated Administrator account:

Create a hooks package S3 bucket within the delegated administrator account. Upload the hooks package and CloudFormation templates that StackSets will deploy. Ensure the S3 bucket policy allows access from the AWS accounts within the OU. This access lets AWS CloudFormation access the hooks package objects and CloudFormation template objects in the S3 bucket from the member accounts during stack deployment.

Follow these steps to deploy a CloudFormation template that sets up the S3 bucket and permissions:

  1. Click here to download the admin-cfn-hook-deployment-s3-bucket.yaml template file in to your local workstation.
    Note: Make sure you model the S3 bucket and IAM policies as least privilege as possible. For the above S3 Bucket policy, you can add a list of IAM Role ARNs created by the StackSets service managed permissions instead of AWS: “*”, which allows S3 bucket access to all the IAM entities from the accounts in the OU. The ARN of this role will be “arn:aws:iam:::role/stacksets-exec-” in every member account within the OU. For more information about equipping least privilege access to IAM policies and S3 Bucket Policies, refer IAM Policies and Bucket Policies and ACLs! Oh, My! (Controlling Access to S3 Resources) blog post.
  2. Execute the following command to deploy the template admin-cfn-hook-deployment-s3-bucket.yaml using AWS CLI. For more information see Creating a stack using the AWS Command Line Interface. If using AWS CloudFormation console, see Creating a stack on the AWS CloudFormation console.
    To get the OU Id, see Viewing the details of an OU. OU Id starts with “ou-“. To get the Organization Id, see Viewing details about your organization. Organization Id starts with “o-

    aws cloudformation create-stack \
    --stack-name hooks-asset-stack \
    --template-body file://admin-cfn-deployment-s3-bucket.yaml \
    --parameters ParameterKey=OrgId,ParameterValue="&lt;Org_id&gt;" \
    ParameterKey=OUId,ParameterValue="&lt;OU_id&gt;"
  3. After deploying the stack, note down the AWS S3 bucket name from the CloudFormation Outputs.

Hook Development

In this section, you will develop a sample CloudFormation hook package that will enforce encryption for S3 Buckets and SQS queues within the preCreate and preDelete hook. Follow the steps in the walkthrough to develop a sample hook and generate a zip package for deploying and enabling them in all the accounts within an OU. While following the walkthrough, within the Registering hooks section, make sure that you stop right after executing the cfn submit --dry-run command. The --dry-run option will make sure that your hook is built and packaged your without registering it with CloudFormation on your account. While initiating a Hook project if you created a new directory with the name mycompany-testing-mytesthook, the hook package will be generated as a zip file with the name mycompany-testing-mytesthook.zip at the root your hooks project.

Upload mycompany-testing-mytesthook.zip file to the hooks package S3 bucket within the Delegated Administrator account. The packaged zip file can then be distributed to enable the encryption hooks across all accounts in the target OU.

Note: If you are using your own hooks project and not doing the tutorial, irrespective of it, you should make sure that you are executing the cfn submit command with the --dry-run option. This ensures you have a hooks package that can be distributed and reused across multiple accounts.

Hook Deployment using CloudFormation Stack Sets

In this section, deploy the sample hook developed previously across all accounts within an OU. Use a centralized CloudFormation stack deployed from the delegated administrator account via StackSets.

Deploying hooks via CloudFormation requires these key resources:

  1. AWS::CloudFormation::HookVersion: Publishes a new hook version to the CloudFormation registry
  2. AWS::CloudFormation::HookDefaultVersion: Specifies the default hook version for the AWS account and region
  3. AWS::CloudFormation::HookTypeConfig: Defines the hook configuration
  4. AWS::IAM::Role #1: Task execution role that grants the hook permissions
  5. AWS::IAM::Role #2: (Optional) role for CloudWatch logging that CloudFormation will assume to send log entries during hook execution
  6. AWS::Logs::LogGroup: (Optional) Enables CloudWatch error logging for hook executions

Follow these steps to deploy CloudFormation Hooks to accounts within the OU using StackSets:

  1. Click here to download the hooks-template.yaml template file into your local workstation and upload it into the Hooks package S3 bucket in the Delegated Administrator account.
  2. Deploy the hooks CloudFormation template hooks-template.yaml to all accounts within an OU using StackSets. Leverage service-managed permissions for automatic IAM role creation across the OU.
    To deploy the hooks template hooks-template.yaml across OU using StackSets, click here to download the CloudFormation StackSets template hooks-stack-sets-template.yaml locally, and upload it to the hooks package S3 bucket in the delegated administrator account. This StackSets template contains an AWS::CloudFormation::StackSet resource that will deploy the necessary hooks resources from hooks-template.yaml to all accounts in the target OU. Using SERVICE_MANAGED permissions model automatically handle provisioning the required IAM execution roles per account within the OU.
  3. Execute the following command to deploy the template hooks-stack-sets-template.yaml using AWS CLI. For more information see Creating a stack using the AWS Command Line Interface. If using AWS CloudFormation console, see Creating a stack on the AWS CloudFormation console.To get the S3 Https URL for the hooks template, hooks package and StackSets template, login to the AWS S3 service on the AWS console, select the respective object and click on Copy URL button as shown in the following screenshot:s3 download https url
    Diagram 2: S3 Https URL

    To get the OU Id, see Viewing the details of an OU. OU Id starts with “ou-“.
    Make sure to replace the <S3BucketName> and then <OU_Id> accordingly in the following command:

    aws cloudformation create-stack --stack-name hooks-stack-set-stack \
    --template-url https://<S3BucketName>.s3.us-west-2.amazonaws.com/hooks-stack-sets-template.yaml \
    --parameters ParameterKey=OuId,ParameterValue="<OU_Id>" \
    ParameterKey=HookTypeName,ParameterValue="MyCompany::Testing::MyTestHook" \
    ParameterKey=s3TemplateURL,ParameterValue="https://<S3BucketName>.s3.us-west-2.amazonaws.com/hooks-template.yaml" \
    ParameterKey=SchemaHandlerPackageS3URL,ParameterValue="https://<S3BucketName>.s3.us-west-2.amazonaws.com/mycompany-testing-mytesthook.zip"
  4. Check the progress of the stack deployment using the aws cloudformation describe-stack command. Move to the next section when the stack status is CREATE_COMPLETE.
    aws cloudformation describe-stacks --stack-name hooks-stack-set-stack
  5. If you navigate to the AWS CloudFormation Service’s StackSets section in the console, you can view the stack instances deployed to the accounts within the OU. Alternatively, you can execute the AWS CloudFormation list-stack-instances CLI command below to list the deployed stack instances:
    aws cloudformation list-stack-instances --stack-set-name MyTestHookStackSet

Testing the deployed hook

Deploy the following sample templates into any AWS account that is within the OU where the hooks was deployed and activated. Follow the steps in the Creating a stack on the AWS CloudFormation console. If using AWS CloudFormation CLI, follow the steps in the Creating a stack using the AWS Command Line Interface.

  1. Provision a non-compliant stack without server-side encryption using the following template:
    AWSTemplateFormatVersion: 2010-09-09
    Description: |
      This CloudFormation template provisions an S3 Bucket
    Resources:
      S3Bucket:
        Type: 'AWS::S3::Bucket'
        Properties: {}

    The stack deployment will not succeed and will give the following error message

    The following hook(s) failed: [MyCompany::Testing::MyTestHook] and the hook status reason as shown in the following screenshot:

    stack deployment failure due to hooks execution
    Diagram 3: S3 Bucket creation failure with hooks execution

  2. Provision a stack using the following template that has server-side encryption for the S3 Bucket.
    AWSTemplateFormatVersion: 2010-09-09
    Description: |
      This CloudFormation template provisions an encrypted S3 Bucket. **WARNING** This template creates an Amazon S3 bucket and a KMS key that you will be charged for. You will be billed for the AWS resources used if you create a stack from this template.
    Resources:
      EncryptedS3Bucket:
        Type: "AWS::S3::Bucket"
        Properties:
          BucketName: !Sub "encryptedbucket-${AWS::Region}-${AWS::AccountId}"
          BucketEncryption:
            ServerSideEncryptionConfiguration:
              - ServerSideEncryptionByDefault:
                  SSEAlgorithm: "aws:kms"
                  KMSMasterKeyID: !Ref EncryptionKey
                BucketKeyEnabled: true
      EncryptionKey:
        Type: "AWS::KMS::Key"
        DeletionPolicy: Retain
        UpdateReplacePolicy: Retain
        Properties:
          Description: KMS key used to encrypt the resource type artifacts
          EnableKeyRotation: true
          KeyPolicy:
            Version: 2012-10-17
            Statement:
              - Sid: Enable full access for owning account
                Effect: Allow
                Principal:
                  AWS: !Ref "AWS::AccountId"
                Action: "kms:*"
                Resource: "*"
    Outputs:
      EncryptedBucketName:
        Value: !Ref EncryptedS3Bucket

    The deployment will succeed as it will pass the hook validation with the following hook status reason as shown in the following screenshot:

    stack deployment pass due to hooks executionDiagram 4: S3 Bucket creation success with hooks execution

Updating the hooks package

To update the hooks package, follow the same steps described in the Hooks Development section to change the hook code accordingly. Then, execute the cfn submit --dry-run command to build and generate the hooks package file with the registering the type with the CloudFormation registry. Make sure to rename the zip file with a unique name compared to what was previously used. Otherwise, while updating the CloudFormation StackSets stack, it will not see any changes in the template and thus not deploy updates. The best practice is to use a CI/CD pipeline to manage the hook package. Typically, it is good to assign unique version numbers to the hooks packages so that CloudFormation stacks with the new changes get deployed.

Cleanup

Navigate to the AWS CloudFormation console on the Delegated Administrator account, and note down the Hooks package S3 bucket name and empty its contents. Refer to Emptying the Bucket for more information.

Delete the CloudFormation stacks in the following order:

  1. Test stack that failed
  2. Test stack that passed
  3. StackSets CloudFormation stack. This has a DeletionPolicy set to Retain, update the stack by removing the DeletionPolicy and then initiate a stack deletion via CloudFormation or physically delete the StackSet instances and StackSets from the Console or CLI by following: 1. Delete stack instances from your stack set 2. Delete a stack set
  4. Hooks asset CloudFormation stack

Refer to the following documentation to delete CloudFormation Stacks: Deleting a stack on the AWS CloudFormation console or Deleting a stack using AWS CLI.

Conclusion

Throughout this blog post, you have explored how AWS StackSets enable the scalable and centralized deployment of CloudFormation hooks across all accounts within an Organization Unit. By implementing hooks as reusable code templates, StackSets provide consistency benefits and slash the administrative labor associated with fragmented and manual installs. As organizations aim to fortify governance, compliance, and security through hooks, StackSets offer a turnkey mechanism to efficiently reach hundreds of accounts. By leveraging the described architecture of delegated StackSet administration and member account joining, organizations can implement a single hook across hundreds of accounts rather than manually enabling hooks per account. Centralizing your hook code-base within StackSets templates facilitates uniform adoption while also simplifying maintenance. Administrators can update hooks in one location instead of attempting fragmented, account-by-account changes. By enclosing new hooks within reusable StackSets templates, administrators benefit from infrastructure-as-code descriptiveness and version control instead of one-off scripts. Once configured, StackSets provide automated hook propagation without overhead. The delegated administrator merely needs to include target accounts through their Organization Unit alignment rather than handling individual permissions. New accounts added to the OU automatically receive hook deployments through the StackSet orchestration engine.

About the Author

kirankumar.jpeg

Kirankumar Chandrashekar is a Sr. Solutions Architect for Strategic Accounts at AWS. He focuses on leading customers in architecting DevOps, modernization using serverless, containers and container orchestration technologies like Docker, ECS, EKS to name a few. Kirankumar is passionate about DevOps, Infrastructure as Code, modernization and solving complex customer issues. He enjoys music, as well as cooking and traveling.

OT/IT convergence security maturity model

Post Syndicated from James Hobbs original https://aws.amazon.com/blogs/security/ot-it-convergence-security-maturity-model/

For decades, we’ve watched energy companies attempt to bring off-the-shelf information technology (IT) systems into operations technology (OT) environments. These attempts have had varying degrees of success. While converging OT and IT brings new efficiencies, it also brings new risks. There are many moving parts to convergence, and there are several questions that you must answer, such as, “Are systems, processes, and organizations at the same point in their convergence journey?” and “Are risks still being managed well?”

To help you answer these questions, this post provides an aid in the form of a maturity model focused on the security of OT/IT convergence.

OT environments consist of industrial systems that measure, automate, and control physical machines. Because of this, OT risk management must consider potential risks to environment, health, and safety. Adding common IT components can add to these risks. For example, OT networks were typically highly segmented to reduce exposure to external untrusted networks while IT has a seemingly ever-growing network surface. Because of this growing surface, IT networks have built-in resiliency against cyber threats, though they weren’t originally designed for the operational requirements found in OT. However, you can use the strengths of Amazon Web Services (AWS) to help meet regulatory requirements and manage risks in both OT and IT.

The merging of OT and IT has begun at most companies and includes the merging of systems, organizations, and policies. These components are often at different points along their journey, making it necessary to identify each one and where it is in the process to determine where additional attention is needed. Another purpose of the OT/IT security convergence model is to help identify those maturity points.

Patterns in this model often reference specific aspects of how much involvement OT teams have in overall IT and cloud strategies. It’s important to understand that OT is no longer an air-gapped system that is hidden away from cyber risks, and so it now shares many of the same risks as IT. This understanding enables and improves your preparedness for a safe and secure industrial digital transformation using AWS to accelerate your convergence journey.

Getting started with secure OT/IT convergence

The first step in a secure OT/IT convergence is to ask questions. The answers to these questions lead to establishing maturity patterns. For example, the answer might indicate a quick win for convergence, or it might demonstrate a more optimized maturity level. In this section, we review the questions you should ask about your organization:

  1. When was the last time your organization conducted an OT/IT cybersecurity risk assessment using a common framework (such as ISA/IEC 62443) and used it to inform system design?

    When taking advantage of IT technologies in OT environments, it’s important to conduct a cybersecurity risk assessment to fully understand and proactively manage risks. For risk assessments, companies with maturing OT/IT convergence display common patterns. Some patterns to be aware of are:

    • The frequency of risk assessments is driven by risk measures and data
    • IT technologies are successfully being adopted into OT environments
    • Specific cybersecurity risk assessments are conducted in OT
    • Risk assessments are conducted at the start of Industrial Internet of Things (IIoT) projects
    • Risk assessments inform system designs
    • Proactively managing risks, gaps, and vulnerabilities between OT and IT
    • Up-to-date threat modeling capabilities for both OT and IT

    For more information, see:

  2. What is the extent and maturity of IIoT enabled digital transformation in your organization?

    There are several good indicators to determine the maturity of IIoT’s effect on OT/IT convergence in an organization. For example, the number of IIoT implementations with well-defined security controls. Also, the number of IIoT digital use cases developed and realized. Additionally, some maturing IIoT convergence practices are:

    • Simplification and standardization of IIoT security controls
    • Scaling digital use cases across the shop floor
    • IIoT being consumed collaboratively or within organizational silos
    • Integrated IIoT enterprise applications
    • Identifying connections to external networks and how they are routed
    • IoT use cases identified and implemented across multiple industrial sites

    For more information, see:

     

  3. Does your organization maintain an inventory of connected assets and use it to manage risk effectively?

    A critical aspect of a good security program is having visibility into your entire OT and IIoT system and knowing which systems don’t support open networks and modern security controls. Since you can’t protect what you can’t see, your organization must have comprehensive asset visibility. Highly capable asset management processes typically demonstrate the following considerations:

    • Visibility across your entire OT and IIoT system
    • Identifies systems not supporting open networks and modern security controls
    • Vulnerabilities and threats readily map to assets and asset owners
    • Asset visibility is used to improve cybersecurity posture
    • An up-to-date and clear understanding of the OT/IIoT network architecture
    • Defined locations for OT data including asset and configuration data
    • Automated asset inventory with modern discovery processes in OT
    • Asset inventory collections that are non-disruptive and do not introduce new vulnerabilities to OT

    For more information, see:

     

  4. Does your organization have an incident response plan for converged OT and IT environments?

    Incident response planning is essential for critical infrastructure organizations to minimize the impacts of cyber events. Some considerations are:

    • An incident response plan that aims to minimize the effects of a cyber event
    • The effect of incidents on an organization’s operations, reputation, and assets
    • Developed and tested incident response runbooks
    • A plan identifying potential risks and vulnerabilities
    • A plan prioritizing and allocating response personnel
    • Established clear roles and responsibilities
    • Documented communication procedures, backup, and recovery
    • Defined incident escalation procedures
    • Frequency of response plan testing and cyber drills
    • Incident response collaboration between OT and IT authorities
    • Relying on individuals versus team processes for incident response
    • Measuring incident response OT/IT coordination hesitation during drills
    • An authoritative decision maker across OT and IT for seamless incident response leadership

    For more information, see:

     

  5. With reference to corporate governance, are OT and IT using separate policies and controls to manage cybersecurity risks or are they using the same policy?

    The ongoing maturity and adoption of cloud within IT and now within OT creates a more common environment. A comprehensive enterprise and OT security policy will encompass risks across the entirety of the business. This allows for OT risks such as safety to be recognized and addressed within IT. Conversely, this allows for IT risks such as bots and ransomware to be addressed within OT. While policies might converge, mitigation strategies will still differ in many cases. Some considerations are:

    • OT and IT maintaining separate risk policies.
    • Assuming air-gapped OT systems.
    • The degree of isolation for process control and safety networks.
    • Interconnectedness of OT and IT systems and networks.
    • Security risks that were applicable to either IT or OT might now apply to both.
    • OT comprehension of risks related to lateral movement.
    • Singular security control policy that governs both OT and IT.
    • Different mitigation strategies as appropriate for OT and for IT. For example, the speed of patching is often different between OT and IT by design.
    • Different risk measures maintained between OT and IT.
    • A common view of risk to the business.
    • The use of holistic approaches to manage OT and IT risk.

    For more information, see:

     

  6. Is there a central cloud center of excellence (CCoE) with equivalent representation from OT and IT?

    Consolidating resources into centers of excellence has proven an effective way to bring focus to new or transforming enterprises. Many companies have created CCoEs around security within the past two decades to consolidate experts from around the company. Such focused areas are a central point of technical authority and accelerates decision making. Some considerations are:

    • Consolidating resources into centers of excellence.
    • Security experts consolidated from around the company into a singular organization.
    • Defining security focus areas based on risk priorities.
    • Having a central point of security authority.
    • OT and IT teams operating uniformly.
    • Well understood and applied incident response decision rights in OT.

    For more information, see:

     

  7. Is there a clear definition of the business value of converging OT and IT?

    Security projects face extra scrutiny from multiple parties ranging from shareholders to regulators. Because of this, each project must be tied to business and operational outcomes. The value of securing converged OT and IT technologies is realized by maintaining and improving operations and resilience. Some considerations are:

    • Security projects are tied to appropriate outcomes
    • The same measures are used to track security program benefits across OT and IT.
    • OT and IT security budgets merged.
    • The CISO has visibility to OT security risk data.
    • OT personnel are invited to cloud strategy meetings.
    • OT and IT security reporting is through a singular leader such as a CISO.
    • Engagement of OT personnel in IT security meetings.

    For more information, see:

     

  8. Does your organization have security monitoring across the full threat surface?

    With the increasing convergence of OT and IT, the digital threat surface has expanded and organizations must deploy security audit and monitoring mechanisms across OT, IIoT, edge, and cloud environments and collect security logs for analysis using security information and event management (SIEM) tools within a security operations center (SOC). Without full visibility of traffic entering and exiting OT networks, a quickly spreading event between OT and IT might go undetected. Some considerations are:

    • Awareness of the expanding digital attack surface.
    • Security audit and monitoring mechanisms across OT, IIoT, edge, and cloud environments.
    • Security logs collected for analysis using SIEM tools within a SOC.
    • Full visibility and control of traffic entering and exiting OT networks.
    • Malicious threat actor capabilities for destructive consequences to physical cyber systems.
    • The downstream impacts resulting in OT networks being shut down due to safety concerns.
    • The ability to safely operate and monitor OT networks during a security event.
    • Benefits of a unified SOC.
    • Coordinated threat detection and immediate sharing of indicators enabled.
    • Access to teams that can map potential attack paths and origins.

    For more information, see:

     

  9. Does your IT team fully comprehend the differences in priority between OT and IT with regard to availability, integrity, and confidentiality?

    Downtime equals lost revenue. While this is true in IT as well, it is less direct than it is in OT and can often be overcome with a variety of redundancy strategies. While the OT formula for data and systems is availability, integrity, then confidentiality, it also focuses on safety and reliability. To develop a holistic picture of corporate security risks, you must understand that systems in OT have been and will continue to be built with availability as the key component. Some considerations are:

    • Availability is vital in OT. Systems must run in order to produce and manufacture product. Downtime equals lost revenue.
    • IT redundancy strategies might not directly translate to OT.
    • OT owners previously relied on air-gapped systems or layers of defenses to achieve confidentiality.
    • Must have a holistic picture of all corporate security risks.
    • Security defenses are often designed to wrap around OT zones while restricting the conduits between them.
    • OT and IT risks are managed collectively.
    • Implement common security controls between OT and IT.

    For more information, see:

     

  10. Are your OT support teams engaged with cloud strategy?

    Given the historical nature of the separation of IT and OT, organizations might still operate in silos. An indication of converging maturity is how well those teams are working across divisions or have even removed silos altogether. OT systems are part of larger safety and risk management programs within industrial systems from which many IT systems have typically remained separated. As the National Institute of Standards and Technology states, “To properly address security in an industrial control system (ICS), it is essential for a cross-functional cybersecurity team to share their varied domain knowledge and experience to evaluate and mitigate risk to the ICS.” [NIST 800-82r2, Pg. 3]. Some considerations are:

    • OT experts should be directly involved in security and cloud strategy.
    • OT systems are part of larger safety and risk management programs.
    • Make sure that communications between OT and IT aren’t limited or strained.
    • OT and IT personnel should interact regularly.
    • OT personnel should not only be informed of cloud strategies, but should be active participants.

    For more information, see:

     

  11. How much of your cloud security across OT and IT is managed manually and how much is automated?

    Security automation means addressing threats automatically by providing predefined response and remediation actions based on compliance standards or best practices. Automation can resolve common security findings to improve your posture within AWS. It also allows you to quickly respond to threat events. Some considerations are:

    • Cyber responses are predefined and are real-time.
    • Playbooks exist and include OT scenarios.
    • Automated remediations are routine practice.
    • Foundational security is automated.
    • Audit trails are enabled with notifications for automated actions.
    • OT events are aggregated, prioritized, and consumed into orchestration tools.
    • Cloud security postures for OT and IT are understood and documented.

    For more information, see:

     

  12. To what degree are your OT and IT networks segmented?

    Network segmentation has been well established as a foundational security practice. NIST SP800-82r3 pg.72 states, “Implementing network segmentation utilizing levels, tiers, or zones allows organizations to control access to sensitive information and components while also considering operational performance and safety.”

    Minimizing network access to OT systems reduces the available threat surface. Typically, firewalls are used as control points between different segments. Several models exist showing separation of OT and IT networks and maintaining boundary zones between the two. As stated in section 5.2.3.1 “A good practice for network architectures is to characterize, segment, and isolate IT and OT devices.” (NIST SP800-82r3).

    AWS provides multiple ways to segment and firewall network boundaries depending upon requirements and customer needs. Some considerations are:

    • Existence of a perimeter network between OT and IT.
    • Level of audit and inspection of perimeter network traffic.
    • Amount of direct connectivity between OT and IT.
    • Segmentation is regularly tested for threat surface vulnerabilities.
    • Use of cloud-native tools to manage networks.
    • Identification and use of high-risk ports.
    • OT and IT personnel are collaborative network boundary decision makers.
    • Network boundary changes include OT risk management methodologies.
    • Defense in depth measures are evident.
    • Network flow log data analysis.

    For more information, see:

The following table describes typical patterns seen at each maturity level.

Phase 1: Quick wins Phase 2: Foundational Phase 3: Efficient Phase 4: Optimized
1 When was the last time your organization conducted an OT/IT cybersecurity risk assessment using a common framework (such as ISA/IEC 62443) and used it to inform system design? A basic risk assessment performed to identify risks, gaps, and vulnerabilities Organization has manual threat modeling capabilities and maintains an up-to-date threat model Organization has automated threat modeling capabilities using the latest tools Organization maintains threat modeling automation as code and an agile ability to use the latest tools
2 What is the extent and maturity of IIoT enabled digital transformation in your organization? Organization is actively introducing IIoT on proof-of-value projects Organization is moving from proof-of-value projects to production pilots Organization is actively identifying and prioritizing business opportunities and use cases and using the lessons learned from pilot sites to reduce the time-to-value at other sites Organization is scaling the use of IIoT across multiple use cases, sites, and assets and can rapidly iterate new IIoT to meet changing business needs
3 Does your organization maintain an inventory of connected assets and use it to manage risk effectively? Manual tracking of connected assets with no automated tools for new asset discovery Introduction of asset discovery tools to discover and create an inventory of all connected assets Automated tools for asset discovery, inventory management, and frequent reporting Near real time asset discovery and consolidated inventory in a configuration management database (CMDB)
4 Does your organization have an incident response plan for converged OT and IT environments? Organization has separate incident response plans for OT and IT environments Organization has an ICS-specific incident response plan to account for the complexities and operational necessities of responding in operational environments Cyber operators are trained to ensure process safety and system reliability when responding to security events in converged OT/IT environments Organization has incident response plans and playbooks for converged OT/IT environments
5 With reference to corporate governance, are OT and IT using separate policies and controls to manage cybersecurity risks or are they using the same policy? Organization has separate risk policies for OT and IT environments Organization has some combined policies across OT and IT but might not account for all OT risks Organization accounts for OT risks such as health and safety in a central cyber risk register Organization has codified central risk management policy accounting for both OT and IT risks
6 Is there a central cloud center of excellence (CCoE) with equivalent representation from OT and IT? Cloud CoE exists with as-needed engagement from OT on special projects Some representation from OT in cloud CoE Increasing representation from OT in cloud CoE Cloud CoE exists with good representation from OT and IT
7 Is there a clear definition of the business value of converging OT and IT? No clear definition of business value from convergence projects Organization working towards defining key performance indicators (KPIs) for convergence projects Organization has identified KPIs and created a baseline of their current as-is state Organization is actively measuring KPIs on convergence projects
8 Does your organization have security monitoring across the full threat surface? Stand-alone monitoring systems in OT and IT with no integration between systems Limited integration between OT and IT monitoring systems and may have separate SOCs Increasing integration between OT and IT systems with some holistic SOC activities Convergence of OT and IT security monitoring in a unified and global SOC
9 Does your IT team fully comprehend the differences in priority between IT and OT with regard to availability, integrity, and confidentiality? IT teams lack an understanding of the priorities in OT as they relate to safety and availability IT teams have a high level understanding of the differences between OT and IT risks and OT teams understand cyber threat models IT teams being trained on the priorities and differences of OT systems and include them in cyber risk measures IT fully understands the differences and increased risk from OT/IT convergence and members work on cross-functional teams as interchangeable experts
10 Are your OT support teams engaged with cloud strategy? Separate OT and IT teams with limited collaboration on projects Limited OT team engagement in cloud strategy Increasing OT team engagement in cloud security OT teams actively engaged with cloud strategy
11 How much of your cloud security across OT and IT is managed manually and how much is automated? Processes are manual and use non-enterprise grade tools Automation exists in pockets within OT Security decisions are increasingly automated and iterated upon with guardrails in place Manual steps are minimized and security decisions are automated as code
12 To what degree are your OT and IT networks segmented? Limited segmentation between OT and IT networks Introduction of an industrial perimeter network between OT and IT networks Industrial perimeter network exists with some OT network segmentation Industrial perimeter network between OT and IT networks with micro-network segmentation within OT and IT networks

Conclusion

In this post, you learned how you can use this OT/IT convergence security maturity model to help identify areas for improvement. There were 12 questions and patterns that are examples you can build upon. This model isn’t the end, but a guide for getting started. Successful implementation of OT/IT convergence for industrial digital transformation requires ongoing strategic security management because it’s not just about technology integration. The risks of cyber events that OT/IT convergence exposes must be addressed. Organizations fall into various levels of maturity. These are quick wins, foundational, efficient, and optimized. AWS tools, guidance, and professional services can help accelerate your journey to both technical and organizational maturity.

Additional reading

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

James Hobbs

James Hobbs

James is a Security Principal for AWS Energy & Utilities Professional Services. He has over 30 years of IT experience, including 20 years leading cybersecurity and incident response teams for the world’s largest Energy companies. He spent years focused on OT security and controls. He guides internal security practices and advises Energy customers on OT and IT cybersecurity practices.

Ryan Dsouza

Ryan Dsouza

Ryan is a Principal IoT Security Solutions Architect at AWS. Based in NYC, Ryan helps customers design, develop, and operate more secure, scalable, and innovative IIoT solutions using the breadth and depth of AWS capabilities to deliver measurable business outcomes. He is passionate about bringing security to all connected devices and being a champion of building a better, safer, and more resilient world for everyone.

How to customize access tokens in Amazon Cognito user pools

Post Syndicated from Edward Sun original https://aws.amazon.com/blogs/security/how-to-customize-access-tokens-in-amazon-cognito-user-pools/

With Amazon Cognito, you can implement customer identity and access management (CIAM) into your web and mobile applications. You can add user authentication and access control to your applications in minutes.

In this post, I introduce you to the new access token customization feature for Amazon Cognito user pools and show you how to use it. Access token customization is included in the advanced security features (ASF) of Amazon Cognito. Note that ASF is subject to additional pricing as described on the Amazon Cognito pricing page.

What is access token customization?

When a user signs in to your app, Amazon Cognito verifies their sign-in information, and if the user is authenticated successfully, returns the ID, access, and refresh tokens. The access token, which uses the JSON Web Token (JWT) format following the RFC7519 standard, contains claims in the token payload that identify the principal being authenticated, and session attributes such as authentication time and token expiration time. More importantly, the access token also contains authorization attributes in the form of user group memberships and OAuth scopes. Your applications or API resource servers can evaluate the token claims to authorize specific actions on behalf of users.

With access token customization, you can add application-specific claims to the standard access token and then make fine-grained authorization decisions to provide a differentiated end-user experience. You can refine the original scope claims to further restrict access to your resources and enforce the least privileged access. You can also enrich access tokens with claims from other sources, such as user subscription information stored in an Amazon DynamoDB table. Your application can use this enriched claim to determine the level of access and content available to the user. This reduces the need to build a custom solution to look up attributes in your application’s code, thereby reducing application complexity, improving performance, and smoothing the integration experience with downstream applications.

How do I use the access token customization feature?

Amazon Cognito works with AWS Lambda functions to modify your user pool’s authentication behavior and end-user experience. In this section, you’ll learn how to configure a pre token generation Lambda trigger function and invoke it during the Amazon Cognito authentication process. I’ll also show you an example function to help you write your own Lambda function.

Lambda trigger flow

During a user authentication, you can choose to have Amazon Cognito invoke a pre token generation trigger to enrich and customize your tokens.

Figure 1: Pre token generation trigger flow

Figure 1: Pre token generation trigger flow

Figure 1 illustrates the pre token generation trigger flow. This flow has the following steps:

  1. An end user signs in to your app and authenticates with an Amazon Cognito user pool.
  2. After the user completes the authentication, Amazon Cognito invokes the pre token generation Lambda trigger, and sends event data to your Lambda function, such as userAttributes and scopes, in a pre token generation trigger event.
  3. Your Lambda function code processes token enrichment logic, and returns a response event to Amazon Cognito to indicate the claims that you want to add or suppress.
  4. Amazon Cognito vends a customized JWT to your application.

The pre token generation trigger flow supports OAuth 2.0 grant types, such as the authorization code grant flow and implicit grant flow, and also supports user authentication through the AWS SDK.

Enable access token customization

Your Amazon Cognito user pool delivers two different versions of the pre token generation trigger event to your Lambda function. Trigger event version 1 includes userAttributes, groupConfiguration, and clientMetadata in the event request, which you can use to customize ID token claims. Trigger event version 2 adds scope in the event request, which you can use to customize scopes in the access token in addition to customizing other claims.

In this section, I’ll show you how to update your user pool to trigger event version 2 and enable access token customization.

To enable access token customization

  1. Open the Cognito user pool console, and then choose User pools.
  2. Choose the target user pool for token customization.
  3. On the User pool properties tab, in the Lambda triggers section, choose Add Lambda trigger.
  4. Figure 2: Add Lambda trigger

    Figure 2: Add Lambda trigger

  5. In the Lambda triggers section, do the following:
    1. For Trigger type, select Authentication.
    2. For Authentication, select Pre token generation trigger.
    3. For Trigger event version, select Basic features + access token customization – Recommended. If this option isn’t available to you, make sure that you have enabled advanced security features. You must have advanced security features enabled to access this option.
  6. Figure 3: Select Lambda trigger

    Figure 3: Select Lambda trigger

  7. Select your Lambda function and assign it as the pre token generation trigger. Then choose Add Lambda trigger.
  8. Figure 4: Add Lambda trigger

    Figure 4: Add Lambda trigger

Example pre token generation trigger

Now that you have enabled access token customization, I’ll walk you through a code example of the pre token generation Lambda trigger, and the version 2 trigger event. This code example examines the trigger event request, and adds a new custom claim and a custom OAuth scope in the response for Amazon Cognito to customize the access token to suit various authorization scheme.

Here is an example version 2 trigger event. The event request contains the user attributes from the Amazon Cognito user pool, the original scope claims, and the original group configurations. It has two custom attributes—membership and location—which are collected during the user registration process and stored in the Cognito user pool.

{
  "version": "2",
  "triggerSource": "TokenGeneration_HostedAuth",
  "region": "us-east-1",
  "userPoolId": "us-east-1_01EXAMPLE",
  "userName": "mytestuser",
  "callerContext": {
    "awsSdkVersion": "aws-sdk-unknown-unknown",
    "clientId": "1example23456789"
  },
  "request": {
    "userAttributes": {
      "sub": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111",
      "cognito:user_status": "CONFIRMED",
      "email": "[email protected]",
      "email_verified": "true",
      "custom:membership": "Premium",
      "custom:location": "USA"
    },
    "groupConfiguration": {
      "groupsToOverride": [],
      "iamRolesToOverride": [],
      "preferredRole": null
    },
    "scopes": [
      "openid",
      "profile",
      "email"
    ]
  },
  "response": {
    "claimsAndScopeOverrideDetails": null
  }
}

In the following code example, I transformed the user’s location attribute and membership attribute to add a custom claim and a custom scope. I used the claimsToAddOrOverride field to create a new custom claim called demo:membershipLevel with a membership value of Premium from the event request. I also constructed a new scope with the value of membership:USA.Premium through the scopesToAdd claim, and added the new claim and scope in the event response.

export const handler = function(event, context) {
  // Retrieve user attribute from event request
  const userAttributes = event.request.userAttributes;
  // Add scope to event response
  event.response = {
    "claimsAndScopeOverrideDetails": {
      "idTokenGeneration": {},
      "accessTokenGeneration": {
        "claimsToAddOrOverride": {
          "demo:membershipLevel": userAttributes['custom:membership']
        },
        "scopesToAdd": ["membership:" + userAttributes['custom:location'] + "." + userAttributes['custom:membership']]
      }
    }
  };
  // Return to Amazon Cognito
  context.done(null, event);
};

With the preceding code, the Lambda trigger sends the following response back to Amazon Cognito to indicate the customization that was needed for the access tokens.

"response": {
  "claimsAndScopeOverrideDetails": {
    "idTokenGeneration": {},
    "accessTokenGeneration": {
      "claimsToAddOrOverride": {
        "demo:membershipLevel": "Premium"
      },
      "scopesToAdd": [
        "membership:USA.Premium"
      ]
    }
  }
}

Then Amazon Cognito issues tokens with these customizations at runtime:

{
  "sub": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111",
  "iss": "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_01EXAMPLE",
  "version": 2,
  "client_id": "1example23456789",
  "event_id": "01faa385-562d-4730-8c3b-458e5c8f537b",
  "token_use": "access",
  "demo:membershipLevel": "Premium",
  "scope": "openid profile email membership:USA.Premium",
  "auth_time": 1702270800,
  "exp": 1702271100,
  "iat": 1702270800,
  "jti": "d903dcdf-8c73-45e3-bf44-51bf7c395e06",
  "username": "mytestuser"
}

Your application can then use the newly-minted, custom scope and claim to authorize users and provide them with a personalized experience.

Considerations and best practices

There are four general considerations and best practices that you can follow:

  1. Some claims and scopes aren’t customizable. For example, you can’t customize claims such as auth_time, iss, and sub, or scopes such as aws.cognito.signin.user.admin. For the full list of excluded claims and scopes, see the Excluded claims and scopes.
  2. Work backwards from authorization. When you customize access tokens, you should start with your existing authorization schema and then decide whether to customize the scopes or claims, or both. Standard OAuth based authorization scenarios, such as Amazon API Gateway authorizers, typically use custom scopes to provide access. However, if you have complex or fine-grained authorization requirements, then you should consider using both scopes and custom claims to pass additional contextual data to the application or to a policy-based access control service such as Amazon Verified Permission.
  3. Establish governance in token customization. You should have a consistent company engineering policy to provide nomenclature guidance for scopes and claims. A syntax standard promotes globally unique variables and avoids a name collision across different application teams. For example, Application X at AnyCompany can choose to name their scope as ac.appx.claim_name, where ac represents AnyCompany as a global identifier and appx.claim_name represents Application X’s custom claim.
  4. Be aware of limits. Because tokens are passed through various networks and systems, you need to be aware of potential token size limitations in your systems. You should keep scope and claim names as short as possible, while still being descriptive.

Conclusion

In this post, you learned how to integrate a pre token generation Lambda trigger with your Amazon Cognito user pool to customize access tokens. You can use the access token customization feature to provide differentiated services to your end users based on claims and OAuth scopes. For more information, see pre token generation Lambda trigger in the Amazon Cognito Developer Guide.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Edward Sun

Edward Sun

Edward is a Security Specialist Solutions Architect focused on identity and access management. He loves helping customers throughout their cloud transformation journey with architecture design, security best practices, migration, and cost optimizations. Outside of work, Edward enjoys hiking, golfing, and cheering for his alma mater, the Georgia Bulldogs.