Tag Archives: Technical How-to

Testing and evaluating GuardDuty detections

Post Syndicated from Marshall Jones original https://aws.amazon.com/blogs/security/testing-and-evaluating-guardduty-detections/

Amazon GuardDuty is a threat detection service that continuously monitors, analyzes, and processes Amazon Web Services (AWS) data sources and logs in your AWS environment. GuardDuty uses threat intelligence feeds, such as lists of malicious IP addresses and domains, file hashes, and machine learning (ML) models to identify suspicious and potentially malicious activity in your AWS environment. When GuardDuty identifies a potential security issue, it creates a GuardDuty finding that gives you information about what the potential security issue is, the resources involved, and contextualized information that’s key to remediating the issue. GuardDuty helps you monitor for the latest threats by continually expanding threat detection to emerging and common threats.

Whether you’re new to GuardDuty or are a long-time user, it’s recommended that you understand the different GuardDuty finding types and finding details and practice responding to them as suggested in the security pillar of AWS Well-Architected.

In this blog post, I dive deep into an open source tool for testing GuardDuty findings and then walk through three examples of how you can use this tool to test and improve your response to GuardDuty findings.

Overview

If you want to learn more about GuardDuty, you can read about the finding types in this AWS documentation. However, customers often want realistic findings in their environment to understand what a finding looks like and to practice responding hands on. While you can use GuardDuty to create sample findings in your environment, these findings are approximations populated with placeholder values and look different from real findings. Additionally, you cannot practice remediation with these findings because they’re not tied to actual resources in your account. This can be helpful if you only want to see what details are in a finding, but if you want to practice a real-world scenario, these sample findings might not be adequate.

To address this use case and provide customers with a secure and reliable way to test the threat detection capabilities of GuardDuty, the GuardDuty service team launched an open source project called GuardDuty Tester. The GuardDuty Tester creates infrastructure in your environment to simulate different security issues so that you can test GuardDuty findings that mirror actual security issues that you might encounter, such as crypto mining or a reverse shell being created on an Amazon Elastic Compute Cloud (Amazon EC2) instance. The GuardDuty Tester was originally released in 2018 as an AWS CloudFormation template and was focused more on testing investigation workflows than on a wide range of finding types. AWS has since released an updated version that uses the AWS Cloud Development Kit (AWS CDK) to make the infrastructure code easier to read and expanded the test coverage to over 100 unique finding types and resource combinations.

The ability to create findings across different resource types such as Amazon EC2, Amazon Simple Storage Service (Amazon S3), and Amazon Elastic Kubernetes Service (Amazon EKS) is a valuable resource for your security team, allowing them to simulate various types of threats with isolated infrastructure so that you don’t need to compromise your deployed workloads to improve response actions and techniques. Remember that the GuardDuty Tester doesn’t cover every possible scenario, but is instead focused on threat intelligence and rules-based findings. Anomaly-based findings, which require learning about how you operate your environment, aren’t included in the GuardDuty Tester.

Getting started with the GuardDuty Tester

The GuardDuty Tester is deployed by using the AWS CDK to create the required infrastructure and scripts to generate the GuardDuty findings. For safety, AWS recommends that you deploy the GuardDuty Tester in a nonproduction environment in an account that’s used specifically for this purpose. This way, your security team can differentiate between test GuardDuty findings and findings for other workloads that they’re monitoring.

In this post, I won’t walk through configuring the GuardDuty Tester because this is already documented in the GuardDuty documentation. Instead, I will go over what you need to know about the GuardDuty Tester and some of the benefits.

Figure 1 shows the GuardDuty Tester architecture, which includes the resources necessary to create GuardDuty findings for various protection plans such as Amazon S3 buckets, Amazon EC2 instances, and an Amazon EKS cluster. The tester also deploys a dedicated GuardDuty Tester instance where you will run the scripts needed to create the GuardDuty findings.

Figure 1: GuardDuty Tester architecture

Figure 1: GuardDuty Tester architecture

The GuardDuty Tester provides key features including:

  • A wide range of threat scenario simulations: Resources that the GuardDuty Tester can create findings for include Amazon S3, AWS Identity and Access Management (IAM), Amazon Elastic Container Service (Amazon ECS) for both Amazon EC2 and AWS Fargate hosted workloads, Amazon EKS, and AWS Lambda and covers over 105 threat scenarios. This includes GuardDuty runtime monitoring as well as other GuardDuty protection plans.
  • Access through AWS Systems Manager: The GuardDuty Tester provides secure access by using Systems Manager to minimize open ports to the internet and allowing access only through Systems Manager.
  • Modular scripts: With an expanded library of tests available, the GuardDuty Tester accepts user parameters to set the scope of the tests to run, which gives you greater flexibility for different testing scenarios.

Setting up the GuardDuty Tester environment is straightforward and requires only a few commands. As outlined in the documentation and the README file in the repository, there are a number of prerequisites to set up the stack. These prerequisites include Python 3+, git, the AWS Command Line Interface (AWS CLI), AWS Systems Manager Session Manager plugin, npm, Docker, and a subscription to Kali Linux image for Amazon EC2. You will have to subscribe to the Kali Linux instance in AWS Marketplace, but will be charged for the instance only while the GuardDuty Tester is deployed. After these prerequisites are met, you can clone the repository, install the packages, and deploy the GuardDuty Tester to your AWS account.

Deploying the GuardDuty Tester can take 20–30 minutes, but if you’re following along with this post, I assume that you have deployed the GuardDuty Tester into your environment and have started your Systems Manager session as stated on Part A of Step 3 – Run tester scripts in the GuardDuty documentation. Now, I will dive into the first testing example.

Manual investigation

The first test use case is about getting familiar with what GuardDuty findings look like and the details that a finding gives you. This might be one of your first steps after turning on GuardDuty, or this might be an activity that you perform to help new team members understand GuardDuty findings.

To start a manual investigation:

  1. Run the following command in your Systems Manager session to view the GuardDuty Tester options.
    Python3 guardduty_tester.py --help
  2. Run the following command in your Systems Manager session to create the first test finding.
    Python3 guardduty_tester.py - -ec2 - -runtime only - -tactics impact
  3. Before creating the findings, the GuardDuty Tester prompts you to confirm that it’s allowed to change GuardDuty settings in the environment. For example, if you’ve chosen to create findings related to the GuardDuty runtime monitoring feature but don’t have this feature enabled, the GuardDuty Tester will enable it for the tests and then disable it after testing is complete.

    Note: This will start the 30-day trial of the enabled features in this account, in this AWS Region, even if the feature is disabled after testing is complete. More information about GuardDuty pricing and free trials can be found on the GuardDuty pricing page.

  4. After choosing y which indicates “yes”, the GuardDuty Tester reports the number of domain reputation findings it’s expecting. Figure 2 shows an example of the expected findings. You can learn more about domain reputation findings in the GuardDuty finding documentation.

    Figure 2: Generated GuardDuty findings in the console

    Figure 2: Generated GuardDuty findings in the console

  5. After the GuardDuty Tester is finished, wait a few minutes and then go to the AWS Management Console for GuardDuty to see the findings. In this example, there are four new GuardDuty findings as expected from step 4 and shown in Figure 3. With the findings generated, you can start your manual investigation.

    Figure 3: GuardDuty finding details

    Figure 3: GuardDuty finding details

In the preceding figure, you can see some of the finding details presented—such as the action type and the process information—that can help you quickly identify what trigger started the suspicious communication. From here, I encourage you to use this finding to practice your runbooks for investigation and response. For example, you might start with validating and triaging the finding before moving into evidence collection and remediation. If you don’t have incident response runbooks already built, you can use this finding as an example to get started. There are multiple open source examples such as AWS incident response playbooks and AWS customer response playbook. A runbook will help your team evaluate the information provided in the GuardDuty finding and understand what else they need to know about your specific environment to properly respond to the finding. For example, in the finding, you will have resource and actor information but not things such as who is the account owner or point of contact for security for that account.

Creating alerts

The next use case highlights how to create alerts based on GuardDuty findings. When setting up alerting automation with tools such as Amazon Simple Notification Service (Amazon SNS) and Slack, you should create a finding using the GuardDuty Tester to test that you’ve configured your alert correctly. See Creating custom responses to GuardDuty findings for information about creating alerts with either of these tools. Figure 4 shows a sample EventBridge rule that will send GuardDuty findings to SNS.

Figure 4: EventBridge rule to send GuardDuty findings to SNS

Figure 4: EventBridge rule to send GuardDuty findings to SNS

For this post, I assume that you’ve already configured an Amazon EventBridge rule and Amazon SNS alert.

To test alerts:

  1. Run the following command in your Systems Manager session to create a privileged container finding.
    Python3 guardduty_tester.py - -finding ‘PrivilegedEscalation:Kubernetes/PrivilegedContainer’
  2. Shortly after creating this finding, you should see an SNS alert based on the finding type.

Figure 5: SNS notification from a GuardDuty finding

Figure 5: SNS notification from a GuardDuty finding

If you’ve configured the alert correctly, you will see an email similar to Figure 5. The email demonstrates that SNS notifications were successfully configured and tested using the GuardDuty Tester. If this is a new finding, you will receive this SNS notification shortly after the GuardDuty Tester generates the finding, but if this is an updated finding, then the timing will be based on the notification frequency configured in the account.

There are many ways that customers consume GuardDuty findings in their environments. Whether you’re using Amazon SNS or another mechanism such as a chat application, ticketing system, or a security information and event management (SIEM) solution, you can use this example of an EventBridge rule and the GuardDuty Tester to test out your notification pipeline.

Automated response

For the third use case, I show you how to create an automated action based on a GuardDuty finding. In this example, I create a finding based on an EC2 instance connecting to a Bitcoin mining domain, then based on this finding, I use Lambda to tag the instance to assist with identification during that investigation steps that follow. Although this is a simple example, it shows you what you can do by combining EventBridge rules and Lambda functions. If you want to create an automated response for GuardDuty runtime monitoring findings that requires making a host-level modification, you can use EventBridge rules with AWS Systems Manager Run Command to run commands locally on a host to remediate a security issue.

Start by creating a Lambda function that will take a GuardDuty event delivered by EventBridge, pull out the instance ID information, and then use that as a parameter in the create_tags API call. See the following example code.

import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:
        # Extract the necessary information from the GuardDuty finding
        instance_id = event['detail']['resource']['instanceDetails']['instanceId']
        account_id = event['detail']['accountId']
        region = event['detail']['region']

        # Create an EC2 client
        ec2 = boto3.client('ec2', region_name=region)

        # Add the "infected" and "cryptomining" tag value pair to the instance
        ec2.create_tags(
            Resources=[instance_id],
            Tags=[
                {
                    'Key': 'infected',
                    'Value': 'cryptomining'
                }
            ]
        )

        logger.info(f"Tagged instance {instance_id} with 'infected=cryptomining' in account {account_id} and region {region}")
        return {
            'statusCode': 200,
            'body': 'Instance tagged successfully'
        }
    except Exception as e:
        logger.error(f"Error tagging instance {instance_id}: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error tagging instance: {str(e)}"
        }

Next, I create an EventBridge rule specific to the Bitcoin mining finding that I want to test, shown in Figure 6. The target is the Lambda function that I just created.

Figure 6: EventBridge rule for crypto mining GuardDuty finding

Figure 6: EventBridge rule for crypto mining GuardDuty finding

Now that the EventBridge rule is in place with the Lambda function as the target, I can use the GuardDuty Tester to trigger a Bitcoin mining finding and test my solution with the following command.

Python3 guardduty_tester.py - - finding ‘CryptoCurrency:EC2/BitcoinTool.B!DNS’

After the finding is generated, I go to my EC2 instance, where there’s a new instance tag with a key of infected and a value of cryptomining, shown in Figure 7.

Figure 7: Updated tags after automated response

Figure 7: Updated tags after automated response

Although this is a general example, you can use the same approach across various actions that you might take in response to a GuardDuty finding and then test them using the GuardDuty Tester. Examples include using Lambda to add logic in AWS WAF, a network access control list (network ACL), or AWS Network Firewall to block suspicious traffic, or use Systems Manager Run Command to end a malicious process that’s running on a host.

Conclusion

The updated GuardDuty Tester represents a significant advancement in helping organizations validate and gain confidence in GuardDuty threat detection. The GuardDuty Tester now provides more comprehensive coverage of GuardDuty runtime monitoring and protection plans across various AWS services.

By using the GuardDuty Tester and following the use cases in this post, you can proactively assess your threat detection readiness, identify potential gaps, and implement necessary measures to help you fortify your AWS environments against evolving cyber threats.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.
 

Marshall Jones
Marshall Jones

Marshall is a Worldwide Security Specialist Solutions Architect at AWS. His background is in AWS consulting and security architecture and focused on a variety of security domains including edge, threat detection, and compliance. Today, he’s focused on helping enterprise AWS customers adopt and operationalize AWS security services to increase security effectiveness and reduce risk.

AWS Firewall Manager retrofitting: Harmonizing central security with application team flexibility

Post Syndicated from Ian Olson original https://aws.amazon.com/blogs/security/aws-firewall-manager-retrofitting-harmonizing-central-security-with-application-team-flexibility/

AWS Firewall Manager is a powerful tool that organizations can use to define common AWS WAF rules with centralized security policies. These policies specify which accounts and resources are in scope. Firewall Manager creates a web access control list (web ACL) that adheres to the organization’s policy requirements and associates it with the in-scope resources. Figure 1 shows a Firewall Manager security policy and web ACL created in each in-scope account.

Figure 1: A Firewall Manager security policy and Firewall Manager created web ACLs in each in-scope account

Figure 1: A Firewall Manager security policy and Firewall Manager created web ACLs in each in-scope account

In this post, we’ll talk about the benefits of retrofitting and how you can use this feature to allow Firewall Manager to manage existing web ACLs. When retrofitting is enabled, a Firewall Manager security policy doesn’t replace existing web ACLs. Instead, Firewall Manager adds the top and bottom rule sections to existing web ACLs associated with in-scope resources. For application teams, Firewall Manger no longer restricts how they configure and deploy AWS WAF. Teams can use either the AWS Management Console or infrastructure as code (IaC) tools to customize rules in web ACLs, even if those web ACLs are managed by Firewall Manager.

Firewall Manager before retrofitting

Firewall Manager offers significant benefits, but the existing approach results in several challenges:

  1. Compatibility with infrastructure as code (IaC): Firewall Manager creates and associates AWS WAF web ACLs with in-scope resources. IaC tools expect to create and manage resources (in other words, own their lifecycle). Application teams cannot use IaC to manage the WAF rules and other web ACL configuration components that are created by Firewall Manager; there are custom solutions that inject locally defined rules into Firewall Manager-created web ACLs, but these are complex and have risks such as drift. For AWS WAF customers and application teams, this introduces an operational challenge.
  2. Existing WAF migration: Customers who are already using AWS WAF must migrate existing rules to Firewall Manager-managed web ACLs.
  3. Application-specific rule complexity: Forcing all in-scope resources in the same account and AWS Region to use the same web ACL makes application-specific or exception-based rules more complex. In addition, changes to one application’s rules could impact others that share the same web ACL.
  4. Increased costs: When many applications share a single web ACL, application-specific rules are part of a single web ACL, which can increase total WAF capacity units (WCU) usage, sometimes resulting in higher AWS WAF request costs.

Firewall Manager with retrofitting addresses challenges

To address these challenges, Firewall Manager now offers the ability to retrofit existing web ACLs. Let’s get specific about when and how Firewall Manager retrofitting works:

  • Firewall Manager will only retrofit a web ACL when all associated resources (for example, Application Load Balancers, API Gateways, and Amazon CloudFront distributions) are in scope. If a not-in-scope resource is also associated, Firewall Manager will not retrofit or update future security policy changes to a retrofitted web ACL. In that scenario, associated in-scope resources and the web ACL are marked noncompliant with security policies.
  • Retrofitting only modifies customer-created web ACLs. Retrofitting will not act on web ACLs retrofitted by another security policy or managed by Firewall Manager. If either scenario occurs, the web ACL and associated resources are marked noncompliant with security policies.
  • Retrofitting adds the following on top of an existing web ACL that is associated with one or more in-scope resources:
    • Retrofitting adds first rule groups and last rule groups defined in a security policy to the web ACL. Existing rules within the web ACL are not changed. The order of rule evaluation changes is the following:
      1. Security policy–defined first rule group rules
      2. Security policy–defined last rule group rules
    • Retrofitting adds a WAF logging configuration if one is defined by the security policy. If the web ACL already has a logging configuration, Firewall Manager does not replace the existing logging configuration and marks the web ACL noncompliant.
    • Retrofitting does not verify or configure other attributes that are defined by the security policy. This includes the following properties: default action, custom request headers, web ACL Captcha or challenge configurations, and token domain list. These properties are used only when Firewall Manager creates a web ACL.
  • If an in-scope resource that supports AWS WAF does not have a web ACL, Firewall Manager creates and associates a Firewall Manager-managed web ACL with that resource.

Figure 2 shows the rules and logging configuration before and after a Firewall Manager security policy retrofits a web ACL that is associated with in-scope resources.

Figure 2: Using retrofitting to update an existing web ACL

Figure 2: Using retrofitting to update an existing web ACL

This new retrofitting capability solves the previous challenges in the following ways:

  1. IaC compatible: Application teams can provision and manage AWS WAF with IAC tools. Firewall Manager retrofits existing web ACLs by adding rules defined in the security policy to web ACLs created by IaC tools. Application teams can manage AWS WAF exactly the same as when Firewall Manager was not in use.
  2. Existing WAF integration: Customers with existing AWS WAF deployments can adopt Firewall Manager without the need to migrate existing WAF rules.
  3. Application-specific rules: Multiple resources in the same account can use separate web ACLs, which simplifies application-specific rules.
  4. Help prevent additional costs: Application-specific rules are applied only to the relevant web ACLs. This helps prevent increased AWS WAF request costs from shared web ACLs that have a high WCU usage.

By enhancing Firewall Manager to retrofit existing web ACLs, customers can use the power of centralized WAF management without restricting how AWS WAF is deployed and configured by application teams in member accounts.

Firewall Manager security policy for AWS WAF – Enabling retrofitting

Figure 3 shows an example of a Firewall Manager security policy.

Figure 3: An example Firewall Manager security policy that uses the new Retrofit existing webACLs feature

Figure 3: An example Firewall Manager security policy that uses the new Retrofit existing webACLs feature

This security policy defines WAF rules in the first rule group section. It applies to all Application Load Balancers (ALBs) across your organization in AWS Organizations in the Region where this security policy is created. The policy action is set to automatically remediate. Under Web ACL management, there is a new section, Managed web ACL source, with two options, Default and Retrofit existing webACLs. Default is the existing behavior: Firewall Manager creates and associates a Firewall Manager managed web ACL. Retrofit existing webACLs applies the WAF rules and logging configuration (if any) defined by a security policy to existing web ACLs when they are associated with an in-scope resource. This policy specifies Retrofit existing webACLs. If an in-scope resource does not have a web ACL, Firewall Manager still creates and associates a web ACL by default.

Retrofitting in action

Let’s walk through what happens when you set Managed web ACL source to Retrofit existing webACLs. Figure 4 shows two ALBs that are in-scope of our security policy, LoadBalancer1 and LoadBalancer2.

Figure 4: Two existing ALBs, one with an existing web ACL and the other without

Figure 4: Two existing ALBs, one with an existing web ACL and the other without

LoadBalancer1 has the following characteristics:

  • LoadBalancer1 does NOT have a web ACL associated.
  • After the Firewall Manager security policy applies, LoadBalancer1 is associated with a Firewall Manager created web ACL, as shown in Figure 5.

    Figure 5: LoadBalancer1 is now associated with a Firewall Manager managed and created web ACL

    Figure 5: LoadBalancer1 is now associated with a Firewall Manager managed and created web ACL

  • The Firewall Manager-created web ACL contains the WAF rules defined in the security policy.

LoadBalancer2 has the following characteristics:

  • LoadBalancer2 has an existing customer created web ACL associated with it, as shown in Figure 6.

    Figure 6: LoadBalancer2 is associated with a customer-created web ACL

    Figure 6: LoadBalancer2 is associated with a customer-created web ACL

  • This web ACL was created by the application team with an application-specific rule. The web ACL could have been created with the AWS Management Console, AWS CloudFormation, or other IaC tools like Terraform.
  • After the Firewall Manager security policy takes effect, LoadBalancer2 remains associated with the existing customer-created web ACL MyCustomWebACL.
  • Retrofitting adds WAF rules in the first rule group according to the security policy, as shown in Figure 7. Existing WAF rules are not changed, and rules and other aspects of the web ACL are not changed and can continue to be managed exactly as you would without Firewall Manager present.

    Figure 7: Firewall Manager has retrofitted a customer-created web ACL and added the security policy–defined first rule groups rules

    Figure 7: Firewall Manager has retrofitted a customer-created web ACL and added the security policy–defined first rule groups rules

Figure 8 shows that both ALBs are now compliant with our security policy. LoadBalancer1 has a web ACL created by Firewall Manager; future ALBs that don’t have a web ACL associated would also become associated to this web ACL. LoadBalancer2 is associated with a web ACL the application team previously created. The application-defined WAF rules are not changed and the WAF rules defined by the security policy are added to this web ACL.

Figure 8: Multiple ALBs in scope for the same security policy

Figure 8: Multiple ALBs in scope for the same security policy

Associate a web ACL with Firewall Manager already in place

Let’s continue from our previous scenario. The application team for LoadBalancer1 now configures application-specific rules for their ALB by using the following process.

  1. The application team creates a web ACL and defines their own WAF rules. They might do this with the console, AWS CloudFormation, or other IaC tools like Terraform.
  2. The application team associates LoadBalancer1 with the custom web ACL.

    Figure 9: From the web ACL, only LoadBalancer1 is associated with the app team’s web ACL

    Figure 9: From the web ACL, only LoadBalancer1 is associated with the app team’s web ACL

  3. After a moment, Firewall Manager detects the change to an in-scope resource (LoadBalancer1). Firewall Manager retrofits the application team’s web ACL AppTeamNewWebACL, making it align with the security policy.

    Figure 10: Firewall Manager has retrofitted the app team–created web ACL and added the security policy–defined first rule group rules

    Figure 10: Firewall Manager has retrofitted the app team–created web ACL and added the security policy–defined first rule group rules

The diagram in Figure 11 shows the workflow that happens when the application team creates their own web ACL and associates it with LoadBalancer1. Firewall Manager detects the association change and retrofits the application team’s web ACL again, bringing LoadBalancer1 into alignment with security policies.

Figure 11: Firewall Manager retrofitting a web ACL in response to being associated with an existing in-scope resource

Figure 11: Firewall Manager retrofitting a web ACL in response to being associated with an existing in-scope resource

Out-of-scope resources associated with a retrofitted web ACL

Let’s make a change to our security policy. In Figure 12, the policy scope has been updated to only apply to resources when they have the resource tag Tier: Production. Application teams add this tag to LoadBalancer1 and LoadBalancer2.

Figure 12: The Firewall Manager security policy scope has been updated to include a resource tag

Figure 12: The Firewall Manager security policy scope has been updated to include a resource tag

Later, an application team creates LoadBalancer3 and associates it with AppTeamNewWebACL (Figure 13).

Figure 13: A new ALB associated with a custom WAF web ACL

Figure 13: A new ALB associated with a custom WAF web ACL

The application team does not tag LoadBalancer3, making it out of scope with the security policy, as shown in Figure 14.

Figure 14: A new ALB that is out of scope with a security policy

Figure 14: A new ALB that is out of scope with a security policy

This web ACL is currently retrofitted by our security policy and associated with LoadBalancer2 (in-scope), as shown in Figure 15.

Figure 15: A retrofitted web ACL associated with in-scope and out-of-scope ALBs

Figure 15: A retrofitted web ACL associated with in-scope and out-of-scope ALBs

Firewall Manager detects that an out-of-scope resource is associated with a retrofitted web ACL. As shown in Figure 16, Firewall Manager marks the web ACL AppTeamNewWebACL noncompliant. It also marks LoadBalancer2 noncompliant.

Figure 16: Shows retrofit-specific reasons a resource or web ACL will be marked noncompliant. In this case, a not-in-scope resource is associated with a web ACL where another associated resource is in-scope.

Figure 16: Shows retrofit-specific reasons a resource or web ACL will be marked noncompliant. In this case, a not-in-scope resource is associated with a web ACL where another associated resource is in-scope.

As long as the web ACL is noncompliant, Firewall Manager will not retrofit a non-retrofitted web ACL, and future security policy changes will not be applied to that web ACL. Existing retrofitted WAF rules for that web ACL are not modified or removed. When the web ACL later becomes compliant, it will again retrofit the latest state of the security policy. Figure 17 shows how Firewall Manager keeps the existing retrofitting but does not apply updates. Using our example, one of three things would need to happen for the web ACL to become compliant:

  1. LoadBalancer3 can be made in-scope with the current security policy. The application team can then add the resource tag Tier: Production to this ALB.
  2. The resource tag can be removed from the policy scope.
  3. LoadBalancer3 can be disassociated with the web ACL.

Note: We recommend that you promptly address not-in-scope resources associated with web ACLs that are shared by in-scope resources. For example, if the preceding scenario was not addressed and LoadBalancer3 was deleted three months later, Firewall Manager would at that point retrofit the web ACL (3 months later). This is not necessarily a problem, but could trigger unexpected changes to a web ACL’s rules.

Figure 17: Firewall Manager retains existing but not future changes as long as a not-in-scope resource is associated with this web ACL

Figure 17: Firewall Manager retains existing but not future changes as long as a not-in-scope resource is associated with this web ACL

In summary: Firewall Manager initially retrofits and will apply future updates to existing web ACLs while all associated resources are in-scope. When an out-of-scope resource is associated, an initial retrofit is delayed and security policy retrofit updates are paused until the out-of-scope resource is addressed.

Figure 18 demonstrates how Firewall Manager will not perform an initial retrofit of a web ACL associated with both in-scope and not-in-scope resources.

Figure 18: Firewall Manger will only perform an initial retrofit when only in-scope resources are associated with a web ACL

Figure 18: Firewall Manger will only perform an initial retrofit when only in-scope resources are associated with a web ACL

Conclusion

Firewall Manager verifies that in-scope resources adhere to relevant security policies or are marked noncompliant. You can enable retrofitting for Firewall Manager for AWS WAF to seamlessly enforce these security policies without changing how your application teams manage the configuration of their WAF rules.

Note: Retrofitting is only available for AWS Firewall Manager security policies for AWS WAF; it is not available for AWS WAF Classic.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Firewall Manager re:Post or contact AWS Support.
 

Ian Olson
Ian Olson

Ian is a Senior Security Specialist Solutions Architect at AWS. He helps customers automate security services to safeguard against threats like DDoS and web exploitations. Through intelligent automation, he delivers security solutions tailored to organizations of any scale. When he’s not working, Ian enjoys spending quality time with his two young children.
Bryan Van Hook
Bryan Van Hook

Bryan is a Senior Security Solutions Architect at AWS. He has over 25 years of experience in software engineering, cloud operations, and internet security. He spends most of his time helping customers gain the most value from native AWS security services. Outside of his day job, Bryan can be found playing tabletop games and acoustic guitar.

Generate vector embeddings for your data using AWS Lambda as a processor for Amazon OpenSearch Ingestion

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/generate-vector-embeddings-for-your-data-using-aws-lambda-as-a-processor-for-amazon-opensearch-ingestion/

On Nov 22, 2024, Amazon OpenSearch Ingestion launched support for AWS Lambda processors. With this launch, you now have more flexibility enriching and transforming your logs, metrics, and trace data in an OpenSearch Ingestion pipeline. Some examples include using foundation models (FMs) to generate vector embeddings for your data and looking up external data sources like Amazon DynamoDB to enrich your data.

Amazon OpenSearch Ingestion is a fully managed, serverless data pipeline that delivers real-time log, metric, and trace data to Amazon OpenSearch Service domains and Amazon OpenSearch Serverless collections.

Processors are components within an OpenSearch Ingestion pipeline that enable you to filter, transform, and enrich events using your desired format before publishing records to a destination of your choice. If no processor is defined in the pipeline configuration, then the events are published in the format specified by the source component. You can incorporate multiple processors within a single pipeline, and they are run sequentially as defined in the pipeline configuration.

OpenSearch Ingestion gives you the option of using Lambda functions as processors along with built-in native processors when transforming data. You can batch events into a single payload based on event count or size before invoking Lambda to optimize the pipeline for performance and cost. Lambda enables you to run code without provisioning or managing servers, eliminating the need to create workload-aware cluster scaling logic, maintain event integrations, or manage runtimes.

In this post, we demonstrate how to use the OpenSearch Ingestion’s Lambda processor to generate embeddings for your source data and ingest them to an OpenSearch Serverless vector collection. This solution uses the flexibility of OpenSearch Ingestion pipelines with a Lambda processor to dynamically generate embeddings. The Lambda function will invoke the Amazon Titan Text Embeddings Model hosted in Amazon Bedrock, allowing for efficient and scalable embedding creation. This architecture simplifies various use cases, including recommendation engines, personalized chatbots, and fraud detection systems.

Integrating OpenSearch Ingestion, Lambda, and OpenSearch Serverless creates a fully serverless pipeline for embedding generation and search. This combination offers automatic scaling to match workload demands and a usage-driven model. Operations are simplified because AWS manages infrastructure, updates, and maintenance. This serverless approach allows you to focus on developing search and analytics solutions rather than managing infrastructure.

Note that Amazon OpenSearch Service also provides Neural search which transforms text into vectors and facilitates vector search both at ingestion time and at search time. During ingestion, neural search transforms document text into vector embeddings and indexes both the text and its vector embeddings in a vector index. Neural search is available for managed clusters running version 2.9 and above.

Solution overview

This solution builds embeddings on a dataset stored in Amazon Simple Storage Service (Amazon S3). We use the Lambda function to invoke the Amazon Titan model on the payload delivered by OpenSearch Ingestion.

Prerequisites

You should have an appropriate role with permissions to invoke your Lambda function and Amazon Bedrock model and also write to the OpenSearch Serverless collection.

To provide access to the collection, you must configure an AWS Identity and Access Management (IAM) pipeline role with a permissions policy that grants access to the collection. For more details, see Granting Amazon OpenSearch Ingestion pipelines access to collections. The following is example code:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "allowinvokeFunction",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
                
            ],
            "Resource": "arn:aws:lambda:{{region}}:{{account-id}}:function:{{function-name}}"
            
        }
    ]
}

The role must have the following trust relationship, which allows OpenSearch Ingestion to assume it:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "osis-pipelines.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Create an ingestion pipeline

You can create a pipeline using a blueprint. For this post, we select the AWS Lambda custom enrichment blueprint.

We use the IMDB title basics dataset, which that contains movie information, including originalTitle, runtimeMinutes, and genres.

The OpenSearch Ingestion pipeline uses a Lambda processor to create embeddings for the field original_title and store the embeddings as original_title_embeddings along with other data.

See the following pipeline code:

version: "2"
s3-log-pipeline:
  source:
    s3:
      acknowledgments: true
      compression: "none"
      codec:
        csv:
      aws:
        # Provide the region to use for aws credentials
        region: "us-west-2"
        # Provide the role to assume for requests to SQS and S3
        sts_role_arn: "<<arn:aws:iam::123456789012:role/ Example-Role>>"
      scan:
        buckets:
          - bucket:
              name: "lambdaprocessorblog"
      
  processor:
     - aws_lambda:
        function_name: "generate_embeddings_bedrock"
        response_events_match: true
        tags_on_failure: ["lambda_failure"]
        batch:
          key_name: "documents"
          threshold:
            event_count: 4
        aws:
          region: us-west-2
          sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
  sink:
    - opensearch:
        hosts:
          - 'https://myserverlesscollection.us-region.aoss.amazonaws.com'
        index: imdb-data-embeddings
        aws:
          sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
          region: us-west-2
          serverless : true

Let’s take a closer look at the Lambda processor in the ingestion pipeline .Pay attention to the key_name, parameter. You can choose any value for key_name and your Lambda function will need to reference this key in your Lambda function when processing the payload from OpenSearch Ingestion. The payload size is determined by the batch setting. When batching is enabled in the Lambda processor, OpenSearch Ingestion groups multiple events into a single payload before invoking the Lambda function. A batch is sent to Lambda when any of the following thresholds are met:

    • event_count – The number of events reaches the specified limit
    • maximum_size – The total size of the batch reaches the specified size (for example, 5 MB) and is configurable up to 6MB (Invocation payload limit for AWS Lambda)

Lambda function

The Lambda function receives the data from OpenSearch Ingestion, invokes Amazon Bedrock to generate the embedding, and adds it to the source record. “documents” is used to reference the events coming in from OpenSearch Ingestion and matches the key_name declared in the pipeline. We add the embedding from Amazon Bedrock back to the original record. This new record with the appended embedding value is then sent to the OpenSearch Serverless sink by OpenSearch Ingestion. See the following code:

import json
import boto3
import os

# Initialize Bedrock client
bedrock = boto3.client('bedrock-runtime')

def generate_embedding(text):
    """Generate embedding for the given text using Bedrock."""
    response = bedrock.invoke_model(
        modelId="amazon.titan-embed-text-v1",
        contentType="application/json",
        accept="application/json",
        body=json.dumps({"inputText": text})
    )
    embedding = json.loads(response['body'].read())['embedding']
    return embedding

def lambda_handler(event, context):
    # Assuming the input is a list of JSON documents
    documents = event['documents']
    
    processed_documents = []
    
    for doc in documents:
        if originalTitle' in doc:
            # Generate embedding for the 'originalTitle' field
            embedding = generate_embedding(doc[originalTitle'])
            
            # Add the embedding to the document
            doc['originalTitle_embeddings'] = embedding
        
        processed_documents.append(doc)
    
    # Return the processed documents
    return  processed_documents

In case of any exceptions while using the lambda processor, all the documents in the batch are considered failed events and are forwarded the next chain of processors if any or to the sink with a failed tag. The tag can be configured to the pipeline with the tags_on_failure parameter and the errors are also sent to CloudWatch logs for further action.

After the pipeline runs, you can see that the embeddings were created and stored as originalTitle_embeddings within the document in a k-NN index, imdb-data-embeddings. The following screenshot shows an example.

Summary

In this post, we showed how you can use Lambda as part of your OpenSearch Ingestion pipeline to enable complex transformation and enrichment of your data. For more details on the feature, refer to Using an OpenSearch Ingestion pipeline with AWS Lambda.


About the Authors

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

Sam Selvan is a Principal Specialist Solution Architect with Amazon OpenSearch Service.

Srikanth Govindarajan is a Software Development Engineer at Amazon Opensearch Service. Srikanth is passionate about architecting infrastructure and building scalable solutions for search, analytics, security, AI and machine learning based usecases.

Automate topic provisioning and configuration using Terraform with Amazon MSK

Post Syndicated from Vijay Kardile original https://aws.amazon.com/blogs/big-data/automate-topic-provisioning-and-configuration-using-terraform-with-amazon-msk/

As organizations deploy Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters across multiple use cases, the manual management of topic configurations can be challenging. This can lead to several issues:

  • Inefficiency – Manual configuration is time-consuming and error-prone, especially for large deployments. Maintaining consistency across multiple configurations can be difficult. To avoid this, Kafka administrators often set the create.topics.enable property on brokers, which leads to cluster operation inefficiency.
  • Human error – Manual configuration increases the risk of mistakes that can disrupt data flow and impact applications relying on Amazon MSK.
  • Scalability challenges – Scaling an Amazon MSK environment with manual configuration is cumbersome. Adding new topics or modifying existing ones requires manual intervention, hindering agility.

These challenges highlight the need for a more automated and robust approach to MSK topic configuration management.

In this post, we address this problem by using Terraform to optimize the configuration of MSK topics. This solution supports both provisioned and serverless MSK clusters.

Solution overview

Customers want a better way to manage the overhead of topics and their configurations. Manually handling topic configurations can be cumbersome and error-prone, making it difficult to keep track of changes and updates.

To address these challenges, you can use Terraform, an infrastructure as code (IaC) tool by HashiCorp. Terraform allows you to manage and provision infrastructure declaratively. It uses human-readable configuration files written in HashiCorp Configuration Language (HCL) to define the desired state of infrastructure resources. These resources can span virtual machines, networks, databases, and a vast array of cloud provider-specific offerings.

Terraform offers a compelling solution to the challenges of manual Kafka topic configuration. Terraform allows you to define and manage your Kafka topics through code. This approach provides several key benefits:

  • Automation – Terraform automates the creation, modification, and deletion of MSK topics.
  • Consistency and repeatability – Terraform configurations provide consistent topic structures and settings across your entire Amazon MSK environment. This simplifies management and reduces the likelihood of configuration drift.
  • Scalability – Terraform enables you to provision and manage large numbers of MSK topics, facilitating the growth of your Amazon MSK environment.
  • Version control – Terraform configurations are stored in version control systems, allowing you to track changes, roll back if needed, and collaborate effectively on your Amazon MSK infrastructure.

By using Terraform for MSK topic configuration management, you can streamline your operations, minimize errors, and have a robust and scalable Amazon MSK environment.

In this post, we provide a comprehensive guide for using Terraform to manage Amazon MSK configurations. We explore the process of installing Terraform on Amazon Elastic Compute Cloud (Amazon EC2), defining and decentralizing topic configurations, and deploying and updating configurations in an automated manner.

Prerequisites

Before proceeding with the solution, make sure you have the following resources and access:

By making sure you have these prerequisites in place, you will be ready to streamline your topic configurations with Terraform.

Install Terraform on your client machine

When your cluster and client machine are ready, SSH to your client machine (Amazon EC2) and install Terraform.

  1. Run the following commands to install Terraform:
    sudo yum update -y
    sudo yum install -y yum-utils shadow-utils
    sudo yum-config-manager --add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo
    sudo yum -y install terraform

  2. Run the following command to check the installation:
    terraform -v
    

This indicates that Terraform installation is successful and you are ready to automate your MSK topic configuration.

Provision an MSK topic using Terraform

To provision the MSK topic, complete the following steps:

  1. Create a new file called main.tf and copy the following code into this file, replacing the BOOTSTRAP_SERVERS and AWS_REGION information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for IAM authentication from your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster. This script is common for Amazon MSK provisioned and MSK Serverless.
    terraform {
    required_providers {
    kafka = {
    source = "Mongey/kafka" }}}
    provider "kafka" {
    bootstrap_servers = [{BOOTSTRAP_SERVERS}]
    tls_enabled       = true
    sasl_mechanism    = "aws-iam"
    sasl_aws_region   ={AWS_REGION}
    sasl_aws_profile  = "dev" }
    resource "kafka_topic" "sampleTopic" {
    name               = "sampleTopic"
    replication_factor = 1
    partitions         = 50 }

  2. Add IAM bootstrap servers endpoints in a comma separated list format:
    BOOTSTRAP_SERVERS = ["b-2.mskcluster…. ","b-3.mskcluster…. ","b-1.mskcluster…. "]

  3. Run the command terraform init to initialize Terraform and download the required providers.

The terraform init command initializes a working directory containing Terraform configuration files(main.tf). This is the first command that should be run after writing a new Terraform configuration.

  1. Run the command terraform plan to review the run plan.

This command shows the changes that Terraform will make to the infrastructure based on the provided configuration. This step is optional but is often used as a preview of the changes Terraform will make.

  1. If the plan looks correct, run the command terraform apply to apply the configuration.
  2. When prompted for confirmation before proceeding, enter yes.

The terraform apply command runs the actions proposed in a Terraform plan. Terraform will create the sampleTopic topic in your MSK cluster.

  1. After the terraform apply command is complete, verify the infrastructure has been created with the help of the kafka-topics.sh utility:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…..amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --list

You can use the kafka-toipcs.sh tool with the --list option to retrieve a list of topics associated with your MSK cluster. For more information, refer to the createtopic documentation.

Update the MSK topic configuration using Terraform

To update the MSK topic configuration, let’s assume we want to change the number of partitions from 50 to 10 on our topic. We need to perform the following steps:

  1. Verify the number of partitions on the topic using the --describe command:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…...amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --describe 
    --topic sampleTopic

This command will show 50 partitions on the sampleTopic topic.

  1. Modify the Terraform file main.tf and change the value of the partitions parameter to 10:
    resource "kafka_topic" "sampleTopic" {
    name               = " sampleTopic "
    replication_factor = 1
    partitions         = 10 }

  2. Run the command terraform plan to review the run plan.

  1. If the plan shows the changes, run the command terraform apply to apply the configuration.
  2. When prompted for confirmation before proceeding, enter yes.

Terraform will drop and recreate the sampleTopic topic with the changed configuration.

  1. Verify the changed number of partitions on the topic, ad rerun the --describe command:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…...amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --describe --topic sampleTopic

Now, this command will show 10 partitions on the sampleTopic topic.

Delete the MSK topic using Terraform

When you no longer need the infrastructure, you can remove all resources created by your Terraform file.

  1. Run the command terraform destroy to remove the topic.
  2. When prompted for confirmation before proceeding, enter yes.

Terraform will delete the sampleTopic topic from your MSK cluster.

  1. To verify, rerun the --list command:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…..amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --list

Now, this command will not show the sampleTopic topic.

Conclusion

In this post, we addressed the common challenges associated with manual MSK topic configuration management and presented a robust Terraform-based solution. Using Terraform for automated topic provisioning and configuration streamlines your processes, fosters scalability, and enhances flexibility. Additionally, it facilitates automated deployments and centralized management.

We encourage you to explore Terraform as a means to optimize Amazon MSK configurations and unlock further efficiencies within your streaming data pipelines.


About the author

Vijay Kardile is a Sr. Technical Account Manager with Enterprise Support, India. With over two decades of experience in IT Consulting and Engineering, he specializes in Analytics services, particularly Amazon EMR and Amazon MSK. He has empowered numerous enterprise clients by facilitating their adoption of various AWS services and offering expert guidance on attaining operational excellence.

Batch data ingestion into Amazon OpenSearch Service using AWS Glue

Post Syndicated from Ravikiran Rao original https://aws.amazon.com/blogs/big-data/batch-data-ingestion-into-amazon-opensearch-service-using-aws-glue/

Organizations constantly work to process and analyze vast volumes of data to derive actionable insights. Effective data ingestion and search capabilities have become essential for use cases like log analytics, application search, and enterprise search. These use cases demand a robust pipeline that can handle high data volumes and enable efficient data exploration.

Apache Spark, an open source powerhouse for large-scale data processing, is widely recognized for its speed, scalability, and ease of use. Its ability to process and transform massive datasets has made it an indispensable tool in modern data engineering. Amazon OpenSearch Service—a community-driven search and analytics solution—empowers organizations to search, aggregate, visualize, and analyze data seamlessly. Together, Spark and OpenSearch Service offer a compelling solution for building powerful data pipelines. However, ingesting data from Spark into OpenSearch Service can present challenges, especially with diverse data sources.

This post showcases how to use Spark on AWS Glue to seamlessly ingest data into OpenSearch Service. We cover batch ingestion methods, share practical examples, and discuss best practices to help you build optimized and scalable data pipelines on AWS.

Overview of solution

AWS Glue is a serverless data integration service that simplifies data preparation and integration tasks for analytics, machine learning, and application development. In this post, we focus on batch data ingestion into OpenSearch Service using Spark on AWS Glue.

AWS Glue offers multiple integration options with OpenSearch Service using various open source and AWS managed libraries, including:

In the following sections, we explore each integration method in detail, guiding you through the setup and implementation. As we progress, we incrementally build the architecture diagram shown in the following figure, providing a clear path for creating robust data pipelines on AWS. Each implementation is independent of the others. We chose to showcase them separately, because in a real-world scenario, only one of the three integration methods is likely to be used.

Image showing the high level architecture diagram

You can find the code base in the accompanying GitHub repo. In the following sections, we walk through the steps to implement the solution.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Clone the repository to your local machine

Clone the repository to your local machine and set the BLOG_DIR environment variable. All the relative paths assume BLOG_DIR is set to the repository location in your machine. If BLOG_DIR is not being used, adjust the path accordingly.

git clone [email protected]:aws-samples/opensearch-glue-integration-patterns.git
cd opensearch-glue-integration-patterns
export BLOG_DIR=$(pwd)

Deploy the AWS CloudFormation template to create the necessary infrastructure

The main focus of this post is to demonstrate how to use the mentioned libraries in Spark on AWS Glue to ingest data into OpenSearch Service. Though we center on this core topic, several key AWS components will need to be pre-provisioned for the integration examples, such as a Amazon Virtual Private Cloud (Amazon VPC), multiple Subnets, an AWS Key Management Service (AWS KMS) key, an Amazon Simple Storage Service (Amazon S3) bucket, an AWS Glue role, and an OpenSearch Service cluster with domains for OpenSearch Service and Elasticsearch. To simplify the setup, we’ve automated the provisioning of this core infrastructure using the cloudformation/opensearch-glue-infrastructure.yaml AWS CloudFormation template.

  1. Run the following commands

The CloudFormation template will deploy the necessary networking components (such as VPC and subnets), Amazon CloudWatch logging, AWS Glue role, and OpenSearch Service and Elasticsearch domains required to implement the proposed architecture. Use a strong password (8–128 characters, three of which are lowercase, uppercase, numbers, or special characters, and no /, “, or spaces) and adhere to your organization’s security standards for ESMasterUserPassword and OSMasterUserPassword in the following command:

cd ${BLOG_DIR}/cloudformation/
aws cloudformation deploy \
--template-file ${BLOG_DIR}/cloudformation/opensearch-glue-infrastructure.yaml \
--stack-name GlueOpenSearchStack \
--capabilities CAPABILITY_NAMED_IAM \
--region <AWS_REGION> \
--parameter-overrides \
ESMasterUserPassword=<ES_MASTER_USER_PASSWORD> \
OSMasterUserPassword=<OS_MASTER_USER_PASSWORD>

You should see a success message such as "Successfully created/updated stack – GlueOpenSearchStack" after the resources have been provisioned successfully. Provisioning this CloudFormation stack typically takes approximately 30 minutes to complete.

  1. On the AWS CloudFormation console, locate the GlueOpenSearchStack stack, and confirm that its status is CREATE_COMPLETE.

Image showing the "CREATE_COMPLETE" status of cloudformation template

You can review the deployed resources on the Resources tab, as shown in the following screenshot.The screenshot does not display all the created resources.

Image showing the "Resources" tab of cloudformation template

Additional setup steps

In this section, we collect essential information, including the S3 bucket name and the OpenSearch Service and Elasticsearch domain endpoints. These details are required for executing the code in subsequent sections.

Capture the details of the provisioned resources

Use the following AWS CLI command to extract and save the output values from the CloudFormation stack to a file named GlueOpenSearchStack_outputs.txt. We refer to the values in this file in upcoming steps.

aws cloudformation describe-stacks \
--stack-name GlueOpenSearchStack \
--query 'sort_by(Stacks[0].Outputs[], &OutputKey)[].{Key:OutputKey,Value:OutputValue}' \
--output table \
--no-cli-pager \
--region <AWS_REGION> > ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Download NY Green Taxi December 2022 dataset and copy to S3 bucket

The purpose of this post is to demonstrate the technical implementation of ingesting data into OpenSearch Service using AWS Glue. Understanding the dataset itself is not essential, aside from its data format, which we discuss in AWS Glue notebooks in later sections. To learn more about the dataset, you can find additional information on the NYC Taxi and Limousine Commission website.

We specifically request that you download the December 2022 dataset, because we have tested the solution using this particular dataset:

S3_BUCKET_NAME=$(awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt)
mkdir -p ${BLOG_DIR}/datasets && cd ${BLOG_DIR}/datasets
curl -O https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-12.parquet
aws s3 cp green_tripdata_2022-12.parquet s3://${S3_BUCKET_NAME}/datasets/green_tripdata_2022-12.parquet

Download the required JARs from the Maven repository and copy to S3 bucket

We’ve specified a particular JAR file version to ensure stable deployment experience. However, we recommend adhering to your organization’s security best practices and reviewing any known vulnerabilities in the version of the JAR files before deployment. AWS does not guarantee the security of any open-source code used here. Additionally, please verify the downloaded JAR file’s checksum against the published value to confirm its integrity and authenticity.

mkdir -p ${BLOG_DIR}/jars && cd ${BLOG_DIR}/jars
# OpenSearch Service jar
curl -O https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/1.0.1/opensearch-spark-30_2.12-1.0.1.jar
aws s3 cp opensearch-spark-30_2.12-1.0.1.jar s3://${S3_BUCKET_NAME}/jars/opensearch-spark-30_2.12-1.0.1.jar
# Elasticsearch jar
curl -O https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/7.17.23/elasticsearch-spark-30_2.12-7.17.23.jar
aws s3 cp elasticsearch-spark-30_2.12-7.17.23.jar s3://${S3_BUCKET_NAME}/jars/elasticsearch-spark-30_2.12-7.17.23.jar

In the following sections, we implement the individual data ingestion methods as outlined in the architecture diagram.

Ingest data into OpenSearch Service using the OpenSearch Spark library

In this section, we load an OpenSearch Service index using Spark and the OpenSearch Spark library. We demonstrate this implementation by using AWS Glue notebooks, employing basic authentication using user name and password.

To demonstrate the ingestion mechanisms, we have provided the Spark-and-OpenSearch-Code-Steps.ipynb notebook with detailed instructions. Follow the steps in this section in conjunction with the instructions in the notebook.

Set up the AWS Glue Studio notebook

Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.

Image showing AWS console page for AWS Glue to open notebook

  1. Upload the notebook file located at ${BLOG_DIR}/glue_jobs/Spark-and-OpenSearch-Code-Steps.ipynb.
  2. For IAM role, choose the AWS Glue job IAM role that begins with GlueOpenSearchStack-GlueRole-*.

Image showing AWS console page for AWS Glue to open notebook

  1. Enter a name for the notebook (for example, Spark-and-OpenSearch-Code-Steps) and choose Save.

Image showing AWS Glue OpenSearch Notebook

Replace the placeholder values in the notebook

Complete the following steps to update the placeholders in the notebook:

  1. In Step 1 in the notebook, replace the placeholder <GLUE-INTERACTIVE-SESSION-CONNECTION-NAME> with the AWS Glue interactive session connection name. You can get the name of the interactive session by executing the following command:
cd ${BLOG_DIR}
awk -F '|' '$2 ~ /GlueInteractiveSessionConnectionName/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 1 in the notebook, replace the placeholder <S3-BUCKET-NAME> and populate the variable s3_bucket with the bucket name. You can get the name of the S3 bucket by executing the following command:
awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 4 in the notebook, replace <OPEN-SEARCH-DOMAIN-WITHOUT-HTTPS> with the OpenSearch Service domain name. You can get the domain name by executing the following command:
awk -F '|' '$2 ~ /OpenSearchDomainEndpoint/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Run the notebook

Run each cell of the notebook to load data into the OpenSearch Service domain and read it back to verify the successful load. Refer to the detailed instructions within the notebook for execution-specific guidance.

Spark write modes (append vs. overwrite)

It is recommended to write data incrementally into OpenSearch Service indexes using the append mode, as demonstrated in Step 8 in the notebook. However, in certain cases, you may need to refresh the entire dataset in the OpenSearch Service index. In these scenarios, you can use the overwrite mode, though it is not advised for large indexes. When using overwrite mode, the Spark library deletes rows from the OpenSearch Service index one by one and then rewrites the data, which can be inefficient for large datasets. To avoid this, you can implement a preprocessing step in Spark to identify insertions and updates, and then write the data into OpenSearch Service using append mode.

Ingest data into Elasticsearch using the Elasticsearch Hadoop library

In this section, we load an Elasticsearch index using Spark and the Elasticsearch Hadoop Library. We demonstrate this implementation by using AWS Glue as the engine for Spark.

Set up the AWS Glue Studio notebook

Complete the following steps to set up the notebook:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.

Image showing AWS console page for AWS Glue to open notebook

  1. Upload the notebook file located at ${BLOG_DIR}/glue_jobs/Spark-and-Elasticsearch-Code-Steps.ipynb.
  2. For IAM role, choose the AWS Glue job IAM role that begins with GlueOpenSearchStack-GlueRole-*.

Image showing AWS console page for AWS Glue to open notebook

  1. Enter a name for the notebook (for example, Spark-and-ElasticSearch-Code-Steps) and choose Save.

Image showing AWS Glue Elasticsearch Notebook

Replace the placeholder values in the notebook

Complete the following steps:

  1. In Step 1 in the notebook, replace the placeholder <GLUE-INTERACTIVE-SESSION-CONNECTION-NAME> with the AWS Glue interactive session connection name. You can get the name of the interactive session by executing the following command:
awk -F '|' '$2 ~ /GlueInteractiveSessionConnectionName/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 1 in the notebook, replace the placeholder <S3-BUCKET-NAME> and populate the variable s3_bucket with the bucket name. You can get the name of the S3 bucket by executing the following command:
awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 4 in the notebook, replace <ELASTIC-SEARCH-DOMAIN-WITHOUT-HTTPS> with the Elasticsearch domain name. You can get the domain name by executing the following command:
awk -F '|' '$2 ~ /ElasticsearchDomainEndpoint/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Run the notebook

Run each cell in the notebook to load data to the Elasticsearch domain and read it back to verify the successful load. Refer to the detailed instructions within the notebook for execution-specific guidance.

Ingest data into OpenSearch Service using the AWS Glue OpenSearch Service connection

In this section, we load an OpenSearch Service index using Spark and the AWS Glue OpenSearch Service connection.

Create the AWS Glue job

Complete the following steps to create an AWS Glue Visual ETL job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Visual ETL

This will open the AWS Glue job visual editor.Image showing AWS console page for AWS Glue to open Visual ETL

  1. Choose the plus sign, and under Sources, choose Amazon S3.

Image showing AWS console page for AWS Glue Visual Editor

  1. In the visual editor, choose the Data Source – S3 bucket node.
  2. In the Data source properties – S3 pane, configure the data source as follows:
    • For S3 source type, select S3 location.
    • For S3 URL, choose Browse S3, and choose the green_tripdata_2022-12.parquet file from the designated S3 bucket.
    • For Data format, choose Parquet.
  1. Choose Infer schema to let AWS Glue detect the schema of the data.

This will set up your data source from the specified S3 bucket.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the plus sign again to add a new node.
  2. For Transforms, choose Drop Fields to include this transformation step.

This will allow you to remove any unnecessary fields from your dataset before loading it into OpenSearch Service.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Drop Fields transform node, then select the following fields to drop from the dataset:
    • payment_type
    • trip_type
    • congestion_surcharge

This will remove these fields from the data before it is loaded into OpenSearch Service.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the plus sign again to add a new node.
  2. For Targets, choose Amazon OpenSearch Service.

This will configure OpenSearch Service as the destination for the data being processed.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Data target – Amazon OpenSearch Service node and configure it as follows:
    • For Amazon OpenSearch Service connection, choose the connection GlueOpenSearchServiceConnec-* from the drop down.
    • For Index, enter green_taxi. The green_taxi index was created earlier in the “Ingest data into OpenSearch Service using the OpenSearch Spark library” section.

This configures the OpenSearch Service to write the processed data to the specified index.

Image showing AWS console page for AWS Glue Visual Editor

  1. On the Job details tab, update the job details as follows:
    • For Name, enter a name (for example, Spark-and-Glue-OpenSearch-Connection).
    • For Description, enter an optional description (for example, AWS Glue job using Glue OpenSearch Connection to load data into Amazon OpenSearch Service).
    • For IAM role, choose the role starting with GlueOpenSearchStack-GlueRole-*.
    • For the Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3
    • Leave the rest of the fields as default.
    • Choose Save to save the changes.

Image showing AWS console page for AWS Glue Visual Editor

  1. To run the AWS Glue job Spark-and-Glue-OpenSearch-Connector, choose Run.

This will initiate the job execution.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Runs tab and wait for the AWS Glue job to complete successfully.

You will see the status change to Succeeded when the job is complete.

Image showing AWS console page for AWS Glue job run status

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack:
aws cloudformation delete-stack \
--stack-name GlueOpenSearchStack \
--region <AWS_REGION>
  1. Delete the AWS Glue jobs:
    • On the AWS Glue console, under ETL jobs in the navigation pane, choose Visual ETL.
    • Select the jobs you created (Spark-and-Glue-OpenSearch-Connector, Spark-and-ElasticSearch-Code-Steps, and Spark-and-OpenSearch-Code-Steps) and on the Actions menu, choose Delete.

Conclusion

In this post, we explored several ways to ingest data into OpenSearch Service using Spark on AWS Glue. We demonstrated the use of three key libraries: the AWS Glue OpenSearch Service connection, the OpenSearch Spark Library, and the Elasticsearch Hadoop Library. The methods outlined in this post can help you streamline your data ingestion into OpenSearch Service.

If you’re interested in learning more and getting hands-on experience, we’ve created a workshop that walks you through the entire process in detail. You can explore the full setup for ingesting data into OpenSearch Service, handling both batch and real-time streams, and building dashboards. Check out the workshop Unified Real-Time Data Processing and Analytics Using Amazon OpenSearch and Apache Spark to deepen your understanding and apply these techniques step by step.


About the Authors

Ravikiran Rao is a Data Architect at Amazon Web Services and is passionate about solving complex data challenges for various customers. Outside of work, he is a theater enthusiast and amateur tennis player.

Vishwa Gupta is a Senior Data Architect with the AWS Professional Services Analytics Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and trying new food.

Suvojit Dasgupta is a Principal Data Architect at Amazon Web Services. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.This post showcases how to use Spark on AWS Glue to seamlessly ingest data into OpenSearch Service. We cover batch ingestion methods, share practical examples, and discuss best practices to help you build optimized and scalable data pipelines on AWS.

Build a high-performance quant research platform with Apache Iceberg

Post Syndicated from Guy Bachar original https://aws.amazon.com/blogs/big-data/build-a-high-performance-quant-research-platform-with-apache-iceberg/

In our previous post Backtesting index rebalancing arbitrage with Amazon EMR and Apache Iceberg, we showed how to use Apache Iceberg in the context of strategy backtesting. In this post, we focus on data management implementation options such as accessing data directly in Amazon Simple Storage Service (Amazon S3), using popular data formats like Parquet, or using open table formats like Iceberg. Our experiments are based on real-world historical full order book data, provided by our partner CryptoStruct, and compare the trade-offs between these choices, focusing on performance, cost, and quant developer productivity.

Data management is the foundation of quantitative research. Quant researchers spend approximately 80% of their time on necessary but not impactful data management tasks such as data ingestion, validation, correction, and reformatting. Traditional data management choices include relational, SQL, NoSQL, and specialized time series databases. In recent years, advances in parallel computing in the cloud have made object stores like Amazon S3 and columnar file formats like Parquet a preferred choice.

This post explores how Iceberg can enhance quant research platforms by improving query performance, reducing costs, and increasing productivity, ultimately enabling faster and more efficient strategy development in quantitative finance. Our analysis shows that Iceberg can accelerate query performance by up to 52%, reduce operational costs, and significantly improve data management at scale.

Having chosen Amazon S3 as our storage layer, a key decision is whether to access Parquet files directly or use an open table format like Iceberg. Iceberg offers distinct advantages through its metadata layer over Parquet, such as improved data management, performance optimization, and integration with various query engines.

In this post, we use the term vanilla Parquet to refer to Parquet files stored directly in Amazon S3 and accessed through standard query engines like Apache Spark, without the additional features provided by table formats such as Iceberg.

Quant developer and researcher productivity

In this section, we focus on the productivity features offered by Iceberg and how it compares to directly reading files in Amazon S3. As mentioned earlier, 80% of quantitative research work is attributed to data management tasks. Business impact heavily relies on quality data (“garbage in, garbage out”). Quants and platform teams have to ingest data from multiple sources with different velocities and update frequencies, and then validate and correct the data. These activities translate into the ability to run append, insert, update, and delete operations. For simple append operations, both Parquet on Amazon S3 and Iceberg offer similar convenience and productivity. However, real-world data is never perfect and needs to be corrected. Gaps filling (inserts), error corrections and restatements (updates), and removing duplicates (deletes) are the most obvious examples. When writing data in the Parquet format directly to Amazon S3 without using an open table format like Iceberg, you have to write code to identify the affected partition, correct errors, and rewrite the partition. Moreover, if the write job fails or a downstream read job occurs during this write operation, all downstream jobs have the possibility of reading inconsistent data. However, Iceberg has built-in insert, update, and delete features with ACID (Atomicity, Consistency, Isolation, Durability) properties, and the framework itself manages the Amazon S3 mechanics on your behalf.

Guarding against lookahead bias is an essential capability of any quant research platform—what backtests as a profitable trading strategy can render itself useless and unprofitable in real time. Iceberg provides time travel and snapshotting capabilities out of the box to manage lookahead bias that could be embedded in the data (such as delayed data delivery).

Simplified data corrections and updates

Iceberg enhances data management for quants in capital markets through its robust insert, delete, and update capabilities. These features allow efficient data corrections, gap-filling in time series, and historical data updates without disrupting ongoing analyses or compromising data integrity.

Unlike direct Amazon S3 access, Iceberg supports these operations on petabyte-scale data lakes without requiring complex custom code. This simplifies data modification processes, which is crucial for ingesting and updating large volumes of market and trade data, quickly iterating on backtesting and reprocessing workflows, and maintaining detailed audit trails for risk and compliance requirements.

Iceberg’s table format separates data files from metadata files, enabling efficient data modifications without full dataset rewrites. This approach also reduces expensive ListObjects API calls typically needed when directly accessing Parquet files in Amazon S3.

Additionally, Iceberg offers merge on read (MoR) and copy on write (CoW) approaches, providing flexibility for different quant research needs. MoR enables faster writes, suitable for frequently updated datasets, and CoW provides faster reads, beneficial for read-heavy workflows like backtesting.

For example, when a new data source or attribute is added, quant researchers can seamlessly incorporate it into their Iceberg tables and then reprocess historical data, confident they’re using correct, time-appropriate information. This capability is particularly valuable in maintaining the integrity of backtests and the reliability of trading strategies.

In scenarios involving large-scale data corrections or updates, such as adjusting for stock splits or dividend payments across historical data, Iceberg’s efficient update mechanisms significantly reduce processing time and resource usage compared to traditional methods.

These features collectively improve productivity and data management efficiency in quant research environments, allowing researchers to focus more on strategy development and less on data handling complexities.

Historical data access for backtesting and validation

Iceberg’s time travel feature can enable quant developers and researchers to access and analyze historical snapshots of their data. This capability can be useful while performing tasks like backtesting, model validation, and understanding data lineage.

Iceberg simplifies time travel workflows on Amazon S3 by introducing a metadata layer that tracks the history of changes made to the table. You can refer to this metadata layer to create a mental model of how Iceberg’s time travel capability works.

Iceberg’s time travel capability is driven by a concept called snapshots, which are recorded in metadata files. These metadata files act as a central repository that stores table metadata, including the history of snapshots. Additionally, Iceberg uses manifest files to provide a representation of data files, their partitions, and any associated deleted files. These manifest files are referenced in the metadata snapshots, allowing Iceberg to identify the relevant data for a specific point in time.

When a user requests a time travel query, the typical workflow involves querying a specific snapshot. Iceberg uses the snapshot identifier to locate the corresponding metadata snapshot in the metadata files. The time travel capability is invaluable to quants, enabling them to backtest and validate strategies against historical data, reproduce and debug issues, perform what-if analysis, comply with regulations by maintaining audit trails and reproducing past states, and roll back and recover from data corruption or errors. Quants can also gain deeper insights into current market trends and correlate them with historical patterns. Also, the time travel feature can further mitigate any risks of lookahead bias. Researchers can access the exact data snapshots that were present in the past, and then run their models and strategies against this historical data, without the risk of inadvertently incorporating future information.

Seamless integration with familiar tools

Iceberg provides a variety of interfaces that enable seamless integration with the open source tools and AWS services that quant developers and researchers are familiar with.

Iceberg provides a comprehensive SQL interface that allows quant teams to interact with their data using familiar SQL syntax. This SQL interface is compatible with popular query engines and data processing frameworks, such as Spark, Trino, Amazon Athena, and Hive. Quant developers and researchers can use their existing SQL knowledge and tools to query, filter, aggregate, and analyze their data stored in Iceberg tables.

In addition to the primary interface of SQL, Iceberg also provides the DataFrame API, which allows quant teams to programmatically interact with their data with popular distributed data processing frameworks like Spark and Flink as well as thin clients like PyIceberg. Quants can further use this API to build more programmatic approaches to access and manipulate data, allowing for the implementation of custom logic and integration of Iceberg with other AWS ecosystems like Amazon EMR.

Although accessing data from Amazon S3 is a viable option, Iceberg provides several advantages like metadata management, performance optimization using partition pruning, data manipulation, and a rich AWS ecosystem integration including services like Athena and Amazon EMR with more seamless and feature-rich data processing experience.

Undifferentiated heavy lifting

Data partitioning is one of major contributing factors to optimizing aggregate throughput to and from Amazon S3, contributing to overall High Performance Computing (HPC) environment price-performance.

Quant researchers often face performance bottlenecks and complex data management challenges when dealing with large-scale datasets in Amazon S3. As discussed in Best practices design patterns: optimizing Amazon S3 performance, single prefix performance is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per partitioned Amazon S3 prefix. Iceberg’s metadata layer and intelligent partitioning strategies automatically optimize data access patterns, reducing the likelihood of I/O throttling and minimizing the need for manual performance tuning. This automation allows quant teams to focus on developing and refining trading strategies rather than troubleshooting data access issues or optimizing storage layouts.

In this section, we discuss situations we discovered while running our experiments at scale and solutions provided by Iceberg vs. vanilla Parquet when accessing data in Amazon S3.

As we mentioned in the introduction, the nature of quant research is “fail fast”—new ideas have to be quickly evaluated and then either prioritized for a deep dive or dismissed. This makes it impossible to come up with universal partitioning that works all the time and for all research styles.

When accessing data directly as Parquet files in Amazon S3, without using an open table format like Iceberg, partitioning and throttling issues can arise. Partitioning in this case is determined by the physical layout of files in Amazon S3, and a mismatch between the intended partitioning and the actual file layout can lead to I/O throttling exceptions. Additionally, listing directories in Amazon S3 can also result in throttling exceptions due to the high number of API calls required.

In contrast, Iceberg provides a metadata layer that abstracts away the physical file layout in Amazon S3. Partitioning is defined at the table level, and Iceberg handles the mapping between logical partitions and the underlying file structure. This abstraction helps mitigate partitioning issues and reduces the likelihood of I/O throttling exceptions. Furthermore, Iceberg’s metadata caching mechanism minimizes the number of List API calls required, addressing the directory listing throttling issue.

Although both approaches involve direct access to Amazon S3, Iceberg is an open table format that introduces a metadata layer, providing better partitioning management and reducing the risk of throttling exceptions. It doesn’t act as a database itself, but rather as a data format and processing engine on top of the underlying storage (in this case, Amazon S3).

One of the most effective techniques to address Amazon S3 API quota limits is salting (random hash prefixes)—a method that adds random partition IDs to Amazon S3 paths. This increases the probability of prefixes residing on different physical partitions, helping distribute API requests more evenly. Iceberg supports this functionality out of the box for both data ingestion and reading.

Implementing salting directly in Amazon S3 requires complex custom code to create and use partitioning schemes with random keys in the naming hierarchy. This approach necessitates a custom data catalog and metadata system to map physical paths to logical paths, allowing direct partition access without relying on Amazon S3 List API calls. Without such a system, applications risk exceeding Amazon S3 API quotas when accessing specific partitions.

At petabyte scale, Iceberg’s advantages become clear. It efficiently manages data through the following features:

  • Directory caching
  • Configurable partitioning strategies (range, bucket)
  • Data management functionality (compaction)
  • Catalog, metadata, and statistics use for optimal execution plans

These built-in features eliminate the need for custom solutions to manage Amazon S3 API quotas and data organization at scale, reducing development time and maintenance costs while improving query performance and reliability.

Performance

We highlighted a lot of the functionality of Iceberg that eliminates undifferentiated heavy lifting and improves developer and quant productivity. What about performance?

This section evaluates whether Iceberg’s metadata layer introduces overhead or delivers optimization for quantitative research use cases, comparing it with vanilla Parquet access on Amazon S3. We examine how these approaches impact common quant research queries and workflows.

The key question is whether Iceberg’s metadata layer, designed to optimize vanilla Parquet access on Amazon S3, introduces overhead or delivers the intended optimization for quantitative research use cases. Then we discuss overlapping optimization techniques, such as data distribution and sorting. We also discuss that there is no magic partitioning and all sorting scheme where one size fits all in the context of quant research. Our benchmarks show that Iceberg performs comparably to direct Amazon S3 access, with additional optimizations from its metadata and statistics usage, similar to database indexing.

Vanilla Parquet vs Iceberg: Amazon S3 read performance

We created four different datasets: two using Iceberg and two with direct Amazon S3 Parquet access, each with both sorted and unsorted write distributions. The purpose of this exercise was to compare the performance of direct Amazon S3 Parquet access vs. the Iceberg open table format, taking into account the impact of write distribution patterns when running various queries commonly used in quantitative trading research.

Query 1

We first run a simple count query to get the total number of records in the table. This query helps understand the baseline performance for a straightforward operation. For example, if the table contains tick-level market data for various financial instruments, the count can give an idea of the total number of data points available for analysis.

The following is the code for vanilla Parquet:

count = spark.read.parquet(s3://example-s3-bucket/path/to/data).count()

The following is the code for Iceberg:

count = spark.read.table(table_name).count()
# We used typical count query for the performance comparision however this could have been also done using metadata as shown below which completes in few seconds 
spark.read.format("iceberg").load(f"{table_name}.files").select(sum("record_count")).show(truncate=False)

Query 2

Our second query is a grouping and counting query to find the number of records for each combination of exchange_code and instrument. This query is commonly used in quantitative trading research to analyze market liquidity and trading activity across different instruments and exchanges.

The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .count().show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Query 3

Next, we run a distinct query to retrieve the distinct combinations of year, month, and day from the adapterTimestamp_ts_utc column. In quantitative trading research, this query can be helpful for understanding the time range covered by the dataset. Researchers can use this information to identify periods of interest for their analysis, such as specific market events, economic cycles, or seasonal patterns.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                 f.month("adapterTimestamp_ts_utc").alias("month"),
                 f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
         .distinct() \
         .count() \
         .show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                f.month("adapterTimestamp_ts_utc").alias("month"),
                f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
        .distinct() \
        .count() \
        .show(truncate=False)

Query 4

Lastly, we run a grouping and counting query with a date range filter on the adapterTimestamp_ts_utc column. This query is similar to Query 2 but focuses on a specific time period. You could use this query to analyze market activity or liquidity during specific time periods, such as periods of high volatility, market crashes, or economic events. Researchers can use this information to identify potential trading opportunities or investigate the impact of these events on market dynamics.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                 (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .show(truncate=False)

The following is the code for Iceberg. Because Iceberg has a metadata layer, the row count can be fetched from metadata:

spark.read.table(table_name) \
        .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Test results

To evaluate the performance and cost benefits of using Iceberg for our quant research data lake, we created four different datasets: two with Iceberg tables and two with direct Amazon S3 Parquet access, each using both sorted and unsorted write distributions. We first ran AWS Glue write jobs to create the Iceberg tables and then mirrored the same write processes for the Amazon S3 Parquet datasets. For the unsorted datasets, we partitioned the data by exchange and instrument, and for the sorted datasets, we added a sort key on the time column.

Next, we ran a series of queries commonly used in quantitative trading research, including simple count queries, grouping and counting, distinct value queries, and queries with date range filters. Our benchmarking process involved reading data from Amazon S3, performing various transformations and joins, and writing the processed data back to Amazon S3 as Parquet files.

By comparing runtimes and costs across different data formats and write distributions, we quantified the benefits of Iceberg’s optimized data organization, metadata management, and efficient Amazon S3 data handling. The results showed that Iceberg not only enhanced query performance without introducing significant overhead, but also reduced the likelihood of task failures, reruns, and throttling issues, leading to more stable and predictable job execution, particularly with large datasets stored in Amazon S3.

AWS Glue write jobs

In the following table, we compare the performance and the cost implications of using Iceberg vs. vanilla Parquet access on Amazon S3, taking into account the following use cases:

  • Iceberg table (unsorted) – We created an Iceberg table partitioned by exchange_code and instrument This means that the data was physically partitioned in Amazon S3 based on the unique combinations of exchange_code and instrument values. Partitioning the data in this way can improve query performance, because Iceberg can prune out partitions that aren’t relevant to a particular query, reducing the amount of data that needs to be scanned. The data was not sorted on any column in this case, which is the default behavior.
  • Vanilla Parquet (unsorted) – For this use case, we wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by exchange_code and instrument columns using standard hash partitioning before writing it out. Repartitioning was necessary to avoid potential throttling issues when reading the data later, because accessing data directly from Amazon S3 without intelligent partitioning can lead to too many requests hitting the same S3 prefix. Like the Iceberg table, the data was not sorted on any column in this case. To make comparison fair, we used the exact repartition count that Iceberg uses.
  • Iceberg table (sorted) – We created another Iceberg table, this time partitioned by exchange_code and instrument Additionally, we sorted the data in this table on the adapterTimestamp_ts_utc column. Sorting the data can improve query performance for certain types of queries, such as those that involve range filters or ordered outputs. Iceberg automatically handles the sorting and partitioning of the data transparently to the user.
  • Vanilla Parquet (sorted) – For this use case, we again wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by range on the exchange_code, instrument, and adapterTimestamp_ts_utc columns before writing it out using standard range partitioning with 1996 partition count, because this was what Iceberg was using based on SparkUI. Repartitioning on the time column (adapterTimestamp_ts_utc) was necessary to achieve a sorted write distribution, because Parquet files are sorted within each partition. This sorted write distribution can improve query performance for certain types of queries, similar to the sorted Iceberg table.
Write Distribution Pattern Iceberg Table (Unsorted) Vanilla Parquet (Unsorted) Iceberg Table (Sorted) Vanilla Parquet
(Sorted)
DPU Hours 899.46639 915.70222 1402 1365
Number of S3 Objects 7444 7288 9283 9283
Size of S3 Parquet Objects 567.7 GB 629.8 GB 525.6 GB 627.1 GB
Runtime 1h 51m 40s 1h 53m 29s 2h 52m 7s 2h 47m 36s

AWS Glue read jobs

For the AWS Glue read jobs, we ran a series of queries commonly used in quantitative trading research, such as simple counts, grouping and counting, distinct value queries, and queries with date range filters. We compared the performance of these queries between the Iceberg tables and the vanilla Parquet files read in Amazon S3. In the following table, you can see two AWS Glue jobs that show the performance and cost implications of access patterns described earlier.

Read Queries / Runtime in Seconds Iceberg Table Vanilla Parquet
COUNT(1) on unsorted 35.76s 74.62s
GROUP BY and ORDER BY on unsorted 34.29s 67.99s
DISTINCT and SELECT on unsorted 51.40s 82.95s
FILTER and GROUP BY and ORDER BY on unsorted 25.84s 49.05s
COUNT(1) on sorted 15.29s 24.25s
GROUP BY and ORDER BY on sorted 15.88s 28.73s
DISTINCT and SELECT on sorted 30.85s 42.06s
FILTER and GROUP BY and ORDER BY on sorted 15.51s 31.51s
AWS Glue DPU hours 45.98 67.97

Test results insights

These test results offered the following insights:

  • Accelerated query performance – Iceberg improved read operations by up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly. In quantitative finance, where speed is crucial, this performance gain allows teams to uncover market insights faster, potentially gaining a competitive edge.
  • Reduced operational costs – For read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage. These efficiency gains translate to cost savings in data-intensive quant operations. With Iceberg, firms can run more comprehensive analyses within the same budget or reallocate resources to other high-value activities, optimizing their research capabilities.
  • Enhanced data management and scalability – Iceberg showed comparable write performance for unsorted data (899.47 DPU hours vs. 915.70 for vanilla Parquet) and maintained consistent object counts across sorted and unsorted scenarios (7,444 and 9,283, respectively). This consistency leads to more reliable and predictable job execution. For quant teams dealing with large-scale datasets, this reduces time spent on troubleshooting data infrastructure issues and increases focus on developing trading strategies.
  • Improved productivity – Iceberg outperformed vanilla Parquet access across various query types. Simple counts were 52.1% faster, grouping and ordering operations improved by 49.6%, and filtered queries were 47.3% faster for unsorted data. This performance enhancement boosts productivity in quant research workflows. It reduces query completion times, allowing quant developers and researchers to spend more time on model development and market analysis, leading to faster iteration on trading strategies.

Conclusion

Quant research platforms often avoid adopting new data management solutions like Iceberg, fearing performance penalties and increased costs. Our analysis disproves these concerns, demonstrating that Iceberg not only matches or enhances performance compared to direct Amazon S3 access, but also provides substantial additional benefits.

Our tests reveal that Iceberg significantly accelerates query performance, with improvements of up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly, potentially uncovering valuable market insights faster.

Iceberg streamlines data management tasks, allowing researchers to focus on strategy development. Its robust insert, update, and delete capabilities, combined with time travel features, enable effortless management of complex datasets, improving backtest accuracy and facilitating rapid strategy iteration.

The platform’s intelligent handling of partitioning and Amazon S3 API quota issues eliminates undifferentiated heavy lifting, freeing quant teams from low-level data engineering tasks. This automation redirects efforts to high-value activities such as model development and market analysis. Moreover, our tests show that for read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage, leading to significant cost savings.

Flexibility is a key advantage of Iceberg. Its various interfaces, including SQL, DataFrames, and programmatic APIs, integrate seamlessly with existing quant research workflows, accommodating diverse analysis needs and coding preferences.

By adopting Iceberg, quant research teams gain both performance enhancements and powerful data management tools. This combination creates an environment where researchers can push analytical boundaries, maintain high data integrity standards, and focus on generating valuable insights. The improved productivity and reduced operational costs enable quant teams to allocate resources more effectively, ultimately leading to a more competitive edge in quantitative finance.


About the Authors

Guy Bachar is a Senior Solutions Architect at AWS based in New York. He specializes in assisting capital markets customers with their cloud transformation journeys. His expertise encompasses identity management, security, and unified communication.

Sercan KaraogluSercan Karaoglu is Senior Solutions Architect, specialized in capital markets. He is a former data engineer and passionate about quantitative investment research.

Boris LitvinBoris Litvin is a Principal Solutions Architect at AWS. His job is in financial services industry innovation. Boris joined AWS from the industry, most recently Goldman Sachs, where he held a variety of quantitative roles across equity, FX, and interest rates, and was CEO and Founder of a quantitative trading FinTech startup.

Salim TutuncuSalim Tutuncu is a Senior Partner Solutions Architect Specialist on Data & AI, based in Dubai with a focus on the EMEA. With a background in the technology sector that spans roles as a data engineer, data scientist, and machine learning engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses using the AWS platform, particularly in data and AI use cases.

Alex TarasovAlex Tarasov is a Senior Solutions Architect working with Fintech startup customers, helping them to design and run their data workloads on AWS. He is a former data engineer and is passionate about all things data and machine learning.

Jiwan PanjikerJiwan Panjiker is a Solutions Architect at Amazon Web Services, based in the Greater New York City area. He works with AWS enterprise customers, helping them in their cloud journey to solve complex business problems by making effective use of AWS services. Outside of work, he likes spending time with his friends and family, going for long drives, and exploring local cuisine.

Cost Optimized Vector Database: Introduction to Amazon OpenSearch Service quantization techniques

Post Syndicated from Aruna Govindaraju original https://aws.amazon.com/blogs/big-data/cost-optimized-vector-database-introduction-to-amazon-opensearch-service-quantization-techniques/

The rise of generative AI applications has heightened the necessity to implement semantic search and natural language search. These advanced search features help find and retrieve conceptually relevant documents from enterprise content repositories to serve as prompts for generative AI models. Raw data within various source repositories in the form of text, images, audio, video, and so on are converted, with the help of embedding models, to a standard numerical representation called vectors that powers the semantic and natural language search. As organizations harness more sophisticated large language and foundational models to power their generative AI applications, supplemental embedding models are also evolving to handle large, high-dimension vector embedding. As the vector volume expands, there is a proportional increase in memory usage and computational requirements, resulting in higher operational costs. To mitigate this issue, various compression techniques can be used to optimize memory usage and computational efficiency.

Quantization is a lossy data compression technique aimed to lower computation and memory usage leading to lower costs, especially for high-volume data workloads. There are various techniques to compress data depending on the type and volume of the data. The usual technique is to map infinite values (or a relatively large list of finites) to smaller more discrete values. Vector compression can be achieved through two primary techniques: product quantization and scalar quantization. In the product quantization technique, the original vector dimension array is broken into multiple sub-vectors and each sub-vector is encoded into a fixed number of bits that represent the original vector. This method requires that you only store and search across the encoded sub-vector instead of the original vector. In scalar quantization, each dimension of the input vector is mapped from a 32-bit floating-point representation to a smaller data type.

Amazon OpenSearch Service, as a vector database, supports scalar and product quantization techniques to optimize memory usage and reduce operational costs.

OpenSearch as a vector database

OpenSearch is a distributed search and analytics service. The OpenSearch k-nearest neighbor (k-NN) plugin allows you to index, store, and search vectors. Vectors are stored in OpenSearch as a 32-bit float array of type knn_vector and that supports up to 16,000 dimensions per vector.

OpenSearch uses approximate nearest neighbor search to provide scalable vector search. The approximate k-NN algorithm retrieves results based on an estimation of the nearest vectors to a given query vector. Two main methods for performing approximate k-NN are the graph-based Hierarchical Navigable Small-World (HNSW) and the cluster-based Inverted File (IVF). These data structures are constructed and loaded into memory during the initial vector search operation. As vector volume grows, both the data structures and associated memory requirements for search operations scale proportionally.

For example, each HNSW graph with 32-bit float data takes approximately 1.1 * (4 * d + 8 * m) * num_vectors bytes of memory. Here, num_vectors represents the total quantity of vectors to be indexed, d is the number of dimensions determined by the embedding model you use to generate the vectors and m is the number of edges in the HSNW graphs, an index parameter that can be controlled to tune performance. Using this formula, memory requirements for vector storage for a configuration of 384 dimensions and an m value of 16 would be:

  • 1 million vectors: 1.830 GB (1.1 * (4 * 384 + 8 * 16) * 1000,000 bytes)
  • 1 billion vectors: 1830 GB (1.1 * (4 * 384 + 8 * 16) * 1,000,000,000 bytes)

Although approximate nearest neighbor search can be optimized to handle massive datasets with billions of vectors efficiently, the memory requirements for loading 32-bit full-precision vectors to memory during the search process can become prohibitively costly. To mitigate this, OpenSearch service supports the following four quantization techniques.

  • Binary quantization
  • Byte quantization
  • FP16 quantization
  • Product quantization

These techniques fall within the broader category of scalar and product quantization that we discussed earlier. In this post, you will learn quantization techniques for optimizing vector workloads on OpenSearch Service, focusing on memory reduction and cost-efficiency. It introduces the new disk-based vector search approach that enables efficient querying of vectors stored on disk without loading them into memory. The method integrates seamlessly with quantization techniques, featuring key configurations such as the on_disk mode and compression_level parameter. These settings facilitate built-in, out-of-the-box scalar quantization at the time of indexing.

Binary quantization (up to 32x compression)

Binary quantization (BQ) is a type of scalar quantization. OpenSearch leverages FAISS engine’s binary quantization, enabling up to 32x compression during indexing. This technique reduces the vector dimension from the default 32-bit float to a 1-bit binary by compressing the vectors into a 0s and 1s. OpenSearch supports indexing, storing and searching binary vectors. You can also choose to encode each vector dimension using 1, 2, or 4 bits, depending upon the desired compression factor as shown in the example below. The compression factor can be adjusted using bits settings. A value of 2 yields 16x compression, while 4 results in 8x compression. The default setting is 1. In binary quantization, the training is handled natively at the time of indexing, allowing you to avoid an additional preprocessing step.

To implement binary quantization, define the vector type as knn_vector and specify the encoder name as binary with the desired number of encoding bits. Note, the encoder parameter refers to a method used to compress vector data before storing it in the index. Optimize performance by using space_type, m, and ef_construction parameters. See the OpenSearch documentation for information about the underlying configuration of the approximate k-NN.

PUT my-vector-index
{
  "settings": {
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "my_vector_field": {
        "type": "knn_vector",
        "dimension": 8,
        "method": {
          "name": "hnsw",
          "engine": "faiss",
          "space_type": "l2",
          "parameters": {
            "m": 16,
            "ef_construction": 512,
            "encoder": {
              "name": "binary",
              "parameters": {
                "bits": 1
              }
            }
          }
        }
      }
    }
  }
}

Memory requirements for implementing binary quantization with FAISS-HNSW:

1.1 * (bits * (d/8)+ 8 * m) * num_vectors bytes.

Compression Encoding bits

Memory required for 1 billion vector

with d=384 and m=16 (in GB)

32x 1 193.6
16x 2 246.4
8x 4 352.0

For detailed implementation steps on binary quantization, see the OpenSearch documentation.

Byte-quantization (4x compression)

Byte quantization compresses 32-bit floating-point dimensions to 8-bit integers, ranging from –128 to +127, reducing memory usage by 75%. OpenSearch supports indexing, storing, and searching byte vectors, which must be converted to 8-bit format prior to ingestion. To implement byte vectors, specify the k-NN vector field data_type as byte in the index mapping. This feature is compatible with both Lucene and FAISS engines. An example of creating an index for byte-quantized vectors follows.

PUT /my-vector-index
{
  "settings": {
    "index": {
      "knn": true,
      "knn.algo_param.ef_search": 100
    }
  },
  "mappings": {
    "properties": {
      "my_vector": {
        "type": "knn_vector",
        "dimension": 3,
        "data_type": "byte",
        "space_type": "l2",
        "method": {
          "name": "hnsw",
          "engine": "faiss",
          "parameters": {
            "ef_construction": 100,
            "m": 16
          }
        }
      }
    }
  }
}

This method requires ingesting a byte-quantized vector into OpenSearch for direct storage in the k-NN vector field (of byte type). However, the recently introduced disk-based vector search feature eliminates the need for external vector quantization. This feature will be discussed in detail later in this blog.

Memory requirements for implementing byte quantization with FAISS-HNSW:

1.1 * (1 * d + 8 * m) * num_vectors bytes.

For detailed implementation steps, see to the OpenSearch documentation. For performance metrics regarding accuracy, throughput, and latency, see Byte-quantized vectors in OpenSearch.

FAISS FP16 quantization (2x compression)

FP16 quantization is a technique that uses 16-bit floating-point scalar representation, reducing the memory usage by 50%. Each vector dimension is converted from 32-bit to 16-bit floating-point, effectively halving the memory requirements. The compressed vector dimensions must be in the range [–65504.0, 65504.0]. To implement FP16 quantization, create the index with the k-NN vector field and configure the following:

  • Set k-NN vector field method and engine to HNSW and FAISS, respectively.
  • Define encoder parameter and set name to sq and type to fp16.

Upon uploading 32-bit floating-point vectors to OpenSearch, the scalar quantization FP16 (SQfp16) automatically quantizes them to 16-bit floating-point vectors during ingestion and stores them in the vector field. The following example demonstrates the creation of the index for quantizing and storing FP16-quantized vectors.

PUT /my-vector-index
{
  "settings": {
    "index": {
      "knn": true,
      "knn.algo_param.ef_search": 100
    }
  },
  "mappings": {
    "properties": {
      "my_vector1": {
        "type": "knn_vector",
        "dimension": 3,
        "space_type": "l2",
        "method": {
          "name": "hnsw",
          "engine": "faiss",
          "parameters": {
            "encoder": {
              "name": "sq",
              "parameters": {
                "type": "fp16",
                "clip": true
              }
            },
            "ef_construction": 256,
            "m": 8
          }
        }
      }
    }
  }
}

Memory requirements for implementing FP16 quantization with FAISS-HNSW:

(1.1 * (2 * d + 8 * m) * num_vectors) bytes.

The preceding FP16 example introduces an optional Boolean parameter called clip, which defaults to false. When false, vectors with out-of-range values (values not between –65504.0 and +65504.0) are rejected. Setting clip to true enables rounding of out-of-range vector values to fit within the supported range. For detailed implementation steps, see the OpenSearch documentation. For performance metrics regarding accuracy, throughput, and latency, see Optimizing OpenSearch with Faiss FP16 scalar quantization: Enhancing memory efficiency and cost-effectiveness.

Product quantization

Product quantization (PQ) is an advanced dimension-reduction technique that offers significantly higher levels of compression. While conventional scalar quantization methods typically achieve up to 32x compression, PQ can provide compression levels of up to 64x, making it a more efficient solution for optimizing storage and cost. OpenSearch supports PQ with both IVF and HNSW method from FAISS engine. Product quantization partitions vectors into m sub-vectors, each encoded with a bit count determined by the code size. The resulting vector’s memory footprint is m * code_size bits.

FAISS product quantization involves three key steps:

  1. Create and populate a training index to build the PQ model, optimizing for accuracy.
  2. Execute the _train API on the training index to generate the quantizer model.
  3. Construct the vector index, configuring the kNN field to use the prepared quantizer model.

The following example demonstrates the three steps to setting up product quantization.

Step1: Create the training index. Populate the training index with an appropriate dataset, making sure of dimensional alignment with train-index specifications. Note that the training index requires a minimum of 256 documents.

PUT /train-index
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 2
  },
  "mappings": {
    "properties": {
      "train-field": {
        "type": "knn_vector",
        "dimension": 4
      }
    }
  }
}

Step2: Create a quantizer model called my-model by running the _train API on the training index you just created. Note that the encoder with name defined as pq facilitates native vector quantization. Other parameters for encoder include code_size and m. FAISS-HNSW requires a code_size of 8 and a training dataset of at least 256 (2^code_size) documents. For detailed parameter specifications, see the PQ parameter reference.

POST /_plugins/_knn/models/my-model/_train
{
  "training_index": "train-index",
  "training_field": "train-field",
  "dimension": 4,
  "description": "My test model description",
  "method": {
    "name": "hnsw",
    "engine": "faiss",
    "parameters": {
      "encoder": {
        "name": "pq", 
         "parameters": {
           "code_size":8,
           "m":2
         }
      },
      "ef_construction": 256,
      "m": 8
    }
  }
}

Step3: Map the quantizer model to your vector index.

PUT /my-vector-index
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 2,
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "target-field": {
        "type": "knn_vector",
        "model_id": "my-model"
      }
    }
  }
}

Ingest the complete dataset into the newly created index, my-vector-index. The encoder will automatically process the incoming vectors, applying encoding and quantization based on the compression parameters (code_size and m) specified in the quantizer model configuration.

Memory requirements for implementing product quantization with FAISS-HNSW:

1.1*(((code_size / 8) * m + 24 + 8 * m) * num_vectors bytes. Here the code_size and m are parameters within the encoder parameter, num_vectors are the total number of vectors.

During quantization, each of the training vectors is broken down to multiple sub-vectors or sub-spaces, defined by a configurable value m. The number of bits to encode each of the sub-vector is controlled by parameter code_size. Each of the sub-vectors is then compressed or quantized separately by running the k-means clustering with the value k defined as 2^code_size. In this technique, the vector is compressed roughly by m * code_size bits.

For detailed implementation guidelines and understanding of the configurable parameters during product quantization, see the OpenSearch documentation. For performance metrics regarding accuracy, throughput and latency using FAISS IVF for PQ, see Choose the k-NN algorithm for your billion-scale use case with OpenSearch.

Disk-based vector search

Disk-based vector search optimizes query efficiency by using compressed vectors in memory while maintaining full-precision vectors on disk. This approach enables OpenSearch to perform searches across large vector datasets without the need to load entire vectors into memory, thus improving scalability and resource utilization. Implementation is achieved through two new configurations at index creation: mode and compression level. As of OpenSearch 2.17, the mode parameter can be set to either in_memory or on_disk during indexing. The previously discussed methods default to an in-memory mode. In this configuration, the vector index is constructed using either a graph (HNSW) or bucket (IVF) structure, which is then loaded into native memory during search operations. While offering excellent recall, this approach could impact memory usage, and scalability for high volume vector workload.

The on_disk mode optimizes vector search efficiency by storing full-precision vectors on disk while using real-time, native quantization during indexing. Coupled with adjustable compression levels, this approach allows only compressed vectors to be loaded into memory, thereby improving memory and resource utilization and search performance. The following compression levels correspond to various scalar quantization methods discussed earlier.

  • 32x: Binary quantization (1-bit dimensions)
  • 4x: Byte and integer quantization (8-bit dimensions)
  • 2x: FP16 quantization (16-bit dimensions)

This method also supports other compression levels such as 16x and 8x that aren’t available with the in-memory mode. To enable disk-based vector search, create the index with mode set to on_disk as shown in the following example.

PUT /my-vector-index
{
  "settings" : {
    "index": {
      "knn": true
    }
  },
  "mappings": {
    "properties": {
      "my_vector_field": {
        "type": "knn_vector",
        "dimension": 8,
        "space_type": "innerproduct",
        "data_type": "float",
        "mode": "on_disk"
      }
    }
  }
}

Configuring just the mode as on_disk employs the default configuration, which uses the FAISS engine and HNSW method with a 32x compression level (1-bit, binary quantization). The ef_construction to optimize index time latency defaults to 100. For more granular fine-tuning, you can override these k-NN parameters as shown in the example that follows.

PUT /my-vector-index
{
  "settings" : {
    "index": {
      "knn": true
    }
  },
  "mappings": {
    "properties": {
      "my_vector_field": {
        "type": "knn_vector",
        "dimension": 8,
        "space_type": "innerproduct",
        "data_type": "float",
        "mode": "on_disk",
        "compression_level": "16x",
        "method": {
          "name": "hnsw",
          "engine": "faiss",
          "parameters": {
            "ef_construction": 512
          }
        }
      }
    }
  }
}

Because quantization is a lossy compression technique, higher compression levels typically result in lower recall. To improve recall during quantization, you can configure the disk-based vector search to run in two phases using the search time configuration parameter ef_search and the oversample_factor as shown in the following example.

GET my-vector-index/_search
{
  "query": {
    "knn": {
      "my_vector_field": {
        "vector": [1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5],
        "k": 5,
        "method_parameters": {
            "ef_search": 512
        },
        "rescore": {
            "oversample_factor": 10.0
        }
      }
    }
  }
}

In the first phase, oversample_factor * k results are retrieved from the quantized vectors in memory and the scores are approximated. In the second phase, the full-precision vectors of those oversample_factor * k results are loaded into memory from disk, and scores are recomputed against the full-precision query vector. The results are then reduced to the top k.

The oversample_factor for rescoring is determined by the configured dimension and compression level at indexing. For dimensions below 1,000, the factor is fixed at 5. For dimensions exceeding 1,000, the default factor varies based on the compression level, as shown in the following table.

Compression level Default oversample_factor for rescoring
32x (default) 3
16x 2
8x 2
4x No default rescoring
2x No default rescoring

As previously discussed, the oversample_factor can be dynamically adjusted at search time. This value presents a critical trade-off between accuracy and search efficiency. While a higher factor improves accuracy, it proportionally increases memory usage and reduces search throughput. See the OpenSearch documentation to learn more about disk-based vector search and understand the right usage for oversample_factor.

Performance assessment of quantization methods: Reviewing memory, recall, and query latency.

The OpenSearch documentation on approximate k-NN search provides a starting point for implementing vector similarity search. Additionally, Choose the k-NN algorithm for your billion-scale use case with OpenSearch offers valuable insights into designing efficient vector workloads for handling billions of vectors in production environments. It introduces product quantization techniques as a potential solution to reduce memory requirements and associated costs by scaling down the memory footprint.

The following table illustrates the memory requirements for storing and searching through 1 billion vectors using various quantization techniques. The table compares the default memory consumption of full-precision vector using the HNSW method against memory consumed by quantized vectors. The model employed in this analysis is the sentence-transformers/all-MiniLM-L12-v2, which operates with 384 dimensions. The raw metadata is assumed to be not more than 100Gb.

Without quantization
(in GB)
Product quantization
(in GB)
Scalar quantization
(in GB)
FP16 vectors Byte vectors Binary vectors
m value 16 16 16 16 16
pq_m, code_size 16, 8
Native memory consumption (GB) 1830.4 184.8 985.6 563.2 193.6
Total storage =
100 GB+vector
1930.4 284.8 1085.6 663.2 293.6

Reviewing the preceding table reveals that for a dataset comprising 1 billion vectors, the HNSW graph with 32-bit full-precision vector requires approximately 1830 GB of memory. Compression techniques such as product quantization can reduce this to 184.8 GB, while scalar quantization offers varying levels of compression. The following table summarizes the correlation between compression techniques and their impact on key performance indicators including cost savings, recall rate, and query latency. This analysis builds upon our previous assessment of memory usage to aid in selecting compression technique that meets your requirement.

The table presents two key search metrics: search latency at the 90th percentile (p90) and recall at 100.

  • Search latency @p90 indicates that 90% of search queries will be completed within that specific latency time.
  • recall@100 – The fraction of the top 100 ground truth neighbors found in the 100 results returned.
  Without quantization
(in GB)
Product quantization
(in GB)
Scalar quantization
(in GB)
  FP16 quantization
[mode=in_memory]
Byte quantization
[mode=in_memory]
Binary quantization
[mode=on_disk]
Preconditions/Datasets Applicable to all datasets Recall depends on the nature of the training data Works for dimension value in
range [-65536 to 65535]
Works for dimension value in
range [-128 to 127]
Works well for larger dimensions >=768
Preprocessing required? No Yes,
preprocessing/training is required
No No No
Rescoring No No No No Yes
Recall @100 >= 0.99 >0.7 >=0.95 >=0.95 >=0.90
p90 query latency (ms) <50 ms <50 ms <50 ms <50 ms <200 ms
Cost
(baseline $X)
$X $0.1*X
(up to 90% savings)
$0.5*X
(up to 50% savings)
$0.25*X
(up to 75%)
$0.15*X
(up to 85% savings)
Sample cost for a billion vector $20,923.14 $2,092.31 $10,461.57 $5,230.79 $3,138.47

The sample cost estimate for billion vector is based on a configuration optimized for cost. Please note that actual savings may vary based on your specific workload requirements and chosen configuration parameters. Notably in the table, product quantization offers up to 90% cost reduction compared to the baseline HNSW graph-based vector search cost ($X). Scalar quantization similarly yields proportional cost savings, ranging from 50% to 85% relative to the compressed memory footprint. The choice of compression technique involves balancing cost-effectiveness, accuracy, and performance, as it impacts precision and latency.

Conclusion

By leveraging OpenSearch’s quantization techniques, organizations can make informed tradeoffs between cost efficiency, performance, and recall, empowering them to fine-tune their vector database operations for optimal results. These quantization techniques significantly reduce memory requirements, improve query efficiency and offer built-in encoders for seamless compression. Whether you’re dealing with large-scale text embeddings, image features, or any other high-dimensional data, OpenSearch’s quantization techniques offer efficient solutions for vector search requirements, enabling the development of cost-effective, scalable, and high-performance systems.

As you move forward with your vector database projects, we encourage you to:

  1. Explore OpenSearch’s compression techniques in-depth
  2. Evaluate applicability of the right technique to your specific use case
  3. Determine the appropriate compression levels based on your requirements for recall and search latency
  4. Measure and compare cost savings based on accuracy, throughput, and latency

Stay informed about the latest developments in this rapidly evolving field, and don’t hesitate to experiment with different quantization techniques to find the optimal balance between cost, performance, and accuracy for your applications.


About the Authors

Aruna Govindaraju is an Amazon OpenSearch Specialist Solutions Architect and has worked with many commercial and open-source search engines. She is passionate about search, relevancy, and user experience. Her expertise with correlating end-user signals with search engine behavior has helped many customers improve their search experience.

Vamshi Vijay Nakkirtha is a software engineering manager working on the OpenSearch Project and Amazon OpenSearch Service. His primary interests include distributed systems. He is an active contributor to various OpenSearch projects such as k-NN, Geospatial, and dashboard-maps.

Use CI/CD best practices to automate Amazon OpenSearch Service cluster management operations

Post Syndicated from Camille BIRBES original https://aws.amazon.com/blogs/big-data/use-ci-cd-best-practices-to-automate-amazon-opensearch-service-cluster-management-operations/

Quick and reliable access to information is crucial for making smart business decisions. That’s why companies are turning to Amazon OpenSearch Service to power their search and analytics capabilities. OpenSearch Service makes it straightforward to deploy, operate, and scale search systems in the cloud, enabling use cases like log analysis, application monitoring, and website search.

Efficiently managing OpenSearch Service indexes and cluster resources can lead to significant improvements in performance, scalability, and reliability – all of which directly impact a company’s bottom line. However, the industry lacks built-in and well-documented solutions to automate these important operational tasks.

Applying continuous integration and continuous deployment (CI/CD) to managing OpenSearch index resources can help do that. For instance, storing index configurations in a source repository allows for better tracking, collaboration, and rollback. Using infrastructure as code (IaC) tools can help automate resource creation, providing consistency and reducing manual work. Finally, using a CI/CD pipeline can automate deployments and streamline workflow.

In this post, we discuss two options to achieve this: the Terraform OpenSearch provider and the Evolution library. Which one is best suited to your use case depends on the tooling you are familiar with, your language of choice, and your existing pipeline.

Solution overview

Let’s walk through a straightforward implementation. For this use case, we use the AWS Cloud Development Kit (AWS CDK) to provision the relevant infrastructure as described in the following architecture diagram that follows, AWS Lambda to trigger Evolution scripts and AWS CodeBuild to apply Terraform files. You can find the code for the entire solution in the GitHub repo.

Solution Architecture Diagram

Prerequisites

To follow along with this post, you need to have the following:

  • Familiarity with Java and OpenSearch
  • Familiarity with the AWS CDK, Terraform, and the command line
  • The following software versions installed on your machine: Python 3.12, NodeJS 20, and AWS CDK 2.170.0 or higher
  • An AWS account, with an AWS Identity and Access Management (IAM) role configured with the relevant permissions

Build the solution

To build an automated solution for OpenSearch Service cluster management, follow these steps:

  1. Enter the following commands in a terminal to download the solution code; build the Java application; build the required Lambda layer; create an OpenSearch domain, two Lambda functions and a CodeBuild project; and deploy the code:
git clone https://github.com/aws-samples/opensearch-automated-cluster-management
cd opensearch-automated-cluster-management
cd app/openSearchMigration
mvn package
cd ../../lambda_layer
chmox a+x create_layer.sh
./create_layer.sh
cd ../infra
npm install
npx cdk bootstrap
aws iam create-service-linked-role --aws-service-name es.amazonaws.com
npx cdk deploy --require-approval never
  1. Wait 15 to 20 minutes for the infrastructure to finish deploying, then check that your OpenSearch domain is up and running, and that the Lambda function and CodeBuild project have been created, as shown in the following screenshots.

OpenSearch domain provisioned successfully OpenSearch Migration Lambda function created successfully OpenSearchQuery Lambda function created successfully CodeBuild project created successfully

Before you use automated tools to create index templates, you can verify that none already exist using the OpenSearchQuery Lambda function.

  1. On the Lambda console, navigate to the relevant Function
  2. On the Test tab, choose Test.

The function should return the message “No index patterns created by Terraform or Evolution,” as shown in the following screenshot.

Check that no index patterns have been created

Apply Terraform files

First, you use Terraform with CodeBuild. The code is ready for you to test, let’s look at a few important pieces of configuration:

  1. Define the required variables for your environment:
variable "OpenSearchDomainEndpoint" {
  type = string
  description = "OpenSearch domain URL"
}

variable "IAMRoleARN" {
  type = string
  description = "IAM Role ARN to interact with OpenSearch"
}
  1. Define and configure the provider
terraform {
  required_providers {
    opensearch = {
      source = "opensearch-project/opensearch"
      version = "2.3.1"
    }
  }
}

provider "opensearch" {
  url = "https://${var.OpenSearchDomainEndpoint}"
  aws_assume_role_arn = "${var.IAMRoleARN}"
}

NOTE: As of the publication date of this post, there is a bug in the Terraform OpenSearch provider that will trigger when launching your CodeBuild project and that will prevent successful execution. Until it is fixed, please use the following version:

terraform {
  required_providers {
    opensearch = {
      source = "gnuletik/opensearch"
      version = "2.7.0"
    }
  }
}
  1. Create an index template
resource "opensearch_index_template" "template_1" {
  name = "cicd_template_terraform"
  body = <<EOF
{
  "index_patterns": ["terraform_index_*"],
  "template": {
    "settings": {
      "number_of_shards": "1"
    },
    "mappings": {
        "_source": {
            "enabled": false
        },
        "properties": {
            "host_name": {
                "type": "keyword"
            },
            "created_at": {
                "type": "date",
                "format": "EEE MMM dd HH:mm:ss Z YYYY"
            }
        }
    }
  }
}
EOF
}

You are now ready to test.

  1. On the CodeBuild console, navigate to the relevant Project and choose Start Build.

The build should complete successfully, and you should see the following lines in the logs:

opensearch_index_template.template_1: Creating...
opensearch_index_template.template_1: Creation complete after 0s (id=cicd_template_terraform)
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

You can check that the index template has been properly created using the same Lambda function as earlier, and should see the following results.

Terraform index properly created

Run Evolution scripts

In the next step, you use the Evolution library. The code is ready for you to test, let’s look at a few important pieces of code and configuration:

  1. To begin with, you need to add the latest version of the Evolution core library and AWS SDK as Maven dependencies. The full xml file is available in the GitHub repository; to check the Evolution library’s compatibility with different OpenSearch versions, see here.
<dependency>
    <groupId>com.senacor.elasticsearch.evolution</groupId>
    <artifactId>elasticsearch-evolution-core</artifactId>
    <version>0.6.0</version><!--check the latest version-->
</dependency>
<dependency>
   <groupId>software.amazon.awssdk</groupId>
   <artifactId>auth</artifactId>
</dependency>
  1. Create Evolution Bean and an AWS interceptor (which implements HttpRequestInterceptor).

Interceptors are open-ended mechanisms in which the SDK calls code that you write to inject behavior into the request and response lifecycle. The function of the AWS interceptor is to hook into the execution of API requests and create an AWS signed request stamped with proper IAM roles. You can use the following code to create your own implementation to sign all the requests made to OpenSearch within AWS.

  1. Create your own OpenSearch client to manage automatic creation of index, mappings, templates, and aliases.

The default ElasticSearch client that comes bundled in as part of the Maven dependency can’t be used to make PUT calls to the OpenSearch cluster. Therefore, you need to bypass the default REST client instance, and add a CallBack to the AwsRequestSigningInterceptor.

The following is a sample implementation:

private RestClient getOpenSearchEvolutionRestClient() {
    return RestClient.builder(getHttpHost())
        .setHttpClientConfigCallback(hacb -> 
            hacb.addInterceptorLast(getAwsRequestSigningInterceptor()))
        .build();
}
  1. Use the Evolution Bean to call your migrate method, which is responsible for initiating the migration of the scripts defined either using classpath or filepath:
public void executeOpensearchScripts() {
    ElasticsearchEvolution opensearchEvolution = ElasticsearchEvolution.configure()
        .setEnabled(true) // true or false
        .setLocations(Arrays.asList("classpath:opensearch_migration/base",
            "classpath:opensearch_migration/dev")) // List of all locations where scripts are located.
        .setHistoryIndex("opensearch_changelog") // Tracker index to store history of scripts executed.
        .setValidateOnMigrate(false) // true or false
        .setOutOfOrder(true) // true or false
        .setPlaceholders(Collections.singletonMap("env","dev")) // list of placeholders which will get replaced in the script during execution.
        .load(getElasticsearchEvolutionRestClient());
    opensearchEvolution.migrate();
}
  1. An Evolution migration script represents a REST call to the OpenSearch API (for example, PUT /_index_template/cicd_template_evolution), where you define index patterns, settings, and mappings in JSON format. Evolution interprets these scripts, manages their versioning, and provides ordered execution. See the following example:
PUT /_index_template/cicd_template_evolution
Content-Type: application/json

{
  "index_patterns": ["evolution_index_*"],
  "template": {
    "settings": {
      "number_of_shards": "1"
    },
    "mappings": {
        "_source": {
            "enabled": false
        },
        "properties": {
            "host_name": {
                "type": "keyword"
            },
            "created_at": {
                "type": "date",
                "format": "EEE MMM dd HH:mm:ss Z YYYY"
            }
        }
    }
  }
}

The first two lines must be followed by a blank line. Evolution also supports comment lines in its migration scripts. Every line starting with # or // will be interpreted as a comment-line. Comment lines aren’t sent to OpenSearch. Instead, they are filtered by Evolution.

The migration script file naming convention must follow a pattern:

  • Start with esMigrationPrefix which is by default V or the value that has been configured using the configuration option esMigrationPrefix
  • Followed by a version number, which must be numeric and can be structured by separating the version parts with a period (.)
  • Followed by the versionDescriptionSeparator: __ (the double underscore symbol)
  • Followed by a description, which can be any text your filesystem supports
  • End with esMigrationSuffixes which is by default .http and is configurable and case insensitive

You’re now ready to execute your first automated change. An example of a migration script has already been created for you, which you can refer to in a previous section. It will create an index template named cicd_template_evolution.

  1. On the Lambda console, navigate to your function.
  2. On the Test tab, choose Test.

After a few seconds, the function should successfully complete. You can review the log output in the Details section, as shown in the following screenshots.

Migration function finish successfully

The index template now exists, and you can check that its configuration is indeed in line with the script, as shown in the following screenshot.

Evolution index template properly created

Clean up

To clean up the resources that were created as part of this post, run the following commands (in the infra folder):

npx cdk destroy --all

Conclusion

In this post, we demonstrated how to automate OpenSearch index templates using CI/CD practices and tools such as Terraform or the Evolution library.

To learn more about OpenSearch Service, refer to the Amazon OpenSearch Service Developer Guide. To further explore the Evolution library, refer to the documentation. To learn more about the Terraform OpenSearch provider, refer to the documentation.

We hope this detailed guide and accompanying code will help you get started. Try it out, let us know your thoughts in the comments section, and feel free to reach out to us for questions!


About the Authors

Camille BirbesCamille Birbes is a Senior Solutions Architect with AWS and is based in Hong Kong. He works with major financial institutions to design and build secure, scalable, and highly available solutions in the cloud. Outside of work, Camille enjoys any form of gaming, from board games to the latest video game.

Sriharsha Subramanya Begolli works as a Senior Solutions Architect with AWS, based in Bengaluru, India. His primary focus is assisting large enterprise customers in modernizing their applications and developing cloud-based systems to meet their business objectives. His expertise lies in the domains of data and analytics.

How to enhance Amazon Macie data discovery capabilities using Amazon Textract

Post Syndicated from ZhiWei Huang original https://aws.amazon.com/blogs/security/how-to-enhance-amazon-macie-data-discovery-capabilities-using-amazon-textract/

Amazon Macie is a managed service that uses machine learning (ML) and deterministic pattern matching to help discover sensitive data that’s stored in Amazon Simple Storage Service (Amazon S3) buckets. Macie can detect sensitive data in many different formats, including commonly used compression and archive formats. However, Macie doesn’t support the discovery of sensitive data within images, audio, video, or other types of multimedia content. Customers often ask how to effectively detect whether there’s sensitive data in images. This can be a significant challenge for organizations, especially those operating in highly regulated industries with strict data protection requirements.

In this post, we show you how to gain visibility of sensitive data embedded in images that are stored within your S3 buckets by adding an additional conversion layer to extract image-based data into a format supported by Macie. The solution also uses the recommended set of managed identifiers and custom data identifiers supported by Macie to cover most use cases.

Solution overview

In this section, we walk through the components of the solution. The solution is deployed using AWS Serverless Application Model (AWS SAM), which is an open source framework for building serverless applications. AWS SAM helps to organize related components and operate on a single stack. When used together with the AWS SAM CLI, it’s a useful tool for developing, testing, and building serverless applications on AWS. We provided an AWS SAM template that you can use to set up the required services and AWS Lambda functions. Figure 1 illustrates the architecture of the solution.

Figure 1: Solution architecture and workflow

Figure 1: Solution architecture and workflow

The solution workflow is as follows:

  1. A user uploads images that might contain sensitive data into the S3 bucket.
  2. After you have verified that potentially sensitive data has been uploaded into the S3 bucket, you can manually invoke the Lambda function textract-trigger to start the process. This function calls Amazon Textract asynchronously to process files in the S3 bucket with filename extensions such as .png, .jpg, and .jpeg. Amazon Textract creates a job for each image and extracts the text found in each image.
  3. Because the operation is asynchronous, the job ID and status of each call is stored in an Amazon DynamoDB table to track the status of jobs and make sure that all of the jobs are completed before Macie is triggered to scan the S3 bucket.
  4. The resulting JSON file from the Amazon Textract job is stored within the same S3 bucket as the original image.
  5. For each analysis job, Amazon Textract sends a job completion notification to the registered Amazon Simple Notification Service (Amazon SNS) topic AmazonTextractJobSNSTopic. The Lambda function macie-trigger is subscribed to the SNS topic and triggered every time an SNS message is received for a completed Amazon Textract analysis job.
  6. Further post-processing is done by the Lambda function macie-trigger to extract the values from the JSON file into a text file. This text file is then uploaded into the same S3 bucket.
  7. The function then checks for other in-progress Amazon Textract jobs in the DynamoDB table. If there are pending jobs, the function exits and waits to be triggered again.
  8. After all of the Amazon Textract jobs are marked as complete in the DynamoDB table, the Lambda function macie-trigger creates a Macie classification job.
  9. Macie then scans the bucket for sensitive data based on managed identifiers and your custom data identifiers.
  10. Macie will continuously publish the classification job status to Amazon CloudWatch Logs.
  11. It might take some time to scan all the files in the S3 bucket, and you will be notified through SNS email when the Macie job is completed. The Lambda function MacieCompletedSNSLambda will filter for completed job status and send an email notification using the SNS topic MacieSnsTopic.

When deploying the solution, you can specify an existing S3 bucket in your AWS account that’s already storing data that might be sensitive or deploy a new S3 bucket as part of the setup. If you specify an existing S3 bucket, make sure that there are no additional statements in the bucket policy or KMS key policy that will deny the relevant solution components access to the S3 bucket. If no existing S3 bucket is specified, a new S3 bucket will be created with the name s3-with-sensitive-data-<account-id>-<random-string>.

Prerequisites

Before deploying the solution, make sure the following prerequisites are in place.

  • Enable Macie in your account. For instructions, see Getting Started with Amazon Macie.
  • Determine the regular expression (regex) pattern for sensitive textual data that you want Macie to detect. This will allow you to create custom data identifiers that complement the managed data identifiers provided by Macie. For more information, see Building custom data identifiers in Amazon Macie. There’s an example in the pre-deployment steps that you can follow, with the sample images that come with the solution.
  • Make sure that you have the permissions to deploy the AWS services detailed in the solution: Lambda, Amazon S3, Amazon Textract, Amazon SNS, Macie, Amazon CloudWatch, and DynamoDB.
  • Install the AWS SAM CLI, which you will use to deploy the solution. To learn more about how AWS SAM works, see The AWS SAM project and AWS SAM template.

Pre-deployment steps

With the prerequisites in place, you need to set up one or more custom identifiers through the AWS Management Console for Macie before you can deploy the solution. Use the following steps to set up an example custom identifier for the images provided in this post.

To set up custom identifiers:

  1. Navigate to the Amazon Macie console.
  2. Choose Custom data identifiers in the navigation pane, and then choose Create.
  3. Enter a name and description for the custom identifier, such as the following examples:
    1. Name: Singapore NRIC Number
    2. Description: This expression can be used to find or validate a Singapore NRIC Number that begins with the character S, F, T, or G, followed by seven digits and ending with any character from A to Z.
  4. For Regular expression, enter: [SFTG]\d{7}[A-Z].
  5. For Keywords, enter: Singapore,Identity, Card.
    Keywords are important because they can help to improve the accuracy of the detection and refine the results.
  6. Leave the other fields as default and choose Submit.
  7. Navigate to the newly created custom identifier and note the ID. This ID is required as an input when deploying the AWS SAM solution.
    Figure 2: ID of a newly created Macie custom identifier

    Figure 2: ID of a newly created Macie custom identifier

Deploy the solution

With the prerequisites in place and pre-deployment steps complete, you’re ready to deploy the solution.

To deploy the solution:

  1. Open a CLI window, navigate to your preferred local directory and run git clone https://github.com/aws-samples/enhancing-macie-with-textract.
    1. Navigate to this directory by using cd enhancing-macie-with-textract.
    2. Run sam deploy --guided and follow the step-by-step instructions to indicate the deployment details such as the desired CloudFormation stack name, AWS Region, and other details. The following are descriptions of some of the requested parameters:
      • ExistingS3BucketName: This is the name of the S3 bucket that you want the solution to scan. This is an optional parameter. If it’s left blank, the solution will create an S3 bucket for you to store the objects that you want to scan.
      • MacieCustomCustomIdentifierIDList: This is the ID that you noted in the final pre-deployment step. Use this field to enter a list of custom identifiers for Macie to detect with. If there is more than one ID, each ID should be separated by a comma (for example, 59fd2814-0ba8-41cc-adb2-1ffec6a0bb3c, 665cf948-ea30-42df-9f63-9a858cbfe1a8).
      • EmailAddress: This is the email address that you want Amazon SNS email notifications to be sent to when a Macie job is complete.
      • MacieLogGroupExists: This checks if you have an existing Macie CloudWatch Log Group (/aws/macie/classificationjobs). If this is your first time running a Macie job, enter No or n. Otherwise, enter Yes or y.
  2. When completed, a confirmation request will be presented for the creation of the required resources. AWS SAM creates a default S3 bucket to store the necessary resources and then proceeds to the deployment prompt. Enter y to deploy and wait for deployment to complete.
  3. After deployment is complete, you should see the following output: Successfully created/updated stack – {StackName} in {AWSRegion}. You can review the resources and stack in the CloudFormation console.
    Figure 3: CloudFormation console of the deployed stack

    Figure 3: CloudFormation console of the deployed stack

  4. An email will be sent from [email protected] to the email address that you entered in step 3. Choose Confirm subscription to allow SNS to send you Macie job completion emails.
    Figure 4: Sample email from Amazon SNS for subscription confirmation

    Figure 4: Sample email from Amazon SNS for subscription confirmation

Test the solution

With the solution deployed, use a set of sample images to verify that it can detect sensitive data within images.

To test the solution:

  1. Use the Amazon S3 console to navigate to the bucket you specified during deployment. If you didn’t specify an S3 bucket to scan, look for a new bucket named s3-with-sensitive-data-<account-id>-<random-string>.
  2. In your project directory, there are sample images in sample-images.zip. Unzip the file and upload the sample images into the S3 bucket. The sample images include a US driver’s license, social security card, passport, and a Singapore National Registration Identity Card (NRIC).
  3. Navigate to the AWS Lambda console and select the {StackName}-TextractTriggerLambda-<random-string> function.
  4. Choose the Test tab and then choose Test to start the automated sensitive data discovery process for the uploaded images.
    Figure 5: Trigger an Amazon Textract scan on all images in the S3 bucket

    Figure 5: Trigger an Amazon Textract scan on all images in the S3 bucket

  5. The whole process will take about 15 minutes to complete. You will receive an email notification after the Macie scan is completed.
    Figure 6: Sample email from Amazon SNS for Macie job completion

    Figure 6: Sample email from Amazon SNS for Macie job completion

  6. Navigate to the Amazon Macie console and select Jobs in the navigation pane. You should see the job Scan for [number of] objects [datetime stamp] that matches the job name shown in the email notification.
  7. In the details panel, choose Show results button and then choose Show findings.
    Figure 7: Show Macie data discovery job findings

    Figure 7: Show Macie data discovery job findings

  8. You will see the findings related to the Macie sensitive data discovery job ID that you selected.
    Figure 8: Findings from the data discovery job

    Figure 8: Findings from the data discovery job

Understanding the findings

In this section, we take a closer look at each finding.

  1. In the console, look in the Resources affected column for the finding that ends with singapore-pink-nric-postprocessed.txt and select it. The finding type SensitiveData:S3Object/CustomIdentifier means that the resource contains text that matches the detection criteria of a custom data identifier. The other finding types in this example are from managed data identifiers. See Types of sensitive data findings for more information about Macie finding types.
  2. In the finding information panel, you can also see:
    1. In the Overview section, the resource indicates which resource contains sensitive data. The resource identifies the text file; however, you can identify the original image file because it has the same object name (other than the file type).
    2. In the Custom data identifiers section, you can see the type of sensitive data found. In this case, the finding involves data that matches the regex of a Singapore NRIC.

By using this solution, you can use Macie to detect sensitive data within the images in your S3 bucket and which images each finding corresponds to.

Using the solution

In this post, you have configured a single custom data identifier and a recommended set of managed identifiers. However, you can create and use multiple custom data identifiers in the solution by providing them as a comma-separated list, as mentioned in step 3 of Deploy the solution.

This solution has been designed to enable sensitive data discovery of text in image objects within a single S3 bucket. To expand the scope to include multiple S3 buckets, some additional code and permission changes are required to allow the Lambda functions to process and access multiple existing S3 buckets.

It’s important to note the language capabilities of Amazon Textract. Amazon Textract can extract printed text and handwriting from the standard English alphabet and ASCII symbols. Currently, Amazon Textract supports extraction in English, German, French, Italian, and Portuguese. For more information on what textual information Amazon Textract can identify, see Amazon Textract FAQs.

Clean up the resources

To clean up the resources that you created for this example:

  1. If you didn’t set up the scan on your own S3 bucket, empty the S3 bucket that was created as part of the solution. Open the Amazon S3 console, search for the bucket name s3-with-sensitive-data-<account-id>-<random-string> and choose Empty.
  2. Use one of the following methods to delete the CloudFormation stack:
    • Use the CloudFormation console to delete the stack.
    • Use AWS SAM CLI to run sam delete in your terminal. Follow the instructions and enter y when prompted to delete the stack.

Conclusion

In this post, you learned how to enhance the capabilities of Amazon Macie to conduct sensitive data discovery within image files. With this solution, you can extend the benefits of Amazon Macie beyond structured file formats.

If you want to extend the benefits of Amazon Macie to scan your databases for sensitive data, you might find these blog posts useful:

If you have feedback about this post, submit comments in the Comments section. If you have questions about this post, start a new thread on the AWS Security, Identity, & Compliance re:Post or contact AWS Support.

ZhiWei Huang

ZhiWei Huang

ZhiWei is a Financial Services Solutions Architect at AWS. He works with FSI customers across the ASEAN region, providing guidance for establishing robust security controls and networking foundations as customers build on and scale with AWS. Outside of work, he finds joy in travelling the world and spending quality time with his family.

Edmund Yeo

Edmund Yeo

Edmund is a Security Solutions Architect based in Singapore. He works with customers across ASEAN region, helping them to continually raise the security bar and posture for them to innovate rapidly and securely. Outside of work, Edmund enjoys having a cup of coffee, seeing the world, and bouldering.

Ying Ting Ng

Ying Ting Ng

Ying Ting is an Associate Security Solutions Architect at AWS, where she supports ASEAN growth customers in scaling securely on the cloud. She also advises on architectural best practices to help customers meet industry compliance standards. An active member in Amazon Women in Security, Ying Ting shares insights on making an impact as an early-career cybersecurity professional.

Implement a custom subscription workflow for unmanaged Amazon S3 assets published with Amazon DataZone

Post Syndicated from Somdeb Bhattacharjee original https://aws.amazon.com/blogs/big-data/implement-a-custom-subscription-workflow-for-unmanaged-amazon-s3-assets-published-with-amazon-datazone/

Organizational data is often fragmented across multiple lines of business, leading to inconsistent and sometimes duplicate datasets. This fragmentation can delay decision-making and erode trust in available data. Amazon DataZone, a data management service, helps you catalog, discover, share, and govern data stored across AWS, on-premises systems, and third-party sources. Although Amazon DataZone automates subscription fulfillment for structured data assets—such as data stored in Amazon Simple Storage Service (Amazon S3), cataloged with the AWS Glue Data Catalog, or stored in Amazon Redshift—many organizations also rely heavily on unstructured data. For these customers, extending the streamlined data discovery and subscription workflows in Amazon DataZone to unstructured data, such as files stored in Amazon S3, is critical.

For example, Genentech, a leading biotechnology company, has vast sets of unstructured gene sequencing data organized across multiple S3 buckets and prefixes. They need to enable direct access to these data assets for downstream applications efficiently, while maintaining governance and access controls.

In this post, we demonstrate how to implement a custom subscription workflow using Amazon DataZone, Amazon EventBridge, and AWS Lambda to automate the fulfillment process for unmanaged data assets, such as unstructured data stored in Amazon S3. This solution enhances governance and simplifies access to unstructured data assets across the organization.

Solution overview

For our use case, the data producer has unstructured data stored in S3 buckets, organized with S3 prefixes. We want to publish this data to Amazon DataZone as discoverable S3 data. On the consumer side, users need to search for these assets, request subscriptions, and access the data within an Amazon SageMaker notebook, using their own custom AWS Identity and Access Management (IAM) roles.

The proposed solution involves creating a custom subscription workflow that uses the event-driven architecture of Amazon DataZone. Amazon DataZone keeps you informed of key activities (events) within your data portal, such as subscription requests, updates, comments, and system events. These events are delivered through the EventBridge default event bus.

An EventBridge rule captures subscription events and invokes a custom Lambda function. This Lambda function contains the logic to manage access policies for the subscribed unmanaged asset, automating the subscription process for unstructured S3 assets. This approach streamlines data access while ensuring proper governance.

To learn more about working with events using EventBridge, refer to Events via Amazon EventBridge default bus.

The solution architecture is shown in the following screenshot.

Custom subscription workflow architecture diagram

To implement the solution, we complete the following steps:

  1. As a data producer, publish an unstructured S3 based data asset as S3ObjectCollectionType to Amazon DataZone.
  2. For the consumer, create a custom AWS service environment in the consumer Amazon DataZone project and add a subscription target for the IAM role attached to a SageMaker notebook instance. Now, as a consumer, request access to the unstructured asset published in the previous step.
  3. When the request is approved, capture the subscription created event using an EventBridge rule.
  4. Invoke a Lambda function as the target for the EventBridge rule and pass the event payload to it:
  5. The Lambda function does 2 things:
    1. Fetches the asset details, including the Amazon Resource Name (ARN) of the S3 published asset and the IAM role ARN from the subscription target.
    2. Uses the information to update the S3 bucket policy granting List/Get access to the IAM role.

Prerequisites

To follow along with the post, you should have an AWS account. If you don’t have one, you can sign up for one.

For this post, we assume you know how to create an Amazon DataZone domain and Amazon DataZone projects. For more information, see Create domains and Working with projects and environments in Amazon DataZone.

Also, for simplicity, we use the same IAM role for the Amazon DataZone admin (creating domains) as well the producer and consumer personas.

Publish unstructured S3 data to Amazon DataZone

We have uploaded some sample unstructured data into an S3 bucket. This is the data that will be published to Amazon DataZone. You can use any unstructured data, such as an image or text file.

On the Properties tab of the S3 folder, note the ARN of the S3 bucket prefix.

Complete the following steps to publish the data:

  1. Create an Amazon DataZone domain in the account and navigate to the domain portal using the link for Data portal URL.

DataZone domain creation

  1. Create a new Amazon DataZone project (for this post, we name it unstructured-data-producer-project) for publishing the unstructured S3 data asset.
  2. On the Data tab of the project, choose Create data asset.

Data asset creation

  1. Enter a name for the asset.
  2. For Asset type, choose S3 object collection.
  3. For S3 location ARN, enter the ARN of the S3 prefix.

After you create the asset, you can add glossaries or metadata forms, but it’s not necessary for this post. You can publish the data asset so it’s now discoverable within the Amazon DataZone portal.

Set up the SageMaker notebook and SageMaker instance IAM role

Create an IAM role which will be attached to the SageMaker notebook instance. For the trust policy, allow SageMaker to assume this role and leave the Permissions tab blank. We refer to this role as the instance-role throughout the post.

SageMaker instance role

Next, create a SageMaker notebook instance from the SageMaker console. Attach the instance-role to the notebook instance.

SageMaker instance

Set up the consumer Amazon DataZone project, custom AWS service environment, and subscription target

Complete the following steps:

  1. Log in to the Amazon DataZone portal and create a consumer project (for this post, we call it custom-blueprint-consumer-project), which will used by the consumer persona to subscribe to the unstructured data asset.

Custom blueprint project name

We use the recently launched custom blueprints for AWS services for creating the environment in this consumer project. The custom blueprint allows you to bring your own environment IAM role to integrate your existing AWS resources with Amazon DataZone. For this post, we create a custom environment to directly integrate SageMaker notebook access from the Amazon DataZone portal.

  1. Before you create the custom environment, create the environment IAM role that will be used in the custom blueprint. The role should have a trust policy as shown in the following screenshot. For the permissions, attach the AWS managed policy AmazonSageMakerFullAccess. We refer to this role as the environment-role throughout the post.

Custom Environment role

  1. To create the custom environment, first enable the Custom AWS Service blueprint on the Amazon DataZone console.

Enable custom blueprint

  1. Open the blueprint to create a new environment as shown in the following screenshot.
  2. For Owning project, use the consumer project that you created earlier and for Permissions, use the environment-role.

Custom environment project and role

  1. After you create the environment, open it to create a customized URL for the SageMaker notebook access.

SageMaker custom URL

  1. Create a new custom AWS link and enter the URL from the SageMaker notebook.

You can find it by navigating to the SageMaker console and choosing Notebooks in the navigation pane.

  1. Choose Customize to add the custom link.

Add the custom link

  1. Next, create a subscription target in the custom environment to pass the instance role that needs access to the unstructured data.

A subscription target is an Amazon DataZone engineering concept that allows Amazon DataZone to fulfill subscription requests for managed assets by granting access based on the information defined in the target like domain-id, environment-id, or authorized-principals.

Currently, creation of subscription targets is only allowed using the AWS Command Line Interface (AWS CLI). You can use the command create-subscription-target to create the subscription target.

The following is an example JSON payload for the subscription target creation. Create it as a JSON file on your workstation (for this post, we call it blog-sub-target.json). Replace the domain ID and the environment ID with the corresponding values for your domain and environment.

{
"domainIdentifier": "<<your-domain-id>>",
"environmentIdentifier": "<<your-environment-id>>",
"name": "custom-s3-target-consumerenv",
"type": "GlueSubscriptionTargetType",
"manageAccessRole": "<<provide the environment-role here>>",
"applicableAssetTypes": ["S3ObjectCollectionAssetType"],
"provider": "Custom Provider",
"authorizedPrincipals": [ "<<provide the instance-role here>>"],
"subscriptionTargetConfig": [{
"formName": "GlueSubscriptionTargetConfigForm",
"content": "{\"databaseName\":\"customdb1\"}"
}]
}

You can get the domain ID from the user name button in the upper right Amazon DataZone data portal; it’s in the format dzd_<<some-random-characters>>.

For the environment ID, you can find it on the Settings tab of the environment within your consumer project.

  1. Open an AWS CloudShell environment and upload the JSON payload file using the Actions option in the CloudShell terminal.
  2. You can now create a new subscription target using the following AWS CLI command:

aws datazone create-subscription-target --cli-input-json file://blog-sub-target.json

Create subscription target

  1. To verify the subscription target was created successfully, run the list-subscription-target command from the AWS CloudShell environment:
aws datazone list-subscription-targets —domain-identifier <<domain-id>> —environment-identifier <<environment-id>>

Create a function to respond to subscription events

Now that you have the consumer environment and subscription target set up, the next step is to implement a custom workflow for handling subscription requests.

The simplest mechanism to handle subscription events is a Lambda function. The exact implementation may vary based on environment; for this post, we walk through the steps to create a simple function to handle subscription creation and cancellation.

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose Create function.
  3. Select Author from scratch.
  4. For Function name, enter a name (for example, create-s3policy-for-subscription-target).
  5. For Runtime¸ choose Python 3.12.
  6. Choose Create function.

Author Lambda function

This should open the Code tab for the function and allow editing of the Python code for the function. Let’s look at some of the key components of a function to handle the subscription for unmanaged S3 assets.

Handle only relevant events

When the function gets invoked, we check to make sure it’s one of the events that’s relevant for managing access. Otherwise, the function can simply return a message without taking further action.

def lambda_handler(event, context):
    # Get the basic info about the event
    event_detail = event['detail']

    # Make sure it's one of the events we're interested in
    event_source = event['source']
    event_type = event['detail-type']

    if event_source != 'aws.datazone':
        return '{"Response" : "Not a DataZone event"}'
    elif event_type not in ['Subscription Created', 'Subscription Cancelled', 
                               'Subscription Revoked']:
        return '{"Response" : "Not a subscription created, cancelled, or revoked event"}'

These subscription events should include both the domain ID and a request ID (among other attributes). You can use these to look up the details of the subscription request in Amazon DataZone:

sub_request = dz.get_subscription_request_details(
domainIdentifier = domain_id,
identifier= sub_request_id
)
asset_listing = sub_request['subscribedListings'][0]['item']['assetListing']
form_data = json.loads(asset_listing['forms'])
asset_id = asset_listing['entityId']
asset_version = asset_listing['entityRevision']
asset_type = asset_listing['entityType']

Part of the subscription request should include the ARN for the S3 bucket in question, so you can retrieve that:

# We only want to take action if this is a S3 asset
    if asset_type == 'S3ObjectCollectionAssetType':
        # Get the bucket ARN from the form info for the asset
        bucket_arn = form_data['S3ObjectCollectionForm']['bucketArn']
        
        #Get the principal from the subscription target
        principal = get_principal(domain_id,project_id)

        try:
            # Get the bucket name from the ARN                    
            bucket_name_with_prefix = bucket_arn.split(':')[5]
            bucket_name = bucket_name_with_prefix.split('/')[0]
           
        except IndexError:
            response = '{"Response" : "Could not find bucket name in ARN"}'
            return response

You can also use the Amazon DataZone API calls to get the environment associated with the project making the subscription request for this S3 asset. After retrieving the environment ID, you can check which IAM principals have been authorized to access unmanaged S3 assets using the subscription target:

        list_sub_target = dz.list_subscription_targets(
            domainIdentifier=domain_id,
            environmentIdentifier=environment_id,
            maxResults=50,
            sortBy='CREATED_AT',
            sortOrder='DESCENDING'
            )
        
        print('asset type:', list_sub_target['items'][0]['applicableAssetTypes'])
        
        if list_sub_target['items'][0]['applicableAssetTypes'] == ['S3ObjectCollectionAssetType']:
            role_arn = list_sub_target['items'][0]['authorizedPrincipals']
            print('role arn',role_arn)

If this is a new subscription, add the relevant IAM principal to the S3 bucket policy by appending a statement that allows the desired S3 actions on this bucket for the new principal:

        if event_type == 'Subscription Created':
            if bucket_arn[-1] == '/':
                statement_block.append({
                    'Sid' : sid_string,
                    'Action': S3_ACTION_STRING,
                    'Resource': [
                        bucket_arn,
                        bucket_arn + '*'
                    ],
                    'Effect': 'Allow',
                    'Principal': {'AWS': principal}
                })

Conversely, if this is a subscription being revoked or cancelled, remove the previously added statement from the bucket policy to make sure the IAM principal no longer has access:

        elif event_type == 'Subscription Cancelled' or event_type == 'Subscription Revoked':
            # Remove the statement from the policy if it's there
            # Made sure to handle case where there's no Sid for a statement
            pruned_statement_block = []
            for statement in statement_block:
                if 'Sid' not in statement or statement['Sid'] != sid_string:
                    pruned_statement_block.append(statement)
            statement_block = pruned_statement_block

The completed function should be able to handle adding or removing principals like IAM roles or users to a bucket policy. Be sure to handle cases where there is no existing bucket policy or where a cancellation means removing the only statement in the policy, meaning the entire bucket policy is no longer needed.

The following is an example of a completed function:

import json
import boto3
import os


dz = boto3.client('datazone')
s3 = boto3.client('s3')

# The list of actions to be permitted on the bucket in the newly granted policy
S3_ACTION_STRING = 's3:*'

def build_policy_statements(event_type, statement_block, principal, sub_request_id, bucket_arn):
        # Generate a Sid that should be unique
        sid_string = ''.join(c for c in f'DZ{principal}{sub_request_id}' if c.isalnum())
        # Add a new policy statement that gives the prinicpal access to whole bucket.
        # If it turns out something other than bucket ARN is allowed in asset, we can
        # get more granular than that
        # Sid that should be unique in case we need to handle unsubscribe
        print('statement block :',statement_block)
        if event_type == 'Subscription Created':
            if bucket_arn[-1] == '/':
                statement_block.append({
                    'Sid' : sid_string,
                    'Action': S3_ACTION_STRING,
                    'Resource': [
                        bucket_arn,
                        bucket_arn + '*'
                    ],
                    'Effect': 'Allow',
                    'Principal': {'AWS': principal}
                })
            else:
                statement_block.append({
                    'Sid' : sid_string,
                    'Action': S3_ACTION_STRING,
                    'Resource': [
                        bucket_arn,
                        bucket_arn + '/*'
                    ],
                    'Effect': 'Allow',
                    'Principal': {'AWS': principal}
                })
        elif event_type == 'Subscription Cancelled' or event_type == 'Subscription Revoked':
            # Remove the statement from the policy if it's there
            # Made sure to handle case where there's no Sid for a statement
            pruned_statement_block = []
            for statement in statement_block:
                if 'Sid' not in statement or statement['Sid'] != sid_string:
                    pruned_statement_block.append(statement)
            statement_block = pruned_statement_block
           

        return statement_block

def lambda_handler(event, context):
    """Lambda function reacting to DataZone subscribe events

    Parameters
    ----------
    event: dict, required
        Event Bridge Events Format

    context: object, required
        Lambda Context runtime methods and attributes

    Returns
    ------
        Simple reponse indicating success or failure reason
    """
    # Get the basic info about the event
    event_detail = event['detail']

    # Make sure it's one of the events we're interested in
    event_source = event['source']
    event_type = event['detail-type']

    if event_source != 'aws.datazone':
        return '{"Response" : "Not a DataZone event"}'
    elif event_type not in ['Subscription Created', 'Subscription Cancelled', 
                               'Subscription Revoked']:
        return '{"Response" : "Not a subscription created, cancelled, or revoked event"}'

    
    # get the domain_id and other information
    domain_id = event_detail['metadata']['domain']
    project_id = event_detail['metadata']['owningProjectId']
    sub_request_id = event_detail['data']['subscriptionRequestId']
    listing_id = event_detail['data']['subscribedListing']['id']
    listing_version = event_detail['data']['subscribedListing']['version']
    
    print('domain-id',domain_id)
    print('project-id:',project_id)
    
    sub_request = dz.get_subscription_request_details(
        domainIdentifier = domain_id,
        identifier= sub_request_id
    )
   
    # Retrieve info about the asset from the request
    asset_listing = sub_request['subscribedListings'][0]['item']['assetListing']
    form_data = json.loads(asset_listing['forms'])
    asset_id = asset_listing['entityId']
    asset_version = asset_listing['entityRevision']
    asset_type = asset_listing['entityType']

    # We only want to take action if this is a S3 asset
    if asset_type == 'S3ObjectCollectionAssetType':
        # Get the bucket ARN from the form info for the asset
        bucket_arn = form_data['S3ObjectCollectionForm']['bucketArn']
        
        #Get the principal from the subscription target
        principal = get_principal(domain_id,project_id)

        try:
            # Get the bucket name from the ARN                    
            bucket_name_with_prefix = bucket_arn.split(':')[5]
            bucket_name = bucket_name_with_prefix.split('/')[0]
           
        except IndexError:
            response = '{"Response" : "Could not find bucket name in ARN"}'
            return response

        # Get the current bucket policy, or else make a blank one if there currently
        # is no policy
        try:
            bucket_policy = json.loads(s3.get_bucket_policy(Bucket=bucket_name)['Policy'])
        except s3.exceptions.from_code('NoSuchBucketPolicy'):
            bucket_policy = {'Statement': []}
        except:
            response = '{"Response" : "Could not get bucket policy"}'
            return response
        
        # Gets new policy with the subscribing principal either added or removed based on
        # event type
        new_policy_statements = build_policy_statements(event_type, bucket_policy['Statement'], principal, 
                                               sub_request_id, bucket_arn)

            
        # Write back the new policy. This can fail if the new policy is too big
        # or if for some reason the function role doesn't have rights to do this
        # If we removed the only policy statement, then just delete the policy
        try: 
            if not new_policy_statements:
                s3.delete_bucket_policy(Bucket = bucket_name)
            else:
                bucket_policy['Statement'] = new_policy_statements
                policy_string = json.dumps(bucket_policy)
                print('policy string :',policy_string)
                s3.put_bucket_policy(
                    Bucket=bucket_name,
                    Policy = policy_string
                )
        except Exception as e: 
            response = f'{{"Response" : "Error updating bucket policy: {e.args}"}}'
            return response
        
        # If we got here everything went as planned
        response = f'{{"Response" : "Updated policy for " + {bucket_name}}}'
    else:
        response = '{"Response" : "Not an S3 asset"}'


    return response

def get_principal(domain_id,project_id):
    # Call list environments to get the environment id
    listenv_request = dz.list_environments(
        domainIdentifier = domain_id,
        projectIdentifier= project_id
    )
    
   # In our example environment, there is only one of these
    environment_id = listenv_request['items'][0]['id']

   # Get the role we want to give access to from the subscription target info
    list_sub_target = dz.list_subscription_targets(
        domainIdentifier=domain_id,
        environmentIdentifier=environment_id,
        maxResults=50,
        sortBy='CREATED_AT',
        sortOrder='DESCENDING'
        )

    if list_sub_target['items'][0]['applicableAssetTypes'] == ['S3ObjectCollectionAssetType']:
       role_arn = list_sub_target['items'][0]['authorizedPrincipals']
   else:
        role_arn = []

    return role_arn

Because this Lambda function is intended to manage bucket policies, the role assigned to it will need a policy that allows the following actions on any buckets it is intended to manage:

  • s3:GetBucketPolicy
  • s3:PutBucketPolicy
  • s3:DeleteBucketPolicy

Now you have a function that is capable of editing bucket policies to add or remove the principals configured for your subscription targets, but you need something to invoke this function any time a subscription is created, cancelled, or revoked. In the next section, we cover how to use EventBridge to integrate this new function with Amazon DataZone.

Respond to subscription events in EventBridge

For events that take place within Amazon DataZone, it publishes information about each event in EventBridge. You can watch for any of these events, and invoke actions based on matching predefined rules. In this case, we’re interested in asset subscriptions being created, cancelled, or revoked, because those will determine when we grant or revoke access to the data in Amazon S3.

  1. On the EventBridge console, choose Rules in the navigation pane.

The default event bus should automatically be present; we use it for creating the Amazon DataZone subscription rule.

  1. Choose Create rule.
  2. In the Rule detail section, enter the following:
    1. For Name, enter a name (for example, DataZoneSubscriptions).
    2. For Description, enter a description that explains the purpose of the rule.
    3. For Event bus, choose default.
    4. Turn on Enable the rule on the selected event bus.
    5. For Rule type, select Rule with an event pattern.
  3. Choose Next.

EventBridge rule

  1. In the Event source section, select AWS Events or EventBridge partner events as the source of the events.

Define Event source

  1. In the Creation method section, select Custom Pattern (JSON editor) to enable exact specification of the events needed for this solution.

Choose custom pattern

  1. In the Event pattern section, enter the following code:

{
"detail-type": ["Subscription Created", "Subscription Cancelled", "Subscription Revoked"],
"source": ["aws.datazone"]
}

Define custom pattern JSON

  1. Choose Next.

Now that we’ve defined the events to watch for, we can make sure those Amazon DataZone events get sent to the Lambda function we defined in the previous section.

  1. On the Select target(s) page, enter the following for Target 1:
    1. For Target types, select AWS service.
    2. For Select a target, choose Lambda function
    3. For Function, choose create-s3policy-for-subscription-target.
  2. Choose Skip to Review and create.

Define event target

  1. On the Review and create page, choose Create rule.

Subscribe to the unstructured data asset

Now that you have the custom subscription workflow in place, you can test the workflow by subscribing to the unstructured data asset.

  1. In the Amazon DataZone portal, search for the unstructured data asset you published by browsing the catalog.

Search unstructured asset

  1. Subscribe to the unstructured data asset using the consumer project, which starts the Amazon DataZone approval workflow.

Subscribe to unstructured asset

  1. You should get a notification for the subscription request; follow the link and approve it.

When the subscription is approved, it will invoke the custom EventBridge Lambda workflow, which will create the S3 bucket policies for the instance role to access the S3 object. You can verify that by navigating to the S3 bucket and reviewing the permissions.

Access the subscribed asset from the Amazon DataZone portal

Now that the consumer project has been given access to the unstructured asset, you can access it from the Amazon DataZone portal.

  1. In the Amazon DataZone portal, open the consumer project and navigate to the Environments
  2. Choose the SageMaker-Notebook

Choose SageMaker notebook on the consumer project

  1. In the confirmation pop-up, choose Open custom.

Choose Custom

This will redirect you to the SageMaker notebook assuming the environment role. You can see the SageMaker notebook instance.

  1. Choose Open JupyterLab.

Open JupyterLab Notebook

  1. Choose conda_python3 to launch a new notebook.

Launch Notebook

  1. Add code to run get_object on the unstructured S3 data that you subscribed earlier and run the cells.

Now, because the S3 bucket policy has been updated to allow the instance role access to the S3 objects, you should see the get_object call return a HTTPStatusCode of 200.

Multi-account implementation

In the instructions so far, we’ve deployed everything in a single AWS account, but in larger organizations, resources can be distributed throughout AWS accounts, often managed by AWS Organizations. The same pattern can be applied in a multi-account environment, with some minor additions. Instead of directly acting on a bucket, the Lambda function in the domain account can assume a role in other accounts that contain S3 buckets to be managed. In each account with an S3 bucket containing assets, create a role that allows editing the bucket policy and has a trust policy referencing the Lambda role in the domain account as a principal.

Clean up

If you’ve finished experimenting and don’t want to incur any further cost for the resources deployed, you can clean up the components as follows:

  1. Delete the Amazon DataZone domain.
  2. Delete the Lambda function.
  3. Delete the SageMaker instance.
  4. Delete the S3 bucket that hosted the unstructured asset.
  5. Delete the IAM roles.

Conclusion

By implementing this custom workflow, organizations can extend the simplified subscription and access workflows provided by Amazon DataZone to their unstructured data stored in Amazon S3. This approach provides greater control over unstructured data assets, facilitating discovery and access across the enterprise.

We encourage you to try out the solution for your own use case, and share your feedback in the comments.


About the Authors

Somdeb Bhattacharjee is a Senior Solutions Architect specializing on data and analytics. He is part of the global Healthcare and Life sciences industry at AWS, helping his customers modernize their data platform solutions to achieve their business outcomes.

Sam YatesSam Yates is a Senior Solutions Architect in the Healthcare and Life Sciences business unit at AWS. He has spent most of the past two decades helping life sciences companies apply technology in pursuit of their missions to help patients. Sam holds BS and MS degrees in Computer Science.

Introducing the AWS Network Firewall CloudWatch Dashboard

Post Syndicated from Ajinkya Patil original https://aws.amazon.com/blogs/security/introducing-the-aws-network-firewall-cloudwatch-dashboard/

Amazon CloudWatch dashboards are customizable pages in the CloudWatch console that you can use to monitor your resources in a single view. This post focuses on deploying a CloudWatch dashboard that you can use to create a customizable monitoring solution for your AWS Network Firewall firewall. It’s designed to provide deeper insights into your firewall’s performance and security events simplifying security monitoring.

Network Firewall is a managed service that you can use to deploy essential network protections to Amazon Virtual Private Clouds (Amazon VPCs). Network Firewall provides comprehensive logs and metrics through CloudWatch, and we’re expanding its capabilities with this CloudWatch dashboard. This enhancement makes it easier to visualize, analyze, and act on the wealth of data generated by your firewall.

This open source solution streamlines network security monitoring with a user-friendly AWS CloudFormation template that quickly deploys a dedicated monitoring dashboard. This solution incorporates a suite of CloudWatch features—basic monitoring metrics, vended logs, Logs Insights queries, Contributor Insights rules, and the dashboard itself—into a centralized view. Preconfigured widgets provide instant insights into critical areas such as top talkers, protocol distributions, and alert log trends, in addition to HTTP and TLS flow analysis. A consolidated view of key metrics and logs enables faster identification of potential security threats or performance issues. With all of this relevant network firewall data in one place, your team can respond more quickly to emerging security events.

In this blog post, we provide an overview of the dashboard and a step-by-step guide to deploy it in your environment.

Solution overview

The CloudWatch dashboard can be deployed in all AWS Regions where Network Firewall is available today, including the AWS GovCloud (US) Regions and China Regions. While the dashboard comes pre-configured, you can quickly adjust queries, time ranges, and refresh intervals to help meet your specific needs. By default, the dashboard queries firewall flow and alert log events over a 3-hour period, impacting the number of log events scanned. Logs Insights and Contributor Insights widgets showcase the top 10 data points by default, but you can enhance results by modifying queries or adjusting the Top Contributors value, though this might lead to increased costs. You can configure the auto-refresh interval of the widgets to get real-time visibility and optimize costs. See the Amazon CloudWatch Pricing guide for up-to-date free and paid tier pricing considerations.

The dashboard, shown in Figure 1, can be deployed using CloudFormation and includes data and analytics from the following sources:

  1. Native CloudWatch metrics from the AWS/NetworkFirewall and AWS/PrivateLinkEndpoints namespaces
  2. CloudWatch Logs Insights queries that analyze Network Firewall flow and alert logs
  3. CloudWatch Contributor Insights rules that aggregate data from Network Firewall flow and alert logs.

Figure 1: CloudWatch dashboard

Figure 1: CloudWatch dashboard

Walkthrough

In the dashboard, the Logs Insights and Contributor Insights widgets display the top 10 data points by default. You can edit the Insights queries or change the Top Contributors to a larger value to display more results, as shown in Figure 2.

Figure 2: Top Talkers dashboard showing a change to the Top Contributors value

Figure 2: Top Talkers dashboard showing a change to the Top Contributors value

You can also manually refresh the data within a single or multiple widgets, or you can configure the entire dashboard to automatically refresh at a configured time interval as shown in Figure 3. The dashboard won’t automatically refresh the widget data by default.

Figure 3: Configuring the dashboard to automatically refresh

Figure 3: Configuring the dashboard to automatically refresh

Prerequisites

Deploying the Network Firewall CloudWatch Dashboard is straightforward. You will need the following:

  1. A Network Firewall in your VPC.
  2. Your Network Firewall must be configured to publish firewall flow and alert logs to two different CloudWatch log groups. For example, firewall flow logs are published to /my-firewall-flow-logs and alert logs are published to /my-firewall-alert-logs.

If you haven’t deployed Network Firewall in your VPC, you can use one of the available AWS Network Firewall Deployment Architecture templates to create a firewall. After creating a firewall, configure CloudWatch log groups for the firewall flow and alert logs and configure stateful logging as described previously. Fine-tune your firewall policy and rule configuration and make sure that you’re routing traffic symmetrically through the firewall. With the firewall now in the routed path and publishing metrics and log events, you can proceed with this Network Firewall CloudWatch dashboard template.

Deployment

The Network Firewall dashboard CloudFormation template creates a monitoring dashboard for a single Network Firewall firewall. Make sure that you launch this CloudFormation stack in the same AWS Region and account as the firewall, regardless of whether the firewall is set up centrally or in a distributed manner.

To deploy the dashboard:

  1. Choose Launch Stack for the relevant AWS Region. Make sure that you’re signed in to the appropriate AWS account and Region.
    • Region: China

      Launch Stack

    • Region: Gov Cloud

      Launch Stack

    • Region: All other regions supported by AWS Network Firewall

      Launch Stack

  2. You will be redirected to the Create stack page in the AWS Management Console for CloudFormation. Make sure that you’re in the correct Region and using the correct template. Choose Next. The following are the Regions and their template names:
    1. China Region: nfw-cloudwatch-dashboard-china.yaml
    2. Gov Cloud Region: nfw-cloudwatch-dashboard-govcloud.yaml
    3. All other Regions: nfw-cloudwatch-dashboard.yaml
Figure 4: Make sure that you’re using the correct template

Figure 4: Make sure that you’re using the correct template

When launching the stack, you will need to enter the following parameters:

  1. Stack name: A descriptive name for this CloudFormation stack. For example, my-firewall-dashboard.
  2. Firewall name: The firewall name as seen in the Amazon VPC console. In the Amazon VPC console, choose Network Firewall in the navigation pane, then choose Firewalls.
  3. Firewall subnets: The firewall subnet IDs to which your firewall endpoints are attached. The firewall subnets can be found on the Firewall details tab of your firewall in the Amazon VPC
  4. Flow log group name: The name of the CloudWatch log group where your firewall flow logs are stored.
  5. Alert log group name: The name of the CloudWatch log group where your firewall alert logs are stored.
  6. Contributor Insights rule state: Enable or disable the Contributor Insights rules (the template defaults to enabled). Disabling will stop the rules from scanning log data and displaying results in the Contributor Insights widgets. After the rules are created, you can change the state of one or more Contributor Insights rules from CloudWatch console by choosing Insights from the navigation pane, and then choosing Contributor Insights.

After the stack reaches CREATE_COMPLETE status, go to the Outputs tab and choose the FirewallDashboardURI link to open the new dashboard in the CloudWatch Dashboards console. It might take a few minutes for the Logs Insights and Contributor Insights widgets to start displaying data. For more details about each widget, see the README. If you don’t have log events matching the query parameters in the widgets, some widgets might not show data points.

Troubleshooting

If you encounter issues during or after deployment, review the following:

  • Firewall logging is enabled and configured to use CloudWatch instead of Amazon Simple Storage Service (Amazon S3) or Amazon Kinesis.
  • Both firewall flow and alert logging are enabled, not just one.
  • Log group names are entered correctly; incorrect names will cause widgets to point to invalid data.
  • Correct subnets are selected. Incorrect choices can impact the PrivateLink metrics widgets.
  • Firewall name is entered correctly. An incorrect name can disrupt metrics widgets, dashboard, and Contributor Insights widget names and break the firewall link.

Cleaning up

You can delete the Network Firewall CloudWatch dashboard and all of the associated resources with a few clicks. Deleting the dashboard will not impact the routing and network traffic inspection performed by the firewall.

  1. Sign in to the CloudFormation console in the Region where you launched the stack and choose Stacks from the navigation pane.
  2. Select the Stack name you chose when launching the stack. For example, my-firewall-dashboard.
  3. Choose Delete.

Conclusion

We encourage you to see for yourself how this new dashboard can enhance your network security management. To get started with the AWS Network Firewall CloudWatch Dashboard, visit our GitHub repository for detailed instructions and the CloudFormation template. For a visual overview of the dashboard and its capabilities, check out our YouTube video.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.
 

Ajinkya Patil
Ajinkya Patil

Ajinkya is a Security Consultant at Amazon Professional Services, specializing in security consulting for AWS customers within the automotive industry since 2019. He has presented at AWS re:Inforce and contributed articles to the AWS Security blog and AWS Prescriptive Guidance. Beyond his professional commitments, he indulges in travel and photography.
Todd Pula
Todd Pula

Todd is a Sr. Cloud Support Engineer at AWS and a service experience owner for AWS Network Firewall. He is an accomplished problem solver helping customers build and troubleshoot complex cloud networking and security solutions. He has a Master’s degree in Information Technology and is a Cisco Certified Internetwork Expert (CCIE).
Amish Shah
Amish Shah

Amish is a seasoned product leader with over 15 years of experience in developing innovative and scalable solutions for networking, security, and cloud use cases. He currently leads the AWS Network Firewall service, where he helps to develop security solutions that protect AWS workloads. Outside of work, Amish enjoys playing cricket and soccer, loves to travel, and has recently picked up a hobby of collecting niche fragrances.

Building end-to-end data lineage for one-time and complex queries using Amazon Athena, Amazon Redshift, Amazon Neptune and dbt

Post Syndicated from Nancy Wu original https://aws.amazon.com/blogs/big-data/building-end-to-end-data-lineage-for-one-time-and-complex-queries-using-amazon-athena-amazon-redshift-amazon-neptune-and-dbt/

One-time and complex queries are two common scenarios in enterprise data analytics. One-time queries are flexible and suitable for instant analysis and exploratory research. Complex queries, on the other hand, refer to large-scale data processing and in-depth analysis based on petabyte-level data warehouses in massive data scenarios. These complex queries typically involve data sources from multiple business systems, requiring multilevel nested SQL or associations with numerous tables for highly sophisticated analytical tasks.

However, combining the data lineage of these two query types presents several challenges:

  1. Diversity of data sources
  2. Varying query complexity
  3. Inconsistent granularity in lineage tracking
  4. Different real-time requirements
  5. Difficulties in cross-system integration

Moreover, maintaining the accuracy and completeness of lineage information while providing system performance and scalability are crucial considerations. Addressing these challenges requires a carefully designed architecture and advanced technical solutions.

Amazon Athena offers serverless, flexible SQL analytics for one-time queries, enabling direct querying of Amazon Simple Storage Service (Amazon S3) data for rapid, cost-effective instant analysis. Amazon Redshift, optimized for complex queries, provides high-performance columnar storage and massively parallel processing (MPP) architecture, supporting large-scale data processing and advanced SQL capabilities. Amazon Neptune, as a graph database, is ideal for data lineage analysis, offering efficient relationship traversal and complex graph algorithms to handle large-scale, intricate data lineage relationships. The combination of these three services provides a powerful, comprehensive solution for end-to-end data lineage analysis.

In the context of comprehensive data governance, Amazon DataZone offers organization-wide data lineage visualization using Amazon Web Services (AWS) services, while dbt provides project-level lineage through model analysis and supports cross-project integration between data lakes and warehouses.

In this post, we use dbt for data modeling on both Amazon Athena and Amazon Redshift. dbt on Athena supports real-time queries, while dbt on Amazon Redshift handles complex queries, unifying the development language and significantly reducing the technical learning curve. Using a single dbt modeling language not only simplifies the development process but also automatically generates consistent data lineage information. This approach offers robust adaptability, easily accommodating changes in data structures.

By integrating Amazon Neptune graph database to store and analyze complex lineage relationships, combined with AWS Step Functions and AWS Lambda functions, we achieve a fully automated data lineage generation process. This combination promotes consistency and completeness of lineage data while enhancing the efficiency and scalability of the entire process. The result is a powerful and flexible solution for end-to-end data lineage analysis.

Architecture overview

The experiment’s context involves a customer already using Amazon Athena for one-time queries. To better accommodate massive data processing and complex query scenarios, they aim to adopt a unified data modeling language across different data platforms. This led to the implementation of both Athena on dbt and Amazon Redshift on dbt architectures.

AWS Glue crawler crawls data lake information from Amazon S3, generating a Data Catalog to support dbt on Amazon Athena data modeling. For complex query scenarios, AWS Glue performs extract, transform, and load (ETL) processing, loading data into the petabyte-scale data warehouse, Amazon Redshift. Here, data modeling uses dbt on Amazon Redshift.

Lineage data original files from both parts are loaded into an S3 bucket, providing data support for end-to-end data lineage analysis.

The following image is the architecture diagram for the solution.

Figure 1-Architecture diagram of DBT modeling based on Athena and Redshift

Some important considerations:

This experiment uses the following data dictionary:

Source table Tool Target table
imdb.name_basics DBT/Athena stg_imdb__name_basics
imdb.title_akas DBT/Athena stg_imdb__title_akas
imdb.title_basics DBT/Athena stg_imdb__title_basics
imdb.title_crew DBT/Athena stg_imdb__title_crews
imdb.title_episode DBT/Athena stg_imdb__title_episodes
imdb.title_principals DBT/Athena stg_imdb__title_principals
imdb.title_ratings DBT/Athena stg_imdb__title_ratings
stg_imdb__name_basics DBT/Redshift new_stg_imdb__name_basics
stg_imdb__title_akas DBT/Redshift new_stg_imdb__title_akas
stg_imdb__title_basics DBT/Redshift new_stg_imdb__title_basics
stg_imdb__title_crews DBT/Redshift new_stg_imdb__title_crews
stg_imdb__title_episodes DBT/Redshift new_stg_imdb__title_episodes
stg_imdb__title_principals DBT/Redshift new_stg_imdb__title_principals
stg_imdb__title_ratings DBT/Redshift new_stg_imdb__title_ratings
new_stg_imdb__name_basics DBT/Redshift int_primary_profession_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift int_known_for_titles_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift names
new_stg_imdb__title_akas DBT/Redshift titles
new_stg_imdb__title_basics DBT/Redshift int_genres_flattened_from_title_basics
new_stg_imdb__title_basics DBT/Redshift titles
new_stg_imdb__title_crews DBT/Redshift int_directors_flattened_from_title_crews
new_stg_imdb__title_crews DBT/Redshift int_writers_flattened_from_title_crews
new_stg_imdb__title_episodes DBT/Redshift titles
new_stg_imdb__title_principals DBT/Redshift titles
new_stg_imdb__title_ratings DBT/Redshift titles
int_known_for_titles_flattened_from_name_basics DBT/Redshift titles
int_primary_profession_flattened_from_name_basics DBT/Redshift
int_directors_flattened_from_title_crews DBT/Redshift names
int_genres_flattened_from_title_basics DBT/Redshift genre_titles
int_writers_flattened_from_title_crews DBT/Redshift names
genre_titles DBT/Redshift
names DBT/Redshift
titles DBT/Redshift

The lineage data generated by dbt on Athena includes partial lineage diagrams, as exemplified in the following images. The first image shows the lineage of name_basics in dbt on Athena. The second image shows the lineage of title_crew in dbt on Athena.

Figure 3-Lineage of name_basics in DBT on Athena

Figure 4-Lineage of title_crew in DBT on Athena

The lineage data generated by dbt on Amazon Redshift includes partial lineage diagrams, as illustrated in the following image.

Figure 5-Lineage of name_basics and title_crew in DBT on Redshift

Referring to the data dictionary and screenshots, it’s evident that the complete data lineage information is highly dispersed, spread across 29 lineage diagrams. Understanding the end-to-end comprehensive view requires significant time. In real-world environments, the situation is often more complex, with complete data lineage potentially distributed across hundreds of files. Consequently, integrating a complete end-to-end data lineage diagram becomes crucial and challenging.

This experiment will provide a detailed introduction to processing and merging data lineage files stored in Amazon S3, as illustrated in the following diagram.

Figure 6-Merging data lineage from Athena and Redshift into Neptune

Prerequisites

To perform the solution, you need to have the following prerequisites in place:

  • The Lambda function for preprocessing lineage files must have permissions to access Amazon S3 and Amazon Redshift.
  • The Lambda function for constructing the directed acyclic graph (DAG) must have permissions to access Amazon S3 and Amazon Neptune.

Solution walkthrough

To perform the solution, follow the steps in the next sections.

Preprocess raw lineage data for DAG generation using Lambda functions

Use Lambda to preprocess the raw lineage data generated by dbt, converting it into key-value pair JSON files that are easily understood by Neptune: athena_dbt_lineage_map.json and redshift_dbt_lineage_map.json.

  1. To create a new Lambda function in the Lambda console, enter a Function name, select the Runtime (Python in this example), configure the Architecture and Execution role, then click the “Create function” button.

Figure 7-Basic configuration of athena-data-lineage-process Lambda

  1. Open the created Lambda function and on the Configuration tab, in the navigation pane, select Environment variables and choose your configurations. Using Athena on dbt processing as an example, configure the environment variables as follows (the process for Amazon Redshift on dbt is similar):
    • INPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path storing the original Athena on dbt lineage files)
    • INPUT_KEY: athena_manifest.json (the original Athena on dbt lineage file)
    • OUTPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path for storing the preprocessed output of Athena on dbt lineage files)
    • OUTPUT_KEY: athena_dbt_lineage_map.json (the output file after preprocessing the original Athena on dbt lineage file)

Figure 8-Environment variable configuration for athena-data-lineage-process-Lambda

  1. On the Code tab, in the lambda_function.py file, enter the preprocessing code for the raw lineage data. Here’s a code reference using Athena on dbt processing as an example (the process for Amazon Redshift on dbt is similar). The preprocessing code for Athena on dbt’s original lineage file is as follows:

The athena_manifest.json, redshift_manifest.json, and other files used in this experiment can be obtained from the Data Lineage Graph Construction GitHub repository.

import json
import boto3
import os

def lambda_handler(event, context):
    # Set up S3 client
    s3 = boto3.client('s3')

    # Get input and output paths from environment variables
    input_bucket = os.environ['INPUT_BUCKET']
    input_key = os.environ['INPUT_KEY']
    output_bucket = os.environ['OUTPUT_BUCKET']
    output_key = os.environ['OUTPUT_KEY']

    # Define helper function
    def dbt_nodename_format(node_name):
        return node_name.split(".")[-1]

    # Read input JSON file from S3
    response = s3.get_object(Bucket=input_bucket, Key=input_key)
    file_content = response['Body'].read().decode('utf-8')
    data = json.loads(file_content)
    lineage_map = data["child_map"]
    node_dict = {}
    dbt_lineage_map = {}

    # Process data
    for item in lineage_map:
        lineage_map[item] = [dbt_nodename_format(child) for child in lineage_map[item]]
        node_dict[item] = dbt_nodename_format(item)

    # Update key names
    lineage_map = {node_dict[old]: value for old, value in lineage_map.items()}
    dbt_lineage_map["lineage_map"] = lineage_map

    # Convert result to JSON string
    result_json = json.dumps(dbt_lineage_map)

    # Write JSON string to S3
    s3.put_object(Body=result_json, Bucket=output_bucket, Key=output_key)
    print(f"Data written to s3://{output_bucket}/{output_key}")

    return {
        'statusCode': 200,
        'body': json.dumps('Athena data lineage processing completed successfully')
    }

Merge preprocessed lineage data and write to Neptune using Lambda functions

  1. Before processing data with the Lambda function, create a Lambda layer by uploading the required Gremlin plugin. For detailed steps on creating and configuring Lambda Layers, see the AWS Lambda Layers documentation.

Because connecting Lambda to Neptune for constructing a DAG requires the Gremlin plugin, it needs to be uploaded before using Lambda. The Gremlin package can be obtained from the Data Lineage Graph Construction GitHub repository.

Figure 9-Lambda layers

  1. Create a new Lambda function. Choose the function to configure. To the recently created layer, at the bottom of the page, choose Add a layer.

Figure 10_Add a layer

Create another Lambda layer for the requests library, similar to how you created the layer for the Gremlin plugin. This library will be used for HTTP client functionality in the Lambda function.

  1. Choose the recently created Lambda function to configure. Connect to Neptune through Lambda to merge the two datasets and construct a DAG. On the Code tab, the reference code to execute is as follows:
import json
import boto3
import os
import requests
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import get_credentials
from botocore.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed

def read_s3_file(s3_client, bucket, key):
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = json.loads(response['Body'].read().decode('utf-8'))
        return data.get("lineage_map", {})
    except Exception as e:
        print(f"Error reading S3 file {bucket}/{key}: {str(e)}")
        raise

def merge_data(athena_data, redshift_data):
    return {**athena_data, **redshift_data}

def sign_request(request):
    credentials = get_credentials(Session())
    auth = SigV4Auth(credentials, 'neptune-db', os.environ['AWS_REGION'])
    auth.add_auth(request)
    return dict(request.headers)

def send_request(url, headers, data):
    try:
        response = requests.post(url, headers=headers, data=data, timeout=30)
        response.raise_for_status()
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Request Error: {str(e)}")
        if hasattr(e.response, 'text'):
            print(f"Response content: {e.response.text}")
        raise

def write_to_neptune(data):
    endpoint = 'https://your neptune endpoint name:8182/gremlin'
    # replace with your neptune endpoint name

    # Clear Neptune database
    clear_query = "g.V().drop()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': clear_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': clear_query}))
    print(f"Clear database response: {response}")

    # Verify if the database is empty
    verify_query = "g.V().count()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': verify_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': verify_query}))
    print(f"Vertex count after clearing: {response}")
    
    def process_node(node, children):
        # Add node
        query = f"g.V().has('lineage_node', 'node_name', '{node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{node}'))"
        request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
        signed_headers = sign_request(request)
        response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
        print(f"Add node response for {node}: {response}")

        for child_node in children:
            # Add child node
            query = f"g.V().has('lineage_node', 'node_name', '{child_node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{child_node}'))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add child node response for {child_node}: {response}")

            # Add edge
            query = f"g.V().has('lineage_node', 'node_name', '{node}').as('a').V().has('lineage_node', 'node_name', '{child_node}').coalesce(inE('lineage_edge').where(outV().as('a')), addE('lineage_edge').from('a').property('edge_name', ' '))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add edge response for {node} -> {child_node}: {response}")

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_node, node, children) for node, children in data.items()]
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error in processing node: {str(e)}")

def lambda_handler(event, context):
    # Initialize S3 client
    s3_client = boto3.client('s3')

    # S3 bucket and file paths
    bucket_name = 'data-lineage-analysis' # Replace with your S3 bucket name
    athena_key = 'athena_dbt_lineage_map.json' # Replace with your athena lineage key value output json name
    redshift_key = 'redshift_dbt_lineage_map.json' # Replace with your redshift lineage key value output json name

    try:
        # Read Athena lineage data
        athena_data = read_s3_file(s3_client, bucket_name, athena_key)
        print(f"Athena data size: {len(athena_data)}")

        # Read Redshift lineage data
        redshift_data = read_s3_file(s3_client, bucket_name, redshift_key)
        print(f"Redshift data size: {len(redshift_data)}")

        # Merge data
        combined_data = merge_data(athena_data, redshift_data)
        print(f"Combined data size: {len(combined_data)}")

        # Write to Neptune (including clearing the database)
        write_to_neptune(combined_data)

        return {
            'statusCode': 200,
            'body': json.dumps('Data successfully written to Neptune')
        }
    except Exception as e:
        print(f"Error in lambda_handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

Create Step Functions workflow

  1. On the Step Functions console, choose State machines, and then choose Create state machine. On the Choose a template page, select Blank template.

Figure 11-Step Functions blank template

  1. In the Blank template, choose Code to define your state machine. Use the following example code:
{
  "Comment": "Daily Data Lineage Processing Workflow",
  "StartAt": "Parallel Processing",
  "States": {
    "Parallel Processing": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Process Athena Data",
          "States": {
            "Process Athena Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "athena-data-lineange-process-Lambda", ##Replace with your Athena data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Process Redshift Data",
          "States": {
            "Process Redshift Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "redshift-data-lineange-process-Lambda", ##Replace with your Redshift data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "Load Data to Neptune"
    },
    "Load Data to Neptune": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "data-lineage-analysis-lambda" ##Replace with your Lambda function Name
      },
      "End": true
    }
  }
}
  1. After completing the configuration, choose the Design tab to view the workflow shown in the following diagram.

Figure 12-Step Functions design view

Create scheduling rules with Amazon EventBridge

Configure Amazon EventBridge to generate lineage data daily during off-peak business hours. To do this:

  1. Create a new rule in the EventBridge console with a descriptive name.
  2. Set the rule type to “Schedule” and configure it to run once daily (using either a fixed rate or the Cron expression “0 0 * * ? *”).
  3. Select the AWS Step Functions state machine as the target and specify the state machine you created earlier.

Query results in Neptune

  1. On the Neptune console, select Notebooks. Open an existing notebook or create a new one.

Figure 13-Neptune notebook

  1. In the notebook, create a new code cell to perform a query. The following code example shows the query statement and its results:
%%gremlin -d node_name -de edge_name
g.V().hasLabel('lineage_node').outE('lineage_edge').inV().hasLabel('lineage_node').path().by(elementMap())

You can now see the end-to-end data lineage graph information for both dbt on Athena and dbt on Amazon Redshift. The following image shows the merged DAG data lineage graph in Neptune.

Figure 14-Merged DAG data lineage graph in Neptune

You can query the generated data lineage graph for data related to a specific table, such as title_crew.

The sample query statement and its results are shown in the following code example:

%%gremlin -d node_name -de edge_name
g.V().has('lineage_node', 'node_name', 'title_crew')
  .repeat(
    union(
      __.inE('lineage_edge').outV(),
      __.outE('lineage_edge').inV()
    )
  )
  .until(
    __.has('node_name', within('names', 'genre_titles', 'titles'))
    .or()
    .loops().is(gt(10))
  )
  .path()
  .by(elementMap())

The following image shows the filtered results based on title_crew table in Neptune.

Figure 15-Filtered results based on title_crew table in Neptune

Clean up

To clean up your resources, complete the following steps:

  1. Delete EventBridge rules
# Stop new events from triggering while removing dependencies
aws events disable-rule --name <rule-name>
# Break connections between rule and targets (like Lambda functions)
aws events remove-targets --rule <rule-name> --ids <target-id>
# Remove the rule completely from EventBridge
aws events delete-rule --name <rule-name>
  1. Delete Step Functions state machine
# Stop all running executions
aws stepfunctions stop-execution --execution-arn <execution-arn>
# Delete the state machine
aws stepfunctions delete-state-machine --state-machine-arn <state-machine-arn>
  1. Delete Lambda functions
# Delete Lambda function
aws lambda delete-function --function-name <function-name>
# Delete Lambda layers (if used)
aws lambda delete-layer-version --layer-name <layer-name> --version-number <version>
  1. Clean up the Neptune database
# Delete all snapshots
aws neptune delete-db-cluster-snapshot --db-cluster-snapshot-identifier <snapshot-id>
# Delete database instance
aws neptune delete-db-instance --db-instance-identifier <instance-id> --skip-final-snapshot
# Delete database cluster
aws neptune delete-db-cluster --db-cluster-identifier <cluster-id> --skip-final-snapshot
  1. Follow the instructions at Deleting a single object to clean up the S3 buckets

Conclusion

In this post, we demonstrated how dbt enables unified data modeling across Amazon Athena and Amazon Redshift, integrating data lineage from both one-time and complex queries. By using Amazon Neptune, this solution provides comprehensive end-to-end lineage analysis. The architecture uses AWS serverless computing and managed services, including Step Functions, Lambda, and EventBridge, providing a highly flexible and scalable design.

This approach significantly lowers the learning curve through a unified data modeling method while enhancing development efficiency. The end-to-end data lineage graph visualization and analysis not only strengthen data governance capabilities but also offer deep insights for decision-making.

The solution’s flexible and scalable architecture effectively optimizes operational costs and improves business responsiveness. This comprehensive approach balances technical innovation, data governance, operational efficiency, and cost-effectiveness, thus supporting long-term business growth with the adaptability to meet evolving enterprise needs.

With OpenLineage-compatible data lineage now generally available in Amazon DataZone, we plan to explore integration possibilities to further enhance the system’s capability to handle complex data lineage analysis scenarios.

If you have any questions, please feel free to leave a comment in the comments section.


About the authors

nancynwu+photo

Nancy Wu is a Solutions Architect at AWS, responsible for cloud computing architecture consulting and design for multinational enterprise customers. Has many years of experience in big data, enterprise digital transformation research and development, consulting, and project management across telecommunications, entertainment, and financial industries.

Xu+Feng+PhotoXu Feng is a Senior Industry Solution Architect at AWS, responsible for designing, building, and promoting industry solutions for the Media & Entertainment and Advertising sectors, such as intelligent customer service and business intelligence. With 20 years of software industry experience, currently focused on researching and implementing generative AI and AI-powered data solutions.

Xu+Da+PhotoXu Da is a Amazon Web Services (AWS) Partner Solutions Architect based out of Shanghai, China. He has more than 25 years of experience in IT industry, software development and solution architecture. He is passionate about collaborative learning, knowledge sharing, and guiding community in their cloud technologies journey.

Accelerate Amazon Redshift secure data use with Satori – Part 2

Post Syndicated from Rohit Vashishtha original https://aws.amazon.com/blogs/big-data/accelerate-amazon-redshift-secure-data-use-with-satori-part-2/

This post is co-written by Adam Gaulding, Solution Architect at Satori.

In this post, we continue from Accelerate Amazon Redshift secure data use with Satori – Part 1, and explain how Satori, an Amazon Redshift Ready partner, simplifies both the user experience of gaining access to data and the admin practice of granting and revoking access to data in Amazon Redshift. Satori enables both just-in-time and self-service access to data.

Solution overview

Satori creates a transparent layer providing visibility and control capabilities that is deployed in front of your existing Redshift data warehouse. When adding a new data store to Satori, a new, Satori-provided URL is generated for the data store, which data consumers use instead of connecting directly.

The following diagram illustrates the solution architecture.

Data consumers don’t have to change how they work with data, such as installing different database drivers, changing their queries, or compromising on features or functionality. Satori is not a data virtualization or database federation solution that abstracts your existing data stores.

Self-service access to data is fully automated. The admin is responsible for setting up the access rules. User access privileges can be preconfigured for automated dataset access. The user can see the datasets that are available to them in their personalized data portal. The user then selects the dataset they want to use and Satori automatically applies the appropriate security, privacy, and compliance requirements.

Just-in-time access to data is also flexible but requires approval from an admin. From the user’s personalized data portal, they can see the available datasets—the only datasets they have self-service access to are already included in their My Data folder. If they see a dataset that they need but don’t have access to, they can request access to this data on-demand. The request is sent to the admin and, based on the user’s credentials, the admin can choose to approve or deny access.

The ability to facilitate and automate access to data provides the following benefits:

  • Satori improves the user experience by providing quick access to data. This increases the time-to-value of data and drives innovative decision-making.
  • Admins benefit from automating the process, significantly reducing the amount of time spent on granting and revoking access to data.

Prerequisites

Follow the steps outlined in Accelerate Amazon Redshift secure data use with Satori – Part 1 to complete the following prerequisite steps:

  1. Prepare the data.
  2. Connect to Amazon Redshift.
  3. Create a dataset and give Satori control over access to the dataset.
  4. Optionally, create security policies and revisit the concepts related to secure data access and masking policies.

After you complete the prerequisites, you’re ready to explore self-service and just-in-time access to data.

Self-service access

The following steps explain how to create self-service rules from admin and user perspectives.

Create access request and self-service rules (admin perspective)

After the admin gives Satori control over access to the dataset, they need to first preconfigure the user access rules. Complete the following steps:

  1. Navigate to the Datasets page and choose User Access Requests.
  2. In the Self-Service Access section, choose Self-Service Rule.

  1. Specify the required level of access.

The admin has several options when configuring the access rules. You can set the level of access by user or group, define when it expires, and set revocation rules.

The following screenshot shows the configuration rule for data access requests we created. In this example, the self-service user group has read-only access during the next 30 days that is set to revoke within 7 days if it’s not used.

The following figure shows an example configuration rule to add a user.

The newly created access rule and details are displayed in the list of self-service rules.

The next steps outline the data user view and steps to gain self-service access to data.

Create access request and self-service rules (user perspective)

As a user, complete the following steps:

  1. Enter the Satori personalized data portal using the Data Portal option on the options menu (three vertical dots).

The data portal will display all available datasets. Any datasets that the user already has self-service access to will appear under My Data, as shown in the following screenshot. All other datasets appear under Available Datasets.

  1. Choose the desired dataset (in this case, CustomerDataset) and request immediate access to this dataset by choosing Ask for Access to Dataset.

  1. For Access Request, choose Self Service.
  2. For Request Message, enter a reason for the request.
  3. Choose Request.

Based on the user’s identity, preconfigured access rules match the user to their respective qualifications and authorizations. In this case, the user is automatically granted access to CustomerDataset using the preconfigured self-service rules. The requested dataset appears with Status – Access Granted under My Data.

The preconfigured access rules are applied so that when this user runs their queries, certain sensitive data is redacted.

Now that access is granted, query the data using a SQL editor of your choice. In this post, we use DBeaver to connect to a Redshift cluster using the Satori hostname on the data stores tab.

When you query the data, you will see the security policies applied to the result set at runtime. In the following example, the customer table is displayed with redacted field values based on security policies.

In the following example, the credit_cards table is displayed with masking policies applied to the result values.

Just-in-time access

Just-in-time access is similar to self-service access; the only difference is that it includes an additional step of requesting access from the admin.

Create access request and self-service rules (user perspective)

The user enters the Satori personalized data portal with the same view as shown in the self-service access to data.

If the data that you need isn’t included under My Data but shows under Available Datasets, you can request access to this dataset. For this example, we consider a new user John Doe trying to access CustomerDataset from the available datasets. The process consists of the following steps:

  1. User John Doe logs in to the Satori portal and finds the Available Datasets section in their data portal.
  2. The user submits a request for CustomerDataset.

The request from user John Doe for CustomerDataset stays in Pending Approval status until approved from the admin.

  1. The admin receives the request from user John Doe through email and portal notifications for dataset requests.

The admin can approve or deny the request and might also designate the level of access and when that access expires.

The following screenshot shows an example email notification.

  1. The admin can choose View Request in the email and then approve or deny the request on the Satori portal.

  1. The admin can choose the pencil icon to edit the request before approval and modify the approval conditions.

In this example, the admin modifies a couple of criteria as shown and then approves the request.

Create access request rules (admin perspective)

Users can request access to datasets and the admin can approve or reject those requests, but the admin can also preconfigure the user access rules. Complete the following steps as the admin:

  1. On the Datasets page, choose User Access Requests.
  2. Fill out the access request rule.
  3. Choose Add.

The access request rule creation will be treated as an approval workflow when dataset requests are placed from the data portal.

Dataset requests from users will follow the course of action configured by the admin during access request rules creation. The preconfigured access rules specific to that user are applied so that when this user runs their queries, security policies and masking conditions are applied, and sensitive data is redacted or masked as applicable. The access control is maintained according to the admin settings for both just-in-time access and self-service access.

Clean up

To avoid unintended costs, clean up the resources provisioned as part of Accelerate Amazon Redshift secure data use with Satori – Part 1 or provisioned for this post. Make sure to delete the following resources:

  • Redshift cluster or serverless endpoint
  • Security group to allow inbound traffic from Satori
  • Configurations within your Satori account

Summary

In this post, we described how Satori can help automate secure data access for both data users and admins. The ability to automate this process increases the time-to-value of data for users and reduces the time and resources admins need to allocate for granting and revoking data access.

Satori is available on the AWS Marketplace. To learn more, start a free trial or request a demo meeting.

Amazon Redshift provides comprehensive security and governance features to protect your data, and continues to expand its out-of-the-box capabilities. For the latest features and updates, explore Amazon Redshift What’s New.


About the Authors

Rohit Vashishtha is a Senior Analytics Specialist Solutions Architect at AWS based in Dallas, Texas. He has over 17 years of experience architecting, building, leading, and maintaining big data platforms. Rohit helps customers modernize their analytic workloads using the breadth of AWS services and ensures that customers get the best price/performance with utmost security and data governance.

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

 Adam Gaulding is a Solution Architect at Satori. At Satori, Adam is helping customers implement data security controls on databases, data lakes and data warehouses. Adam has been in and around the data space throughout his 20+ year career. He’s worked with companies large and small and prides himself in building creative solutions for technical problems.

How to Make Simple Email Service Resilient Across Two AWS Regions with Global Endpoints

Post Syndicated from Zip Zieper original https://aws.amazon.com/blogs/messaging-and-targeting/how-to-make-simple-email-service-resilient-across-two-aws-regions-with-global-endpoints/

Introduction

Amazon Simple Email Service (SES) recently announced Global Endpoints, a major enhancement to its email sending features. This new capability improves the availability and reliability of SES API v2 email sending workloads by automatically distributing messages, in an active/active configuration across the Primary and Secondary AWS regions. When Global Endpoints detects degradation in either the Primary or Secondary SES region, the feature automatically shifts all traffic to the healthy region, no customer intervention is needed.

Multi-Region SES Configuration Challenges

Customers face significant difficulties in correctly implementing a multi-region setup or disaster recovery setup. The process requires careful curation of systems along the failover path and ensuring high availability of these systems. Ironically, the system designed to trigger failover can itself fail when needed most. For many SES customers, the effort required to design, build test, monitor and maintain a two-region SES system outweighs the benefits.

SES Global Endpoints eliminates the need for these complex, custom workarounds. The feature provides a straightforward solution for maintaining email sending during unexpected regional disruptions. Global Endpoints’ built-in safeguards ensure email infrastructure remains resilient when it matters most. Please note that at launch, Global Endpoints does not support SMTP or VPC endpoint access.

SES Global Endpoints: Key Technological Components

Global Endpoints utilizes four new SES components that work together to provide a seamless and reliable multi-region email sending experience:

  1. Multi-Region Endpoint (MREP) is a new type of SES endpoint that automatically distributes email traffic across two AWS regions.
  2. Deterministic Easy DKIM Identities (DEED) makes it easy to setup multi-region identities without having to make any DNS changes.
    1. Read the blog introducing SES DEED for more information.
  3. Updated AWS SES Console Tool walks you through the process and simplifies the duplication of Domain Identities, Configuration sets, and Sending limits across regions.
  4. Readiness Checks in the SES console verify uniformity between configurations of key resources in both the Parent and Secondary SES regions.

How SES Global Endpoints work

GE-MREP-healthy

Figure 1 – SES Global Endpoints with two healthy regions.

Global Endpoints are resources that distribute your SES outbound workloads across two AWS Regions. When you set up Global Endpoints, you select a Primary Region (where the actual Endpoint is created) and a Secondary Region. When configured, a new Global Endpoints resource, called “multi-region endpoint” (MREP) is created and managed. Developers will need to update their SES v2 API enabled applications and services to use their unique MREP as the entry point to SES for their email sending requests.

The Global Endpoints configuration requires that your sending domain identity(s) is verified in both the Primary and Secondary regions. SES Global Endpoints uses DEED to simplify this process. DEED is a new feature that generates consistent DKIM tokens across all AWS Regions based on a Parent Domain Identity that is configured with Easy DKIM. This consistency allows SES to automatically verify a domain identity in the Secondary region once it’s verified in the Primary region, without requiring additional DNS record updates. Customers do not need to make any additional changes other than activating the DEED identity type. When customers expand their sending workload’s geographic footprint, or reconfigure their Global Endpoints settings in the future, their DEED identities will continue to be available and managed automatically. You can learn more about DEED from this post.

Global Endpoints works alongside other SES services, such as Virtual Deliverability Manager (VDM). Once Global Endpoints are enabled, you’ll continue to see per-region data on email sending performance in VDM. If you’ve configured event destinations like CloudWatch, SNS, or Firehose, you can make use of those same monitoring and alerting tools in your second region as soon as you’re ready. As noted below, although Configuration Sets are automatically duplicated in the Secondary region, you must manually duplicate your SES event destinations in those Configuration Sets.

It is important to understand that Global Endpoints is not a failover solution for SES, it’s an active-active implementation; when no regional impairment exists, SES Global Endpoints distributes sending traffic across two AWS regions. Customers who use SES’ shared IP sending pool do not need to make any changes, Global Endpoints will utilize the SES shared IP pool in the Secondary region. Customers who use standard, dedicated IPs must manually set up equivalent number of dedicated IPs in the Secondary region. Once properly configured, Global Endpoints will keep the dedicated IPs warm in both regions as long as you use the MREP and maintain a steady sending volume.

For example, when SES’s regional health monitoring detects degradation in the Primary region (as shown in the diagram below), The MREP automatically shifts all traffic to the healthy region. This illustrates the need for matching configurations in both regions, since all traffic will be sent through a single region, in this example the Secondary region, as long as the impairment exists. When SES’s regional health monitoring detects the impaired region is back to normal, traffic is once again redistributed across both regions. Importantly, no customer intervention is needed; SES Global Endpoints automatically and dynamically monitors and manages the traffic distribution via the MREP.

GE-MREP-impaired

Figure 2 – SES Global Endpoints with one impaired region.

The key benefits of using Global Endpoints include:

  • Simplified multi-region configuration
  • Automatic routing between two regions with no delay
  • More resilient email sending

Global Endpoints: Setup and Use

Using the SES Console, the Global Endpoint setup process assists in duplicating the key artifacts and sending limits from your Primary Region to your Secondary Region. This process ensures that both regions have equivalent verified identities, configuration sets, and approved sending limits sufficient for all of the expected volume. Customers can manually duplicate these key artifacts using the SES v2API or CloudFormation, but we recommend using the SES console because these steps are simplified.

Once the Global Endpoint is ready, key resources duplicated and the application or service has been updated to use the new MREP, SES Global Endpoints automatically routes your outbound traffic evenly between your primary and secondary regions using the multi-region endpoint (MREP). If the MREP detects degraded performance in the Primary or Secondary region, it will automatically route all SES traffic to the healthy region until the impairment is resolved.

Preparing the Parent Region

The high-level steps to setup Global Endpoints using the SES Console are below.  Note – you must already have a primary SES region fully operational with at least 1 fully verified sending identity with production access before setting up Global Endpoints.

Create Global Endpoint

  1. Open the SES console in the Primary AWS region.
  2. In the navigation pane, choose Global Endpoints.
  3. Choose Create Global Endpoint.
  4. Select a Secondary Region from the dropdown menu. (Your Primary Region defaults to the region to which you signed into the console)
  5. Review the configuration and choose Create.

The creation process may take a few seconds. Once completed, the status of your Global Endpoint will change to “Ready.”

Global Endpoints "ready" status

Preparing the secondary region

Once your Global Endpoint is ready, you must now ensure that the your email sending configuration, including all its components (Domain Identities, Configuration sets, email templates, and sending limits), is consistent across the primary and secondary regions before utilizing the Global endpoint for sending emails. This alignment is crucial to avoid potential issues and ensure proper email delivery and tracking.

The new Region duplication feature in the SES console assists you by automatically duplicating resources and duplicating account-level settings from the primary to the secondary region, ensuring that both regions have equivalent configurations.

The high-level steps to setup the secondary region for use with Global Endpoints using the SES Console are below. If you’d like to use the AWS CLI to manually duplicate these resources, consult with the SES v2 API documentation.

Duplicating verified domain identities

Next you’ll use the Duplicate verified domain identities feature in the SES console to create one or more domain identities in the Secondary Region. SES will then use DEED to verify the domain identities in the Secondary Region. Note that DEED can only be used if the Primary Domain Identity is already configured with Easy DKIM. Domain identities that are verified with BYODKIM will need to be created manually in the Secondary Region, as DEED is not applicable in this case.

Important – It’s crucial that both Primary and Secondary Regions have the equivalent verified identities and configuration sets that you intend to send email with, along with matching sending limits to ensure proper functionality of the Global endpoint. Any discrepancies could cause delivery failures, diminished failover reliability, and missing metrics.

To duplicate identities from the SES console:

  1. On the Global endpoints page, choose the Global endpoint you want to duplicate by selecting it from the Name column.
  2. Under the Region duplication tab, choose Duplicate identities.
  3. Select the identities you want to duplicate followed by Confirm.

To duplicate configuration sets from the SES console:

  1. On the Global endpoints page, choose the Global endpoint you want to duplicate by selecting it from the Name column.
  2. From the Region duplication tab, choose Duplicate configuration sets.
  3. Select the configuration sets you want to duplicate followed by Confirm.

Important Notes:

  • When duplicating configuration sets across regions, Event destinations and Reputation settings require manual reconfiguration in the Secondary Region to match the Primary Region’s setup. Since SES event destinations are region-specific, you’ll need to manually recreate these configurations in each region. For cross-regional monitoring and event routing, you can refer to additional AWS documentation for services like CloudWatch (cross-region dashboards), SNS (cross-region message delivery), and EventBridge (cross-region event routing) to develop a comprehensive multi-region event strategy.
  • If you are using SES email template resources, those templates need to be manually duplicated into the Secondary Region (the console is currently unable to perform this action).
  • You must manually synchronize any changes made to the Parent Region’s configuration sets to the Secondary Region to maintain sending integrity. This includes adding or removing standard dedicated IPs to both regions to ensure either region has the required IPs to manage the expected throughput in the case of a regional event or impairment.

The Duplicate production limits feature allows you to:

  • Check if production limits are aligned between primary and secondary regions
  • Request a limit increase in the secondary region if needed

To duplicate production limits from the SES console:

  1. On the Global endpoints page, choose the Global endpoint you want to duplicate by selecting it from the Name column.
  2. From the Region duplication tab, verify the status in the Duplicate production limits card. If the status displays Sending limits not aligned, choose Duplicate production limits.
  3. The Service Quotas page opens in the secondary region where you can request increases to “Sending quota” and “Sending rate” to match the values from the primary region.

Note – SES checks if sending limits are aligned between regions and allows you to request limit increases in the secondary region if needed. We recommend that you request the maximum quota you’re eligible for in both regions. While email traffic is distributed amongst both regions under normal operating conditions, during a failover event, the full volume of email traffic will be sent to one region and its limits should be enough to handle the full volume load.

If any manual steps are required to complete the Global Endpoint creation, they will be shown in the SES Console:

manual steps warning

When the Global Endpoint is fully configured, a MREP will be created with an Endpoint ID (see below). You will use this new endpoint ID when re-configuring your SendEmail and SendBulkEmail API calls. (note – Global Endpoints MREP are only supported by SES APIv2. The feature is not available using SMTP or VPC endpoints).

Now you’re ready to send your first email through SES Global Endpoints and your MREP!

Once you’ve obtained the Endpoint ID of your Global endpoint (this is the MREP), you should update your applications’ SendEmail or SendBulkEmail API calls to include the Endpoint ID value for the -endpoint-id parameter.

Here’s an example of how to specify the Endpoint ID in a SendEmail API call using the AWS CLI (modify the from & to email addresses and the --endpoint-id accordingly):

aws sesv2 send-email \
--from-email-address "[[email protected]](mailto:[email protected])" \
--destination "ToAddresses=[[email protected]](mailto:[email protected])" \
--content "Subject={Data=Test
email,Charset=UTF-8},Body={Text={Data=This is a test email sent using Amazon SES
Global endpoints.,Charset=UTF-8}}" \
--endpoint-id "abcdef12.g3h"

The Global Endpoints console page provides summary observability metrics on the combined workload and a unified view of your email sending volume across both the primary and secondary regions. You can access these metrics through the Cross-region metrics tab on the Global endpoint details page in the SES console..

Conclusion

By using a SES Global Endpoint in their SES API v2 applications and services, SES customers benefit from uninterrupted email delivery during regional service issues. SES Global Endpoints automatically distributes sending workloads across two AWS Regions, significantly enhancing resilience against regional outages. The Global Endpoints feature maintains warmed-up dedicated IP addresses in both regions, when used, and automatically shifts traffic to the healthy region when the other is impaired, without requiring customer intervention. SES Global Endpoints eliminates the pain points typically associated with manually-built, multi-region SES sending systems.

Global Endpoint’s console tools provide quick and easy setup and includes readiness checks to identify and mitigate potential misconfigurations. These enhancements simplify the configuration and management process, making it easier for customers to maintain a robust email sending infrastructure.

Overall, SES Global Endpoints addresses customer needs for a more reliable and easily managed email sending system, automating critical processes and providing robust tools for setup and maintenance. This significant improvement to the email sending experience is expected to benefit AWS customers across various industries and use cases.

Call to Action

Get started with SES Global Endpoints today to enhance your email sending resilience!

  • Visit the AWS Console to enable this feature for your account
  • Review the comprehensive documentation for step-by-step guidance.
  • For personalized assistance, don’t hesitate to contact AWS Support or your AWS account team.

Elevate your email infrastructure now to ensure uninterrupted communication with your customers, even in the face of regional disruptions.

Simplify Multi-Region Email Sending with Simple Email Service’s Deterministic Easy DKIM

Post Syndicated from Zip Zieper original https://aws.amazon.com/blogs/messaging-and-targeting/simplify-multi-region-email-sending-with-simple-email-services-deterministic-easy-dkim/

Introduction

Amazon Simple Email Service (SES) provides customers with a robust, scalable email solution to send large-scale, global email communications. The service offers customers many benefits, including scalability, high deliverability rates, cost effective pay-as-you go pricing, availability in over a dozen AWS regions, and tight integration with other AWS services.

We’ve heard from customers who want to synchronise domain identities across multiple AWS regions that it can be difficult to configure and maintain. These customers have shared that establishing and maintaining AWS region specific domain verifications can be confusing, time-consuming, and that it’s difficult to coordinate the many details across their users, customers and service providers.

In this post, we’ll explore the newly introduced SES feature called Deterministic Easy DKIM (aka “DEED”), which solves critical challenges in multi-region email identity management for SES customers. DEED generates consistent DKIM tokens across multiple AWS Regions based on a Parent (domain) Identity that is configured with SES Easy DKIM. DEED uses the Parent Identity’s Easy DKIM configuration to automatically provide the same DKIM signing configuration for a Replica (domain) Identity in a AWS Replica Region. With DEED, you only need to publish DNS records once for the Parent Identity. The Replica Identity will automatically use these same DNS records to verify domain ownership and manage DKIM signing. DEED streamlines multi-region email operations by simplifying DNS management and ensuring consistent DKIM signing across AWS regions, maintaining best-practice email authentication while reducing operational complexity.

Background and key challenges

SES introduced Easy DKIM over a decade ago as an innovative, streamlined solution to help customers create, verify, and manage domain identities with automated DKIM signing. Easy DKIM is simple to set-up, and works by generating pre-determined tokens that customers add to their DNS configurations. Once Easy DKIM is enabled, SES generates a public/private signing key for each domain identity and updates the verified identity’s CNAME public key. SES’ Easy DKIM simplifies email authentication by managing and rotating DKIM keys on behalf of customers.

Before DEED, customers who wanted to expand their SES email infrastructure across multiple regions faced complexity, operational challenges and a substantial administrative burden configuring and maintaining DKIM across AWS regions. This made it very difficult for customers to scale their email infrastructure efficiently, and often dissuaded customers from fully leveraging the potential of a multi-region email sending strategy. These challenges are more pronounced for Independent Software Vendors (ISVs) and email service providers using SES. Frequently these organizations don’t own or control the domains used by their end-customers, who must manually update their DNS entries each time the ISV expanded or shifted their SES sending infrastructure between AWS regions.

Solution Overview

Deterministic Easy DKIM (DEED) allows SES customers to set up email identities across multiple regions, leveraging an existing domain identity configuration in a single region without the need to make companion DNS changes in all other regions. This innovative approach eliminates the manual overhead of creating region-specific DNS entries, and provides a streamlined solution to configure and maintain global email infrastructure for organizations and ISVs alike.

The key benefits of using DEED include:

  • Simplified DNS Management – Publish DNS records once for the Parent Identity and the replica identity is automatically synced.
  • Easier Multi-Region Operations – Simplifies the process of expanding email sending operations to new AWS regions.
    Reduced Administrative Overhead – Manage DKIM configurations centrally from the Parent Identity.

Terminology:

To understand DEED, let’s explore the key terminology that underpins this innovative approach:

  • Deterministic – a process or system where the same input will always produce the same output, with no randomness or unpredictability involved. In other words, if all the starting conditions are known, the outcome can be precisely predicted or determined.
  • Parent Region – The original AWS Region where the primary email identity is initially established.
  • Parent Identity – A verified email identity configured with Easy DKIM that serves as the authoritative source for DKIM configuration across regions.
  • Replica Region – An AWS Region where an identical email identity is replicated without additional configuration.
  • Replica Identity – An identity that shares identical DNS configuration and DKIM signing configuration of a parent identity.
  • DEED Identity – Any identity that is used as either a parent identity or a replica identity. (When a new identity is created, it is initially treated as a regular (non-DEED) identity. However, once a replica is created, the identity is then considered a DEED identity.)

How DEED works

DEED flow

DEED is built on the existing Easy DKIM framework:

  1. Using Easy DKIM, SES generates a public-private key pair and automatically adds a DKIM signature to every message sent from a SES verified identity in the Parent Region.
  2. To accommodate authenticated SES sending from multiple AWS regions, DEED automatically synchronizes the signing keys from the Parent Region and Parent Identity to the Replica Region and Replica Identity.
  3. This automated process ensures that both the Parent and Replica Identities receive and use identical keys for DKIM signing, maintaining consistent authentication across different AWS regions.
  4. SES manages the complex process of key rotation across Parent Identity and Replica Identities, further simplifying email infrastructure management across multiple AWS regions.
  5. Replica Identities inherit the DKIM signing configuration of the parent identity. Because of this dependency, you cannot delete a Parent Identity until all Replica Identities are deleted.
  6. The receiving email server/service validates DKIM from DNS.

We recommend customers take advantage of DEED for single-Region sending, as this new capability is included, at no additional cost, in the base SES pricing.

Steps to setup SES DEED Replica in a second AWS region

These steps assume that you are already using SES in the Parent Region, and have a fully verified domain identity that is configured to use Easy DKIM. You can also use the AWS CLI.

Step 1 – Update the Parent Identity

  1. Login to the AWS SES Console in the Parent Region
  2. Click on the Identities link in the SES navigation panel (far left) and click on the verified identity you want to use in other AWS regions.
    1. The Parent Identity must have Easy DKIM enabled.
    2. You cannot create Replicas of Identities that use BYODKIM or self-signed identities.
  3. Click on the Authorization tab
  4. Click Create Policy and select Create custom policy from the drop-down menu.
  5. Name the policy (for example, DEED-example_com)
  6. Modify the IAM policy (below) with your AWS account ID and AWS region, and paste it into the IAM Policy document to grant permission on the Parent Identity to allow the desired Replica Region to replicate the Parent Identity’s DKIM Signing Attributes:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDKIMReplication",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::YOUR_ACCOUNT_ID:root"
},
"Action": "ses:ReplicateEmailIdentityDKIMSigningKey",
"Resource": "arn:aws:ses:us-east-1:123456789124:identity/example.com",
"Condition": {
"ForAllValues:StringEquals": {
"ses:ReplicaRegion": ["us-west-2", "eu-west-1"]
}
}
}
]
}

Note – Use consistent IAM policies that all allow for DKIM replication across intended Replica Regions.

Step -2 – Setup DEED Replica Identity

  1. Login to the AWS SES Console in the Replica Region
  2. Click on the Identities link in the SES navigation panel (far left) and click Create identity
  3. Under Identity details, Identity type, click Domain
  4. Type the domain name exactly as it appears in the Parent Identity
  5. Under Verifying your domain, click Deterministic Easy DKIM
  6. Select the Parent Region in the Parent region drop-down
  7. Ensure DKIM Signature is Enabled
  8. Click Create identity

create DEED ID

  1. The Replica Identity in the Replica Region will now automatically synchronize DKIM with the Parent Identity in the Parent Region and your apps/services can be configured to use either Parent or Replica Region & Identity to send DKIM authenticated email.
  2. Repeat the process if you want to create additional Replica Regions.

You can verify that the replica identity was configured correctly with the parent identity’s DKIM signing configuration by using the get-email-identity command and specifying the Replica’s domain name and region:

`aws sesv2 get-email-identity --email-identity [example.com](http://example.com/) —region us-west-2`

The response will include the value of the Parent Region in the DomainSigningAttributesOrigin parameter, signifying that the Replica Identity has been successfully configured with the Parent Identity’s DKIM signing configuration:

{
  "DkimAttributes": {
    "SigningAttributesOrigin": "AWS_SES_US_EAST_1"
  }
}

Conclusion

Deterministic Easy DKIM (DEED) represents a significant leap forward in multi-region email management for Amazon SES users. By eliminating the need for manually configured region-specific DNS configurations, DEED streamlines the process of expanding email operations across multiple AWS regions. This innovation not only reduces administrative overhead but also opens up new possibilities for organizations and ISVs to implement robust, globally distributed email infrastructures. With DEED, businesses can now leverage the full potential of Amazon SES across regions, ensuring consistent authentication, improved disaster recovery, and optimal performance without the previous complexities of multi-region setup.

Call to Action

Are you ready to simplify your multi-region email strategy with Amazon SES and DEED? Take the next step in optimizing your email infrastructure:

  1. Explore the DEED feature in your Amazon SES console today.
  2. Set up a test environment to experience the seamless multi-region configuration firsthand.
  3. For more detailed information, check out our comprehensive documentation on implementing DEED.
  4. Have questions or need assistance? Reach out to our AWS support team or join the AWS community forums to connect with other users.

Don’t let regional boundaries limit your email capabilities. Embrace the power of Deterministic Easy DKIM and transform your global email strategy with Amazon SES. Start your DEED journey today and unlock new levels of efficiency and scalability in your email operations.

Federate to Amazon Redshift Query Editor v2 with Microsoft Entra ID

Post Syndicated from Koushik Konjeti original https://aws.amazon.com/blogs/big-data/federate-to-amazon-redshift-query-editor-v2-with-microsoft-entra-id/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. With its massively parallel processing (MPP) architecture and columnar data storage, Amazon Redshift delivers high price-performance for complex analytical queries against large datasets.

To interact with and analyze data stored in Amazon Redshift, AWS provides the Amazon Redshift Query Editor V2, a web-based tool that allows you to explore, analyze, and share data using SQL. The Query Editor V2 offers a user-friendly interface for connecting to your Redshift clusters, executing queries, and visualizing results.

As organizations increasingly adopt cloud-based solutions and centralized identity management, the need for seamless and secure access to data warehouses like Amazon Redshift becomes crucial. Many customers have already implemented identity providers (IdPs) like Microsoft Entra ID (formerly Azure Active Directory) for single sign-on (SSO) access across their applications and services. For more information about using Microsoft Entra ID for federation to Amazon Redshift with SQL clients, see Federate Amazon Redshift access with Microsoft Azure AD single sign-on. This post focuses on setting up federation for accessing the Redshift Query Editor.

Through this federated setup, users can connect to the Redshift Query Editor using their existing Microsoft Entra ID credentials, allowing you to control permissions for database objects based on business groups defined in your Active Directory. This approach provides a seamless user experience while centralizing the governance of authentication and permissions for end-users, eliminating the need to manage separate credentials for data warehousing. Additionally, you can restrict access to specific datasets based on the user’s business group, so users only have access to the data they are authorized to view and manage.

In the following sections, we explore the process of federating into AWS using Microsoft Entra ID and AWS Identity and Access Management (IAM), and how to restrict access to datasets based on permissions linked to AD groups. Although the integration with AWS IAM Identity Center is the recommended approach, this post focuses on setups where IAM Identity Center might not be applicable due to compliance constraints, such as organizations requiring FedRAMP Moderate compliance, which IAM Identity Center doesn’t yet meet. We cover the prerequisites, guide you through the setup process, and demonstrate how to seamlessly connect to the Redshift Query Editor while making sure data access permissions are accurately enforced based on your Microsoft Entra ID groups.

Solution overview

The following diagram illustrates the authentication flow of Microsoft Entra ID with a Redshift cluster using federated IAM roles.

The configuration of federation between Microsoft Entra ID and IAM to enable seamless access to Amazon Redshift through a SQL client such as the Redshift Query Editor V2 involves the following main components:

  1. Users start by authenticating with their Microsoft Entra ID credentials by accessing the enterprise application’s user access URL.
  2. Upon successful authentication, the custom claims provider triggers the custom authentication extension’s token issuance start event listener.
  3. The custom authentication extension calls an Azure function (your REST API endpoint) with information about the event, user profile, session data, and other context.
  4. The Azure function makes a call to the Microsoft Graph API to retrieve the authenticated user’s group membership information.
  5. The Microsoft Graph API responds with the user’s group membership details.
  6. The Azure function takes the group information and transforms it into a colon-separated list, such as group1:group2:group3, and passes this colon-separated group information back to the custom authentication extension as a response payload.
  7. The custom authentication extension processes the response and augments the token with the user’s group information as SAML claims (principal tags). The token, now enriched with the group membership, is returned to the enterprise application.
  8. The enterprise application in Azure AD generates a SAML assertion with principal tags. It sends an HTTP POST to the user’s browser containing an HTML form. This form includes the SAML assertion and specifies the AWS sign-in SAML endpoint (https://signin.aws.amazon.com/saml) as the destination where the SAML assertion should be submitted.
  9. The browser automatically submits this SAML assertion, sending an HTTP POST to the AWS SAML endpoint. This endpoint validates and processes the SAML assertion. If multiple IAM roles are available, the user selects one. The AWS SAML endpoint then uses AWS Security Token Service (AWS STS) to generate temporary credentials for that specific role, creates a console sign-in URL, and redirects the user to the AWS Management Console. From there, the user can access the Redshift Query Editor V2. To learn more about this process, refer to Enabling SAML 2.0 federated users to access the AWS Management Console.
  10. Inside Redshift Query Editor V2, the user selects the option to authenticate using their IAM identity. This triggers the Redshift Query Editor V2 to call the GetClusterCredentialsWithIAM API, which checks the principal tags to determine the user’s database roles. If this is the user’s first login, the API automatically creates a database user and assigns the necessary database roles.
  11. The GetClusterCredentialsWithIAM API issues a temporary user name and password to the user. Using these credentials, the user logs in to the Redshift database. This login authorizes the user based on the Redshift database roles assigned earlier and allows them to run queries on the datasets.

Prerequisites

On the Microsoft Entra ID side, you need the following prerequisites to set up this solution:

  • A Microsoft Entra ID tenant – Required to set up and configure the Microsoft Entra ID service for managing and securing access to AWS resources through federation.
  • An Azure Subscription – Needed to access and use Azure services like Azure Functions.

Users should be members of specific Azure AD groups based on their access needs:

  • User A – Member of the "redshift_sales" group for access to sales datasets in Amazon Redshift, and the "AWS-<acctno>_dev-bdt-team" group for access to AWS services in the development environment. <acctno> is the AWS account where you have your Redshift cluster.
  • User B – Member of the "redshift_product" group for access to product datasets in Amazon Redshift, and the "AWS-<acctno>_dev-bdt-team" group for access to AWS services.
  • User C – Member of both "redshift_sales" and "redshift_product" groups for access to both datasets, and the "AWS-<acctno>_dev-bdt-team" group for access to AWS services.

The "AWS-<acctno>_dev-bdt-team" group in Azure AD is configured to allow users to assume an IAM role in AWS, providing the necessary permissions to access the AWS account. For a multi-account setup, create multiple groups for different environments or accounts and add users based on their access needs. For example, "AWS-<acctno>_prd-bdt-team" could be used for access to the production environment, where <acctno> reflects the account number for the production account.

On the Amazon Redshift side, you need the following resources:

  • Redshift cluster – A Redshift cluster should be available in the AWS account specified by <acctno> in the AWS-<acctno>_dev-bdt-team group. If not, follow the instructions to create a sample Redshift cluster.
  • Redshift database roles – Create database roles in Amazon Redshift that correspond to Microsoft Entra ID groups:
    • redshift_sales – For users with access to sales datasets.
    • redshift_product – For users with access to product datasets.
  • Redshift schemas – You need a Redshift schema named sales with the table sales_table, which can be accessed by users of the group redshift_sales. You also need a Redshift schema named product with the table product_table, which can be accessed by users of the group redshift_product in the dev database. You can use the following SQL statements on your Redshift cluster to create the groups and tables, inserting data into the created tables and granting access to the appropriate groups:
-- Create Redshift Roles
CREATE ROLE redshift_sales;
CREATE ROLE redshift_product;
-- Create sales schema and sales_table
CREATE SCHEMA sales;
CREATE TABLE sales.sales_table (
    id INT PRIMARY KEY,
    item VARCHAR(255),
    quantity INT,
    price DECIMAL(10,2)
);

-- Create product schema and product_table
CREATE SCHEMA product;
CREATE TABLE product.product_table (
    id INT PRIMARY KEY,
    name VARCHAR(255),
    category VARCHAR(255),
    price DECIMAL(10,2)
);
-- Insert data into sales_table
INSERT INTO sales.sales_table (id, item, quantity, price) VALUES
(1, 'Laptop', 10, 999.99),
(2, 'Smartphone', 20, 499.99),
(3, 'Headphones', 15, 199.99),
(4, 'Keyboard', 12, 89.99),
(5, 'Mouse', 30, 29.99);
-- Insert data into product_table
INSERT INTO product.product_table (id, name, category, price) VALUES
(1, 'Laptop', 'Electronics', 999.99),
(2, 'Smartphone', 'Electronics', 499.99),
(3, 'Blender', 'Home Appliances', 199.99),
(4, 'Mixer', 'Home Appliances', 89.99),
(5, 'Desk Lamp', 'Furniture', 29.99);
-- Grant usage on schema and select on all tables in the schema to redshift_sales
GRANT USAGE ON SCHEMA sales TO ROLE redshift_sales;
GRANT SELECT ON ALL TABLES IN SCHEMA sales TO ROLE redshift_sales;
-- Grant usage on schema and select on all tables in the schema to redshift_product
GRANT USAGE ON SCHEMA product TO ROLE redshift_product;
GRANT SELECT ON ALL TABLES IN SCHEMA product TO ROLE redshift_product;

Setup Azure Functions and custom authentication extensions

Complete the steps in this section to set up Azure Function and custom authentication extensions.

Create a new function app

Complete the following steps to create a new function app:

  • Open your web browser and navigate to the Azure Portal (portal.azure.com).
  • Log in with your Azure account credentials.
  • Choose Create a resource.
  • Choose Create under Function App.

  • Select the Consumption hosting plan and then choose Select.

  • On the Basics tab, for Subscription, provide the subscription you want to use. For this example, we use our default subscription, Azure subscription 1.
  • Choose or create a new resource group to organize your Azure resources. We name our resource group rg-redshift-federated-sso.

  • Under Instance Details, enter a globally unique name. For this post, we use the name fn-entra-id-transformer.
  • For Runtime stack, choose as Python.
  • For Version, choose 3.11.
  • For Region, choose East Us.
  • For Operating System, select Linux.
  • Choose Review + create to review the app configuration.

  • Choose Create to create the Azure Functions app.

  • Choose Go to resource in the notification message or deployment output window to navigate directly to your newly created app.

Create a function

Next, we create a HTTP trigger function in the newly created function app, called fn-entra-id-transformer.

  1. In the function app, choose Overview, then choose Create function in the Functions section.
  2. In the Create function pane, provide the following information:
    1. For Select a template, choose v2 Programming Model.
    2. For Programming Model, choose the HTTP trigger template.
    3. choose Next.
  3. In the Template details section, provide the following information:
    1. For Job type, choose Create new app.
    2. For Provide a function name, enter CustomAuthenticationFunction.
    3. Leave the Authorization level unchanged, which is set to Function by default.
    4. Choose Create.
  4. After the function is created, choose Get function URL and copy the value for default (Function key).
  5. Store the copied URL securely; you’ll need to use this URL later when setting up a custom authentication extension later in the section.

We will come back to this function later to update the code to retrieve group information.

Create a custom authentication extension

Next, we create a custom authentication extension. Complete the following steps:

  1. Navigate to Microsoft Entra ID, Enterprise applications, Custom authentication extensions.
  2. Choose Create a custom extension.
  3. In the Basics section, provide the following information:
    1. Leave Event type as TokenIssuanceStart (which is the default option).
    2. Select it and choose Next.
  4. In the Endpoint Configuration section, provide the following information:
    1. For Name, enter Retrieve_user_group_information.
    2. For Target URL, enter the function URL you stored earlier.
    3. Leave Timeout in milliseconds and Maximum Retries as the default values.
    4. Choose Next.
  5. In the Api Authentication section, provide the following information:
    1. Select Create new app registration for App registration type.
    2. For Name, enter Retrieve_user_group_information.
    3. Choose Next.
  6. In the Claims section, provide the following information:
    1. For Claim name, enter dbGroupsqueryeditor and dbGroupssqltools.
    2. Choose Next.

  7. In the Review section, review the configuration details, and if everything looks correct, choose Create.After the creation is completed, you will be redirected to the overview page of the newly created custom authentication extension.On the overview page, in the API Authentication section, you will see a message indicating that admin consent is required.
  8. Choose Grant admin consent to grant the required permissions.

After the admin consent is granted successfully, the API Authentication section will show the status as Configured.

Now you can proceed to create the enterprise application.

Set up the Azure enterprise application

Complete the steps in this section to configure the Azure enterprise application.

Create a new Azure enterprise application

Complete the following steps to create an Azure Enterprise application:

  1. Navigate to Microsoft Entra ID, Enterprise applications, New application.

  2. Under Cloud platforms, choose Amazon Web Services (AWS).
  3. For Name, enter AWS Single-Account Access.
  4. Choose Create.

When the create process is complete, you will be redirected to the newly created enterprise application.

Configure SSO

Complete the following steps to configure SSO for your application:

  1. On the enterprise application page, choose Get started under Set up single sign on.
  2. Choose SAML.

  3. In the Basic SAML Configuration section, choose Edit.
    1. For Identifier (Entity ID) and Reply URL, enter https://signin.aws.amazon.com/saml.
    2. Choose Save.
  4. In the Attributes & Claims section, choose Edit.
    1. In the Advanced settings section, choose Configure next to the custom claims provider setting.
    2. For Custom claims provider, choose Retrieve_user_group_information.
    3. Choose Save.

Configure a group claim

We use the group claim to transform the Azure AD group assignments into corresponding IAM roles. By applying a regular expression pattern, the group names are mapped to appropriate Amazon Resource Names (ARNs) for IAM roles and SAML providers. Complete the following steps to configure the group claim:

  1. On the Attributes & Claims page, delete claim name https://aws.amazon.com/SAML/Attributes/Role.
  2. Choose Add a group claim.
  3. Select Groups assigned to the application for the associated groups.
  4. For Source attribute, choose Cloud-only group display names.
  5. Under Advanced options, select Filter groups and provide the following information:
    1. For Attribute to match, choose Display name.
    2. For Match with, choose Prefix.
    3. For String, enter AWS-.
  6. Select Customize the name of group claim and provide the following information:
    1. For Name, choose Role.
    2. For Namespace, enter https://aws.amazon.com/SAML/Attributes.
    3. Select Apply regex replace to groups claim content.
    4. For Regex pattern, enter AWS-(?'accountid'[\d]{12})_(?'env'[a-z]+)-(?'app'[a-z]+)-(?'role'[a-z]+).
    5. For Regex replacement pattern, enter arn:aws:iam::{accountid}:saml-provider/AzureADDemo,arn:aws:iam::{accountid}:role/{env}-{app}-{role}
  7. Choose Save.

Add new claims

Complete the following steps to add new claims:

  1. On the Attributes & Claims page, choose Add new claim.
  2. Add claims with the following values:
    1. Choose Add a new claim, name the new claim https://aws.amazon.com/SAML/Attributes/PrincipalTag:RedshiftDbRoles, select Attribute for Source, enter customclaimsprovider.dbGroupsqueryeditor for Source attribute, and choose Save.

    2. Choose Add a new claim, name the new claim https://aws.amazon.com/SAML/Attributes/PrincipalTag:RedshiftDbUser, select Attribute for Source, enter user.userprincipalname for Source attribute, and choose Save.
    3. Choose Add a new claim, name the new claim https://redshift.amazon.com/SAML/Attributes/AutoCreate, select Attribute for Source, enter true for Source attribute, and choose Save.

The values of PrincipalTag:RedshiftDbUser and PrincipalTag:RedshiftDbGroups must be lowercase; begin with a letter; contain only alphanumeric characters, underscore (_), plus sign (+), dot (.), at (@), or hyphen (-); and be less than 128 characters.

When you complete adding all the claims, your Attributes & Claims page should look like the following screenshot.

Save the federation metadata XML file

You use the federation metadata file to configure the IAM IdP in a later step. Complete the following steps to download the file:

  1. Navigate back to your SAML-based sign-in page.
  2. In the Single sign-on section, under SAML Certificates, choose Download for Federation Metadata XML.
  3. Save this file locally.

The name of the file is often the same as the application name; for example, AWS Single-Account Access.xml.

Create a new client secret

Complete the following steps to create a new client secret:

  1. Return to the Azure directory overview and navigate to App registrations.
  2. Choose the application AWS Single-Account Access.
  3. If you don’t see your application in the list, choose the All applications tab and register it if it’s not registered.
  4. Record the values for Application (client) ID and Directory (tenant) ID.
  5. Under Certificates & secrets, choose New client secret.
  6. In the Add a client secret pane, provide the following information:
    1. For Description, enter AWSRedshiftFederationsecret.
    2. For Expires, choose select the Microsoft’s recommended value of 180 days.
    3. Choose Add.
  7. Copy the secret value and store it securely.

The secret expires after 180 days. Make sure there is a process in place to update with a new secret before the current secret expires in your environment.

Add permissions

Complete the following steps to add permissions:

  1. Navigate to API permissions for application AWS Single-Account Access.
  2. Choose Add a permission and provide the following information:
    1. For Select an API, choose Microsoft Graph.
    2. Select Delegated permissions for the type of permission your application requires.
    3. In Select permissions, choose User and then User.Read.

    4. Choose Application permissions for the type of permission your application requires.
    5. In Select permissions, choose Directory and then Directory.Read.All.

  3. Choose Add permissions.
    This allows the Redshift enterprise application to grant admin consent to read the user profile and group data associated with the user and perform the login using SSO.
  4. Under Configured permissions, choose Grant admin consent for added permissions.
  5. In the confirmation pane, choose Yes to grant consent for the requested permissions for all accounts to the enterprise application.
  6. Navigate to your enterprise applications and select AWS Single-Account Access and choose Users and groups.
  7. Choose Add user/group.
  8. Under Users and groups select the groups redshift_product, redshift_sales, and AWS-<acctno>_dev-bdt-team, which are created as part of the prerequisites, and choose Select.
  9. On the Add Assignment page, choose Assign.

Update Azure Function code

Complete the following steps to update the Azure Function code:

  1. Return to Home and navigate to fn-entra-id-transformer under Function App.
  2. Choose CustomAuthenticationFunction under Functions.
  3. On the Code + Test page, replace the sample code with the following code, which retrieves the user’s group membership, and choose Save.

In this code, replace the values of clientId, clientSecret, and tenantId with the values recorded previously. Also, in enterprise environments, use secret management service to store these secrets and use requirements file to install required packages such as requests.

import azure.functions as func
import logging
import json
import sys
import subprocess

def install(package):
    allowed_pattern = r'^[a-zA-Z0-9\-_\.]+$'
    if not re.match(allowed_pattern, package):
        raise ValueError("Invalid package name")

    subprocess.check_call([sys.executable, "-m", "pip", "install", package], shell=False)

# Ensure the requests package is installed
try:
    import requests
except ImportError:
    install("requests")
    import requests

app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)

@app.route(route="custom-extension")
def custom_extension(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("Azure AD Custom Extension function triggered")

    try:
        request_body = req.get_body().decode('utf-8')
        data = json.loads(request_body)
        user_id = data['data']['authenticationContext']['user']['id']

        # Fetch access token for Microsoft Graph API
        access_token = get_access_token()
        if not access_token:
            error_response = {"error": "Failed to obtain access token for Graph API"}
            return func.HttpResponse(body=json.dumps(error_response), status_code=200, headers={"Content-Type": "application/json"})

        # Fetch user groups
        user_groups = fetch_user_groups(user_id, access_token)
        if user_groups is None:
            error_response = {"error": "Failed to fetch user groups"}
            return func.HttpResponse(body=json.dumps(error_response), status_code=200, headers={"Content-Type": "application/json"})

        # Format groups as : seperated values as needed by redshift query editor
        groups_colon_separated = ":".join(user_groups)

        # Construct response as per the required JSON structure
        response_content = {
            "data": {
                "@odata.type": "microsoft.graph.onTokenIssuanceStartResponseData",
                "actions": [
                    {
                        "@odata.type": "microsoft.graph.tokenIssuanceStart.provideClaimsForToken",
                        "claims": {
                            "dbGroupsqueryeditor": groups_colon_separated,
                            "dbGroupssqltools": user_groups
                        }
                    }
                ]
            }
        }

        return func.HttpResponse(body=json.dumps(response_content), status_code=200, headers={"Content-Type": "application/json"})

    except Exception as e:
        logging.error(f"Error in function execution: {str(e)}")
        error_response = {"error": str(e)}
        return func.HttpResponse(body=json.dumps(error_response), status_code=200, headers={"Content-Type": "application/json"})

def get_access_token():
    # Hardcoded credentials for demonstration; replace with secure storage before production
    client_id = 'client_id'
    client_secret = 'client_secret' 
    tenant_id = 'tenant_id'
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    body = {
        "client_id": client_id,
        "scope": "https://graph.microsoft.com/.default",
        "client_secret": client_secret,
        "grant_type": "client_credentials"
    }

    try:
        response = requests.post(token_url, data=body, timeout=10)
        response.raise_for_status()
        return response.json()['access_token']
    except requests.RequestException as e:
        logging.error(f"Failed to retrieve access token: {str(e)}")
        return None

def fetch_user_groups(user_id, access_token):
    graph_url = f"https://graph.microsoft.com/v1.0/users/{user_id}/memberOf?$select=displayName"

    headers = {
        "Authorization": f"Bearer {access_token}"
    }

    try:
        response = requests.get(graph_url, headers=headers, timeout=10)
        response.raise_for_status()
        return [group['displayName'] for group in response.json().get('value', []) if group["@odata.type"] == "#microsoft.graph.group"]
    except requests.RequestException as e:
        logging.error(f"Failed to fetch user groups: {str(e)}")
        return None

Now you can create an IAM IdP and role.

In IAM, an IdP represents a trusted external authentication service like Microsoft Entra ID that supports SAML 2.0, allowing AWS to recognize user identities authenticated by that service. It’s crucial to name this IdP AzureADDemo to match the previously configured SAML claims for role creation.

Create your IAM SAML IdP

Complete the following steps to create your IAM SAML IdP:

  1. On the IAM console, choose Identity providers in the navigation pane.
  2. Choose Add provider.
  3. For Provider type, select SAML.
  4. For Provider name, enter a descriptive name, such as AzureADDemo.
  5. Upload the SAML metadata document, which you downloaded as Federation Metadata.xml and stored as AWS Single-Account Access.xml.
  6. Choose Add provider.

Create an IAM role

Next, you create an IAM role for SAML-based federation, which will be used to grant access to the Redshift Query Editor and Redshift cluster. Complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted identity type, select SAML 2.0 federation.
  4. For SAML 2.0-based provider, choose AzureADDemo.
  5. For Access to be allowed, select Allow programmatic and AWS Management Console access.
  6. Choose Next.
  7. Add the permissions AmazonRedshiftQueryEditorV2ReadSharing and ReadOnlyAccess, and choose Next.
  8. For Role name, enter a descriptive name, such as dev-bdt-team.
  9. Choose Create role.

Update trust policy

  1. On the IAM console, choose Roles in the navigation pane, and search for and choose the role dev-bdt-team.
  2. In the Trusted entities section, choose Edit trust policy.
  3. Add the action sts:TagSession by removing the Action line and adding the following code:"Action": [
    "sts:AssumeRoleWithSAML",
    "sts:TagSession"
    ],
  4. Choose Update policy.

Create an IAM policy

In the following steps, you create an IAM policy to allow the dev-bdt-team role to obtain temporary credentials for connecting to Amazon Redshift using IAM:

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. On the JSON tab, enter the following policy document, replacing placeholders with appropriate values:
    {
        "Version": "2012-10-17",
        "Statement": [
                        {
                            "Sid": "VisualEditor0",
                            "Effect": "Allow",
                            "Action": "redshift:GetClusterCredentialsWithIAM",
                            "Resource": "arn:aws:redshift:<YOUR-REGION>:<AWS-ACCOUNT-NUMBER>:dbname::<YOUR-REDSHIFT-CLUSTER-NAME>/*"
                        }
                    ]
    }
    
  4. Review the policy details and provide a descriptive name for your policy, such as redshiftAccessPolicy.
  5. Review the policy summary and resolve any warnings or errors.
  6. Choose Create policy to finalize the policy creation process.
  7. On the Roles page, search for and open dev-bdt-team role.
  8. On the Add permissions menu, choose Attach policies.
  9. Attach redshiftAccessPolicy to the role.

Your permissions under the role dev-bdt-team should look like the following screenshot.

Test the SSO setup

You can now test the SSO setup. Complete the following steps:

  1. On the Azure Portal, for your AWS Single-Account Access application, choose Single sign-on.
  2. Choose Test this application.
  3. Choose Sign in as current user.

If the setup is correct, you’re redirected to the AWS Management Console (which might be in a new tab for some browsers).

Test with Redshift Query Editor

Complete the following steps:

  • Navigate to Microsoft Entra ID, Enterprise applications, AWS Single-Account Access.
  • Go to Properties and copy the user access URL.
  • Launch your preferred web browser and enter the user access URL to navigate to the Microsoft sign-in page.
  • Log in with user A credentials.
  • You will be directed to AWS console, and you will be logged in as dev-bdt-role.
  • Open the Amazon Redshift console and choose Provisioned clusters dashboard.
  • Choose the cluster examplecluster.
  • On the Query data menu, choose Query in query editor v2.
  • Select Temporary credentials using your IAM identity.
  • For Database, enter dev.
  • Choose Create connection.

After the connection is established, you should be able to see your dev database and schemas under it, as shown in the following screenshot.

Because user A is only part of group redshift_sales, they will be able to see only the sales schema.

  • Run a SQL statement to get data from sales_table.

Because user A has access to the table, you can see output like the following screenshot.

  • Log in as user C to test access for user C.

User C is able to see both the product and sales schemas because they’re part of both the redshift_product and redshift_sales groups.

  • Run a SQL statement to get data from both sales_table and product_table.

User C has access to both tables, as you can see in the following screenshot.

Clean up

To avoid incurring future charges, delete the resources you created, including the Redshift cluster, IAM role, IAM policy, Microsoft Entra ID application, and Azure Functions app.

Conclusion

In this post, we demonstrated how to use Microsoft Entra ID to federate into your AWS account and use the Redshift Query Editor V2 to connect to a Redshift cluster and access the schemas based on the AD groups associated with the user.


About the author

Koushik Konjeti is a Senior Solutions Architect at Amazon Web Services. He has a passion for aligning architectural guidance with customer goals, ensuring solutions are tailored to their unique requirements. Outside of work, he enjoys playing cricket and tennis.

AWS Network Firewall Geographic IP Filtering launch

Post Syndicated from Prasanjit Tiwari original https://aws.amazon.com/blogs/security/aws-network-firewall-geographic-ip-filtering-launch/

AWS Network Firewall is a managed service that provides a convenient way to deploy essential network protections for your virtual private clouds (VPCs). In this blog post, we discuss Geographic IP Filtering, a new feature of Network Firewall that you can use to filter traffic based on geographic location and meet compliance requirements.

Customers with internet-facing applications are constantly in need of advanced security features to protect their applications from threat actors. This includes restricting traffic to and from their workloads in Amazon Web Services (AWS) to certain geographies because of security risk. Customers operating in highly regulated industries—such as banking, public sector, or insurance—might have specific security requirements that can be addressed by Geographic IP Filtering.

Previously, customers had to rely on third-party tools for retrieving an IP address list of specific countries and updating their firewall rules on a regular basis to meet applicable requirements. Now, with Geographic IP Filtering on Network Firewall, you can protect your application workloads based on the geolocation of the IP address. As new IP addresses are assigned by the Internet Assigned Numbers Authority (IANA), the Geographic IP database underneath Network Firewall is automatically updated so that the service can consistently filter inbound and outbound traffic from specific countries based on country codes. It supports IPv4 and IPv6 traffic.

Geographic IP Filtering is supported in all AWS Regions where Network Firewall is available today, including the AWS GovCloud (US) Regions.

Set up Geographic IP Filtering in Network Firewall

You can use Network Firewall to inspect network traffic and protect your VPCs using layer 3–7 rules (network layer to application layer of the OSI model). When traffic reaches Network Firewall, it will identify the location of the source and destination IP address from the Geographic IP database and block traffic if you have a firewall rule to block that location. You can choose to pass, drop, reject, or create an alert for traffic coming from or going to specific countries.

Before setting up Geographic IP Filtering rules, you need to deploy Network Firewall and attach a firewall policy. You can learn more about these steps in the Network Firewall Getting Started guide. You can configure Network Firewall Geographic IP Filtering in minutes using the AWS Management Console, AWS Command Line Interface (AWS CLI), AWS SDK, or the Network Firewall API.

To configure Geographic IP Filtering rules using the console:

  1. Sign in to the AWS Management Console and open the Amazon VPC console.
  2. In the navigation pane, under Network Firewall, choose Network Firewall rule groups.
  3. Choose Create rule group.
  4. In the Create rule group page, for the Rule group type, select Stateful rule group.
  5. For the Rule group format, select Standard stateful rule.
  6. For Rule evaluation order, select either Strict order (recommended) or Action order.
  7. Enter a name for the stateful rule group.
  8. For Capacity, enter the maximum capacity you want to allow for the stateful rule group.
  9. Under Standard stateful rules, for Geographic IP Filtering, select whether you want to Disable Geographic IP filtering, Match only selected countries, or Match all but selected countries.
  10. If you opt for Geographic IP Filtering, then select the Geographic IP traffic direction and Country codes that you want to filter the traffic for.
  11. Enter the appropriate values for Protocol, Source, Source port, Destination, and Destination port.
  12. For Action, select the action that you want Network Firewall to take when a packet matches the rule settings.

    Figure 1: Standard stateful rule

    Figure 1: Standard stateful rule

  13. Click Add rule and then review the rule to create the rule group.

    Figure 2: Geographic IP Filtering rules

    Figure 2: Geographic IP Filtering rules

Suricata compatibility

You can also use Geographic IP filtering with Suricata-compatible rule strings using the geoip keyword.

To create a Suricata compatible rule string:

  1. Follow steps 1 through 4 of the previous procedure.
  2. For the Rule group format, select Suricata compatible rule string.
  3. For Rule evaluation order, select either Strict order (recommended) or Action order.
  4. Enter a name for the stateful rule group.
  5. For Capacity, enter the maximum capacity you want to allow for the stateful rule group.
  6. Under Suricata compatible rule string, enter an appropriate string based on your source and destination along with the country code to filter traffic for. To use a Geographic IP filter, provide the geoip keyword, the filter type, and the country codes for the countries that you want to filter for.
  7. Suricata supports filtering for source and destination IPs. You can filter on either of these types by itself, by specifying dst or src. You can filter on the two types together with AND or OR logic, by specifying both or any.

For example, the following sample Suricata rule string drops traffic originating from Japan:

drop ip any any -> any any (msg:"Geographic IP from JP,Japan"; geoip:src,JP; sid:55555555; rev:1;)

Note that Suricata determines the location of requests using MaxMind GeoIP databases. MaxMind reports very high accuracy of their data at the country level, although accuracy varies according to factors such as country and type of IP. For more information about MaxMind, see MaxMind IP Geolocation.

If you think any of the Geographic IP data is incorrect, you can submit a correction request to MaxMind at MaxMind Correct GeoIP Data.

Logging Geographic IP Filtering

You can configure Network Firewall logging for your firewall’s stateful engine to get detailed information about the packet and any stateful rule action taken against the packet. There are no changes to the logging and monitoring mechanism with the introduction of the Geographic IP Filtering feature. However, by explicitly specifying the msg and metadata keywords, you can see additional geographic information in the alert logs that can help with troubleshooting. If these keywords aren’t specified in the Suricata rule string, the log event will not show any geographic information.

Suricata rule examples

In this section, you will find examples of Suricata rule strings to pass, block, reject, and alert on traffic to or from a specific country.

Example 1: To pass ingress traffic from a specific country

The following example passes ingress traffic from India.

Note: The rule evaluation order should be set to Strict for alert logs to be generated in this example. If the rule evaluation order is set to Action, then although the traffic will pass, alert logs will not be generated.

alert ip $EXTERNAL_NET any -> $HOME_NET any (msg:"Ingress traffic from IN allowed"; flow:to_server; geoip:src,IN; metadata:geo IN; sid:202409301;)
pass ip $EXTERNAL_NET any -> $HOME_NET any (msg:"Ingress traffic from IN allowed"; flow:to_server; geoip:src,IN; metadata:geo IN; sid:202409302;)

The following are the alert and flow logs for Example 1.

Alert logs:

{
    "firewall_name": "Test-NFW",
    "availability_zone": "eu-north-1a",
    "event_timestamp": "1731102856",
    "event": {
        "src_ip": "13.127.20.X",
        "src_port": 56630,
        "event_type": "alert",
        "alert": {
            "severity": 3,
            "signature_id": 202409301,
            "rev": 0,
            "metadata": {
                "geo": ["IN"]
            },
            "signature": "Ingress traffic from IN allowed",
            "action": "allowed",
            "category": ""
        },
        "flow_id": 234143298308779,
        "dest_ip": "172.31.2.4",
        "proto": "TCP",
        "verdict": {
            "action": "pass"
        },
        "dest_port": 80,
        "pkt_src": "geneve encapsulation",
        "timestamp": "2024-11-08T21:54:16.972019+0000",
        "direction": "to_server"
    }
}

Flow logs from source to destination:

{
    "firewall_name": "Test-NFW",
    "availability_zone": "eu-north-1a",
    "event_timestamp": "1731102918",
    "event": {
        "tcp": {
            "tcp_flags": "13",
            "syn": true,
            "fin": true,
            "ack": true
        },
        "app_proto": "unknown",
        "src_ip": "13.127.20.X",
        "src_port": 56630,
        "netflow": {
            "pkts": 4,
            "bytes": 216,
            "start": "2024-11-08T21:54:16.972019+0000",
            "end": "2024-11-08T21:54:17.263030+0000",
            "age": 1,
            "min_ttl": 112,
            "max_ttl": 112
        },
        "event_type": "netflow",
        "flow_id": 234143298308779,
        "dest_ip": "172.31.2.4",
        "proto": "TCP",
        "dest_port": 80,
        "timestamp": "2024-11-08T21:55:18.257416+0000"
    }
}

Flow logs from destination to source:

{
    "firewall_name": "Test-NFW",
    "availability_zone": "eu-north-1a",
    "event_timestamp": "1731102918",
    "event": {
        "tcp": {
            "tcp_flags": "13",
            "syn": true,
            "fin": true,
            "ack": true
        },
        "app_proto": "unknown",
        "src_ip": "172.31.2.4",
        "src_port": 80,
        "netflow": {
            "pkts": 2,
            "bytes": 112,
            "start": "2024-11-08T21:54:16.972019+0000",
            "end": "2024-11-08T21:54:17.263030+0000",
            "age": 1,
            "min_ttl": 126,
            "max_ttl": 126
        },
        "event_type": "netflow",
        "flow_id": 234143298308779,
        "dest_ip": "13.127.20.X",
        "proto": "TCP",
        "dest_port": 56630,
        "timestamp": "2024-11-08T21:55:18.257449+0000"
    }
}

Example 2: To block ingress traffic from a specific country

The following example blocks ingress traffic from Japan.

drop ip $EXTERNAL_NET any -> $HOME_NET any (msg:"Ingress traffic from JP blocked"; flow:to_server; geoip:any,JP; metadata:geo JP; sid:202409303;)

Example 3: To block ingress SSH traffic from a specific country

The following example blocks ingress SSH traffic from Russia.

drop ssh $EXTERNAL_NET any -> $HOME_NET any (msg:"Ingress SSH traffic from RU blocked"; flow:to_server; geoip:src,RU; metadata:geo RU; sid:202409304;)

Example 4: To reject egress TCP traffic to a specific country:

The following example rejects egress TCP traffic to Iran.

reject tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"Egress traffic to IR rejected"; flow:to_server; geoip:dst,IR; metadata:geo IR; sid:202409305;)

Example 5: To alert on traffic originating from or destined to specific country

The following example alerts on traffic that originates from Venezuela.

alert ip any any -> any any (msg:"Geographic IP is from VE, Venezuela"; geoip:any,VE; sid: 202409306;)

Conclusion

You can use the new Geographic IP Filtering feature in AWS Network Firewall to enhance your security posture by controlling traffic based on geographic locations. In this post, you learned about the key concepts, configuration steps, and examples for implementing the Geographic IP Filtering feature in Network Firewall. By using this feature, businesses can protect their networks from potentially harmful traffic and control which geographic locations can interact with their infrastructure. As cyber threats continue to evolve, the Geographic IP Filtering feature serves as a vital tool for strengthening network security.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.
 

Prasanjit Tiwari
Prasanjit Tiwari

Prasanjit is a Cloud Support Engineer II based out of Virginia, USA. He has a Master of Science in Telecommunications Engineering from the University of Maryland. He is a WAF and Route 53 subject matter expert and enjoys working on networking and perimeter security services. He is enthusiastic about using innovative solutions to address customer challenges.
Dhiren Patel
Dhiren Patel

Dhiren is a Cloud Support Engineer-II based out of Virginia, USA. He has a Master of Science in Electrical and Computer Engineering from New York University. As a WAF and Route 53 subject matter expert, he specializes in AWS networking and security services. He’s passionate about helping customers solve their AWS issues and get the best cloud experience possible through AWS.

Develop a business chargeback model within your organization using Amazon Redshift multi-warehouse writes

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/develop-a-business-chargeback-model-within-your-organization-using-amazon-redshift-multi-warehouse-writes/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. Thousands of customers use Amazon Redshift data sharing to enable instant, granular, and fast data access shared across Redshift provisioned clusters and serverless workgroups. This allows you to scale your read workloads to thousands of concurrent users without having to move or copy data.

Now, we are announcing general availability (GA) of Amazon Redshift multi-data warehouse writes through data sharing. This new capability allows you to scale your write workloads and achieve better performance for extract, transform, and load (ETL) workloads by using different warehouses of different types and sizes based on your workload needs. You can make your ETL job runs more predictable by distributing them across different data warehouses with just a few clicks. Other benefits include the ability to monitor and control costs for each data warehouse, and enabling data collaboration across different teams because you can write to each other’s databases. The data is live and available across all warehouses as soon as it’s committed, even when it’s written to cross-account or cross-Region. To learn more about the reasons for using multiple warehouses to write to same databases, refer to this previous blog on multi-warehouse writes through datasharing.

As organizations continue to migrate workloads to AWS, they are also looking for mechanisms to manage costs efficiently. A good understanding of the cost of running your business workload, and the value that business workload brings to the organization, allows you to have confidence in the efficiency of your financial management strategy in AWS.

In this post, we demonstrate how you can develop a business chargeback model by adopting the multi-warehouse architecture of Amazon Redshift using data sharing. You can now attribute cost to different business units and at the same time gain more insights to drive efficient spending.

Use case

In this use case, we consider a fictional retail company (AnyCompany) that operates several Redshift provisioned clusters and serverless workgroups, each specifically tailored to a particular business unit—such as the sales, marketing, and development teams. AnyCompany is a large enterprise organization that previously migrated large volumes of enterprise workloads into Amazon Redshift, and now is in the process of breaking data silos by migrating business-owned workloads into Amazon Redshift. AnyCompany has a highly technical community of business users, who want to continue to have autonomy on the pipelines that enrich the enterprise data with their business centric data. The enterprise IT team wants to break data siloes and data duplication as a result, and despite this segregation in workloads, they mandate all business units to access a shared centralized database, which will further help in data governance by the centralized enterprise IT team. In this intended architecture, each team is responsible for data ingestion and transformation before writing to the same or different tables residing in the central database. To facilitate this, teams will use their own Redshift workgroup or cluster for computation, enabling separate chargeback to respective cost centers.

In the following sections, we walk you through how to use multi-warehouse writes to ingest data to the same databases using data sharing and develop an end-to-end business chargeback model. This chargeback model can help you attribute cost to individual business units, have higher visibility on your spending, and implement more cost control and optimizations.

Solution overview

The following diagram illustrates the solution architecture.

Architecture

The workflow includes the following steps:

  • Steps 1a, 1b, and 1c – In this section, we isolate ingestion from various sources by using separate Amazon Redshift Serverless workgroups and a Redshift provisioned cluster.
  • Steps 2a, 2b, and 2c – All producers write data to the primary ETL storage in their own respective schemas and tables. For example, the Sales workgroup writes data into the Sales schema, and the Marketing workgroup writes data into the Marketing schema, both belonging to the storage of the ETL provisioned cluster. They can also apply transformations at the schema object level depending on their business requirements.
  • Step 2d – Both the Redshift Serverless producer workgroups and the Redshift producer cluster can insert and update data into a common table, ETL_Audit, residing in the Audit schema in the primary ETL storage.
  • Steps 3a, 3b, and 3c – The same Redshift Serverless workgroups and provisioned cluster used for ingestion are also used for consumption and are maintained by different business teams and billed separately.

The high-level steps to implement this architecture are as follows:

  1. Set up the primary ETL cluster (producer)
    • Create the datashare
    • Grant permissions on schemas and objects
    • Grant permissions to the Sales and Marketing consumer namespaces
  2. Set up the Sales warehouse (consumer)
    • Create a sales database from the datashare
    • Start writing to the etl and sales datashare
  3. Set up the Marketing warehouse (consumer)
    • Create a marketing database from the datashare
    • Start writing to the etl and marketing datashare
  4. Calculate the cost for chargeback to sales and marketing business units

Prerequisites

To follow along with this post, you should have the following prerequisites:

  • Three Redshift warehouses of desired sizes, with one as the provisioned cluster and another two as serverless workgroups in the same account and AWS Region.
  • Access to a superuser in both warehouses.
  • An AWS Identity and Access Management (IAM) role that is able to ingest data from Amazon Simple Storage Service (Amazon S3) to Amazon Redshift.
  • For cross-account only, you need access to an IAM user or role that is allowed to authorize datashares. For the IAM policy, refer to Sharing datashares.

Refer to Getting started with multi-warehouse for the most up-to-date information.

Set up the primary ETL cluster (producer)

In this section, we show how to set up the primary ETL producer cluster to store your data.

Connect to the producer

Complete the following steps to connect to the producer:

  1. On the Amazon Redshift console, choose Query editor v2 in the navigation pane.
    QEv2

In the query editor v2, you can see all the warehouses you have access to in the left pane. You can expand them to see their databases.

  1. Connect to your primary ETL warehouse using a superuser.
  2. Run the following command to create the prod database:
CREATE DATABASE prod;

Create the database objects to share

Complete the following steps to create your database objects to share:

  1. After you create the prod database, switch your database connection to the prod.

You may need to refresh your page to be able to see it.

  1. Run the following commands to create the three schemas you intend to share:
CREATE SCHEMA prod.etl;
CREATE SCHEMA prod.sales;
CREATE SCHEMA prod.marketing;
  1. Create the tables in the ETL schema to share with the Sales and Marketing consumer warehouses. These are standard DDL statements coming from the AWS Labs TPCDS DDL file with modified table names.
CREATE TABLE prod.etl.etl_audit_logs (
    id bigint identity(0, 1) not null,
    job_name varchar(100),
    creation_date timestamp,
    last_execution_date timestamp
);

create table prod.etl.inventory (
    inv_date_sk int4 not null,
    inv_item_sk int4 not null,
    inv_warehouse_sk int4 not null,
    inv_quantity_on_hand int4,
    primary key (inv_date_sk, inv_item_sk, inv_warehouse_sk)
) distkey(inv_item_sk) sortkey(inv_date_sk);
  1. Create the tables in the SALES schema to share with the Sales consumer warehouse:
create table prod.sales.store_sales (
    ss_sold_date_sk int4,
    ss_sold_time_sk int4,
    ss_item_sk int4 not null,
    ss_customer_sk int4,
    ss_cdemo_sk int4,
    ss_hdemo_sk int4,
    ss_addr_sk int4,
    ss_store_sk int4,
    ss_promo_sk int4,
    ss_ticket_number int8 not null,
    ss_quantity int4,
    ss_wholesale_cost numeric(7, 2),
    ss_list_price numeric(7, 2),
    ss_sales_price numeric(7, 2),
    ss_ext_discount_amt numeric(7, 2),
    ss_ext_sales_price numeric(7, 2),
    ss_ext_wholesale_cost numeric(7, 2),
    ss_ext_list_price numeric(7, 2),
    ss_ext_tax numeric(7, 2),
    ss_coupon_amt numeric(7, 2),
    ss_net_paid numeric(7, 2),
    ss_net_paid_inc_tax numeric(7, 2),
    ss_net_profit numeric(7, 2),
    primary key (ss_item_sk, ss_ticket_number)
) distkey(ss_item_sk) sortkey(ss_sold_date_sk);

create table prod.sales.web_sales (
    ws_sold_date_sk int4,
    ws_sold_time_sk int4,
    ws_ship_date_sk int4,
    ws_item_sk int4 not null,
    ws_bill_customer_sk int4,
    ws_bill_cdemo_sk int4,
    ws_bill_hdemo_sk int4,
    ws_bill_addr_sk int4,
    ws_ship_customer_sk int4,
    ws_ship_cdemo_sk int4,
    ws_ship_hdemo_sk int4,
    ws_ship_addr_sk int4,
    ws_web_page_sk int4,
    ws_web_site_sk int4,
    ws_ship_mode_sk int4,
    ws_warehouse_sk int4,
    ws_promo_sk int4,
    ws_order_number int8 not null,
    ws_quantity int4,
    ws_wholesale_cost numeric(7, 2),
    ws_list_price numeric(7, 2),
    ws_sales_price numeric(7, 2),
    ws_ext_discount_amt numeric(7, 2),
    ws_ext_sales_price numeric(7, 2),
    ws_ext_wholesale_cost numeric(7, 2),
    ws_ext_list_price numeric(7, 2),
    ws_ext_tax numeric(7, 2),
    ws_coupon_amt numeric(7, 2),
    ws_ext_ship_cost numeric(7, 2),
    ws_net_paid numeric(7, 2),
    ws_net_paid_inc_tax numeric(7, 2),
    ws_net_paid_inc_ship numeric(7, 2),
    ws_net_paid_inc_ship_tax numeric(7, 2),
    ws_net_profit numeric(7, 2),
    primary key (ws_item_sk, ws_order_number)
) distkey(ws_order_number) sortkey(ws_sold_date_sk);
  1. Create the tables in the MARKETING schema to share with the Marketing consumer warehouse:
create table prod.marketing.customer (
    c_customer_sk int4 not null,
    c_customer_id char(16) not null,
    c_current_cdemo_sk int4,
    c_current_hdemo_sk int4,
    c_current_addr_sk int4,
    c_first_shipto_date_sk int4,
    c_first_sales_date_sk int4,
    c_salutation char(10),
    c_first_name char(20),
    c_last_name char(30),
    c_preferred_cust_flag char(1),
    c_birth_day int4,
    c_birth_month int4,
    c_birth_year int4,
    c_birth_country varchar(20),
    c_login char(13),
    c_email_address char(50),
    c_last_review_date_sk int4,
    primary key (c_customer_sk)
) distkey(c_customer_sk);

create table prod.marketing.promotion (
    p_promo_sk integer not null,
    p_promo_id char(16) not null,
    p_start_date_sk integer,
    p_end_date_sk integer,
    p_item_sk integer,
    p_cost decimal(15, 2),
    p_response_target integer,
    p_promo_name char(50),
    p_channel_dmail char(1),
    p_channel_email char(1),
    p_channel_catalog char(1),
    p_channel_tv char(1),
    p_channel_radio char(1),
    p_channel_press char(1),
    p_channel_event char(1),
    p_channel_demo char(1),
    p_channel_details varchar(100),
    p_purpose char(15),
    p_discount_active char(1),
    primary key (p_promo_sk)
) diststyle all;

Create the datashare

Create datashares for the Sales and Marketing business units with the following command:

CREATE DATASHARE sales_ds;
CREATE DATASHARE marketing_ds;

Grant permissions on schemas to the datashare

To add objects with permissions to the datashare, use the grant syntax, specifying the datashare you want to grant the permissions to.

  1. Allow the datashare consumers (Sales and Marketing business units) to use objects added to the ETL schema:
GRANT USAGE ON SCHEMA prod.etl TO DATASHARE sales_ds;
GRANT USAGE ON SCHEMA prod.etl TO DATASHARE marketing_ds;
  1. Allow the datashare consumer (Sales business unit) to use objects added to the SALES schema:
GRANT USAGE ON SCHEMA prod.sales TO DATASHARE sales_ds;
  1. Allow the datashare consumer (Marketing business unit) to use objects added to the MARKETING schema:
GRANT USAGE ON SCHEMA prod.marketing TO DATASHARE marketing_ds;

Grant permissions on tables to the datashare

Now you can grant access to tables to the datashare using the grant syntax, specifying the permissions and the datashare.

  1. Grant select and insert scoped privileges on the etl_audit_logs table to the Sales and Marketing datashares:
GRANT SELECT ON TABLE prod.etl.etl_audit_logs TO DATASHARE sales_ds;
GRANT SELECT ON TABLE prod.etl.etl_audit_logs TO DATASHARE marketing_ds;
GRANT INSERT ON TABLE prod.etl.etl_audit_logs TO DATASHARE sales_ds;
GRANT INSERT ON TABLE prod.etl.etl_audit_logs TO DATASHARE marketing_ds;
  1. Grant all privileges on all tables in the SALES schema to the Sales datashare:
GRANT ALL ON ALL TABLES IN SCHEMA prod.sales TO DATASHARE sales_ds;
  1. Grant all privileges on all tables in the MARKETING schema to the Marketing datashare:
GRANT ALL ON ALL TABLES IN SCHEMA prod.marketing TO DATASHARE marketing_ds;

You can optionally choose to include new objects to be automatically shared. The following code will automatically add new objects in the etl, sales, and marketing schemas to the two datashares:

ALTER DATASHARE sales_ds SET INCLUDENEW = TRUE FOR SCHEMA sales;
ALTER DATASHARE sales_ds SET INCLUDENEW = TRUE FOR SCHEMA etl;
ALTER DATASHARE marketing_ds SET INCLUDENEW = TRUE FOR SCHEMA marketing;
ALTER DATASHARE marketing_ds SET INCLUDENEW = TRUE FOR SCHEMA etl;

Grant permissions to the Sales and Marketing namespaces

You can grant permissions to the Sales and Marketing namespaces by specifying the namespace IDs. There are two ways to find namespace IDs:

  1. On the Redshift Serverless console, find the namespace ID on the namespace details page
  2. From the Redshift query editor v2, run select current_namespace; on both consumers

You can then grant access to the other namespace with the following command (change the consumer namespace to the namespace UID of your own Sales and Marketing warehouse):

-- Sales Redshift Serverless namespace
GRANT USAGE ON DATASHARE sales_ds TO namespace '<sales namespace>';

-- Marketing Redshift Serverless namespace
GRANT USAGE ON DATASHARE marketing_ds TO namespace '<marketing namespace>';

Set up and run an ETL job in the ETL producer

Complete the following steps to set up and run an ETL job:

  1. Create a stored procedure to perform the following steps:
    • Copy data from the S3 bucket to the inventory table in the ETL
    • Insert an audit record in the etl_audit_logs table in the ETL
CREATE OR REPLACE PROCEDURE load_inventory() 
LANGUAGE plpgsql 
AS $$ 
BEGIN 
    COPY etl.inventory
    FROM 's3://redshift-downloads/TPC-DS/2.13/1TB/inventory/inventory_1_25.dat.gz' 
    iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';

    INSERT INTO etl.etl_audit_logs (job_name, creation_date, last_execution_date)
    values ('etl copy job', sysdate, sysdate);

END;
$$
  1. Run the stored procedure and validate data in the ETL logging table:
CALL load_inventory();

SELECT * from etl.etl_audit_logs order by last_execution_date desc;

Set up the Sales warehouse (consumer)

At this point, you’re ready to set up your Sales consumer warehouse to start writing data to the shared objects in the ETL producer namespace.

Create a database from the datashare

Complete the following steps to create your database:

  1. In the query editor v2, switch to the Sales warehouse.
  2. Run the command show datashares; to see etl and sales datashares as well as the datashare producer’s namespace.
  3. Use that namespace to create a database from the datashare, as shown in the following code:
CREATE DATABASE sales_db WITH PERMISSIONS FROM DATASHARE sales_ds OF NAMESPACE '<<producer-namespace>>'

Specifying with permissions allows you to grant granular permissions to individual database users and roles. Without this, if you grant usage permissions on the datashare database, users and roles get all permissions on all objects within the datashare database.

Start writing to the datashare database

In this section, we show you how to write to the datashare database using the use <database_name> command and using three-part notation: <database_name>.<schem_name>.<table_name>.

Let’s try the use command method first. Run the following command:

use sales_db;

Ingest data into the datashare tables

Complete the following steps to ingest the data:

  1. Copy the TPC-DS data from the AWS Labs public S3 bucket into the tables in the producer’s sales schema:
copy sales.store_sales from 's3://redshift-downloads/TPC-DS/2.13/3TB/store_sales/store_sales_9_4293.dat.gz' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';

copy sales.web_sales from 's3://redshift-downloads/TPC-DS/2.13/3TB/web_sales/web_sales_9_1630.dat.gz' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
  1. Insert an entry in the etl_audit_logs table in the producer’s etl schema. To insert the data, let’s try three-part notation this time:
INSERT INTO sales_db.etl.etl_audit_logs (job_name, creation_date, last_execution_date)
  values ('sales copy job', sysdate, sysdate);

Set up the Marketing warehouse (consumer)

Now, you’re ready to set up your Marketing consumer warehouse to start writing data to the shared objects in the ETL producer namespace. The following steps are similar to the ones previously completed while setting up the Sales warehouse consumer.

Create a database from the datashare

Complete the following steps to create your database:

  1. In the query editor v2, switch to the Marketing warehouse.
  2. Run the command show datashares; to see the etl and marketing datashares as well as the datashare producer’s namespace.
  3. Use that namespace to create a database from the datashare, as shown in the following code:
CREATE DATABASE marketing _db WITH PERMISSIONS FROM DATASHARE marketing _ds OF NAMESPACE '<<producer-namespace>>'

Start writing to the datashare database

In this section, we show you how to write to the datashare database by calling a stored procedure.

Set up and run an ETL job in the ETL producer

Complete the following steps to set up and run an ETL job:

  1. Create a stored procedure to perform the following steps:
    1. Copy data from the S3 bucket to the customer and promotion tables in the MARKETING schema of the producer’s namespace.
    2. Insert an audit record in the etl_audit_logs table in the ETL schema of the producer’s namespace.
CREATE OR REPLACE PROCEDURE load_marketing_data() 
LANGUAGE plpgsql 
AS $$ 
BEGIN 
    copy marketing_db.marketing.customer
    from 's3://redshift-downloads/TPC-DS/2.13/3TB/customer/' 
    iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';

    copy marketing_db.marketing.promotion
    from 's3://redshift-downloads/TPC-DS/2.13/3TB/promotion/' 
    iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';

    INSERT INTO marketing_db.etl.etl_audit_logs (job_name, creation_date, last_execution_date)
    values('marketing copy job', sysdate, sysdate);
END;
$$;
  1. Run the stored procedure:
CALL load_marketing_data();

At this point, you’ve completed ingesting the data to the primary ETL namespace. You can query the tables in the etl, sales, and marketing schemas from both the ETL producer warehouse and Sales and Marketing consumer warehouses and see the same data.

Calculate chargeback to business units

Because the business units’ specific workloads have been isolated to dedicated consumers, you can now attribute the cost based on compute capacity utilization. The compute capacity in Redshift Serverless is measured in Redshift Processing Units (RPUs) and metered for the workloads that you run in RPU-seconds on a per-second basis. A Redshift administrator can use the SYS_SERVERLESS_USAGE view on individual consumer workgroups to view the details of Redshift Serverless usage of resources and related cost.

For example, to get the total charges for RPU hours used for a time interval, run the following query on the Sales and Marketing business units’ respective consumer workgroups:

select
    trunc(start_time) "Day",
    (sum(charged_seconds) / 3600 :: double precision) * < Price for 1 RPU > as cost_incurred
from
    sys_serverless_usage
group by 1
order by 1;

Clean up

When you’re done, remove any resources that you no longer need to avoid ongoing charges:

  1. Delete the Redshift provisioned cluster.
  2. Delete Redshift serverless workgroups and namespaces.

Conclusion

In this post, we showed you how you can isolate business units’ specific workloads to multiple consumer warehouses writing the data to the same producer database. This solution has the following benefits:

  • Straightforward cost attribution and chargeback to business
  • Ability to use provisioned clusters and serverless workgroups of different sizes to write to the same databases
  • Ability to write across accounts and Regions
  • Data is live and available to all warehouses as soon as it’s committed
  • Writes work even if the producer warehouse (the warehouse that owns the database) is paused

You can engage an Amazon Redshift specialist to answer questions, and discuss how we can further help your organization.


About the authors

Raks KhareRaks Khare is a Senior Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers across varying industries and regions architect data analytics solutions at scale on the AWS platform. Outside of work, he likes exploring new travel and food destinations and spending quality time with his family.

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Saurav Das is part of the Amazon Redshift Product Management team. He has more than 16 years of experience in working with relational databases technologies and data protection. He has a deep interest in solving customer challenges centered around high availability and disaster recovery.

Unlocking near real-time analytics with petabytes of transaction data using Amazon Aurora Zero-ETL integration with Amazon Redshift and dbt Cloud

Post Syndicated from BP Yau original https://aws.amazon.com/blogs/big-data/unlocking-near-real-time-analytics-with-petabytes-of-transaction-data-using-amazon-aurora-zero-etl-integration-with-amazon-redshift-and-dbt-cloud/

While customers can perform some basic analysis within their operational or transactional databases, many still need to build custom data pipelines that use batch or streaming jobs to extract, transform, and load (ETL) data into their data warehouse for more comprehensive analysis.

Zero-ETL integration with Amazon Redshift reduces the need for custom pipelines, preserves resources for your transactional systems, and gives you access to powerful analytics. Within seconds of transactional data being written into Amazon Aurora (a fully managed modern relational database service offering performance and high availability at scale), the data is seamlessly made available in Amazon Redshift for analytics and machine learning. The data in Amazon Redshift is transactionally consistent and updates are automatically and continuously propagated.

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL, business intelligence (BI), and reporting tools. Together with price-performance, Amazon Redshift offers capabilities such as serverless architecture, machine learning integration within your data warehouse and secure data sharing across the organization.

dbt helps manage data transformation by enabling teams to deploy analytics code following software engineering best practices such as modularity, continuous integration and continuous deployment (CI/CD), and embedded documentation.

dbt Cloud is a hosted service that helps data teams productionize dbt deployments. dbt Cloud offers turnkey support for job scheduling, CI/CD integrations; serving documentation, native git integrations, monitoring and alerting, and an integrated developer environment (IDE) all within a web-based UI.

In this post, we explore how to use Aurora MySQL-Compatible Edition Zero-ETL integration with Amazon Redshift and dbt Cloud to enable near real-time analytics. By using dbt Cloud for data transformation, data teams can focus on writing business rules to drive insights from their transaction data to respond effectively to critical, time sensitive events. This enables the line of business (LOB) to better understand their core business drivers so they can maximize sales, reduce costs, and further grow and optimize their business.

Solution overview

Let’s consider TICKIT, a fictional website where users buy and sell tickets online for sporting events, shows, and concerts. The transactional data from this website is loaded into an Aurora MySQL 3.05.0 (or a later version) database. The company’s business analysts want to generate metrics to identify ticket movement over time, success rates for sellers, and the best-selling events, venues, and seasons. Analysts can use this information to provide incentives to buyers and sellers who frequently use the site, to attract new users, and to drive advertising and promotions.

The Zero-ETL integration between Aurora MySQL and Amazon Redshift is set up by using a CloudFormation template to replicate raw ticket sales information to a Redshift data warehouse. After the data is in Amazon Redshift, dbt models are used to transform the raw data into key metrics such as ticket trends, seller performance, and event popularity. These insights help analysts make data-driven decisions to improve promotions and user engagement.

The following diagram illustrates the solution architecture at a high-level.

To implement this solution, complete the following steps:

  1. Set up Zero-ETL integration from the AWS Management Console for Amazon Relational Database Service (Amazon RDS).
  2. Create dbt models in dbt Cloud.
  3. Deploy dbt models to Amazon Redshift.

Prerequisites

Set up resources with CloudFormation

This post provides a CloudFormation template as a general guide. You can review and customize it to suit your needs. Some of the resources that this stack deploys incur costs when in use.

The CloudFormation template provisions the following components

  • An Aurora MySQL provisioned cluster (source)
  • An Amazon Redshift Serverless data warehouse (target)
  • Zero-ETL integration between the source (Aurora MySQL) and target (Amazon Redshift Serverless)

To create your resources:

  1. Sign in to the console.
  2. Choose the us-east-1 AWS Region in which to create the stack.
  3. Choose Launch Stack

       Launch Cloudformation Stack

  1. Choose Next.

This automatically launches CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template from within the console.

  1. For Stack name, enter a stack name.
  2. Keep the default values for the rest of the Parameters and choose Next.
  3. On the next screen, choose Next.
  4. Review the details on the final screen and select I acknowledge that AWS CloudFormation might create IAM resources.
  5. Choose Submit.

Stack creation can take up to 30 minutes.

  1. After the stack creation is complete go to the Outputs tab of the stack and record the values of the keys for the following components, which you will use in a later step:
  • NamespaceName
  • PortNumber
  • RDSPassword
  • RDSUsername
  • RedshiftClusterSecurityGroupName
  • RedshiftPassword
  • RedshiftUsername
  • VPC
  • Workinggroupname
  • ZeroETLServicesRoleNameArn

  1. Configure your Amazon Redshift data warehouse security group settings to allow inbound traffic from dbt IP addresses.
  2. You’re now ready to sign in to both Aurora MySQL cluster and Amazon Redshift Serverless data warehouse and run some basic commands to test them.

Create a database from integration in Amazon Redshift

To create a target database using Redshift query editor V2:

  1. On the Amazon Redshift Serverless console, choose the zero-etl-destination workgroup.
  2. Choose Query data to open Query Editor v2.
  3. Connect to an Amazon Redshift Serverless data warehouse using the username and password from the CloudFormation resource creation step.
  4. Get the integration_id from the svv_integration system table.
select integration_id from svv_integration; ---- copy this result, use in the next sql
  1. Use the integration_id from the preceding step to create a new database from the integration.
CREATE DATABASE aurora_zeroetl_integration FROM INTEGRATION '<result from above>';

The integration between Aurora MYSQL and the Amazon Redshift Serverless data warehouse is now complete.

Populate source data in Aurora MySQL

You’re now ready to populate source data in Amazon Aurora MYSQL.

You can use your favorite query editor installed on either an Amazon Elastic Compute Cloud (Amazon EC2) instance or your local system to interact with Aurora MYSQL. However, you need to provide access to Aurora MYSQL from the machine where the query editor is installed. To achieve this, modify the security group inbound rules to allow the IP address of your machine and make Aurora publicly accessible.

To populate source data:

  1. Run the following script on Query Editor to create the sample database DEMO_DB and tables inside DEMO_DB.
create database demodb;

create table demodb.users(
userid integer not null primary key,
username char(8),
firstname varchar(30),
lastname varchar(30),
city varchar(30),
state char(2),
email varchar(100),
phone char(14),
likesports boolean,
liketheatre boolean,
likeconcerts boolean,
likejazz boolean,
likeclassical boolean,
likeopera boolean,
likerock boolean,
likevegas boolean,
likebroadway boolean,
likemusicals boolean);

create table demodb.venue(
venueid integer not null primary key,
venuename varchar(100),
venuecity varchar(30),
venuestate char(2),
venueseats integer);

create table demodb.category(
catid integer not null primary key,
catgroup varchar(10),
catname varchar(10),
catdesc varchar(50));

create table demodb.date (
dateid integer not null primary key,
caldate date not null,
day character(3) not null,
week smallint not null,
month character(5) not null,
qtr character(5) not null,
year smallint not null,
holiday boolean default FALSE );

create table demodb.event(
eventid integer not null primary key,
venueid integer not null,
catid integer not null,
dateid integer not null,
eventname varchar(200),
starttime timestamp);

create table demodb.listing(
listid integer not null primary key,
sellerid integer not null,
eventid integer not null,
dateid integer not null,
numtickets smallint not null,
priceperticket decimal(8,2),
totalprice decimal(8,2),
listtime timestamp);

create table demodb.sales(
salesid integer not null primary key,
listid integer not null,
sellerid integer not null,
buyerid integer not null,
eventid integer not null,
dateid integer not null,
qtysold smallint not null,
pricepaid decimal(8,2),
commission decimal(8,2),
saletime timestamp);
  1. Load data from Amazon Simple Storage Service (Amazon S3) to the corresponding table using the following commands:
LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/users/' 
INTO TABLE demodb.users FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/venue/' 
INTO TABLE demodb.venue FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/category/' 
INTO TABLE demodb.category FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/date/' 
INTO TABLE demodb.date FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/event/' 
INTO TABLE demodb.event FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/listing/' 
INTO TABLE demodb.listing FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/sales/' 
INTO TABLE demodb.sales FIELDS TERMINATED BY '|';

The following are common errors associated with load from Amazon S3:

  • For the current version of the Aurora MySQL cluster, set the aws_default_s3_role parameter in the database cluster parameter group to the role Amazon Resource Name (ARN) that has the necessary Amazon S3 access permissions.
  • If you get an error for missing credentials, such as the following, you probably haven’t associated your IAM role to the cluster. In this case, add the intended IAM role to the source Aurora MySQL cluster.

Error 63985 (HY000): S3 API returned error: Missing Credentials: Cannot instantiate S3 Client),

Validate the source data in your Amazon Redshift data warehouse

To validate the source data

  1. Navigate to the Redshift Serverless dashboard, open Query Editor v2, and select the workgroup and database created from integration from the drop-down list. Expand the database aurora_zeroetl, schema demodb and you should see 7 tables being created.
  2. Wait a few seconds and run the following SQL query to see integration in action.
select * from aurora_zeroetl_integration.demodb.category;

Transforming data with dbtCloud

Connect dbt Cloud to Amazon Redshift

  1. Create a new project in dbt Cloud. From Account settings (using the gear menu in the top right corner), choose + New Project.
  2. Enter a project name and choose Continue.

  1. For Connection, select Add new connection from the drop-down list.
  2. Select Redshift and enter the following information:
    1. Connection name: The Name of the connection.
    2. Server Hostname: Your Amazon Redshift Serverless endpoint.
    3. Port: Redshift 5439.
    4. Database name: dev.
  3. Make sure you allowlist your dbt Cloud IP address in your Redshift data warehouse security group inbound traffic.
  4. Choose Save to set up your connection.

  1. Set your development credentials. These credentials will be used by dbt Cloud to connect to your Amazon Redshift data warehouse. See the CloudFormation template output for the credentials.
  2. Schemadbt_zetl. dbt Cloud automatically generates a schema name for you. By convention, this is dbt_<first-initial><last-name>. This is the schema connected directly to your development environment, and it’s where your models will be built when running dbt within the Cloud integrated development environment (IDE).

  1. Choose Test Connection. This verifies that dbt Cloud can access your Redshift data warehouse.
  2. Choose Next if the test succeeded. If it failed, check your Amazon Redshift settings and credentials.

Set up a dbt Cloud managed repository

When you develop in dbt Cloud, you can use git to version control your code. For the purposes of this post, use a dbt Cloud-hosted managed repository.

To set up a managed repository:

  1. Under Setup a repository, select Managed.
  2. Enter a name for your repo, such as dbt-zeroetl.
  3. Choose Create. It will take a few seconds for your repository to be created and imported.

Initialize your dbt project and start developing

Now that you have a repository configured, initialize your project and start developing in dbt Cloud.

To start development in dbt Cloud:

  1. In dbt Cloud, choose Start developing in the IDE. It might take a few minutes for your project to spin up for the first time as it establishes your git connection, clones your repo, and tests the connection to the warehouse.

  1. Above the file tree to the left, choose Initialize dbt project. This builds out your folder structure with example models.

  1. Make your initial commit by choosing Commit and sync. Use the commit message initial commit and choose Commit Changes. This creates the first commit to your managed repo and allows you to open a branch where you can add new dbt code.

To build your models

  1. Under Version Control on the left, choose Create branch. Enter a name, such as add-redshift-models. You need to create a new branch because the main branch is set to read-only mode.
  2. Choose dbt_project.yml.
  3. Update the models section of dbt_project.yml at the bottom of the file. Change example to staging and make sure the materialized value is set to table.

models:

my_new_project:

# Applies to all files under models/example/

staging:

materialized: table

  1. Choose the three-dot icon () next to the models directory, then select Create Folder.
  2. Name the folder staging, then choose Create.
  3. Choose the three-dot icon () next to the models directory, then select Create Folder.
  4. Name the folder dept_finance, then choose Create.
  5. Choose the three-dot icon () next to the staging directory, then select Create File.

  1. Name the file sources.yml, then choose Create.
  2. Copy the following query into the file and choose Save.
version: 2
sources:
- name: ops
database: aurora_zeroetl_integration
schema: demodb
tables:
- name: category
- name: date
- name: event
- name: listing
- name: users
- name: venue
- name: sales

Be aware that the operation database created on your Amazon Redshift data warehouse is a special read only database and you cannot directly connect to it to create objects. You need to connect to another regular database and use three-part notation as defined in sources.yml to query data from it.

  1. Choose the three-dot icon () directory, then select Create File.
  2. Name the file staging_event.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
with source as (
select * from {{ source('ops', 'event') }}
)
SELECT
eventid::integer AS eventid,
venueid::smallint AS venueid,
catid::smallint AS catid,
dateid::smallint AS dateid,
eventname::varchar(200) AS eventname,
starttime::timestamp AS starttime,
current_timestamp as etl_load_timestamp
from source
  1. Choose the three-dot icon ()  next to the staging directory, then select Create File.
  2. Name the file staging_sales.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
with store_source as (
select * from {{ source('ops', 'sales') }}
)
SELECT
salesid::integer AS salesid,
'store' as salestype,
listid::integer AS listid,
sellerid::integer AS sellerid,
buyerid::integer AS buyerid,
eventid::integer AS eventid,
dateid::smallint AS dateid,
qtysold::smallint AS qtysold,
pricepaid::decimal(8,2) AS pricepaid,
commission::decimal(8,2) AS commission,
saletime::timestamp AS saletime,
current_timestamp as etl_load_timestamp
from store_source
  1. Choose the three-dot icon ()  next to the dept_finance directory, then select Create File.
  2. Name the file rpt_finance_qtr_total_sales_by_event.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
select
date_part('year', a.saletime) as year,
date_part('quarter', a.saletime) as quarter,
b.eventname,
count(a.salesid) as sales_made,
sum(a.pricepaid) as sales_revenue,
sum(a.commission) as staff_commission,
staff_commission / sales_revenue as commission_pcnt
from {{ref('staging_sales')}} a
left join {{ref('staging_event')}} b on a.eventid = b.eventid
group by
year,
quarter,
b.eventname
order by
year,
quarter,
b.eventname
  1. Choose the three-dot icon () next to the dept_finance directory, then select Create File.
  2. Name the file rpt_finance_qtr_top_event_by_sales.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
select *
from
(
select
*,
rank() over (partition by year, quarter order by sales_revenue desc) as row_num
from {{ref('rpt_finance_qtr_total_sales_by_event')}}
)
where row_num <= 3
  1. Choose the three-dot icon () next to the example directory, then select Delete.
  2. Enter dbt run in the command prompt at the bottom of the screen and press Enter.

  1. You should get a successful run and see the four models.

  1. Now that you have successfully run the dbt model, you should be able to find it in the Amazon Redshift data warehouse. Go to Redshift Query Editor v2, refresh the dev database, and verify that you have a new dbt_zetl schema with the staging_event and staging_sales tables and rpt_finance_qtr_top_event_by_sales and rpt_finance_qtr_total_sales_by_event views in it.

  1. Run the following SQL statement to verify that data has been loaded into your Amazon Redshift table.
    SELECT * FROM dbt_zetl.rpt_finance_qtr_total_sales_by_event;
    SELECT * FROM dbt_zetl.rpt_finance_qtr_top_event_by_sales;

Add tests to your models

Adding tests to a project helps validate that your models are working correctly.

To add tests to your project:

  1. Create a new YAML file in the models directory and name it models/schema.yml.
  2. Add the following contents to the file:
version: 2
models:
- name: rpt_finance_qtr_top_events_by_sales
columns:
- name: year
tests:
- not_null
- name: rpt_finance_qtr_total_sales_by_event
columns:
- name: year
tests:
- not_null
- name: staging_event
columns:
- name: eventid
tests:
- not_null
- name: staging_sales
columns:
- name: salesid
tests:
- not_null
  1. Run dbt test, and confirm that all your tests passed.
  2. When you run dbt test, dbt iterates through your YAML files and constructs a query for each test. Each query will return the number of records that fail the test. If this number is 0, then the test is successful.

Document your models

By adding documentation to your project, you can describe your models in detail and share that information with your team.

To add documentation:

  1. Run dbt docs generate to generate the documentation for your project. dbt inspects your project and your warehouse to generate a JSON file documenting your project.

  1. Choose the book icon in the Develop interface to launch documentation in a new tab.

Commit your changes

Now that you’ve built your models, you need to commit the changes you made to the project so that the repository has your latest code.

To commit the changes:

  1. Under Version Control on the left, choose Commit and sync and add a message. For example, Add Aurora zero-ETL integration with Redshift models.

  1. Choose Merge this branch to main to add these changes to the main branch on your repo.

Deploy dbt

Use dbt Cloud’s Scheduler to deploy your production jobs confidently and build observability into your processes. You’ll learn to create a deployment environment and run a job in the following steps.

To create a deployment environment:

  1. In the left pane, select Deploy, then choose Environments.

  1. Choose Create Environment.
  2. In the Name field, enter the name of your deployment environment. For example, Production.
  3. In the dbt Version field, select Versionless from the dropdown.
  4. In the Connection field, select the connection used earlier in development.
  5. Under Deployment Credentials, enter the credentials used to connect to your Redshift data warehouse. Choose Test Connection.

  1. Choose Save.

Create and run a job

Jobs are a set of dbt commands that you want to run on a schedule.

To create and run a job:

  1. After creating your deployment environment, you should be directed to the page for a new environment. If not, select Deploy in the left pane, then choose Jobs.
  2. Choose Create job and select Deploy job.
  3. Enter a Job name, such as,  Production run, and link to the environment you just created.
  4. Under Execution Settings, select Generate docs on run.
  5. Under Commands, add this command as part of your job if you don’t see them:
    • dbt build
  6. For this exercise, don’t set a schedule for your project to run—while your organization’s project should run regularly, there’s no need to run this example project on a schedule. Scheduling a job is sometimes referred to as deploying a project.

  1. Choose Save, then choose Run now to run your job.
  2. Choose the run and watch its progress under Run history.
  3. After the run is complete, choose View Documentation to see the docs for your project.

Clean up

When you’re finished, delete the CloudFormation stack since some of the AWS resources in this walkthrough incur a cost if you continue to use them. Complete the following steps:

  1. On the CloudFormation console, choose Stacks.
  2. Choose the stack you launched in this walkthrough. The stack must be currently running.
  3. In the stack details pane, choose Delete.
  4. Choose Delete stack.

Summary

In this post, we showed you how to set up Amazon Aurora MySQL Zero-ETL integration from Aurora MySQL to Amazon Redshift, which eliminates complex data pipelines and enables near real-time analytics on transactional and operational data. We also showed you how to build dbt models on Aurora MySQL Zero-ETL integration tables in Amazon Redshift to transform the data to get insight.

We look forward to hearing from you about your experience. If you have questions or suggestions, leave a comment.


About the authors

BP Yau is a Sr Partner Solutions Architect at AWS. His role is to help customers architect big data solutions to process data at scale. Before AWS, he helped Amazon.com Supply Chain Optimization Technologies migrate its Oracle data warehouse to Amazon Redshift and build its next generation big data analytics platform using AWS technologies.

Saman Irfan is a Senior Specialist Solutions Architect at Amazon Web Services, based in Berlin, Germany. She collaborates with customers across industries to design and implement scalable, high-performance analytics solutions using cloud technologies. Saman is passionate about helping organizations modernize their data architectures and unlock the full potential of their data to drive innovation and business transformation. Outside of work, she enjoys spending time with her family, watching TV series, and staying updated with the latest advancements in technology.

Raghu Kuppala is an Analytics Specialist Solutions Architect experienced working in the databases, data warehousing, and analytics space. Outside of work, he enjoys trying different cuisines and spending time with his family and friends.

Neela Kulkarni is a Solutions Architect with Amazon Web Services. She primarily serves independent software vendors in the Northeast US, providing architectural guidance and best practice recommendations for new and existing workloads. Outside of work, she enjoys traveling, swimming, and spending time with her family.

Run Apache XTable in AWS Lambda for background conversion of open table formats

Post Syndicated from Matthias Rudolph original https://aws.amazon.com/blogs/big-data/run-apache-xtable-in-aws-lambda-for-background-conversion-of-open-table-formats/

This post was co-written with Dipankar Mazumdar, Staff Data Engineering Advocate with AWS Partner OneHouse.

Data architecture has evolved significantly to handle growing data volumes and diverse workloads. Initially, data warehouses were the go-to solution for structured data and analytical workloads but were limited by proprietary storage formats and their inability to handle unstructured data. This led to the rise of data lakes based on columnar formats like Apache Parquet, which came with different challenges like the lack of ACID capabilities.

Eventually, transactional data lakes emerged to add transactional consistency and performance of a data warehouse to the data lake. Central to a transactional data lake are open table formats (OTFs) such as Apache Hudi, Apache Iceberg, and Delta Lake, which act as a metadata layer over columnar formats. These formats provide essential features like schema evolution, partitioning, ACID transactions, and time-travel capabilities, that address traditional problems in data lakes.

In practice, OTFs are used in a broad range of analytical workloads, from business intelligence to machine learning. Moreover, they can be combined to benefit from individual strengths. For instance, a streaming data pipeline can write tables using Hudi because of its strength in low-latency, write-heavy workloads. In later pipeline stages, data is converted to Iceberg, to benefit from its read performance. Traditionally, this conversion required time-consuming rewrites of data files, resulting in data duplication, higher storage, and increased compute costs. In response, the industry is shifting toward interoperability between OTFs, with tools that allow conversions without data duplication. Apache XTable (incubating), an emerging open source project, facilitates seamless conversions between OTFs, eliminating many of the challenges associated with table format conversion.

In this post, we explore how Apache XTable, combined with the AWS Glue Data Catalog, enables background conversions between OTFs residing on Amazon Simple Storage Service (Amazon S3) based data lakes, with minimal to no changes to existing pipelines in a scalable and cost-effective way, as shown in the following diagram.

This post is one of multiple posts about XTable on AWS. For more examples and references to other posts, refer to the following GitHub repository.

Apache XTable

Apache XTable (incubating) is an open source project designed to enable interoperability among various data lake table formats, allowing omnidirectional conversions between formats without the need to copy or rewrite data. Originally open sourced in November 2023 under the name OneTable, with contributions from amongst others OneHouse, it was licensed under Apache 2.0. In March 2024, the project was donated to the Apache Software Foundation (ASF) and rebranded as Apache XTable, where it is now incubating. XTable isn’t a new table format but provides abstractions and tools to translate the metadata associated with existing formats. The primary objective of XTable is to allow users to start with any table format and have the flexibility to switch to another as needed.

Inner workings and features

At a fundamental level, Hudi, Iceberg, and Delta Lake share similarities in their structure. When data is written to a distributed file system, these formats consist of a data layer, typically Parquet files, and a metadata layer that provides the necessary abstraction (see the following diagram). XTable uses these commonalities to enable interoperability between formats.

The synchronization process in XTable works by translating table metadata using the existing APIs of these table formats. It reads the current metadata from the source table and generates the corresponding metadata for one or more target formats. This metadata is then stored in a designated directory within the base path of your table, such as _delta_log for Delta Lake, metadata for Iceberg, and .hoodie for Hudi. This allows the existing data to be interpreted as if it were originally written in any of these formats.

XTable provides two metadata translation methods: Full Sync, which translates all commits, and Incremental Sync, which only translates new, unsynced commits for greater efficiency with large tables. If issues arise with Incremental Sync, XTable automatically falls back to Full Sync to provide uninterrupted translation.

Community and future

In terms of future plans, XTable is focused on achieving feature parity with OTFs’ built-in features, including adding critical capabilities like support for Merge-on-Read (MoR) tables. The project also plans to facilitate synchronization of table formats across multiple catalogs, such as AWS Glue, Hive, and Unity catalog.

Run XTable as a continuous background conversion mechanism

In this post, we describe a background conversion mechanism for OTFs that doesn’t require changes to data pipelines. The mechanism periodically scans a data catalog like the AWS Glue Data Catalog for tables to convert with XTable.

On a data platform, a data catalog stores table metadata and typically contains the data model and physical storage location of the datasets. It serves as the central integration with analytical services. To maximize ease of use, compatibility, and scalability on AWS, the conversion mechanism described in this post is built around the AWS Glue Data Catalog.

The following diagram illustrates the solution at a glance. We design this conversion mechanism based on Lambda, AWS Glue, and XTable.

In order for the Lambda function to be able to detect the tables inside the Data Catalog, the following information needs to be associated with a table: source format and target formats. For each detected table, the Lambda function invokes the XTable application, which is packaged into the functions environment. Then XTable translates between source and target formats and writes the new metadata on the same data store.

Solution overview

We implement the solution with the AWS Cloud Development Kit (AWS CDK), an open source software development framework for defining cloud infrastructure in code, and provide it on GitHub. The AWS CDK solution deploys the following components:

  • A converter Lambda function that contains the XTable application and starts the conversion job for the detected tables
  • A detector Lambda function that scans the Data Catalog for tables that are to be converted and invokes the converter Lambda function
  • An Amazon EventBridge schedule that invokes the detector Lambda function on an hourly basis

Currently, the XTable application needs to be built from source. We therefore provide a Dockerfile that implements the required build steps and use the resulting Docker image as the Lambda function runtime environment.

In case you don’t have sample data available for testing, we provide scripts for generating sample datasets on GitHub. Data and metadata are shown in blue in the following detail diagram.

Converter Lambda function: Run XTable

The converter Lambda function invokes the XTable JAR, wrapped with the third-party library jpype, and converts the metadata layer of the respective data lake tables.

The function is defined in the AWS CDK through the DockerImageFunction, which uses a Dockerfile and builds a Docker container as part of the deploy step. With this mechanism, we can bundle the XTable application inside our Lambda function.

First, we download the XTtable GitHub repository and build the jar with the maven CLI. This is done as a part of the Docker container build process:

# Dockerfile # clone sources
RUN git clone --depth 1 --branch <xtable_branch> https://github.com/apache/incubator-xtable.git

# build xtable jar
WORKDIR /incubator-xtable
RUN /apache-maven-<maven_version>/bin/mvn package -DskipTests=true
WORKDIR /

To automatically build and upload the Docker image, we create a DockerImageFunction in the AWS CDK and reference the Dockerfile in its definition. To successfully run Spark and therefore XTable in a Lambda function, we need to set the LOCAL_IP variable of Spark to localhost and therefore to 127.0.0.1:

# cdk_stack.py
detector = _lambda.DockerImageFunction(
    scope=self,
    id="Converter",
    # Dockerfile in ./src directory
    code=_lambda.DockerImageCode.from_image_asset(
        directory="src", cmd=["detector.handler"]
    )
    environment={"SPARK_LOCAL_IP": "127.0.0.1"}
    ...
)

To call the XTtable JAR, we use a third-party Python library called jpype, which handles the communication with the Java virtual machine. In our Python code, the XTtable call is as follows:

# call java class with configuration files
run_sync = jpype.JPackage("org").apache.xtable.utilities.RunSync.main
run_sync(
    [
        "--datasetConfig",
        "<path_to_dataset_config>",
        "--icebergCatalogConfig",
        "<path_to_catalog_config>",
    ]
)

For more information on XTable application parameters, see Creating your first interoperable table.

Detector Lambda function: Identify tables to convert in the Data Catalog

The detector Lambda function scans the tables in the Data Catalog. For a table that will be converted, it invokes the converter Lambda function through an event. This decouples the scanning and conversion parts and makes our solution more resilient to potential failures.

The detection mechanism searches in the table parameters for the parameters xtable_table_type and xtable_target_formats. If they exist, the conversion is invoked. See the following code:

# detector.py
# create paginator to loop through AWS Glue tables
tables = glue_client.get_paginator("get_tables").paginate(
    DatabaseName=database["Name"]
)
for table_list in tables:
    table_list = table_list["TableList"]
…
# loop through all tables and check for required custom glue parameters
for table in table_list:
    required_parameters={"xtable_table_type", "xtable_target_formats"}
    # if required table parameters exist pass on table for conversion
    if required_parameters <= table["Parameters"].keys():
        yield table

EventBridge Scheduler rule

In the AWS CDK, you define an EventBridge Scheduler rule as follows. Based on the rule, EventBridge will then call the Lambda detector function every hour:

# cdk_stack.py
event = events.Rule(
    scope=self,
    id="DetectorSchedule",
    schedule=events.Schedule.rate(Duration.hours(1)),
)
event.add_target(targets.LambdaFunction(detector))

Prerequisites

Let’s dive deeper into how to deploy the provided AWS CDK stack. You need one of the following container runtimes:

  • Finch (an open source client for container development)
  • Docker

You also need the AWS CDK configured. For more details, see Getting started with the AWS CDK.

Build and deploy the solution

Complete the following steps:

  1. To deploy the stack, clone the GitHub repo, change into the folder for this post (xtable_lambda), and deploy the AWS CDK stack:
    git clone https://github.com/aws-samples/apache-xtable-on-aws-samples.git
    cd xtable_lambda
    cdk deploy

This deploys the described Lambda functions and the EventBridge Scheduler rule.

  1. When using Finch, you need to set the CDK_DOCKER environment variable before deployment:
    export CDK_DOCKER=finch

After successful deployment, the conversion mechanism starts to run every hour.

  1. The following parameters need to exist on the AWS Glue table that will be converted:
    1. "xtable_table_type": "<source_format>"
    2. "xtable_target_formats": "<target_format>, <target_format>"

On the AWS Glue console, the parameters look like the following screenshot and can be set under Table properties when editing an AWS Glue table.

  1. Optionally, if you don’t have sample data, the following scripts can help you set up a test environment either with your local machine or in an AWS Glue for Spark job:
    # local: create hudi dataset on S3
    cd scripts
    pip install -r requirements.txt
    python ./create_hudi_s3.py

Convert a streaming table (Hudi to Iceberg)

Let’s assume we have a Hudi table on Amazon S3, which is registered in the Data Catalog, and want to periodically translate it to Iceberg format. Data is streaming in continuously. We have deployed the provided AWS CDK stack and set the required AWS Glue table properties to translate the dataset to the Iceberg format. In the following steps, we run the background job, see the results in AWS Glue and Amazon S3, and query it with Amazon Athena, a serverless and interactive analytics service that provides a simplified and flexible way to analyze petabytes of data.

In Amazon S3 and AWS Glue, we can see our Hudi dataset and table along with the metadata folder .hoodie. On the AWS Glue console, we set the following table properties:

  • "xtable_target_type": "HUDI"
  • "xtable_table_formats": "ICEBERG"

Our Lambda function is invoked periodically every hour. After the run, we can find the Iceberg-specific metadata folder in our S3 bucket, which was generated by XTable.

If we look at the Data Catalog, we can see the new table <table_name>_converted was registered as an Iceberg table.

img-registered-table-after-conversion

With the Iceberg format, we can now take advantage of the time travel feature by querying the dataset with a downstream analytical service like Athena. In the following screenshot, you can see at Name: that the table is in Iceberg format.

Querying all snapshots, we can see that we created three snapshots with overwrites after the initial one.

We then take the current time and query the dataset representation of 180 minutes ago, resulting in the data from the first snapshot committed.

Summary

In this post, we demonstrated how to build a background conversion job for OTFs, using XTable and the Data Catalog, which is independent from data pipelines and transformation jobs. Through Xtable, it allows for efficient translation between OTFs, because data files are reused and only the metadata layer is processed. The integration with the Data Catalog provides wide compatability with AWS analytical services.

You can reuse the Lambda based XTable deployment in other solutions. For instance, you could use it in a reactive mechanism for near real-time conversion of OTFs, which is invoked by Amazon S3 object events resulting from changes to OTF metadata.

For further information about XTable, see the project’s official website. For more examples and references to other posts on using XTable on AWS, refer to the following GitHub repository.


About the authors

Matthias Rudolph is a Solutions Architect at AWS, digitalizing the German manufacturing industry, focusing on analytics and big data. Before that he was a lead developer at the German manufacturer KraussMaffei Technologies, responsible for the development of data platforms.

Dipankar Mazumdar is a Staff Data Engineer Advocate at Onehouse.ai, focusing on open-source projects like Apache Hudi and XTable to help engineering teams build and scale robust analytics platforms, with prior contributions to critical projects such as Apache Iceberg and Apache Arrow.

Stephen Said is a Senior Solutions Architect and works with Retail/CPG customers. His areas of interest are data platforms and cloud-native software engineering.