Tag Archives: Technical How-to

Connect your on-premises Kubernetes cluster to AWS APIs using IAM Roles Anywhere

Post Syndicated from Varun Sharma original https://aws.amazon.com/blogs/security/connect-your-on-premises-kubernetes-cluster-to-aws-apis-using-iam-roles-anywhere/

Many customers want to seamlessly integrate their on-premises Kubernetes workloads with AWS services, implement hybrid workloads, or migrate to AWS. Previously, a common approach involved creating long-term access keys, which posed security risks and is no longer recommended. While solutions such as Kubernetes secrets vault and third-party options exist, they fail to address the underlying issue effectively.

One option to connect your on-premises Kubernetes workloads to AWS APIs is to use the service account issuer discovery feature. This allows the Kubernetes API server to act as an OpenID Connect (OIDC) identity provider and be federated with AWS Identity and Access Management (IAM). However, this approach requires public internet access to the Kubernetes API server, which might not be desirable for some customers.

To help eliminate the need for long-term access keys or exposing the Kubernetes API server to the public internet, AWS has introduced AWS IAM Roles Anywhere. This feature enables secure, seamless integration of on-premises Kubernetes workloads with AWS services, promoting robust security practices and minimizing potential risks associated with long-term credentials or public exposure.

IAM Roles Anywhere enables workloads outside of AWS to access AWS resources by exchanging X.509 bound identities for temporary AWS credentials. With IAM Roles Anywhere, you can use the same IAM roles and policies as your AWS workloads to access AWS resources, promoting consistency.

IAM Roles Anywhere can be combined with a standard public key infrastructure solution. In this blog post, we use AWS Private Certificate Authority, which has several advantages over using a self-signed certificate authority (CA). First, it reduces operational and management overhead, because AWS manages the CA for you. Second, the cryptographic key material can be stored in hardware security modules or at least vaulted, which helps you protect your private CA against key compromises. Additionally, certificates can be short-lived, which aligns with dynamic Kubernetes environments where pod lifetimes are typically shorter than traditional servers.

We also demonstrate how to integrate IAM Roles Anywhere without modifying your existing workload Docker files, and how to automate the X.509 certificate lifecycle with cert-manager and an AWS Private CA backend in short-lived certificate mode. By using these capabilities, you can seamlessly integrate your on-premises Kubernetes workloads with AWS services, promoting robust security practices, minimizing risks associated with long-term credentials, and helping to ensure a streamlined, consistent access management experience.

This post is for customers who run their own Kubernetes cluster outside of AWS without using Amazon EKS Anywhere. If you’re using Amazon Elastic Kubernetes Service (Amazon EKS), use IAM roles for service accounts or Amazon EKS Pod Identity instead.

Background

“Why should I prefer X.509 certificates over IAM access keys?” Access keys are long-term credentials that must be rotated regularly to minimize the risk of unauthorized access. They need to be securely deployed onto servers hosting applications that use them, requiring procedures for secure transfer and deletion of transient copies. As the number of applications and access keys grows, tracking and managing them becomes operationally challenging.

In contrast, X.509 certificates use public key infrastructure (PKI). The private key is generated directly on the application server and doesn’t leave it. Only a certificate signing request, which doesn’t contain secrets, is sent to the CA for signing and returning the certificate. This alleviates the need for securely transmitting secret keys.

However, you can argue that X.509 certificates are also long-lived credentials. This concern is valid, but not necessarily true. As demonstrated by projects such as Let’s Encrypt, it’s possible to reduce certificate lifetimes from years to months by implementing automation for certificate renewal. After such a mechanism is in place, certificate lifetimes can be further limited to days or even hours.

In this post, we introduce mutually authenticated Transport Layer Security (mTLS), which uses certificates for high-assurance bidirectional authentication. Certificates are used to establish trust between the client and server, making sure that both parties are authenticated and authorized to communicate securely. By implementing mTLS, you can achieve a higher level of security and trust in your communication channels, mitigating potential risks associated with unauthorized access or man-in-the-middle attacks. Here, we implement ephemeral certificates that are tied to the lifecycle of pods. When a pod is started, a certificate is automatically created, and it expires after a short period of time unless it’s actively in use by the pod, in which case it’s automatically renewed by the cert-manager. This approach verifies that certificates are only valid for the duration of the pod’s lifetime, minimizing the potential risk associated with long-lived credentials. Additionally, IAM Roles Anywhere supports certificate revocation list (CRL) checks, allowing you to perform explicit revocation of certificates if required. This feature provides an additional layer of security, enabling you to revoke access promptly in case of compromised credentials or other security concerns.

Throughout this post, we assume that you have a basic understanding of IAM Roles Anywhere. For more information you can see this blog post. Furthermore, we assume that you are familiar with Kubernetes, kubectl, Helm, and cert-manager.

Solution overview

This solution assumes that you have an existing Kubernetes cluster running outside of AWS.

Figure 1 shows the high-level architecture of our solution. An on-premises Kubernetes cluster accessing AWS APIs using IAM Roles Anywhere with X.509 certificates issued by AWS Private CA in short-lived-certificate mode.

Figure 1: High level architecture of on-premises Kubernetes accessing AWS APIs

Figure 1: High level architecture of on-premises Kubernetes accessing AWS APIs

Here’s how the solution works, as shown in Figure 1:

  1. An AWS Private CA in short-lived certificate mode issues X.509 certificates for your pods.
  2. When you set up your AWS Private CA as a trusted source and establish a specific profile, IAM Roles Anywhere will validate and accept authentication requests that use certificates issued by your AWS Private CA.
  3. cert-manager, deployed into your Kubernetes cluster, orchestrates the issuance of AWS Private CA certificates to authorized pods.
  4. Each pod uses IAM Roles Anywhere to create an AWS session using its private key and X.509 certificate obtained from cert-manager.

Let’s explore the different parts of the architecture in more detail.

AWS Private CA short lived credentials

AWS Private CA offers a short-lived certificate, where the validity period is limited to 7 days or fewer. You can see this AWS Blog to learn how to use AWS Private CA short-lived certificates. This new mode can be used to issue certificates for your Kubernetes pods and benefit from lower costs of operations. By synchronizing the certificate lifecycle with the lifecycle of the pod, you can minimize the operational overhead for this solution. To help meet requirements for auditability and transparency, you can use the audit report feature to list the issued certificates in a machine readable format.

IAM Roles Anywhere

Figure 2 shows a detailed overview of the components involved in authentication with IAM Roles Anywhere.

Figure 2: Components of IAM Roles Anywhere

Figure 2: Components of IAM Roles Anywhere

IAM Roles Anywhere allows you to obtain temporary security credentials for workloads that run outside of AWS. Your workloads must use a certificate issued by a trusted PKI CA to authenticate with IAM Roles Anywhere. You establish trust between IAM Roles Anywhere and your CA by creating a trust anchor that points to the root of the CA.

cert-manager

Figure 3 shows a detailed overview of the cert-manager setup used in this post, including the aws-privateca-issuer add-on for the integration of AWS Private CA.

Figure 3: Detailed overview of cert-manager setup

Figure 3: Detailed overview of cert-manager setup

cert-manager is a tool for managing X.509 certificates in Kubernetes. As shown in Figure 3, cert-manager will make sure that certificates are valid and up-to-date and attempt to renew them before they expire. By using add-ons, you can configure different backends for issuing X.509 certificates. In this post, we explore how to integrate cert-manager with AWS Private CA using the aws-privateca-issuer add-on. The aws-privateca-issuer add-on defines two custom resources, AWSPCAIssuer and AWSPCAClusterIssuer, which are used to configure the link to AWS Private CA. They are similar to the Issuer and ClusterIssuer resources that come with cert-manager, but specific to aws-privateca-issuer.

After the AWSPCAIssuer or AWSPCAClusterIssuer is available, aws-privateca-issuer authenticates towards AWS APIs using temporary security credentials obtained from IAM Roles Anywhere. cert-manager watches for the certificate resource, which references to an AWSPCAIssuer, which in turn references to AWS Private CA. aws-privatca-issuer requests a certificate from AWS Private CA. The auto-generated private key and the signed certificate are stored in Kubernetes secrets.

Using certificates and secrets

cert-manager supports multiple ways of integrating into your Kubernetes workloads. You can use certificate resources, which represent a human-readable definition of a certificate signing request (CSR) and contain information on certificate lifespan and renewal time. When using a certificate, the auto-generated private key and the signed certificate are stored in Kubernetes secrets.

With this option, an X.509 certificate is issued manually and saved as a secret. After a PKI is configured as an issuer, a certificate resource is created to automate the renewal of the certificate. With the certificate resource, the lifecycle of certificates is decoupled from the lifecycle of the pods that use them. This allows you to bootstrap the X.509 certificate even before the trusted PKI is deployed.

Using the CSI driver

Another way of integrating cert-manager is by using a CSI driver. In this case, the certificate lifecycle is bound to the lifecycle of the pod. An X.509 certificate and private key are mounted into a predefined folder where your workloads can read them. On pod creation, cert-manager automatically creates a private key and requests a certificate for the configured trusted PKI. When the pod is deleted, the private key and certificate are also deleted and become invalid because they aren’t renewed by cert-manager.

In this post, we use the CSI driver approach for workloads to create ephemeral certificates for IAM Roles Anywhere.

Workload configuration

Figure 4 shows a detailed view of how pods can be configured to use IAM Roles Anywhere without needing to change the underlying Docker images by using a sidecar that provides an IMDSv2 endpoint that mimics the behavior in the Amazon Elastic Compute Cloud (Amazon EC2) instance metadata endpoint.

Figure 4: Pod configuration using a sidecar

Figure 4: Pod configuration using a sidecar

As shown in Figure 4, when using a certificate resource, the auto-generated private key and the signed certificate are stored in Kubernetes secrets and mounted into the pod. When using the CSI driver, a private key is generated locally (for the pod), a certificate is requested from cert-manager based on the given attributes and is issued by AWSPCAIssuer, and the certificates are mounted directly into the pod with no intermediate secret being created.

IAM Roles Anywhere uses the CreateSession API to authenticate requests with a SigV4a signature using the private key and its associated X.509 certificate. This exchange provides a IAM role session credential, as if you had assumed the IAM role. The aws_signing_helper binary is provided to call the CreateSession API from the command line. In this post, a sidecar container that provides an IMDSv2 endpoint to the workload container is used. This container uses the aws_signing_helper binary and uses its serve command.

This way, applications using AWS SDKs can use the AWS_EC2_METADATA_SERVICE_ENDPOINT environment variable to set the instance metadata endpoint to the correct port on the localhost interface. The X.509 certificate and private key are provided as files to the sidecar container.

Solution deployment

In this section, we show the steps needed to deploy the solution in your AWS account.

Prerequisites

To deploy the solution in this post, make sure that you have the following in place:

  • AWS Command Line Interface (AWS CLI) v2
  • An AWS account and IAM permissions for IAM, IAM Roles Anywhere, and AWS Private CA
  • Latest stable Kubernetes
  • kubectl (matching your Kubernetes version)
  • Helm 3
  • jq

Note: As an alternative to using the AWS CLI, you can use the AWS Controllers for Kubernetes (ACK) service controller for AWS Private CA for creating and managing CertificateAuthority, Certificate, and CertificateAuthorityActivation resources directly within your Kubernetes cluster. After establishing your CA hierarchy using the ACK controller, you can proceed with the subsequent steps involving IAM Roles Anywhere integration, aws-privateca-issuer, and cert-manager as described in this post.

Step 1 – AWS Private CA

  1. Set up a root CA in AWS Private CA, which will issue short lived certificates for your pods. In this example you use only one CA; for production environments, you should check the considerations for designing CA hierarchies. Start by using the AWS CLI to create a configuration.
    cat <<EOF > ca-config.json
    {
       "KeyAlgorithm":"RSA_2048",
       "SigningAlgorithm":"SHA256WITHRSA",
       "Subject":{
          "Country":"DE",
          "Organization":"Example Corp",
          "OrganizationalUnit":"SREs",
          "State":"HE",
          "Locality":"FRANKFURT",
          "CommonName":"Blogpost CA"
       }
    }
    EOF

  2. Create the CA in AWS Private CA with short-lived certificates mode.
    aws acm-pca create-certificate-authority \
      --certificate-authority-configuration file://ca-config.json \
      --certificate-authority-type "ROOT" \
      --usage-mode SHORT_LIVED_CERTIFICATE

  3. The command will return a CertificateAuthorityArn, which you will need for further commands, so export it for later use. Replace <region> with your AWS Region.
    export PCA_ARN=arn:aws:acm-pca:<region>:012345678912:certificate-authority/8213159d-cad0-481c-bf14-a0ced4d6d479

  4. After creating the root CA, the CA is in a pending state. You need to create a CSR.
    aws acm-pca get-certificate-authority-csr \
         --certificate-authority-arn ${PCA_ARN} \
         --output text > ca.csr

  5. Now, the CSR needs to be signed by the root CA.
    aws acm-pca issue-certificate \
         --certificate-authority-arn ${PCA_ARN} \
         --csr fileb://ca.csr \
         --signing-algorithm SHA256WITHRSA \
         --template-arn arn:aws:acm-pca:::template/RootCACertificate/V1 \
         --validity Value=365,Type=DAYS

  6. This command returns a CertificateArn which you will need later. Export it.
    export ROOT_CA_CERTIFICATE_ARN=arn:aws:acm-pca:<region>:012345678912:certificate-authority/8213159d-cad0-481c-bf14-a0ced4d6d479/certificate/5830e475088eee553bd409b7f4964613

  7. Download the root CA certificate and upload it to your AWS Private CA.
    aws acm-pca get-certificate \
        --certificate-authority-arn ${PCA_ARN} \
        --certificate-arn ${ROOT_CA_CERTIFICATE_ARN} \
        --output text > cert.pem
    
    aws acm-pca import-certificate-authority-certificate \
         --certificate-authority-arn ${PCA_ARN} \
         --certificate fileb://cert.pem

  8. Verify the status of the PCA, it should be ACTIVE.
    aws acm-pca describe-certificate-authority \
        --certificate-authority-arn ${PCA_ARN} \
        --output json

Step 2 – IAM Roles Anywhere

At this point your root CA is set up and ready to use. The next step is to configure IAM Roles Anywhere.

  1. Start by defining a trust anchor that will refer to your newly created AWS Private CA and export the trustAnchorArn. Replace <value-of-trustAnchorArn> with the Amazon Resource Name (ARN) value of your IAM Roles Anywhere trust anchor.
    aws rolesanywhere create-trust-anchor \
    --name onprem-k8s-issuer \
    --enabled \
    --source sourceType=AWS_ACM_PCA,sourceData={acmPcaArn=${PCA_ARN}}
    
    export TRUST_ANCHOR_ARN=<value-of-trustAnchorArn>

  2. Create an IAM role to be used by the aws-privateca-issuer cert-manager plugin. This role needs to include the actions sts:AssumeRole, sts:SetSourceIdentity and sts:TagSession, which are required by IAMRA. Replace <TA_ID> with your trust anchor.

    Note: You should specify a PrincipalTag with the CN. Furthermore, it should be scoped to the IAMRA service principal. This further restricts authorization based on attributes that are extracted from the X.509 certificate and provides an additional layer of security by helping to ensure that even if an unauthorized party gains access to a valid certificate, they cannot assume the role unless the certificate’s CN matches the specified value.

    cat <<EOF > trust-policy.json
    {
        "Version": "2012-10-17",
        "Statement": [{
            "Sid": "Statement1",
            "Effect": "Allow",
            "Principal": {
                "Service": "rolesanywhere.amazonaws.com"
            },
            "Action": [
                "sts:AssumeRole",
                "sts:SetSourceIdentity",
                "sts:TagSession"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:PrincipalTag/x509Subject/CN": "iamra-issuer"
                },
                "ArnEquals": {
                    "aws:SourceArn": [
                        "arn:aws:rolesanywhere:<region>:012345678912:trust-anchor/<TA_ID>"
                    ]
                }
    
            }
        }]
    }
    EOF

    • Use the following to create the iamra-issuer role:
      aws iam create-role --role-name iamra-issuer \
        --assume-role-policy-document file://trust-policy.json

  3. The command will return a JSON document containing information about the newly created role. Export the ARN for later use.
    export IAMRA_ISSUER_ROLE=arn:aws:iam::012345678912:role/iamra-issuer

  4. Attach an inline policy that allows the role request certificates from your PCA and retrieve these. Note that there is a condition limiting the AWS Private CA templates to only allow EndEntityCertificate.
    cat <<EOF > inline-policy.json
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "awspcaissuerread",
          "Action": [
            "acm-pca:DescribeCertificateAuthority",
            "acm-pca:GetCertificate"
          ],
          "Effect": "Allow",
          "Resource": "$PCA_ARN"
        },
        {
          "Sid": "awspcaissuerwrite",
          "Action": [
            "acm-pca:IssueCertificate"
          ],
          "Effect": "Allow",
          "Resource": "$PCA_ARN",
          "Condition":{
            "StringEquals":{
              "acm-pca:TemplateArn":"arn:aws:acm-pca:::template/EndEntityCertificate/V1"
            }
          }
        }
      ]
    }
    EOF

    • Use the following to associate the inline policy (created in the preceding step) with the iamra-issuer role.
      aws iam put-role-policy --role-name iamra-issuer \
        --policy-name iamra-issuer \
        --policy-document file://inline-policy.json

  5. To finish, create a profile that defines which IAM roles can be assumed and then export the returned ARN.
    aws rolesanywhere create-profile --name iamra-issuer \
      --role-arns ${IAMRA_ISSUER_ROLE} \
      --enabled

    • Export the returned ARN:
      export IAMRA_PROFILE_ARN=arn:aws:rolesanywhere:<region>:012345678912:profile/<Profile_ID>

The created role iamra-issuer will only be used by the aws-privateca-issuer to integrate with AWS Private CA. You should repeat the process of creating IAM roles and IAMRA profiles for your workloads. it’s recommended to create a separate IAM role for each workload and limit its use with condition statements in the trust policy, checking for the workload identity and trust anchor (for example, matching the common name). Furthermore, it’s important that you add IAMRA to the trust policy and allow the aforementioned actions. Best practice with IAM roles is to apply least-privilege permissions.

Step 3 – Create the init container

To integrate IAM Roles Anywhere within your Kubernetes environment, you need to provide an IMDSv2 endpoint to your application containers by running the aws_signing_helper binary as a sidecar. You also need to configure your applications using an environment variable to use the new instance metadata endpoint. To do so, build a Docker image that works as a sidecar.

In this step, create a basic image that fulfills the preceding requirements. In your environment, you might want to adapt this example to use your own base image and implement your image hardening processes.

Copy the following script and save it as init.sh.

#!/bin/sh

if [[ -z "$TRUST_ANCHOR_ARN" ]]; then
  echo "Must provide TRUST_ANCHOR_ARN environment variable." 1>&2
  exit 1
fi

if [[ -z "$PROFILE_ARN" ]]; then
  echo "Must provide PROFILE_ARN environment variable." 1>&2
  exit 1
fi

if [[ -z "$ROLE_ARN" ]]; then
  echo "Must provide ROLE_ARN environment variable." 1>&2
  exit 1
fi

echo "starting IMDSv2 endpoint with aws_signing_helper ..."
/aws_signing_helper serve \
  --certificate /iamra/tls.crt         \
  --private-key /iamra/tls.key         \
  --trust-anchor-arn $TRUST_ANCHOR_ARN \
  --profile-arn $PROFILE_ARN           \
  --role-arn $ROLE_ARN

This script is the entry point of the sidecar container. It expects the environment variables TRUST_ANCHOR_ARN, PROFILE_ARN, and ROLE_ARN, which are required by aws_signing_helper. It also expects an X.509 certificate and its private key in the folder /iamra, which will be mounted in a later stage during pod initialization. Finally, it invokes the aws_signing_helper with the serve directive which creates an IMDSv2 endpoint listening on 9911 by default. This can be customized using the --port parameter.

Now let’s inspect the Docker file.

Note: At the time of writing, we used the alpine3.17.0 image. Use a hardened base image that’s designed to be secure and aligns with the requirements of your environment.

FROM alpine:3.17.0

COPY init.sh .
RUN apk add --no-cache libc6-compat libgcc wget
RUN wget https://rolesanywhere.amazonaws.com/releases/1.3.0/X86_64/Linux/aws_signing_helper
RUN chmod +x /aws_signing_helper /init.sh 
RUN ln -s /lib/libc.musl-x86_64.so.1 /lib/libresolv.so.2
ENTRYPOINT ["/bin/sh", "-c", "/init.sh"]

This Docker file copies the init.sh and downloads the aws_signing_helper binary. The init.sh script is defined as an entry point to the container. Dynamic libraries required by aws_signing_helper are installed using Alpine Linux package manager (Apk).

Now build the docker image, sign in to it, and push it for later use. For the following commands replace <my-docker-registry> with the hostname of your local registry or use an ECR Repository.

docker build . -t <my-docker-registry>/iamra-sidecar
docker login <my-docker-registry>
docker push <my-docker-registry>/iamra-sidecar

Step 4 – Install cert-manager

In this step, install cert-manager into your cluster and configure aws-privateca-issuer using a manually bootstrapped certificate. cert-manager-approver-policy is used to control which certificates can be requested by the workloads. Then, set up the cert-manager CSI driver to automatically provision X.509 certificates for your workload pods.

Start with the cert-manager setup:

  1. Add the cert-manager repository to Helm and install the chart.

    Note: At the time of writing, we used cert-manager version 1.16.2. Check for the latest stable version.

    helm repo add jetstack https://charts.jetstack.io
    helm repo update
    helm install \
      cert-manager jetstack/cert-manager \
      --namespace cert-manager \
      --create-namespace \
      --version v1.16.2 \
      --set installCRDs=true \
      --set extraArgs={--controllers='*\,-certificaterequests-approver'}
      
    helm install \
      cert-manager-approver-policy jetstack/cert-manager-approver-policy \
      --namespace cert-manager \
      --wait \
        --set app.approveSignerNames="{\
    issuers.cert-manager.io/*,clusterissuers.cert-manager.io/*,\
    awspcaclusterissuers.awspca.cert-manager.io/*,awspcaissuers.awspca.cert-manager.io/*\
    }"
    
    
    #make modifications in cert-manager-approver-policy and add below permissions
    
    kubectl edit  Clusterrole cert-manager-approver-policy -n cert-manager -o yaml
    
    - apiGroups:
      - awspca.cert-manager.io
      resources:
      - awspcaissuers
      - awspcaclusterissuers
      verbs:
      - get
      - list
      - watch
    - apiGroups:
      - cert-manager.io
      - awspca.cert-manager.io
      resources:
      - signers
      verbs:
      - approve

    Now, install the cert-manager aws-privateca-issuer plugin. This integration connects cert-manager with AWS Private CA and lets you issue short-lived certificates automatically. Currently, aws-privateca-issuer Helm chart doesn’t support IAMRA natively. So, you’re going to use the same init-container to set up IAMRA as for the workload pods.

    You need to issue the first X.509 certificate for aws-privateca-issuer IAMRA manually. Later, cert-manager will renew it automatically.

  2. Create the bootstrap certificate. When asked for a common name, enter iamra-issuer.
    openssl req -out iamra.csr -new -newkey rsa:2048 \
    -nodes -keyout iamra.key
    

    The previous command will create an RSA private key named iamra.key and a certificate signing request name iamra.csr. Now you need to call AWS Private CA to issue the bootstrap certificate.

  3. Set the validity period of the certificate to 1 day so that cert-manager will replace it after it’s set up. The IAM role that’s performing this action must have permissions to AWS Certificate Manager (ACM), IAM, and IAM Roles Anywhere to complete the setup.
    aws acm-pca issue-certificate \
          --certificate-authority-arn ${PCA_ARN} \
          --csr fileb://iamra.csr \
          --signing-algorithm "SHA256WITHRSA" \
          --validity Value=1,Type="DAYS"

  4. The command will return a CertificateArn for your iamra-issuer certificate. Export it and save the certificate to a file.
    export IAMRA_ISSUER_CERT_ARN=arn:aws:acm-pca:<region>:012345678912:certificate-authority/8213159d-cad0-481c-bf14-a0ced4d6d479/certificate/afc47911ed2ded9c2664fa597a33b9fb
    aws acm-pca get-certificate \
          --certificate-authority-arn ${PCA_ARN} \
          --certificate-arn ${IAMRA_ISSUER_CERT_ARN} | \
          jq -r .'Certificate' > iamra-cert.pem

  5. Create a Kubernetes secret that contains the certificate and private key.
    kubectl create secret tls -n cert-manager iamra-issuer \
      --cert=iamra-cert.pem \
      --key=iamra.key

    You’re ready to install the aws-privateca-issuer. You need to modify the Helm chart because it doesn’t currently support IAMRA. You will render the Helm chart into YAML manifests, which are then adapted for IAMRA.

  6. Install the Helm repository and render the charts into a file.
    helm repo add awspca https://cert-manager.github.io/aws-privateca-issuer
     helm template --release-name iamra --include-crds awspca/aws-privateca-issuer \
       -n cert-manager > privateca-issuer.yaml

  7. Add your previously built image as a sidecar and replace the environment variables with your exported values. Search for the deployment definition and add the following section:
    # Source: aws-privateca-issuer/templates/deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: iamra-aws-privateca-issuer
      namespace: cert-manager
      labels:
        helm.sh/chart: aws-privateca-issuer-v1.4.0
        app.kubernetes.io/name: aws-privateca-issuer
        app.kubernetes.io/instance: iamra
        app.kubernetes.io/version: "v1.4.0"
        app.kubernetes.io/managed-by: Helm
    spec:
      replicas: 1
      revisionHistoryLimit: 10
      selector:
        matchLabels:
          app.kubernetes.io/name: aws-privateca-issuer
          app.kubernetes.io/instance: iamra
      template:
        metadata:
          labels:
            app.kubernetes.io/name: aws-privateca-issuer
            app.kubernetes.io/instance: iamra
        spec:
          serviceAccountName: iamra-aws-privateca-issuer
          securityContext:
            runAsUser: 65532
          volumes:
            - name: "iamra-secret"
              projected:
                sources:
                  - secret:
                      name: iamra-issuer
          containers:
            - name: iamra-sidecar
              image: 012345678912.dkr.ecr.us-east-2.amazonaws.com/<replace-with-iamra-sidecar-repository>
              imagePullPolicy: Always
              env:
                - name: "TRUST_ANCHOR_ARN"
                  value: "arn:aws:rolesanywhere:us-east-2:012345678912:trust-anchor/05d183f8-a34e-4f0c-ad2a-de6f803"
                - name: "PROFILE_ARN"
                  value: "arn:aws:rolesanywhere:us-east-2:012345678912:profile/7b45f9a9-73fa-47f8-a20f-88aacbf57"
                - name: "ROLE_ARN"
                  value: "arn:aws:iam::012345678912:role/iamra-issuer"
              volumeMounts:
                - name: iamra-secret
                  mountPath: "/iamra"
                  readOnly: true
            - name: aws-privateca-issuer
              securityContext:
                allowPrivilegeEscalation: false
              image: "public.ecr.aws/k1n1h4h4/cert-manager-aws-privateca-issuer:latest"
              env:
               - name: "AWS_EC2_METADATA_SERVICE_ENDPOINT"
                 value: "http://localhost:9911/"
              imagePullPolicy: IfNotPresent
              command:
                - /manager
              args:
                - --leader-elect
              ports:
                - containerPort: 8080
                  name: http
              livenessProbe:
                httpGet:
                  path: /healthz
                  port: 8081
                initialDelaySeconds: 15
                periodSeconds: 20
              readinessProbe:
                httpGet:
                  path: /healthz
                  port: 8081
                initialDelaySeconds: 5
                periodSeconds: 10
          terminationGracePeriodSeconds: 10

  8. Apply your modified manifest to install aws-privateca-issuer and verify the deployment you have modified. It should show that one pod is ready and available.
    kubectl apply -f privateca-issuer.yaml
    
    kubectl get deployment -n cert-manager -l app.kubernetes.io/name=aws-privateca-issuer
    
    NAME                         READY   UP-TO-DATE   AVAILABLE   AGE
    iamra-aws-privateca-issuer   1/1     1            1           4d10h

  9. Define an AWSPCAIssuer, which will be used for renewal of the manually bootstrapped certificate for the aws-privateca-issuer add-on.

    Note: At the time of writing, we used awspca cert-manager API version v1beta1. Check for the latest stable version.

    export AWS_REGION=<region>
    cat <<EOF | kubectl apply -f -
    apiVersion: awspca.cert-manager.io/v1beta1
    kind: AWSPCAIssuer
    metadata:
      name: iamra-cm-issuer
      namespace: cert-manager
    spec:
      arn: ${PCA_ARN}
      region: ${AWS_REGION}
    EOF

  10. After at least one AWSPCAIssuer or AWSPCAClusterIssuer is available, aws-privateca-issuer is going to authenticate towards AWS APIs by calling sts.get-caller-identity and verify the authentication method. You can verify this using its log files. It should print the assumed role.
    kubectl logs -n cert-manager -l app.kubernetes.io/name=aws-privateca-issuer -c aws-privateca-issuer | grep -i getcalleridentity
    
    Defaulted container "aws-privateca-issuer" out of: aws-privateca-issuer, iamra-init (init)
    {"level":"info","ts":1669240040.2704494,"logger":"controllers.GenericIssuer","msg":"sts.GetCallerIdentity","genericissuer":"cert-manager/iamra-cm-issuer","arn":"arn:aws:sts::012345678912:assumed-role/iamra-issuer/5bafffcfb691969f0616a9b1a68032ec","account":"012345678912","user_id":"AROA2EIPPI5BVJ6SKBYOY:5bafffcfb691969f0616a9b1a68032ec"}

    Now, you can create a cert-manager Certificate resource that represents a desired certificate that should be issued by the referenced cert-manager Issuer. It combines information of a CSR with details on the validity period and renewal.

  11. Create the certificate object:
    cat <<EOF | kubectl apply -f - 
      apiVersion: cert-manager.io/v1
      kind: Certificate
      metadata:
        name: iamra-privateca-issuer-cert
        namespace: cert-manager
      spec:
        secretName: iamra-issuer
        duration: 168h # 7d
        renewBefore: 24h # 15d
        subject:
          organizations:
            - "Example Corp."
          organizationalUnits:
            - "Admin"
        commonName: "iamra-issuer"
        isCA: false
        usages:
          - "client auth"
          - "server auth"
        issuerRef:
          group: awspca.cert-manager.io
          kind: AWSPCAIssuer
          name: iamra-cm-issuer
      EOF
      helm upgrade -i -n cert-manager cert-manager-csi-driver jetstack/cert-manager-csi-driver --wait
      -- > install policies:
      policy + role + role binding to allow service account to accept certs.
      cat <<EOF | kubectl apply -f - 
      apiVersion: policy.cert-manager.io/v1alpha1
      kind: CertificateRequestPolicy
      metadata:
        name: iamra-issuer-policy
      spec:
        allowed:
          commonName:
            required: true
            value: "iamra-issuer"
          subject:
            organizations:
              values: ["Example Corp."]
              required: true
            organizationalUnits:
              values: ["Admin"]
              required: true
          usages:
          - "server auth"
          - "client auth"
        selector:
          issuerRef:
            group: awspca.cert-manager.io
            kind: AWSPCAIssuer
            name: iamra-cm-issuer
      ---
      apiVersion: rbac.authorization.k8s.io/v1
      kind: ClusterRole
      metadata:
        name: cert-manager-policy:iamra-issuer-policy
      rules:
        - apiGroups: ["policy.cert-manager.io"]
          resources: ["certificaterequestpolicies"]
          verbs: ["use"]
          resourceNames: ["iamra-issuer-policy"]
      ---
      apiVersion: rbac.authorization.k8s.io/v1
      kind: ClusterRoleBinding
      metadata:
        name: cert-manager-policy:iamra-issuer-policy
      roleRef:
        apiGroup: rbac.authorization.k8s.io
        kind: ClusterRole
        name: cert-manager-policy:iamra-issuer-policy
      subjects:
      - kind: ServiceAccount
        name: cert-manager
        namespace: cert-manager
      EOF

Step 5 – Deploy your workload

In Step 4, sub-step 9, you created an AWSPCAIssuer named iamra-cm-issuer. You then used this AWSPCAIssuer to renew the manually bootstrapped certificate for the aws-privateca-issuer.

In Step 4, sub-step 11, you created the certificate iamra-privateca-issuer-cert, which is used by the aws-privateca-issuer.

In this step, you will deploy the sample workload. When deploying the sample workload, make sure to repeat the process of creating IAM roles and IAMRA profiles (from Step 2), the AWSPCAIssuer (Step 4, sub-step 9), and the CertificateRequestPolicy (Step 4, sub-step 11) for the certificate request.

For more information on certificate request policies, see the cert-manager documentation on approval policies.

Use the following code to deploy the workload.

cat <<EOF | kubectl apply -f -
  
apiVersion: v1
kind: Pod
metadata:
   creationTimestamp: null
   labels:
     run: acmpca-csi-test
   name: acmpca-csi-test
spec:
  containers:
      - name: iamra-sidecar
        image: 056930860237.dkr.ecr.us-east-2.amazonaws.com/aws_sighning:latest
        imagePullPolicy: Always
        env:
          - name: "TRUST_ANCHOR_ARN"
            value: "arn:aws:rolesanywhere:us-east-2:012345678912:trust-anchor/05d183f8-a34e-4f0c-ad2a-de6f803ac172"
          - name: "PROFILE_ARN"
            value: "arn:aws:rolesanywhere:us-east-2:012345678912:profile/7b45f9a9-73fa-47f8-a20f-88aacbf579d2"
          - name: "ROLE_ARN"
            value: "arn:aws:iam::012345678912:role/iam-roles-anywhere-s3-full-access"
        volumeMounts:
          - name: "iamra-csi"
            mountPath: "/iamra"
            readOnly: true
      - name: aws-cli
        image: amazon/aws-cli:latest
        env:
        - name: "AWS_EC2_METADATA_SERVICE_ENDPOINT"
          value: "http://127.0.0.1:9911/"
        command:
          - sleep
          - "3600"
  dnsPolicy: ClusterFirst
  restartPolicy: Never
  volumes:
    - name: "iamra-csi"
      csi:
        readOnly: true
        driver: csi.cert-manager.io
        volumeAttributes:
            csi.cert-manager.io/issuer-name: my-pca
            csi.cert-manager.io/issuer-group: awspca.cert-manager.io
            csi.cert-manager.io/issuer-kind: AWSPCAIssuer
            csi.cert-manager.io/common-name: "${SERVICE_ACCOUNT_NAME}.${POD_NAMESPACE}"
            csi.cert-manager.io/duration: 168h
            csi.cert-manager.io/renew-before: 24h
            csi.cert-manager.io/is-ca: "false"
            csi.cert-manager.io/key-usages: "client auth, server auth"
  EOF

Step 6 – Test your deployment

To test the deployment, you can use kubectl exec to access the iamra-sidecar container. Navigate to the iamra directory and check if the certificate and key are mounted.

Command:
kubectl exec -it acmpca-csi-test  – sh
ls | grep iamra

Output: iamra

Command:
cd iamra
/iamra# ls

Output: ca.crt   tls.crt  tls.key

You can also exec into the aws-cli container and verify the caller identity and make API calls to Amazon Simple Storage Service (Amazon S3):

Command:
kubectl exec -it acmpca-csi-test -c aws-cli  – sh
$aws sts get-caller-identity

Output: You should see iam-roles-anywhere-s3-full-access in caller-identity.

Command:
$aws s3 ls

Output: You should be able to list the S3 bucket based on the permissions associated with the assumed role.

Summary

In this post, you learned about a solution for securely connecting on-premises Kubernetes workloads to AWS services using IAM Roles Anywhere. The approach alleviates the need for long-term access keys or public internet exposure of the Kubernetes API server. By using this solution for containerized and full stack applications, you can benefit from:

  • Enhanced security: Use short-lived X.509 certificates instead of long-term credentials.
  • Simplified management: Automate the certificate lifecycle with cert-manager and AWS Private CA.
  • Seamless integration: No modifications are required to existing workload Docker files.
  • Consistent policies: Use the same IAM roles and policies across AWS and on premises.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.
 

Varun Sharma
Varun Sharma

Varun is a Senior AWS Cloud Security Engineer who wears his security cape proudly. Varun is a go-to subject matter expert for Amazon Cognito and IAM. When he’s not busy securing the cloud, you’ll find him in the world of security penetration testing. Outside of work, Varun switches gears to capture the beauty of nature through the lens of his camera.
Nishant Mainro
Nishant Mainro

Nishant is a Senior Security Consultant in the AWS Professional Services team and is based in Atlanta, Georgia. He is a technical and passionate Amazonian with over 16 years of professional experience with a specialization in security, risk, and compliance. His specializes developing and enabling security controls at scale to empower customers to achieve the required security goals for their workloads.
Roshini Jagarapu
Roshini Jagarapu

Roshini is an Amazon EKS subject matter expert and an AWS Cloud Support Engineer based in India. She works with services such as Amazon EKS and Amazon ECS, helping customers run at scale. Her day-to-day work involves troubleshooting issues related to container technologies. Roshini conducts learning sessions to educate customers and is passionate about cloud-native solutions.

From log analysis to rule creation: How AWS Network Firewall automates domain-based security for outbound traffic

Post Syndicated from Mary Kay Sondecker original https://aws.amazon.com/blogs/security/from-log-analysis-to-rule-creation-how-aws-network-firewall-automates-domain-based-security-for-outbound-traffic/

When it comes to controlling incoming (ingress) and outgoing (egress) network traffic, organizations typically focus heavily on inbound traffic controls—carefully restricting what traffic can enter their network perimeter. However, this approach addresses only inbound security challenges. Modern applications rely heavily on third-party code through operating systems, libraries, and packages. This dependency can create potential security vulnerabilities. If these components are compromised, affected workloads might attempt to connect to unauthorized command and control servers or send sensitive data to unauthorized destinations on the internet.

This is why implementing strong outbound traffic controls—particularly through domain-based allowlisting—has become a critical security best practice. Rather than allowing unrestricted outbound access or maintaining an ever-growing denylist of low-reputation domains, many organizations are shifting to domain-based allowlisting. This approach restricts outbound communications to explicitly trusted domains, reduces potential risk surfaces, and helps to protect against both known and unknown threats. However, manually identifying and maintaining these allowlists has traditionally been a complex and time-consuming process.

AWS Network Firewall automated domain lists improve visibility into network traffic patterns and simplify outbound traffic control management. This feature provides analytics for HTTP and HTTPS network traffic, helping organizations understand domain usage patterns. It also automates firewall log analysis to create rules based on your network traffic. By combining increased visibility with automation, this feature enhances your security awareness and helps to improve the effectiveness of your firewall rules.

In this blog post, we’ll guide you through the implementation of the AWS Network Firewall automated domain list feature, providing a detailed overview, step-by-step instructions, and best practices to optimize your network security.

Overview of automated domain lists and traffic insights

Domain-based security allows you to control network traffic based on the domain names that your applications and users are trying to access. This approach offers a more intuitive and flexible way to create firewall rules, focusing on the destinations your network is trying to reach rather than just IP addresses. However, effectively configuring and managing firewall rules remains challenging for some customers, especially in large environments where connected devices, applications, and traffic patterns are continuously growing and changing. Organizations might struggle to keep up with these changes, leading to outdated or ineffective firewall rules and policies that are either too permissive, exposing the network to risks, or too restrictive, blocking legitimate traffic.

Let’s explore how automated domain lists address these challenges through various use cases and benefits:

Preventive and detective security controls

  1. Domain control through allowlisting – Establishing domain allowlists aligns with the security principle of least privilege for network traffic. A least-privilege model adjusts the scope of what a workload can do across the network, from infinite and undefined to scoped-down and well-defined, enabling better insight into potentially risky behaviors. By limiting outbound connections to only approved domains, organizations can more effectively control and monitor workload communications.
  2. Rule audit and compliance – Domain allowlisting makes it clear which domains are allowed, supporting alignment with standards like the Payment Card Industry Data Security Standard (PCI DSS), Health Insurance Portability and Accountability Act (HIPAA), Cybersecurity Maturity Model Certification (CMMC), and General Data Protection Regulation (GDPR).
  3. Preventive controls enable detection – Preventive controls also act as detective controls, establishing a baseline for normal domain access patterns. With a domain allowlist in place, security teams can better detect workloads that show signs of unauthorized activity.
  4. Incident response support – Domain reporting provides the latest list of workload domains accessed, enabling quick identification of potentially malicious domains during security incidents. This information helps teams prioritize which workloads may need immediate attention.

Operational value

  1. Initial firewall setup and management – Automated allowlisting involves analyzing existing traffic patterns and recommending domain-based rules, which simplifies the process of establishing baseline firewall rules. This helps organizations quickly deploy effective security policies, potentially reducing the time and expertise needed for initial firewall configuration and ongoing management.
  2. Application modernization – Allowlisting supports adjusting firewall rules to accommodate rapidly changing traffic patterns in microservices and containerized environments, helping security to keep pace with evolving architectures.
  3. Cross-environment consistency – Allowlisting enables consistent firewall rule creation and management across multi-cloud and hybrid environments, regardless of where applications or data reside.

How the automated domain list feature works

Automated domain lists work by analyzing your HTTP and HTTPS traffic, generating reports on frequently accessed domains, and providing a convenient way to create rules based on actual network traffic patterns. To begin using automated domain lists in AWS Network Firewall, sign in to the AWS Management Console, access the Network Firewall service, and either work with an existing firewall or create a new one. Then follow the rest of the steps in this post.

Step 1: Enable traffic analysis mode to capture HTTP and HTTPS traffic domain logs

After you’ve selected a firewall, in the left navigation pane, choose Configure advanced settings. Select the Enable traffic analysis mode checkbox to enable it, as shown in Figure 1. Network Firewall uses this logging mode to collect data on observed domains for HTTP and HTTPS traffic to create domain reports.

Figure 1: Enabling traffic analysis mode for a firewall

Figure 1: Enabling traffic analysis mode for a firewall

To stop collecting data on frequently accessed domains in your network traffic, clear the checkbox to disable traffic analysis mode, as shown in Figure 2. Note that if you disable traffic analysis mode, you won’t be able to generate domain reports.

Figure 2: Disabling traffic analysis mode

Figure 2: Disabling traffic analysis mode

Once traffic analysis mode is enabled, you’re ready to generate a domain report based on observed network traffic. Next, you can go to the Monitoring and observability tab and choose Create report.

Figure 3: Traffic analysis mode enabled: Now you’re ready to generate domain-based reports

Figure 3: Traffic analysis mode enabled: Now you’re ready to generate domain-based reports

Step 2: Create a domain report

The domain report summarizes the HTTP and HTTPS traffic observed by your firewall for up to 30 days (or for the duration since firewall activation if less than 30 days). Select the checkbox next to each traffic analysis type you want to include in the report—HTTP, HTTPS, or both.

Important: Use your monthly domain report to examine 30 days of traffic behavior. Each report type (HTTP, HTTPS) is available once every 30 days at no additional cost.

Figure 4: Create a domain report that includes traffic analysis types HTTP, HTTPS, or both

Figure 4: Create a domain report that includes traffic analysis types HTTP, HTTPS, or both

To see the status of your domain report, go to the Reports section in the console for your specific firewall. When the report is ready, you can review the report directly in the console or download it, as shown in Figure 5.

Figure 5: The list of domain reports in the Reports section of the console for your specific firewall

Figure 5: The list of domain reports in the Reports section of the console for your specific firewall

Step 3: Review the report details

The report details include the traffic type (HTTP or HTTPS) and the observation period (start and end dates). By default, the report covers the last 30 days, or the entire period since traffic analysis was enabled if that is less than 30 days. The report also shows these details:

  • The Domain list shows domains that are a fully qualified domain name (FQDN) observed in the network traffic, such as aws.com or subdomain.aws.com.
  • The Access attempt count refers to the overall count of connection requests to the domain, including both successful and failed attempts.
  • The Unique sources field shows the number of distinct source IP addresses connected to the domain, indicating its popularity. For example, if one workload connects to aws.com, then count = 1; if 1000 workloads connect to aws.com, then count = 1,000.
  • The First accessed field shows when the domain was first seen in your traffic, while Last accessed shows when it was most recently seen. This includes both successful and failed attempts to access the domain.
  • The Protocol field indicates how the domain was observed—through either HTTP or HTTPS traffic (in other words, HTTP headers or a TLS handshake).

An example report is shown in Figure 6.

Figure 6: Example domain report details: 30-day analysis

Figure 6: Example domain report details: 30-day analysis

Step 4: (Optional) Create a domain list rule group

You can copy the list of observed domains from the report to a stateful domain list rule group and update your firewall policy. To do so, in the Report details section, choose Create domain list group to use the firewall policy wizard to create or update your firewall rules. The selected domains are automatically copied to a domain list rule group, as shown in Figure 7. For detailed instructions, see the AWS Network Firewall documentation.

Figure 7: Option to copy over the observed domain lists and create a domain list rule group using the firewall policy wizard

Figure 7: Option to copy over the observed domain lists and create a domain list rule group using the firewall policy wizard

Best practices for implementing domain allowlists

When you implement domain allowlisting, consider the following guidelines for operational success. We recommend that you also consult your own internal compliance and security policies.

  1. Start with a strategy of generous allowlisting:
    • Begin with broader and more generous allowlist rules rather than a more refined list, initially, to reduce the risk of accidently blocking legitimate domains.
    • Focus on getting to a Default Deny policy so that you can benefit from its risk surface reduction.
    • Create flexible rules for trusted domains, including second-level domains and top-level domains, such as allowing access to subdomains under your registered second-level domain. Or allow access to second-level domains under top-level domains that your organization trusts—for example, .mil, .gov, or .edu.
    • Use custom Suricata rules with regex capabilities to handle complex traffic efficiently. See Examples of stateful rules for Network Firewall.
    • Remember that even a broad allowlist provides better security than having no allowlist at all.
  2. Make iterative improvements:
    • After you establish an initial generous allowlist and Default Deny rules, evaluate the rules to determine which areas you might want to start narrowing down further. Use alert rules before pass rules in order to log the specific domains a pass rule might be allowing access to.
    • Adjust logging levels based on domain trust levels and monitoring requirements.
    • Review and update rules based on operational insights and changing requirements.
    • Take a pragmatic and iterative approach to rule refinement rather than attempting to make the ruleset very strict.
  3. Set up robust logging:
  4. Additional considerations:
    • After you enable traffic analysis mode, the automated domain lists feature provides visibility into your network traffic, reporting on observed connections. Although it doesn’t distinguish between allowed and blocked traffic, the domain list report can help you identify the most critical domains to include in your firewall rules.
    • The domain traffic data used to generate the list of domain recommendations is available for up to the last 30 days after traffic analysis has been enabled. This allows you to focus on the most relevant and recent network activity when optimizing your firewall policies.
    • Data collection for automated domain lists is opt-in and performed independently of the firewall policy and logging configuration. Enabling the feature doesn’t impact the performance of the firewall itself.

Conclusion

With AWS Network Firewall automated domain lists, you can simplify your firewall management process, create more effective rules based on actual traffic patterns, and maintain a strong security posture with less manual effort. This feature helps you address common challenges such as keeping up with rapidly changing application landscapes, managing security across complex environments, and adhering to compliance requirements. To learn more about Network Firewall and its features, see the product page and service documentation.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Network Firewall re:Post forum or contact AWS Support.
 

Mary Kay Sondecker
Mary Kay Sondecker

Mary Kay is a Senior Product Manager at AWS, focused on AWS Network Firewall. With over two decades of experience in the technology industry, she is passionate about helping customers easily implement effective, scalable cloud solutions to drive better business outcomes.
Jesse Lepich
Jesse Lepich

Jesse is a Senior Security Solutions Architect at AWS based in Lake St. Louis, Missouri, focused on helping customers implement native AWS security services. Outside of cloud security, his interests include relaxing with family, barefoot waterskiing, snowboarding and snow skiing, surfing, boating, and mountain climbing.
Michael Leighty
Michael Leighty

Michael is a Senior Security Solutions Architect at AWS, based in Atlanta. He specializes in helping customers design and implement effective network security controls, drawing from extensive experience at leading network security vendors. At AWS, he works closely with service teams to drive continuous improvement in security services based on customer needs and feedback.
Jason Goode
Jason Goode

Jason is a Senior Security GTM Content Specialist at AWS, where he develops content strategies that bridge technical concepts with practical business solutions. Based in Austin, Texas, he leverages his creative background and expertise to help organizations understand and use native AWS security services.

Enhance your workload resilience with new Amazon EMR instance fleet features

Post Syndicated from Deepmala Agarwal original https://aws.amazon.com/blogs/big-data/enhance-your-workload-resilience-with-new-amazon-emr-instance-fleet-features/

Big data processing and analytics have emerged as fundamental components of modern data architectures. Organizations worldwide use these capabilities to extract actionable insights and facilitate data-driven decision-making processes. Amazon EMR has long been a cornerstone for big data processing in the cloud. Now, with a suite of exciting new features for EMR instance fleets that enables you to effectively manage your compute, Amazon is taking cloud-based analytics to the next level.

Amazon EMR has introduced new features for instance fleets that address critical challenges in big data operations. This post explores how these innovations improve cluster resilience, scalability, and efficiency, enabling you to build more robust data processing architectures on AWS. This comprehensive post introduces instance fleets, demonstrates using this new allocation strategy, explores how enhanced Availability Zone and subnet selection works, and examines how these features improve cluster’s resilience. This technical exploration will equip you with the knowledge to implement more resilient and efficient EMR clusters for your organization’s big data processing needs.

The current challenges

Organizations using big data operations might face several challenges:

  • When preferred instance types are unavailable, finding suitable alternatives often delays cluster launches and disrupts workflows
  • Selecting the optimal Availability Zone for cluster launch is challenging due to constantly changing available compute capacity, especially when considering future scaling needs
  • Maintaining uninterrupted operation of mission-critical long-running clusters becomes complex as data processing requirements evolve over time
  • Organizations frequently struggle to scale their operations to meet growing data processing demands, leading to performance bottlenecks and delayed insights

These challenges underscore the need for more advanced, flexible, and intelligent solutions in the realm of big data operations, driving the demand for innovative features in cloud-based data processing platforms.

Introducing improved EMR instance fleets

Amazon EMR, a cloud-based big data platform, allows you to process large datasets using various open source tools such as Apache Spark, Apache Flink, and Trino. To address the aforementioned challenges, Amazon EMR introduced instance fleets, with a robust set of features.

When setting up an EMR cluster, Amazon EMR offers two configuration options for configuring the primary, core, and task nodes: uniform instance groups or instance fleets.

Uniform instance groups offer a streamlined approach to cluster setup, allowing up to 50 instance groups per cluster. An EMR cluster has a primary instance group for primary node, a core instance group with one or more Amazon Elastic Compute Cloud (Amazon EC2) instances, and the option to add up to 48 task instance groups. Both core and task instance groups are flexible, allowing any number of EC2 instances within each group. Both core and task groups offer flexibility in instance count, and each node type (primary, core, or task) consists of instances sharing the same specifications and purchasing model (On-Demand or Spot). However, this approach limits the ability to mix different instance types or purchasing options within a single group.

Instance fleets provide a versatile approach to provisioning EC2 instances, offering unparalleled flexibility in cluster configuration. This setup assigns one instance fleet each for primary and core nodes, with the task instance fleet being optional. It allows you to specify up to five EC2 instance types (or up to 30 when using the Amazon Command Line Interface (AWS CLI) or API with an instance allocation strategy) for each node type in a cluster, providing enhanced instance diversity to optimize cost and performance while increasing the likelihood of fulfilling capacity requirements. Instance fleets automatically manage the mix of instance types to meet specified target capacities for On-Demand and Spot, reducing operational overhead and improving compute availability.

Key benefits of instance fleets include improved cluster resilience to capacity fluctuations, superior management of Spot Instances with the ability to set timeouts and specify actions if Spot capacity can’t be provisioned, and faster cluster provisioning. The feature also allows you to select multiple subnets for different Availability Zones, enabling Amazon EMR to optimally launch clusters and automatically route traffic away from impacted zones during large-scale events. Additionally, instance fleets offer capacity reservation options for On-Demand Instances and support allocation strategies that prioritize instance types based on user-defined criteria, further enhancing the flexibility and efficiency of EMR cluster management.

Achieve resiliency with instance fleets

Now that you have a good understanding of instance fleets, let’s explore how the new instance fleet capabilities help achieve resiliency for your workloads through the following methods:

  • EC2 instance allocation – Enables precise control over instance type selection and prioritization
  • Enhanced subnet selection – Optimizes cluster deployment across Availability Zones

EC2 instance allocation

EMR instance fleets now offer newer allocation strategies for both Spot and On-Demand Instances, giving you control over selection and prioritization of instance types and allowing you to optimize for greater flexibility, resilience, and cost-efficiency.

Amazon EMR supports the following allocation strategies for On-Demand Instances:

  • Prioritized (new) – Allows you to define a priority order for instance types, giving you precise control over instance selection
  • Lowest-price (existing) – Selects the lowest-priced instance type from the available options

Amazon EMR supports the following allocation strategies for Spot Instances:

  • Price-capacity optimized (new) – Selects instances with the lowest price while also considering the available capacity
  • Capacity-optimized-prioritized (new) – Similar to capacity-optimized, but respects instance type priorities that you specify, on a best-effort basis
  • Capacity-optimized (existing) – Selects instances from the pools with the most available capacity
  • Lowest-price (existing) – Selects the lowest-priced Spot Instances
  • Diversified (existing) – Distributes instances across all pools

When using the prioritized On-Demand allocation strategy, Amazon EMR applies the same priority value to both your On-Demand and Spot Instances when you set priorities.

For Spot Instances, Amazon EMR recommends the capacity-optimized allocation strategy. This approach allocates instances from the most available capacity pools, thereby reducing the chance of interruptions and enhancing cluster stability. Amazon EMR also allows you to launch a cluster without an allocation strategy. However, using an allocation strategy is recommended for faster cluster provisioning, more accurate Spot Instance allocation, and fewer Spot Instance interruptions.

Enhanced subnet selection

Amazon EMR on EC2 offers improved reliability and cluster launch experience for instance fleet clusters through the newly launched enhanced subnet selection. With this feature, EMR on EC2 reduces cluster launch failures resulting from an IP address shortage. Previously, the subnet selection for EMR clusters only considered the available IP addresses for the core instance fleet. Amazon EMR now employs subnet filtering at cluster launch and selects one of the subnets that have adequate available IP addresses to successfully launch all instance fleets. If Amazon EMR can’t find a subnet with sufficient IP addresses to launch the whole cluster, it will prioritize the subnet that can at least launch the core and primary instance fleets. In this scenario, Amazon EMR will also publish an Amazon CloudWatch alert event to notify the user. If none of the configured subnets can be used to provision the core and primary fleet, Amazon EMR will fail the cluster launch and provide a critical error event. These CloudWatch events enable you to monitor your clusters and take remedial actions as necessary. This capability is enabled by default when you configure more than one subnet for cluster launch, and you don’t need to make any configuration changes to benefit from it.

Solution overview

Now that you have a comprehensive grasp of the two new features, let’s integrate the elements of instance fleets and look at the implementation flow for each feature.

EC2 instance allocation

The following diagram illustrates the instance fleet lifecycle management architecture.

The workflow consists of the following steps:

  1. Create a cluster configuration with the prioritized allocation strategy, specifying instance types, their priority, and a list of potential subnets.
  2. When you launch an EMR cluster, it evaluates compute capacity and available IPs across the specified subnets. Amazon EMR then selects a single Availability Zone that best meets capacity and instance availability needs for the entire cluster.
  3. Amazon EMR launches the cluster using available instance types in one of the configured Availability Zones based on enhanced subnet selection.
  4. During a scale-up scenario, Amazon EMR adds new instances to the clusters while following the configured compute allocation strategy.
  5. If a specific instance type is unavailable, Amazon EMR will select the next available instance types based on the priority order. This flexibility provides capacity availability for production workloads while maintaining scalability.

The following example code provisions an EMR cluster with a primary and core instance fleet configuration with both Spot and On-Demand Instances, using the Capacity-optimized-prioritized allocation strategy for Spot Instances and the Prioritized strategy for On-Demand Instances:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Resources": {
    "myCluster": {
      "Type": "AWS::EMR::Cluster",
      "Properties": {
        "Instances": {
          "MasterInstanceFleet": {
            "Name": "cfnPrimary",
            "InstanceTypeConfigs": [
              {
                "BidPrice": "10.50",
                "InstanceType": "m5.xlarge",
                "Priority": "1",
                "EbsConfiguration": {
                  "EbsBlockDeviceConfigs": [
                    {
                      "VolumeSpecification": {
                        "VolumeType": "gp2",
                        "SizeInGB": 32
                      }
                    }
                  ]
                }
              }
            ],
            "TargetOnDemandCapacity": 1
          },
          "CoreInstanceFleet": {
            "Name": "cfnCore",
            "InstanceTypeConfigs": [
              {
                "BidPrice": "10.50",
                "InstanceType": "m5.xlarge",
                "Priority": "1",
                "WeightedCapacity": "1",
                "EbsConfiguration": {
                  "EbsBlockDeviceConfigs": [
                    {
                      "VolumeSpecification": {
                        "VolumeType": "gp2",
                        "SizeInGB": 32
                      }
                    }
                  ]
                }
              }
            ],
            "LaunchSpecifications": {
              "SpotSpecification": {
                "TimeoutAction": "SWITCH_TO_ON_DEMAND",
                "TimeoutDurationMinutes": 20,
                "AllocationStrategy": "CAPACITY_OPTIMIZED_PRIORITIZED"
              },
              "OnDemandSpecification": {
                "AllocationStrategy": "PRIORITIZED"
              }
            },
            "TargetOnDemandCapacity": "5",
            "TargetSpotCapacity": "0"
          }
        },
        "Name": "blog-test",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "ServiceRole": "EMR_DefaultRole",
        "ReleaseLabel": "emr-7.2.0"
      }
    }
  }
}

Enhanced subnet selection

To better understand Step 3 in the preceding workflow, let’s explore how enhanced subnet selection works with instance fleet EMR clusters.

For our example, let’s configure an EMR instance fleet as follows:

  • Primary fleet (1 unit) – r8g.xlarge, r6g.xlarge, r8g.2xlarge
  • Core fleet (48 units) – r6g.xlarge, r6g.2xlarge, m7g.2xlarge
  • Task fleet (48 units) – m7g.2xlarge, r6g.xlarge, r6a.4xlarge

For this example, let’s use the lowest price allocation strategy. Next, let’s check the available IP addresses in our subnets using the AWS CLI:

aws ec2 describe-subnets 
--query "sort_by(Subnets, &SubnetId)[*].[SubnetId, AvailableIpAddressCount, AvailabilityZoneId]" 
--output table

We get the following results:

--------------------------------------------------
|                 DescribeSubnets                |
+---------------------------+-------+------------+
|subnet-XXXXXXXXXXXXXXXX1   |  27  |  us-east-1a |
|subnet-XXXXXXXXXXXXXXXX2   |  251 |  us-east-1b |
|subnet-XXXXXXXXXXXXXXXX3   |  11  |  us-east-1a |
-------------------------------------------------

When launching an EMR cluster, Amazon EMR follows a specific subnet filtering process. First, EMR on EC2 evaluates subnets based on the total IP addresses required for all node types: primary, core, and task nodes. If multiple subnets have sufficient IP capacity to accommodate all instance fleets, Amazon EMR selects one based on the cluster’s allocation strategy. However, if no subnet has enough IPs to support all node types, Amazon EMR considers subnets that can at least accommodate the primary and core nodes, again using the allocation strategy to make the final selection. In our case, Amazon EMR selected a subnet in Availability Zone us-east-1b that had 251 available IPs that can support 97 instances to launch the whole cluster, bypassing smaller subnets with only 27 or 11 available IPs because they didn’t meet the minimum IP requirements for the cluster configuration.

  • Primary fleet (1 unit) – r6g.xlarge
  • Core fleet (48 units) – m7g.2xlarge
  • Task fleet (48 units) – r6g.xlarge

The EMR and CloudWatch event for this cluster would be:

Amazon EMR cluster j-X40BEI1Oxxx (Cluster) 
is being created in subnet (subnet-XXXXXXXXXXXXXXXX2) 
in VPC (vpc-XXXXXXXXXXXXXXXX1) in Availability Zone (us-east-1b), 
which was chosen from the specified VPC options.

If Amazon EMR can’t find a subnet with sufficient IP addresses to launch the entire cluster, it will prioritize launching the core and primary instance fleets. If no configured subnet can accommodate even the core and primary fleets, Amazon EMR will fail the cluster launch and provide a critical error event. These CloudWatch events enable you to monitor your clusters and take necessary actions.

Conclusion

The latest enhancements to EMR instance fleets mark a significant advancement in cloud-based big data processing, addressing key challenges in resource allocation, scalability, and reliability. These features, including priority-based instance selection and enhanced subnet selection, provide you with greater control over resource strategies, improved cluster availability, enhanced capacity optimization across Availability Zones, and more efficient fallback mechanisms for production workloads. Instance fleets help you tackle current resource management challenges while laying the groundwork for future scalability.

Get started today by setting up an EMR cluster using the example configuration provided in this post. For additional configuration options and implementation guidance, refer here or reach out to your AWS account team.


About the Authors

Deepmala Agarwal works as an AWS Data Specialist Solutions Architect. She is passionate about helping customers build out scalable, distributed, and data-driven solutions on AWS. When not at work, Deepmala likes spending time with family, walking, listening to music, watching movies, and cooking!

Ravi Kumar Singh is a Senior Product Manager Technical-ES (PMT) at Amazon Web Services, specialized in building petabyte-scale data infrastructure and analytics platforms. With a passion for building innovative tools, he helps customers unlock valuable insights from their structured and unstructured data. Ravi’s expertise lies in creating robust data foundations using open source technologies and advanced cloud computing that power advanced artificial intelligence and machine learning use cases. A recognized thought leader in the field, he advances the data and AI ecosystem through pioneering solutions and collaborative industry initiatives. As a strong advocate for customer-centric solutions, Ravi constantly seeks ways to simplify complex data challenges and enhance user experiences. Outside of work, Ravi is an avid technology enthusiast who enjoys exploring emerging trends in data science, cloud computing, and machine learning.

Mandisa Nxumalo is a Cloud Engineer at Amazon Web Services (AWS) with over 5 years experience in topics related to cloud services (databases, automation, and others). Currently, specializing in Big data service Amazon EMR. She is passionate about engaging customers to effectively adopt and utilize data driven approaches to improve their big data workflows. Outside work, Mandisa enjoys hiking mountains, chasing waterfalls and travelling across countries.

Kashif Khan is a Sr. Analytics Specialist Solutions Architect at AWS, specializing in big data services like Amazon EMR, AWS Lake Formation, AWS Glue, Amazon Athena, and Amazon DataZone. With over a decade of experience in the big data domain, he possesses extensive expertise in architecting scalable and robust solutions. His role involves providing architectural guidance and collaborating closely with customers to design tailored solutions using AWS analytics services to unlock the full potential of their data.

Gaurav Sharma is a Specialist Solutions Architect (Analytics) at AWS, supporting US public sector customers on their cloud journey. Outside of work, Gaurav enjoys spending time with his family and reading books.

Amazon Redshift announces history mode for zero-ETL integrations to simplify historical data tracking and analysis

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/amazon-redshift-announces-history-mode-for-zero-etl-integrations-to-simplify-historical-data-tracking-and-analysis/

In the ever-evolving landscape of cloud computing and data management, AWS has consistently been at the forefront of innovation. One of the groundbreaking developments in recent years is zero-ETL integration, a set of fully managed integrations by AWS that minimizes the need to build extract, transform, and load (ETL) data pipelines. This post will explore brief history of zero-ETL, its importance for customers, and introduce an exciting new feature: history mode for Amazon Aurora PostgreSQL-Compatible Edition, Amazon Aurora MySQL-Compatible Edition, Amazon Relational Database Service (Amazon RDS) for MySQL, and Amazon DynamoDB zero-ETL integration with Amazon Redshift.

A brief history of zero-ETL integrations

The concept of zero-ETL integrations emerged as a response to the growing complexities and inefficiencies in traditional ETL processes. Traditional ETL processes are time-consuming and complex to develop, maintain, and scale. Although not all use cases can be replaced with zero-ETL, it simplifies the replication and allows you to apply transformation post-replication. This eliminates the need for additional ETL technology between the source database and Amazon Redshift. We at AWS recognized the need for a more streamlined approach to data integration, particularly between operational databases and the cloud data warehouses. The journey of zero-ETL began in late 2022 when we introduced the feature for Aurora MySQL with Amazon Redshift. This feature marked a pivotal moment in streamlining complex data workflows, enabling near real-time data replication and analysis while eliminating the need for ETL processes.

Building on the success of our first zero-ETL integration, we’ve made continuous strides in this space by working backward from our customers’ needs and launching features like data filtering, auto and incremental refresh of materialized views, refresh interval, and more. Furthermore, we increased the breadth of sources to include Aurora PostgreSQL, DynamoDB, and Amazon RDS for MySQL to Amazon Redshift integrations, solidifying our commitment to making it seamless for you to run analytics on your data. The introduction of zero-ETL was not just a technological advancement; it represented a paradigm shift in how organizations could approach their data strategies. By removing the need for intermediate data processing steps, we opened up new possibilities for near real-time analytics and decision-making.

Introducing history mode: A new frontier in data analysis

Zero-ETL has already simplified the data integration, and we’re excited to further enhance the capabilities by announcing a new feature that takes it a step further: history mode with Amazon Redshift. Using history mode with zero-ETL integrations, you can streamline your historical data analysis by maintaining full change data capture (CDC) from the source in Amazon Redshift. History mode enables you to unlock the full potential of your data by seamlessly capturing and retaining historical versions of records across your zero-ETL data sources. You can perform advanced historical analysis, build look back reports, perform trend analysis, and create slowly changing dimensions (SCD) Type 2 tables on Amazon Redshift. This allows you to consolidate your core analytical assets and derive insights across multiple applications, gaining cost savings and operational efficiencies. History mode enables organizations to comply with regulatory requirements for maintaining historical records, facilitating comprehensive data governance and informed decision-making.

Zero-ETL integrations provide a current view of records in near real time, meaning only the latest changes from source databases are retained on Amazon Redshift. With history mode, Amazon Redshift introduces a revolutionary approach to historical data analysis. You can now configure your zero-ETL integrations to track every version of your records in source tables directly in Amazon Redshift, along with the source timestamp with every record version indicating when each record was inserted, modified, or deleted. Because data changes are tracked and retained by Amazon Redshift, this can help you meet your compliance requirements without having to maintain duplicate copies in data sources. In addition, you don’t have to maintain and manage partitioned tables to keep older data intact as separate partitions to version records, and maintain historical data in source databases.

In a data warehouse, the most common dimensional modeling techniques is a star schema, where there is a fact table at the center surrounded by a number of associated dimension tables. A dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time, or product are dimensions and sales transactions is a fact. An SCD is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). CDC is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

In this post, we demonstrate how to enable history mode for tables in a zero-ETL integration and capture the full historical data changes as SCD2.

Solution overview

In this use case, we explore how a fictional nationwide retail chain, AnyCompany, uses AWS services to gain valuable insights into their customer base. With multiple locations across the country, AnyCompany aims to enhance their understanding of customer behavior and improve their marketing strategies through two key initiatives:

  • Customer migration analysis – AnyCompany seeks to track and analyze customer relocation patterns, focusing on how geographical moves impact purchasing behavior. By monitoring these changes, the company can adapt its inventory, services, and local marketing efforts to better serve customers in their new locations.
  • Marketing campaign effectiveness – The retailer wants to evaluate the impact of targeted marketing campaigns based on customer demographics at the time of campaign execution. This analysis can help AnyCompany refine its marketing strategies, optimize resource allocation, and improve overall campaign performance.

By closely tracking changes in customer profiles for both geographic movement and marketing responsiveness, AnyCompany is positioning itself to make more informed, data-driven decisions.

In this demonstration, we begin by loading a sample dataset into the source table, customer, in Aurora PostgreSQL-Compatible. To maintain historical records, we enable history mode on the customer table, which automatically tracks changes in Amazon Redshift.

When history mode is turned on, the following columns are automatically added to the target table, customer, in Amazon Redshift to keep track of changes in the source.

Column name Data type Description
_record_is_active Boolean Indicates if a record in the target is currently active in the source. True indicates the record is active.
_record_create_time Timestamp Starting time (UTC) when the source record is active.
_record_delete_time Timestamp Ending time (UTC) when the source record is updated or deleted.

Next, we create a dimension table, customer_dim, in Amazon Redshift with an additional surrogate key column to show an example of creating an SCD table. To optimize query performance for different queries, some of which might be analyzing active or inactive records only while other queries might be analyzing data as of a certain date, we defined the sort key consisting of _record_is_active, _record_create_time, and _record_delete_time attributes in the customer_dim table.

The following figure provides the schema of the source table in Aurora PostgreSQL-Compatible, and the target table and target customer dimension table in Amazon Redshift.
schema

To streamline the data population process, we developed a stored procedure named SP_Customer_Type2_SCD(). This procedure is designed to populate incremental data into the customer_dim table from the replicated customer table. It handles various data changes, including updates, inserts, and deletes in the source table and implementing an SCD2 approach.

Prerequisites

Before you get started, complete the following steps:

  1. Configure your Aurora DB cluster and your Redshift data warehouse with the required parameters and permissions. For instructions, refer to Getting started with Aurora zero-ETL integrations with Amazon Redshift.
  2. Create an Aurora zero-ETL integration with Amazon Redshift.
  3. From an Amazon Elastic Compute Cloud (Amazon EC2) terminal or using AWS CloudShell, SSH into the Aurora PostgreSQL cluster and run the following commands to install psql:
sudo dnf install postgresql15
psql --version
  1. Load the sample source data:
    • Download the TPC-DS sample dataset for the customer table onto the machine running psql.
    • From the EC2 terminal, run the following command to connect to the Aurora PostgreSQL DB using the default super user postgres:
      psql -h <RDS Write Instance Endpoint> -p 5432 -U postgres

    • Run the following SQL command to create the database zetl:
      create database zetl template template1;

    • Change the connection to the newly created database:
      \c zetl

    • Create the customer table (the following example creates it in the public schema):
      CREATE TABLE customer(
          c_customer_id char(16) NOT NULL PRIMARY KEY,
          c_salutation char(10),
          c_first_name char(20),
          c_last_name char(30),
          c_preferred_cust_flag char(1),
          c_birth_day int4,
          c_birth_month int4,
          c_birth_year int4,
          c_birth_country varchar(20),
          c_login char(13),
          c_email_address char(50),
          ca_street_number char(10),
          ca_street_name varchar(60),
          ca_street_type char(15),
          ca_suite_number char(10),
          ca_city varchar(60),
          ca_county varchar(30),
          ca_state char(2),
          ca_zip char(10),
          ca_country varchar(20),
          ca_gmt_offset numeric(5, 2),
          ca_location_type char(20)
      );

    • Run the following command to load customer data from the downloaded dataset after changing the highlighted location of the dataset to your directory path:
      \copy customer from '/home/ec2-user/customer_sample_data.dat' WITH DELIMITER '|' CSV;

    • Run the following query to validate the successful creation of the table and loading of sample data:
      SELECT table_catalog, table_schema, table_name, n_live_tup AS row_count
      FROM information_schema.tables JOIN g_stat_user_tables ON table_name = relname
      WHERE table_type = 'BASE TABLE'
      ORDER BY row_count DESC;

The SQL output should be as follows:

table_catalog | table_schema | table_name | row_count
---------------+--------------+------------+-----------
zetl          | public       | customer   |   1200585
(1 row)

Create a target database in Amazon Redshift

To replicate data from your source into Amazon Redshift, you must create a target database from your integration in Amazon Redshift. For this post, we have already created a source database called zetl in Aurora PostgreSQL-Compatible as part of the prerequisites. Complete the following steps to create the target database:

  1. On the Amazon Redshift console, choose Query editor v2 in the navigation pane.
  2. Run the following commands to create a database called postgres in Amazon Redshift using the zero-ETL integration_id with history mode turned on.
-- Amazon Redshift SQL commands to create database
SELECT integration_id FROM svv_integration; -- copy this result, use in the next sql
CREATE DATABASE "postgres" FROM INTEGRATION '<result from above>' DATABASE "zetl" SET HISTORY_MODE = TRUE;

History mode turned on at the time of target database creation on Amazon Redshift will enable history mode for existing and new tables created in the future.

  1. Run the following query to validate the successful replication of the initial data from the source into Amazon Redshift:
select is_history_mode, table_name, table_state, * from svv_integration_table_state;

The table customer should show table_state as Synced with is_history_mode as true.
histmode-true

Enable history mode for existing zero-ETL integrations

History mode can be enabled for your existing zero-ETL integrations using either the Amazon Redshift console or SQL commands. Based on your use case, you can turn on history mode at the database, schema, or table level. To use the Amazon Redshift console, complete the following steps:

  1. On the Amazon Redshift console, choose Zero-ETL integrations in the navigation pane.
  2. Choose your desired integration.
  3. Choose Manage history mode.
    zelt-integratin

On this page, you can either enable or disable history mode for all tables or a subset of tables.

  1. Select Manage history mode for individual tables and select Turn on for the history mode for the customer
  2. Choose Save changes.
    table-hist-mode
  3. To confirm changes, choose Table statistics and make sure History mode is On for the customer.
    table-stats
  4. Optionally, you can run the following SQL command in Amazon Redshift to enable history mode for the customer table:
ALTER DATABASE "postgres" INTEGRATION SET HISTORY_MODE = TRUE FOR TABLE public.customer;
  1. Optionally, you can enable history mode for all current and tables created in the future in the database:
ALTER DATABASE "postgres" INTEGRATION SET HISTORY_MODE = TRUE FOR ALL TABLES;
  1. Optionally, you can enable history mode for all current and tables created in the future in one or more schemas. The following query enables history mode for all current and tables created in the future for the public schema:
ALTER DATABASE "postgres" INTEGRATION SET HISTORY_MODE = TRUE FOR ALL TABLES IN SCHEMA public;
  1. Run the following query to validate if the customer table has been successfully changed to history mode with the is_history_mode column as true so that it can begin tracking every version (including updates and deletes) of all records changed in the source:
select is_history_mode, table_name, table_state, * from svv_integration_table_state;

Initially, the table will be in ResyncInitiated state before changing to Synced.
table-synced

  1. Run the following query in the zetl database of Aurora PostgreSQL-Compatible to modify a source record and observe the behavior of history mode in the Amazon Redshift target:
UPDATE customer
SET
    ca_suite_number = 'Suite 100',
    ca_street_number = '500',
    ca_street_name = 'Main',
    ca_street_type = 'St.',
    ca_city = 'New York',
    ca_county = 'Manhattan',
    ca_state = 'NY',
    ca_zip = '10001'
WHERE c_customer_id = 'AAAAAAAAAAAKNAAA';
  1. Now run the following query in the postgres database of Amazon Redshift to see all versions of the same record:
SELECT   
    c_customer_id,
    ca_street_number,
    ca_street_name,
    ca_suite_number,
    ca_city,
    ca_county,
    ca_state,
    ca_zip,
    _record_is_active,
    _record_create_time,
    _record_delete_time
FROM postgres.public.customer
WHERE c_customer_id = 'AAAAAAAAAAAKNAAA';

Zero-ETL integrations with history mode has inactivated the old record with the _record_is_active column value to false and created a new record with _record_is_active as true. You can also see how it maintains the _record_create_time and _record_delete_time column values for both records. The inactive record has a delete timestamp that matches the active record’s create timestamp.
table-history

Load incremental data in an SCD2 table

Complete the following steps to create an SCD2 table and implement an incremental data load process in a regular database of Amazon Redshift, in this case dev:

  1. Create an empty customer SDC2 table called customer_dim with SCD fields. The table also has DISTSTYLE AUTO and SORTKEY columns _record_is_active, _record_create_time, and _record_delete_time. When you define a sort key on a table, Amazon Redshift can skip reading entire blocks of data for that column. It can do so because it tracks the minimum and maximum column values stored on each block and can skip blocks that don’t apply to the predicate range.
CREATE TABLE dev.public.customer_dim (
    c_customer_sk bigint NOT NULL DEFAULT 0 ENCODE raw distkey,
    c_customer_id character varying(19) DEFAULT '' :: character varying ENCODE lzo,
    c_salutation character varying(12) ENCODE bytedict,
    c_first_name character varying(24) ENCODE lzo,
    c_last_name character varying(36) ENCODE lzo,
    c_preferred_cust_flag character varying(1) ENCODE lzo,
    c_birth_day integer ENCODE az64,
    c_birth_month integer ENCODE az64,
    c_birth_year integer ENCODE az64,
    c_birth_country character varying(24) ENCODE bytedict,
    c_login character varying(15) ENCODE lzo,
    c_email_address character varying(60) ENCODE lzo,
    ca_street_number character varying(12) ENCODE lzo,
    ca_street_name character varying(72) ENCODE lzo,
    ca_street_type character varying(18) ENCODE bytedict,
    ca_suite_number character varying(12) ENCODE bytedict,
    ca_city character varying(72) ENCODE lzo,
    ca_county character varying(36) ENCODE lzo,
    ca_state character varying(2) ENCODE lzo,
    ca_zip character varying(12) ENCODE lzo,
    ca_country character varying(24) ENCODE lzo,
    ca_gmt_offset numeric(5, 2) ENCODE az64,
    ca_location_type character varying(24) ENCODE bytedict,
    _record_is_active boolean ENCODE raw,
    _record_create_time timestamp without time zone ENCODE az64,
    _record_delete_time timestamp without time zone ENCODE az64,
    PRIMARY KEY (c_customer_sk)
) SORTKEY (
    _record_is_active,
    _record_create_time,
    _record_delete_time
);

Next, you create a stored procedure called SP_Customer_Type2_SCD() to populate incremental data in the customer_dim SCD2 table created in the preceding step. The stored procedure contains the following components:

    • First, it fetches the max _record_create_time and max _record_delete_time for each customer_id.
    • Then, it compares the output of the preceding step with the ongoing zero-ETL integration replicated table for records created after the max creation time in the dimension table or the record in the replicated table with _record_delete_time after the max _record_delete_time in the dimension table for each customer_id.
    • The output of the preceding step captures the changed data between the replicated customer table and target customer_dim dimension table. The interim data is staged to a customer_stg table, which is ready to be merged with the target table.
    • During the merge process, records that need to be deleted are marked with _record_delete_time and _record_is_active is set to false, whereas newly created records are inserted into the target table customer_dim with _record_is_active as true.
  1. Create the stored procedure with the following code:
CREATE OR REPLACE PROCEDURE public.sp_customer_type2_scd()
LANGUAGE plpgsql
AS $$
    BEGIN

    DROP TABLE IF EXISTS cust_latest;

    -- Create temp table with latest record timestamps
         CREATE TEMP TABLE cust_latest DISTKEY (c_customer_id) 
    AS
        SELECT
            c_customer_id,
            max(_record_create_time) AS _record_create_time,
            max(_record_delete_time) AS _record_delete_time
        FROM customer_dim 
        GROUP BY c_customer_id;
    
    DROP TABLE IF EXISTS customer_stg;

    -- Identify and stage changed records
    CREATE TEMP TABLE customer_stg 
    AS           
    SELECT
            ABS(fnv_hash(cust.c_customer_id)) as customer_sk,
            cust.*
            FROM
                postgres.public.customer cust
LEFT OUTER JOIN cust_latest ON cust.c_customer_id = cust_latest.c_customer_id
WHERE (cust._record_create_time > NVL(cust_latest._record_create_time, '1099-01-01 01:01:01') AND cust._record_is_active is true)
OR (cust._record_delete_time > NVL(cust_latest._record_delete_time, '1099-01-01 01:01:01') AND cust._record_is_active is false);

    -- Merge changes to customer dimension table
    MERGE INTO public.customer_dim 
    USING customer_stg stg 
    ON customer_dim.c_customer_id = stg.c_customer_id
        AND customer_dim._record_is_active = TRUE
        AND stg._record_is_active = false
    WHEN MATCHED THEN
        UPDATE
        SET
            _record_is_active = stg._record_is_active,
            _record_create_time = stg._record_create_time,
            _record_delete_time = stg._record_delete_time
    WHEN NOT MATCHED THEN
        INSERT
        VALUES
            (
                stg.customer_sk,
                stg.c_customer_id,
                stg.c_salutation,
                stg.c_first_name,
                stg.c_last_name,
                stg.c_preferred_cust_flag,
                stg.c_birth_day,
                 	     stg.c_birth_month,
                stg.c_birth_year,
                stg.c_birth_country,
                stg.c_login,
                stg.c_email_address,
                stg.ca_street_number,
                stg.ca_street_name,
                stg.ca_street_type,
                stg.ca_suite_number,
                stg.ca_city,
                stg.ca_county,
                stg.ca_state,
                stg.ca_zip,
                stg.ca_country,
                stg.ca_gmt_offset,
                stg.ca_location_type,
                stg._record_is_active,
                stg._record_create_time,
                stg._record_delete_time
            );

    END;
    $$
  1. Run and schedule the stored procedure to load the initial and ongoing incremental data into the customer_dim SCD2 table:
CALL SP_Customer_Type2_SCD();
  1. Validate the data in the customer_dim table for the same customer with a changed address:
SELECT
    c_customer_id,
    ca_street_number,
    ca_street_name,
    ca_suite_number,
    ca_city,
    ca_county,
    ca_state,
    ca_zip,
    _record_is_active,
    _record_create_time,
    _record_delete_time
FROM customer_dim
WHERE c_customer_id = 'AAAAAAAAAAAKNAAA';

dim-history

You have successfully implemented an incremental load strategy for the customer SCD2 table. Going forward, all changes to customer will be tracked and maintained in this customer dimension table by running the stored procedure. This enables you to analyze customer data at a desired point in time for varying use cases, for example, performing customer migration analysis and seeing how geographical moves impact purchasing behavior, or marketing campaign effectiveness to analyze the impact of targeted marketing campaigns on customer demographics at the time of campaign execution.

Industry use cases for history mode

The following are other industry use cases enabled by history mode between operational data stores and Amazon Redshift:

  • Financial auditing or regulatory compliance – Track changes in financial records over time to support compliance and audit requirements. History mode allows auditors to reconstruct the state of financial data at any point in time, which is crucial for investigations and regulatory reporting.
  • Customer journey analysis – Understand how customer data evolves to gain insights into behavior patterns and preferences. Marketers can analyze how customer profiles change over time, informing personalization strategies and lifetime value calculations.
  • Supply chain optimization – Analyze historical inventory and order data to identify trends and optimize stock levels. Supply chain managers can review how demand patterns have shifted over time, improving forecasting accuracy.
  • HR analytics – Track employee data changes over time for better workforce planning and performance analysis. HR professionals can analyze career progression, salary changes, and skill development trends across the organization.
  • Machine learning model auditing – Data scientists can use historical data to train models, compare predictions vs. actuals to improve accuracy, and help explain model behavior and identify potential biases over time.
  • Hospitality and airline industry use cases – For example:
    • Customer service – Access historical reservation data to swiftly address customer queries, enhancing service quality and customer satisfaction.
    • Crew scheduling – Track crew schedule changes to help comply with union contracts, maintaining positive labor relations and optimizing workforce management.
    • Data science applications – Use historical data to train models on multiple scenarios from different time periods. Compare predictions against actuals to improve model accuracy for key operations such as airport gate management, flight prioritization, and crew scheduling optimization.

Best practices

If your requirement is to separate active and inactive records, you can use _record_is_active as the first sort key. For other patterns where you want to analyze data as of a specific date in the past, irrespective of whether data is active or inactive, _record_create_time and _record_delete_time can be added as sort keys.

History mode retains record versions, which will increase the table size in Amazon Redshift and could impact query performance. Therefore, periodically perform DML deletes for outdated record versions (delete data beyond a certain timeframe if not needed for analysis). When executing these deletions, maintain data integrity by deleting across all related tables. Vacuuming also becomes necessary when you perform DML deletes on records whose versioning is no longer required. To improve auto vacuum delete efficiency, Amazon Redshift auto vacuum delete is more efficient when operating on bulk deletes. You can monitor vacuum progression using the SYS_VACUUM_HISTORY table.

Clean up

Complete the following steps to clean up your resources:

  1. Delete the Aurora PostgreSQL cluster.
  2. Delete the Redshift cluster.
  3. Delete the EC2 instance.

Conclusion

Zero-ETL integrations have already made significant strides in simplifying data integration and enabling near real-time analytics. With the addition of history mode, AWS continues to innovate, providing you with even more powerful tools to derive value from your data.

As businesses increasingly rely on data-driven decision-making, zero-ETL with history mode will be crucial in maintaining a competitive edge in the digital economy. These advancements not only streamline data processes but also open up new avenues for analysis and insight generation.

To learn more about zero-ETL integration with history mode, refer to Zero-ETL integrations and Limitations. Get started with zero-ETL on AWS by creating a free account today!


About the Authors

Raks KhareRaks Khare is a Senior Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers across varying industries and regions architect data analytics solutions at scale on the AWS platform. Outside of work, he likes exploring new travel and food destinations and spending quality time with his family.

Jyoti Aggarwal is a Product Management Lead for AWS zero-ETL. She leads the product and business strategy, including driving initiatives around performance, customer experience, and security. She brings along an expertise in cloud compute, data pipelines, analytics, artificial intelligence (AI), and data services including databases, data warehouses and data lakes.

Gopal Paliwal is a Principal Engineer for Amazon Redshift, leading the software development of ZeroETL initiatives for Amazon Redshift.

Harman Nagra is a Principal Solutions Architect at AWS, based in San Francisco. He works with global financial services organizations to design, develop, and optimize their workloads on AWS.

Sumanth Punyamurthula is a Senior Data and Analytics Architect at Amazon Web Services with more than 20 years of experience in leading large analytical initiatives, including analytics, data warehouse, data lakes, data governance, security, and cloud infrastructure across travel, hospitality, financial, and healthcare industries.

Streamline AWS WAF log analysis with Apache Iceberg and Amazon Data Firehose

Post Syndicated from Charishma Makineni original https://aws.amazon.com/blogs/big-data/streamline-aws-waf-log-analysis-with-apache-iceberg-and-amazon-data-firehose/

Organizations are rapidly expanding their digital presence, creating opportunities to serve customers better through web applications. AWS WAF logs play a vital role in this expansion by enabling organizations to proactively monitor security, enforce compliance, and strengthen application defense. AWS WAF log analysis is essential across many industries, including banking, retail, and healthcare, each needing to deliver secure digital experiences.

To optimize their security operations, organizations are adopting modern approaches that combine real-time monitoring with scalable data analytics. They are using data lake architectures and Apache Iceberg to efficiently process large volumes of security data while minimizing operational overhead. Apache Iceberg combines enterprise reliability with SQL simplicity when working with security data stored in Amazon Simple Storage Service (Amazon S3), enabling organizations to focus on security insights rather than infrastructure management.

Apache Iceberg enhances security analytics through several key capabilities. It seamlessly integrates with various AWS services and analysis tools while supporting concurrent read-write operations for simultaneous log ingestion and analysis. Its time travel feature enables thorough security forensics and incident investigation, and its schema evolution support allows teams to adapt to emerging security patterns without disrupting existing workflows. These capabilities make Apache Iceberg an ideal choice for building robust security analytics solutions. However, organizations often struggle when building their own solutions to deliver data to Apache Iceberg tables. These include managing complex extract, transform, and load (ETL) processes, handling schema validation, providing reliable delivery, and maintaining custom code for data transformations. Teams must also build resilient error handling, implement retry logic, and manage scaling infrastructure—all while maintaining data consistency and high availability. These challenges take valuable time away from analyzing security data and deriving insights.

To address these challenges, Amazon Data Firehose provides real-time data delivery to Apache Iceberg tables within seconds. Firehose delivers high reliability across multiple Availability Zones while automatically scaling to match throughput requirements. It is fully managed and requires no infrastructure management or custom code development. Firehose delivers streaming data with configurable buffering options that can be optimized for near-zero latency. It also provides built-in data transformation, compression, and encryption capabilities, along with automatic retry mechanisms to provide reliable data delivery. This makes it an ideal choice for streaming AWS WAF logs directly into a data lake while minimizing operational overhead.

In this post, we demonstrate how to build a scalable AWS WAF log analysis solution using Firehose and Apache Iceberg. Firehose simplifies the entire process—from log ingestion to storage—by allowing you to configure a delivery stream that delivers AWS WAF logs directly to Apache Iceberg tables in Amazon S3. The solution requires no infrastructure setup and you pay only for the data you process.

Solution overview

To implement this solution, you first configure AWS WAF logging to capture web traffic information. This captures detailed information about traffic analyzed by the web access control lists (ACLs). Each log entry includes the request timestamp, detailed request information, and rule matches that were triggered. These logs are continuously streamed to Firehose in real time.

Firehose writes these logs into an Apache Iceberg table, which is stored in Amazon S3. When Firehose delivers data to the S3 table, it uses the AWS Glue Data Catalog to store and manage table metadata. This metadata includes schema information, partition details, and file locations, enabling seamless data discovery and querying across AWS analytics services.

Finally, security teams can analyze data in the Apache Iceberg tables using various AWS services, including Amazon Redshift, Amazon Athena, Amazon EMR, and Amazon SageMaker. For this demonstration, we use Athena to run SQL queries against the security logs.

The following diagram illustrates the solution architecture.

 

The implementation consists of four steps:

  1. Deploy the base infrastructure using AWS CloudFormation.
  2. Create an Apache Iceberg table using an AWS Glue notebook.
  3. Create a Firehose stream to handle the log data.
  4. Configure AWS WAF logging to send data to the Apache Iceberg table through the Firehose stream.

You can deploy the required resources into your AWS environment in the US East (N. Virginia) AWS Region using a CloudFormation template. This template creates an S3 bucket for storing AWS WAF logs, an AWS Glue database for the Apache Iceberg tables, and the AWS Identity and Access Management (IAM) roles and policies needed for the solution.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An AWS account with access to the US East (N. Virginia) Region
  • AWS WAF configured with a web ACL in the US East (N. Virginia) Region

If you don’t have AWS WAF set up, refer to the AWS WAF Workshop to create a sample web application with AWS WAF.

AWS WAF logs use case-sensitive field names (like httpRequest and webaclId). For successful log ingestion, this solution uses the Apache Iceberg API through an AWS Glue job to create tables—this is a reliable approach that preserves the exact field names from the AWS WAF logs. Although AWS Glue crawlers and Athena DDLs offer convenient ways to create Apache Iceberg tables, they convert mixed-case column names to lowercase, which can affect AWS WAF log processing. By using an AWS Glue job with the Apache Iceberg API, case-sensitivity of column names is preserved, providing proper mapping between AWS WAF log fields and table columns.

Deploy the CloudFormation stack

Complete the following steps to deploy the solution resources with AWS CloudFormation:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack.
    Launch Cloudformation Stack
  3. Choose Next.
  4. For Stack name, leave as WAF-Firehose-Iceberg-Stack.
  5. Under Parameters, specify whether AWS Lake Formation permissions are to be used for the AWS Glue tables.
  6. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Next.

 

  1. Review the deployment and choose Submit.

 

The stack takes several minutes to deploy. After the deployment is complete, you can review the resources created by navigating to the Resources tab on the CloudFormation stack.

Create an Apache Iceberg table

Before setting up the Firehose delivery stream, you must create the destination Apache Iceberg table in the Data Catalog. This is done using AWS Glue jobs and the Apache Iceberg API, as discussed earlier. Complete the following steps to create an Apache Iceberg table:

  1. On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane.

 

  1. Choose Notebook option under Create job.

 

  1. Under Options, select Start fresh.
  2. For IAM role, choose WAF-Firehose-Iceberg-Stack-GlueServiceRole-*.
  3. Choose Create notebook.

  1. Enter the following configuration command in the notebook to configure the Spark session with Apache Iceberg extensions. Be sure to update the configuration for sql.catalog.glue_catalog.warehouse to the S3 bucket created by the CloudFormation template.
%%configure
{
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://<S3BucketName>/waflogdata --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO",
    "--datalake-formats": "iceberg"
}

  1. Enter the following SQL in the AWS Glue notebook to create the Apache Iceberg table:
# Note: This code uses Glue version 5.0 (as of April 2024)
# Please check AWS Glue release notes for the latest version and update accordingly:
# https://docs.aws.amazon.com/glue/latest/dg/release-notes.html
# To update: Change the %glue_version parameter below to the latest version

%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.conf import SparkConf

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

spark.sql(""" CREATE TABLE glue_catalog.waf_logs_db.firehose_waf_logs(
  `timestamp` bigint,
  `formatVersion` int,
  `webaclId` string,
  `terminatingRuleId` string,
  `terminatingRuleType` string,
  `action` string,
  `terminatingRuleMatchDetails` array <
                                    struct <
                                        conditiontype: string,
                                        sensitivitylevel: string,
                                        location: string,
                                        matcheddata: array < string >
                                          >
                                     >,
  `httpSourceName` string,
  `httpSourceId` string,
  `ruleGroupList` array <
                      struct <
                          rulegroupid: string,
                          terminatingrule: struct <
                                              ruleid: string,
                                              action: string,
                                              rulematchdetails: array <
                                                                   struct <
                                                                       conditiontype: string,
                                                                       sensitivitylevel: string,
                                                                       location: string,
                                                                       matcheddata: array < string >
                                                                          >
                                                                    >
                                                >,
                          nonterminatingmatchingrules: array <
                                                              struct <
                                                                  ruleid: string,
                                                                  action: string,
                                                                  overriddenaction: string,
                                                                  rulematchdetails: array <
                                                                                       struct <
                                                                                           conditiontype: string,
                                                                                           sensitivitylevel: string,
                                                                                           location: string,
                                                                                           matcheddata: array < string >
                                                                                              >
                                                                   >,
                                                                  challengeresponse: struct <
                                                                            responsecode: string,
                                                                            solvetimestamp: string
                                                                              >,
                                                                  captcharesponse: struct <
                                                                            responsecode: string,
                                                                            solvetimestamp: string
                                                                              >
                                                                    >
                                                             >,
                          excludedrules: string
                            >
                       >,
`rateBasedRuleList` array <
                         struct <
                             ratebasedruleid: string,
                             limitkey: string,
                             maxrateallowed: int
                               >
                          >,
  `nonTerminatingMatchingRules` array <
                                    struct <
                                        ruleid: string,
                                        action: string,
                                        rulematchdetails: array <
                                                             struct <
                                                                 conditiontype: string,
                                                                 sensitivitylevel: string,
                                                                 location: string,
                                                                 matcheddata: array < string >
                                                                    >
                                                             >,
                                        challengeresponse: struct <
                                                            responsecode: string,
                                                            solvetimestamp: string
                                                             >,
                                        captcharesponse: struct <
                                                            responsecode: string,
                                                            solvetimestamp: string
                                                             >
                                          >
                                     >,
  `requestHeadersInserted` array <
                                struct <
                                    name: string,
                                    value: string
                                      >
                                 >,
  `responseCodeSent` string,
  `httpRequest` struct <
                    clientip: string,
                    country: string,
                    headers: array <
                                struct <
                                    name: string,
                                    value: string
                                      >
                                 >,
                    uri: string,
                    args: string,
                    httpversion: string,
                    httpmethod: string,
                    requestid: string
                      >,
  `labels` array <
               struct <
                   name: string
                     >
                >,
  `CaptchaResponse` struct <
                        responsecode: string,
                        solvetimestamp: string,
                        failureReason: string
                          >,
  `ChallengeResponse` struct <
                        responsecode: string,
                        solvetimestamp: string,
                        failureReason: string
                        >,
  `ja3Fingerprint` string,
  `overSizeFields` string,
  `requestBodySize` int,
  `requestBodySizeInspectedByWAF` int
)
USING iceberg
TBLPROPERTIES ("format-version"="2")
""")
job.commit()

  1. Navigate to the Data Catalog and waf_logs_db database to confirm the table firehose_waf_logs is created.

Create a Firehose stream

Complete the following steps to create a Firehose stream:

  1. On the Data Firehose console, choose Create Firehose stream.

  1. Choose Direct PUT for Source and Apache Iceberg Tables for Destination.

  1. For Firehose stream name, enter aws-waf-logs-firehose-iceberg-1.
  1. In the Destination settings section, enable Inline parsing for routing information. Because we’re sending all records to one table, specify the destination database and table names:
    1. For Database expression, enter "waf_logs_db".
    2. For Table expression, enter "firehose_waf_logs".

Make sure to include double quotation marks to use the literal value for the database and table name. If you don’t use double quotation marks, Firehose assumes that this is a JSON query expression and will attempt to parse the expression when processing your stream and fail. Firehose can also route to different Apache Iceberg Tables based on the content of the data. For more information, refer to Route incoming records to different Iceberg Tables.

  1. For S3 backup bucket, enter the S3 bucket created by the CloudFormation template.
  2. For S3 backup bucket error output prefix, enter error/events-1/.

  1. Under Advanced settings, select Enable server-side encryption for source records in Firehose stream.

  1. For Existing IAM roles, choose the role that starts with WAF-Firehose-Iceberg-stack-FirehoseIAMRole-*, created by the CloudFormation template.
  2. Choose Create Firehose stream.

Configure AWS WAF logs to the Firehose stream

Complete the following steps to configure AWS WAF logs to the Firehose stream.

  1. On the AWS WAF console, choose Web ACLs in the navigation pane.

  1. Choose your web ACL.
  2. On the Logging and metrics tab, choose Enable.

  1. For Amazon Data Firehose stream, choose the stream aws-waf-logs-firehose-iceberg-1.
  2. Choose Save.

Query and analyze the logs

You can query the data you’ve written to your Apache Iceberg tables using different processing engines, such as Apache Spark, Apache Flink, or Trino. In this example, we use Athena to query AWS WAF logs data stored in Apache Iceberg tables. Complete the following steps:

  1. On the Athena console, choose Settings in the top right corner.
  2. For Location of query result, enter the S3 bucket created by the CloudFormation template

s3://<S3BucketName>/athena/

  1. Enter the AWS account ID for Expected bucket owner and choose save.

  1. In the query editor, in Tables and views, choose the options menu next to firehose_waf_logs and choose Preview Table.

You should be able to see the AWS WAF logs in the Apache Iceberg tables by using Athena.

The following are some additional useful example queries:

  • Identify potential attack sources by analyzing blocked IP addresses:
-- Top 10 blocked IP addresses
SELECT httpRequest.clientip, COUNT() as block_count
FROM waf_logs_db.firehose_waf_logs
WHERE action = 'BLOCK'
GROUP BY httpRequest.clientip
ORDER BY block_count DESC
LIMIT 10;
  • Monitor attack patterns and trends over time:
-- Rate of blocked requests over time
SELECT DATE_TRUNC('hour', FROM_UNIXTIME(timestamp/1000)) as hour,
       COUNT() as request_count
FROM waf_logs_db.firehose_waf_logs
WHERE action = 'BLOCK'
GROUP BY DATE_TRUNC('hour', FROM_UNIXTIME(timestamp/1000))
ORDER BY hour;

Apache Iceberg table optimization

Although Firehose enables efficient streaming of AWS WAF logs into Apache Iceberg tables, the nature of streaming writes can result in many small files being created. This is because Firehose delivers data based on its buffering configuration, which can lead to suboptimal query performance. To address this, regular table optimization is recommended.

There are two recommended table optimization approaches:

  • Compaction – Data compaction merges small data files to reduce storage usage and improve read performance. Data files are merged and rewritten to remove obsolete data and consolidate fragmented data into larger, more efficient files.
  • Storage optimization – You can manage storage overhead by removing older, unnecessary snapshots and their associated underlying files. Additionally, this includes periodically deleting orphan files to maintain efficient storage utilization and optimal query performance.

These optimizations can be implemented using either the Data Catalog or Athena.

Table optimization using the Data Catalog

The Data Catalog provides automatic table optimization features. Within the table optimization feature, you can configure specific optimizers for compaction, snapshot retention, and orphan file deletion. A table optimization schedule can be managed and status can be monitored from the AWS Glue console.

Table optimization using Athena

Athena supports manual optimization through SQL commands. The OPTIMIZE command rewrites small files into larger files and applies file compaction:

OPTIMIZE waf_logs_db.firehose_waf_logs REWRITE DATA USING BIN_PACK 

The VACUUM command removes old snapshots and cleans up expired data files:

ALTER TABLE waf_logs_db.firehose_waf_logs SET TBLPROPERTIES (
  'vacuum_max_snapshot_age_seconds'='259200'
)
VACUUM waf_logs_db.firehose_waf_logs

You can monitor the table’s optimization status using the following query:

SELECT * FROM "waf_logs_db"."firehose_waf_logs$files"

Clean up

To avoid future charges, complete the following steps:

  1. Empty the S3 bucket.
  2. Delete the CloudFormation stack.
  3. Delete the Firehose stream.
  4. Disable AWS WAF logging.

Conclusion

In this post, we demonstrated how to build an AWS WAF log analytics pipeline using Firehose to deliver AWS WAF logs to Apache Iceberg tables on Amazon S3. The solution handles large-scale AWS WAF log processing without requiring complex code or infrastructure management. Although this post focused on Apache Iceberg tables as the destination, Data Firehose also seamlessly integrates with Amazon S3 tables. To optimize your tables for querying, Amazon S3 Tables continuously performs automatic maintenance operations, such as compaction, snapshot management, and unreferenced file removal. These operations increase table performance by compacting smaller objects into fewer, larger files.

To get started with your own implementation, try the solution in your AWS account and explore the following resources for additional features and best practices:


About the Authors

Charishma Makineni is a Senior Technical Account Manager at AWS. She provides strategic technical guidance for Independent Software Vendors (ISVs) to build and optimize solutions on AWS. She specializes in Big Data and Analytics technologies, helping organizations optimize their data-driven initiatives on AWS.

Phaneendra Vuliyaragoli is a Product Management Lead for Amazon Data Firehose at AWS. In this role, Phaneendra leads the product and go-to-market strategy for Amazon Data Firehose.

Improving Security in Amazon WorkMail with MFA

Post Syndicated from Zip Zieper original https://aws.amazon.com/blogs/messaging-and-targeting/improving-security-in-amazon-workmail-with-mfa/

Securing your business email is more critical than ever in today’s digital workplace. To help you protect your users and data in Amazon WorkMail, we have introduced enhanced security features that give organizations more control and protection for their communication platforms. With the integration of AWS Identity and Access Management (IAM) Identity Center, WorkMail now offers robust multi-factor authentication and personal access token capabilities that can help prevent unauthorized access to user accounts and protect sensitive business communications. In this post, we’ll explore how these new security features can strengthen your organization’s email security strategy

Introduction

Email remains a critical business communication channel, yet it’s also one of the most targeted by cybercriminals. When you’re managing an organization’s communications, a single compromised account can lead to significant financial losses, damage your reputation, and serve as a gateway for additional cyber attacks. Traditional username and password protection is no longer adequate against growing cyber threats.

With Amazon WorkMail, you now have powerful tools to enhance your email security. Our support for Multi-Factor Authentication (MFA) and Personal Access Token (PAT) capabilities provides administrators with essential additional security layers to prevent unauthorized account access.

This blog demonstrates WorkMail’s integration with the IAM Identity Center’s default identity store to enable these advanced security features. If you’re using third-party identity providers like Microsoft Entra ID or Okta Universal Directory, you’ll find dedicated integration guides in our documentation.

High-level Overview

Amazon WorkMail’s default authentication is established via a unique username & password:

  1. Users of the WorkMail web-app sign in using their username and password.
  2. Users who access WorkMail from a desktop &/or mobile email application sign in using their username and password.WorkMail standard login

After you integrate WorkMail with IAM Identity Center, Amazon WorkMail can be configured with enhanced authentication that requires:

  1. Users of the WorkMail web-app will log-in via their unique AWS Access Portal using username, password and a MFA token. Upon successful log-in, they select and are redirected into the WorkMail web-app.
  2. Users who access WorkMail from a desktop &/or mobile email app continue to sign in to WorkMail using their username, however they must use a personal access token (PAT) instead of their WorkMail password.

WorkMail via MFA or PAT

More details about WorkMail authentication can be found in our documentation.

Prerequisites

  1. Administrator access to an AWS account
    1. You can evaluate the integration in this post for a limited period of time using an AWS Free Tier Account (link = https://aws.amazon.com/free )
  2. Administrator access to an Amazon WorkMail Organization
    1. Your WorkMail organization should have at least 3 or more users for testing
  3. Administrator access to Amazon IAM Identity Center
  4. Your WorkMail and IAM Identity Center must be in the same AWS region

High-level Configuration Steps

  1. Configure Identity Center (see our documentation for detailed information).
  2. Configure WorkMail to use Identity Center (see our documentation for detailed information).
  3. Assign IAM Identity Center users/groups to WorkMail organization
    1. Associate Amazon WorkMail users with IAM Identity Center users (this step is not necessary if your IdC and WorkMail user names are exactly the same, see our documentation for details)
  4. Check Authentication mode (see documentation) & Personal access token configuration (see documentation).
    1. Allow both WorkMail Directory (no MFA/PAT) and Identity Center (requires MFA & PAT) modes for testing.
  5. Test your users’ access to WorkMail with MFA & PAT
  6. Notify your WorkMail users of upcoming changes to login procedures.
  7. Switch WorkMail Authentication Mode to Identity Center only.
    1. When your users are ready for MFA and PAT, switch authentication mode to require MFA and desktop and mobile email clients to use PAT.
  8. Review additional WorkMail security guidance in AWS blogs and documentation to ensure you are up to date with the latest security guidance.

Detailed Configuration Guidance

Configure AWS IAM Identity Center

    1. Open the IdC console in the same AWS region as your WorkMail organization.
      1. If this is your first time accessing IAM Identity Center, you’ll be greeted with the IdC console home page and “Enable IAM Identity Center”. Click the **`Enable`** button.
      2. Enable IAM Identity Center
      3. Unless you have a reason to use an account instance of IdC, choose Enable.
      4. Org instance of IdC
      5. In a new browser window, open the WorkMail console in the same AWS region as the IdC you created above.
      6. Arrange the IdC console browser next to the WorkMail console browser window so you can easily copy/paste between the two services.
      7. Sync IdC and WorkMail users
      8. In the IdC console’s left navigation rail, choose users and click add user.
        1. Create several IdC user accounts with the same usernames and email addresses as your WorkMail users.
          1. Using identical usernames in Amazon WorkMail and IAM Identity Center simplifies user synchronization and reduces authentication errors during integration. This alignment streamlines troubleshooting and user lifecycle management while ensuring consistent access control across both services.)
        2. Make sure the “Send an email to this user with password setup instructions.” is selected.
          1. The user will receive an email with a link to set up a password and instructions to connect to the AWS access portal. The link will be valid for up to 7 days. You can grant this user permissions to accounts or applications (such as WorkMail) when they sign in to the AWS access portal.
          2. join-idc-email
      9. In IdC’s left navigation rail, choose groups and create a new group called “workmail_users”.
        1. IdC-workmail-users-group
        2. Add the IdC users created above to the IdC workmail_users group.
  1. Configure WorkMail to use Identity Center
    1. In the WorkMail console’s left navigation rail, click the link for Identity Center.
    2. Click the down arrow for Multi-factor authentication setup guide
    3. Click Step 1 – Enable identity center and click Enable.
    4. enable-MFA-workmail.
  2. Assign IAM Identity Center users/groups to WorkMail organization
    1. Click the down arrow for Multi-factor authentication setup guide
    2. Click Step 2 – Add and Assign users and click Next
    3. add-assign-users
    4. Assigning users and groups – Users and groups synced to your Identity Center directory are available to assign to your application. Learn more
      1. Click Get Started 
      2. Type workmail_users and select it from the drop-down list.
      3. assign-group
      4. Click Assign
        1. You will get a message “Successfully assigned group workmail_users. Please continue with step 3 by associating users within this group with WorkMail users.”
  3. Authentication mode & Personal access token configuration
    1. The default Authentication mode is set to WorkMail directory and Identity center. Don’t change this yet.
      1. This will allow WorkMail web-app users to continue to login to the WorkMail client directly, without MFA.
    2. The Personal access token configuration default is set to active, and token lifespan set to 365 days. PATs are used by desktop and mobile email clients to login to WorkMail.
      1. This will allow desktop and mobile email clients to continue to login to the WorkMail with their username and password, without a PAT.
  4. Test WorkMail logins to verify a few users can properly access their WorkMail accounts via both the WorkMail web-app and your organization’s unique AWS access portal URL.
    1. Open the Amazon WorkMail web application and login as one of your test users.
      1. You should have an email invitation to join AWS IAM Identity Center.
      2. Accept the invitation.
      3. Create a IdC password.
      4. Use your username and the new password to login to IdC.
      5. Register an MFA device
      6. enable-mfa
      7. Click Next
      8. You’ll be redirected to the AWS access portal.
        1. Enter your user name and password
        2. Provide your MFA token
      9. Click the tile for Amazon WorkMail to login to the WorkMail web-app
    2. Desktop or mobile email software users need to create PATs to access WorkMail (once the WorkMail administrator disables the WorkMail directory Authentication mode and logins are via the Identity center AWS access portal URL only). Note – PATs are retrieved by individual users from within the WorkMail web-client after logging in via the AWS access portal URL (with MFA).
      1. Open the AWS access portal URL and login
        1. You can find the URL from the Identity Center console > Settings > AWS access portal URL
      2. Login via your username password
      3. Register an MFA device
      4. You’ll be redirected to the AWS access portal.
        1. Enter your user name and password
        2. Provide your MFA token
      5. Click the tile for Amazon WorkMail to login to the WorkMail web-app
      6. In the web-app, click the settings (gear in top right) icon
      7. get-PAT
        1. In settings, click Personal access token and Create token
        2. Enter a token name (typically the device on which you’ll use this PAT) and select create token.
        3. Copy the token value (this is the only time you can retrieve this token value).
        4. Open your desktop or mobile email software, enter your username and your PAT (the PAT replaces your existing user password).
        5. update-email-app-with-pat
  5. Notify your WorkMail users of upcoming changes to login procedures
    1. Once you have tested the integration between Amazon WorkMail and IAM Identity Center with a few test users, you should prepare your WorkMail users for the increased login security. For example, you could send them an email that explains:
      1. The organization is adding an additional login security step to help protect their inboxes.
      2. Inform them that they should anticipate an email from the AWS IAM Identity Center with info about the upcoming implementation of MFA for web-app users and PATs for desktop and mobile client users.
      3. Users should accept the invitation and create a new password for the AWS IAM Identity Center.
      4. Inform them that once WorkMail MFA is enabled, all WorkMail web-app users will be required to use their username, password and MFA.
      5. Inform them that once WorkMail PATs are enabled, all WorkMail desktop and mobile email client users will need to login to the WorkMail web client (with MFA) via the AWS access portal URL, create one PAT per software client (the same PAT can not be used on desktop and mobile). They then must update their desktop or mobile email software to use their username and PAT, instead of their current password. Explain that the PAT is now their email client application password and their personal desktop or mobile email software passwords will no longer work.
      6. Provide users with a way to request support.
  6. Switch WorkMail Authentication Mode to Identity Center only
    1. Once you are satisfied that your WorkMail users have incorporated MFA and/or PATs into their WorkMail login routines, the WorkMail administrator should disable the WorkMail directory Authentication mode found in the WorkMail console under Organization > Identity Center.
  7. Review additional guidance to improve WorkMail security via AWS blogs and documentation.
    1. WorkMail Audit Logging Overview: https://aws.amazon.com/blogs/messaging-and-targeting/an-introduction-to-amazon-workmail-audit-logging/
    2. Custom Security Alarm Setup: https://aws.amazon.com/blogs/messaging-and-targeting/how-to-create-a-big-yellow-taxi-to-help-protect-amazon-workmail/
    3. For comprehensive security guidelines, refer to the Amazon WorkMail Security Documentation: https://docs.aws.amazon.com/workmail/latest/adminguide/security.html

Conclusion – Strengthen your Amazon WorkMail security with IAM Identity Center

By integrating Amazon WorkMail with IAM Identity Center you can more fully protect your organization’s email communications. This integration also centralizes user access management, allowing you to:

  • Manage WorkMail access alongside other AWS applications
  • Reduce security risks in a landscape of constant cyber threats
  • Simplify administrative tasks through a single dashboard

To keep your email environment secure, we recommend you:

Take control of your email communications today with Amazon WorkMail

  • Enable IAM Identity Center Integration
  • Connect your WorkMail organization to centralized access management
  • Configure WorkMail to require:
    • Multi-Factor Authentication (MFA) – Adds an extra layer of security for web-app users
    • Personal Access Tokens (PAT) – Add an extra layer of security for desktop and mobile client access
  • Visit the WorkMail Console (https://aws.amazon.com/workmail/) to begin configuration

Need guidance? Contact your AWS account team or check out our technical documentation.

Join the conversation and connect with other administrators and security professionals on the AWS re:Post community to share insights and learn best practices.

Migrate from Standard brokers to Express brokers in Amazon MSK using Amazon MSK Replicator

Post Syndicated from Subham Rakshit original https://aws.amazon.com/blogs/big-data/migrate-from-standard-brokers-to-express-brokers-in-amazon-msk-using-amazon-msk-replicator/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now offers a new broker type called Express brokers. It’s designed to deliver up to 3 times more throughput per broker, scale up to 20 times faster, and reduce recovery time by 90% compared to Standard brokers running Apache Kafka. Express brokers come preconfigured with Kafka best practices by default, support Kafka APIs, and provide the same low latency performance that Amazon MSK customers expect, so you can continue using existing client applications without any changes. Express brokers provide straightforward operations with hands-free storage management by offering unlimited storage without pre-provisioning, eliminating disk-related bottlenecks. To learn more about Express brokers, refer to Introducing Express brokers for Amazon MSK to deliver high throughput and faster scaling for your Kafka clusters.

Creating a new cluster with Express brokers is straightforward, as described in Amazon MSK Express brokers. However, if you have an existing MSK cluster, you need to migrate to a new Express based cluster. In this post, we discuss how you should plan and perform the migration to Express brokers for your existing MSK workloads on Standard brokers. Express brokers offer a different user experience and a different shared responsibility boundary, so using them on an existing cluster is not possible. However, you can use Amazon MSK Replicator to copy all data and metadata from your existing MSK cluster to a new cluster comprising of Express brokers.

MSK Replicator offers a built-in replication capability to seamlessly replicate data from one cluster to another. It automatically scales the underlying resources, so you can replicate data on demand without having to monitor or scale capacity. MSK Replicator also replicates Kafka metadata, including topic configurations, access control lists (ACLs), and consumer group offsets.

In the following sections, we discuss how to use MSK Replicator to replicate the data from a Standard broker MSK cluster to an Express broker MSK cluster and the steps involved in migrating the client applications from the old cluster to the new cluster.

Planning your migration

Migrating from Standard brokers to Express brokers requires thorough planning and careful consideration of various factors. In this section, we discuss key aspects to address during the planning phase.

Assessing the source cluster’s infrastructure and needs

It’s crucial to evaluate the capacity and health of the current (source) cluster to make sure it can handle additional consumption during migration, because MSK Replicator will retrieve data from the source cluster. Key checks include:

    • CPU utilization – The combined CPU User and CPU System utilization per broker should remain below 60%.
    • Network throughput – The cluster-to-cluster replication process adds extra egress traffic, because it might need to replicate the existing data based on business requirements along with the incoming data. For instance, if the ingress volume is X GB/day and data is retained in the cluster for 2 days, replicating the data from the earliest offset would cause the total egress volume for replication to be 2X GB. The cluster must accommodate this increased egress volume.

Let’s take an example where in your existing source cluster you have an average data ingress of 100 MBps and peak data ingress of 400 MBps with retention of 48 hours. Let’s assume you have one consumer of the data you produce to your Kafka cluster, which means that your egress traffic will be same compared to your ingress traffic. Based on this requirement, you can use the Amazon MSK sizing guide to calculate the broker capacity you need to safely handle this workload. In the spreadsheet, you will need to provide your average and maximum ingress/egress traffic in the cells, as shown in the following screenshot.

Because you need to replicate all the data produced in your Kafka cluster, the consumption will be higher than the regular workload. Taking this into account, your overall egress traffic will be at least twice the size of your ingress traffic.
However, when you run a replication tool, the resulting egress traffic will be higher than twice the ingress because you also need to replicate the existing data along with the new incoming data in the cluster. In the preceding example, you have an average ingress of 100 MBps and you retain data for 48 hours, which means that you have a total of approximately 18 TB of existing data in your source cluster that needs to be copied over on top of the new data that’s coming through. Let’s further assume that your goal for the replicator is to catch up in 30 hours. In this case, your replicator needs to copy data at 260 MBps (100 MBps for ingress traffic + 160 MBps (18 TB/30 hours) for existing data) to catch up in 30 hours. The following figure illustrates this process.

Therefore, in the sizing guide’s egress cells, you need to add an additional 260 MBps to your average data out and peak data out to estimate the size of the cluster you should provision to complete the replication safely and on time.

Replication tools act as a consumer to the source cluster, so there is a chance that this replication consumer can consume higher bandwidth, which can negatively impact the existing application client’s produce and consume requests. To control the replication consumer throughput, you can use a consumer-side Kafka quota in the source cluster to limit the replicator throughput. This makes sure that the replicator consumer will throttle when it goes beyond the limit, thereby safeguarding the other consumers. However, if the quota is set too low, the replication throughput will suffer and the replication might never end. Based on the preceding example, you can set a quota for the replicator to be at least 260 MBps, otherwise the replication will not finish in 30 hours.

  • Volume throughput – Data replication might involve reading from the earliest offset (based on business requirement), impacting your primary storage volume, which in this case is Amazon Elastic Block Store (Amazon EBS). The VolumeReadBytes and VolumeWriteBytes metrics should be checked to make sure the source cluster volume throughput has additional bandwidth to handle any additional read from the disk. Depending on the cluster size and replication data volume, you should provision storage throughput in the cluster. With provisioned storage throughput, you can increase the Amazon EBS throughput up to 1000 MBps depending on the broker size. The maximum volume throughput can be specified depending on broker size and type, as mentioned in Manage storage throughput for Standard brokers in a Amazon MSK cluster. Based on the preceding example, the replicator will start reading from the disk and the volume throughput of 260 MBps will be shared across all the brokers. However, existing consumers can lag, which will cause reading from the disk, thereby increasing the storage read throughput. Also, there is storage write throughput due to incoming data from the producer. In this scenario, enabling provisioned storage throughput will increase the overall EBS volume throughput (read + write) so that existing producer and consumer performance doesn’t get impacted due to the replicator reading data from EBS volumes.
  • Balanced partitions – Make sure partitions are well-distributed across brokers, with no skewed leader partitions.

Depending on the assessment, you might need to vertically scale up or horizontally scale out the source cluster before migration.

Assessing the target cluster’s infrastructure and needs

Use the same sizing tool to estimate the size of your Express broker cluster. Typically, fewer Express brokers might be needed compared to Standard brokers for the same workload because depending on the instance size, Express brokers allow up to three times more ingress throughput.

Configuring Express Brokers

Express brokers employ opinionated and optimized Kafka configurations, so it’s important to differentiate between configurations that are read-only and those that are read/write during planning. Read/write broker-level configurations should be configured separately as a pre-migration step in the target cluster. Although MSK Replicator will replicate most topic-level configurations, certain topic-level configurations are always set to default values in an Express cluster: replication-factor, min.insync.replicas, and unclean.leader.election.enable. If the default values differ from the source cluster, these configurations will be overridden.

As part of the metadata, MSK Replicator also copies certain ACL types, as mentioned in Metadata replication. It doesn’t explicitly copy the write ACLs except the deny ones. Therefore, if you’re using SASL/SCRAM or mTLS authentication with ACLs rather than AWS Identity and Access Management (IAM) authentication, write ACLs need to be explicitly created in the target cluster.

Client connectivity to the target cluster

Deployment of the target cluster can occur within the same virtual private cloud (VPC) or a different one. Consider any changes to client connectivity, including updates to security groups and IAM policies, during the planning phase.

Migration strategy: All at once vs. wave

Two migration strategies can be adopted:

  • All at once – All topics are replicated to the target cluster simultaneously, and all clients are migrated at once. Although this approach simplifies the process, it generates significant egress traffic and involves risks to multiple clients if issues arise. However, if there is any failure, you can roll back by redirecting the clients to use the source cluster. It’s recommended to perform the cutover during non-business hours and communicate with stakeholders beforehand.
  • Wave – Migration is broken into phases, moving a subset of clients (based on business requirements) in each wave. After each phase, the target cluster’s performance can be evaluated before proceeding. This reduces risks and builds confidence in the migration but requires meticulous planning, especially for large clusters with many microservices.

Each strategy has its pros and cons. Choose the one that aligns best with your business needs. For insights, refer to Goldman Sachs’ migration strategy to move from on-premises Kafka to Amazon MSK.

Cutover plan

Although MSK Replicator facilitates seamless data replication with minimal downtime, it’s essential to devise a clear cutover plan. This includes coordinating with stakeholders, stopping producers and consumers in the source cluster, and restarting them in the target cluster. If a failure occurs, you can roll back by redirecting the clients to use the source cluster.

Schema registry

When migrating from a Standard broker to an Express broker cluster, schema registry considerations remain unaffected. Clients can continue using existing schemas for both producing and consuming data with Amazon MSK.

Solution overview

In this setup, two Amazon MSK provisioned clusters are deployed: one with Standard brokers (source) and the other with Express brokers (target). Both clusters are located in the same AWS Region and VPC, with IAM authentication enabled. MSK Replicator is used to replicate topics, data, and configurations from the source cluster to the target cluster. The replicator is configured to maintain identical topic names across both clusters, providing seamless replication without requiring client-side changes.

During the first phase, the source MSK cluster handles client requests. Producers write to the clickstream topic in the source cluster, and a consumer group with the group ID clickstream-consumer reads from the same topic. The following diagram illustrates this architecture.

When data replication to the target MSK cluster is complete, we need to evaluate the health of the target cluster. After confirming the cluster is healthy, we need to migrate the clients in a controlled manner. First, we need to stop the producers, reconfigure them to write to the target cluster, and then restart them. Then, we need to stop the consumers after they have processed all remaining records in the source cluster, reconfigure them to read from the target cluster, and restart them. The following diagram illustrates the new architecture.

After verifying that all clients are functioning correctly with the target cluster using Express brokers, we can safely decommission the source MSK cluster with Standard brokers and the MSK Replicator.

Deployment Steps

In this section, we discuss the step-by-step process to replicate data from an MSK Standard broker cluster to an Express broker cluster using MSK Replicator and also the client migration strategy. For the purpose of the blog, “all at once” migration strategy is used.

Provision the MSK cluster

Download the AWS CloudFormation template to provision the MSK cluster. Deploy the following in us-east-1 with stack name as migration.

This will create the VPC, subnets, and two Amazon MSK provisioned clusters: one with Standard brokers (source) and another with Express brokers (target) within the VPC configured with IAM authentication. It will also create a Kafka client Amazon Elastic Compute Cloud (Amazon EC2) instance where from we can use the Kafka command line to create and view Kafka topics and produce and consume messages to and from the topic.

Configure the MSK client

On the Amazon EC2 console, connect to the EC2 instance named migration-KafkaClientInstance1 using Session Manager, a capability of AWS Systems Manager.

After you log in, you need to configure the source MSK cluster bootstrap address to create a topic and publish data to the cluster. You can get the bootstrap address for IAM authentication from the details page for the MSK cluster (migration-standard-broker-src-cluster) on the Amazon MSK console, under View Client Information. You also need to update the producer.properties and consumer.properties files to reflect the bootstrap address of the standard broker cluster.

sudo su - ec2-user

export BS_SRC=<<SOURCE_MSK_BOOTSTRAP_ADDRESS>>
sed -i "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=${BS_SRC}/g" producer.properties 
sed -i "s/bootstrap.servers=/bootstrap.servers=${BS_SRC}/g" consumer.properties

Create a topic

Create a clickstream topic using the following commands:

/home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server=$BS_SRC \
--create --replication-factor 3 --partitions 3 \
--topic clickstream \
--command-config=/home/ec2-user/kafka/config/client_iam.properties

Produce and consume messages to and from the topic

Run the clickstream producer to generate events in the clickstream topic:

cd /home/ec2-user/clickstream-producer-for-apache-kafka/

java -jar target/KafkaClickstreamClient-1.0-SNAPSHOT.jar -t clickstream \
-pfp /home/ec2-user/producer.properties -nt 8 -rf 3600 -iam \
-gsr -gsrr <<REGION>> -grn default-registry -gar

Open another Session Manager instance and from that shell, run the clickstream consumer to consume from the topic:

cd /home/ec2-user/clickstream-consumer-for-apache-kafka/

java -jar target/KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t clickstream \
-pfp /home/ec2-user/consumer.properties -nt 3 -rf 3600 -iam \
-gsr -gsrr <<REGION>> -grn default-registry

Keep the producer and consumer running. If not interrupted, the producer and consumer will run for 60 minutes before it exits. The -rf parameter controls how long the producer and consumer will run.

Create an MSK replicator

To create an MSK replicator, complete the following steps:

  1. On the Amazon MSK console, choose Replicators in the navigation pane.
  2. Choose Create replicator.
  3. In the Replicator details section, enter a name and optional description.

  1. In the Source cluster section, provide the following information:
    1. For Cluster region, choose us-east-1.
    2. For MSK cluster, enter the MSK cluster Amazon Resource Name (ARN) for the Standard broker.

After the source cluster is selected, it automatically selects the subnets associated with the primary cluster and the security group associated with the source cluster. You can also select additional security groups.

Make sure that the security groups have outbound rules to allow traffic to your cluster’s security groups. Also make sure that your cluster’s security groups have inbound rules that accept traffic from the replicator security groups provided here.

  1. In the Target cluster section, for MSK cluster¸ enter the MSK cluster ARN for the Express broker.

After the target cluster is selected, it automatically selects the subnets associated with the primary cluster and the security group associated with the source cluster. You can also select additional security groups.

Now let’s provide the replicator settings.

  1. In the Replicator settings section, provide the following information:
    1. For the purpose of the example, we have kept the topics to replicate as a default value that would replicate all topics from primary to secondary cluster.
    2. For Replicator starting position, we configure it to replicate from the earliest offset, so that we can get all the events from the start of the source topics.
    3. To configure the topic name in the secondary cluster as identical to the primary cluster, we select Keep the same topic names for Copy settings. This makes sure that the MSK clients don’t need to add a prefix to the topic names.

    1. For this example, we keep the Consumer Group Replication setting as default (make sure it’s enabled to allow redirected clients resume processing data from the last processed offset).
    2. We set Target Compression type as None.

The Amazon MSK console will automatically create the required IAM policies. If you’re deploying using the AWS Command Line Interface (AWS CLI), SDK, or AWS CloudFormation, you have to create the IAM policy and use it as per your deployment process.

  1. Choose Create to create the replicator.

The process will take around 15–20 minutes to deploy the replicator. When the MSK replicator is running, this will be reflected in the status.

Monitor replication

When the MSK replicator is up and running, monitor the MessageLag metric. This metric indicates how many messages are yet to be replicated from the source MSK cluster to the target MSK cluster. The MessageLag metric should come down to 0.

Migrate clients from source to target cluster

When the MessageLag metric reaches 0, it indicates that all messages have been replicated from the source MSK cluster to the target MSK cluster. At this stage, you can cut over client applications from the source to the target cluster. Before initiating this step, confirm the health of the target cluster by reviewing the Amazon MSK metrics in Amazon CloudWatch and making sure that the client applications are functioning properly. Then complete the following steps:

  1. Stop the producers writing data to the source (old) cluster with Standard brokers and reconfigure them to write to the target (new) cluster with Express brokers.
  2. Before migrating the consumers, make sure that the MaxOffsetLag metric for the consumers has dropped to 0, confirming that they have processed all existing data in the source cluster.
  3. When this condition is met, stop the consumers and reconfigure them to read from the target cluster.

The offset lag happens if the consumer is consuming slower than the rate the producer is producing data. The flat line in the following metric visualization shows that the producer has stopped producing to the source cluster while the consumer attached to it continues to consume the existing data and eventually consumes all the data, therefore the metric goes to 0.

  1. Now you can update the bootstrap address in properties and consumer.properties to point to the target Express based MSK cluster. You can get the bootstrap address for IAM authentication from the MSK cluster (migration-express-broker-dest-cluster) on the Amazon MSK console under View Client Information.
export BS_TGT=<<TARGET_MSK_BOOTSTRAP_ADDRESS>>
sed -i "s/BOOTSTRAP_SERVERS_CONFIG=.*/BOOTSTRAP_SERVERS_CONFIG=${BS_TGT}/g" producer.properties
sed -i "s/bootstrap.servers=.*/bootstrap.servers=${BS_TGT}/g" consumer.properties

  1. Run the clickstream producer to generate events in the clickstream topic:
cd /home/ec2-user/clickstream-producer-for-apache-kafka/

java -jar target/KafkaClickstreamClient-1.0-SNAPSHOT.jar -t clickstream \
-pfp /home/ec2-user/producer.properties -nt 8 -rf 60 -iam \
-gsr -gsrr <<REGION>> -grn default-registry -gar

  1. In another Session Manager instance and from that shell, run the clickstream consumer to consume from the topic:
cd /home/ec2-user/clickstream-consumer-for-apache-kafka/

java -jar target/KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t clickstream \
-pfp /home/ec2-user/consumer.properties -nt 3 -rf 60 -iam \
-gsr -gsrr <<REGION>> -grn default-registry

We can see that the producers and consumers are now producing and consuming to the target Express based MSK cluster. The producers and consumers will run for 60 seconds before they exit.

The following screenshot shows producer-produced messages to the new Express based MSK cluster for 60 seconds.

Migrate stateful applications

Stateful applications such as Kafka Streams, KSQL, Apache Spark, and Apache Flink use their own checkpointing mechanisms to store consumer offsets instead of relying on Kafka’s consumer group offset mechanism. When migrating topics from a source cluster to a target cluster, the Kafka offsets in the source will differ from those in the target. As a result, migrating a stateful application along with its state requires careful consideration, because the existing offsets are incompatible with the target cluster’s offsets. Before migrating stateful applications, it is crucial to stop producers and make sure that consumer applications have processed all data from the source MSK cluster.

Migrate Kafka Streams and KSQL applications

Kafka Streams and KSQL store consumer offsets in internal changelog topics. It is advisable not to replicate these internal changelog topics to the target MSK cluster. Instead, the Kafka Streams application should be configured to start from the earliest offset of the source topics in the target cluster. This allows the state to be rebuilt. However, this method results in duplicate processing, because all the data in the topic is reprocessed. Therefore, the target destination (such as a database) must be idempotent to handle these duplicates effectively.

Express brokers don’t allow configuring segment.bytes to optimize performance. Therefore, the internal topics need to be manually created before the Kafka Streams application is migrated to the new Express based cluster. For more information, refer to Using Kafka Streams with MSK Express brokers and MSK Serverless.

Migrate Spark applications

Spark stores offsets in its checkpoint location, which should be a file system compatible with HDFS, such as Amazon Simple Storage Service (Amazon S3). After migrating the Spark application to the target MSK cluster, you should remove the checkpoint location, causing the Spark application to lose its state. To rebuild the state, configure the Spark application to start processing from the earliest offset of the source topics in the target cluster. This will lead to re-processing all the data from the start of the topic and therefore will generate duplicate data. Consequently, the target destination (such as a database) must be idempotent to effectively handle these duplicates.

Migrate Flink applications

Flink stores consumer offsets within the state of its Kafka source operator. When checkpoints are completed, the Kafka source commits the current consuming offset to provide consistency between Flink’s checkpoint state and the offsets committed on Kafka brokers. Unlike other systems, Flink applications don’t rely on the __consumer_offsets topic to track offsets; instead, they use the offsets stored in Flink’s state.

During Flink application migration, one approach is to start the application without a Savepoint. This approach discards the entire state and reverts to reading from the last committed offset of the consumer group. However, this prevents the application from accurately rebuilding the state of downstream Flink operators, leading to discrepancies in computation results. To address this, you can either avoid replicating the consumer group of the Flink application or assign a new consumer group to the application when restarting it in the target cluster. Additionally, configure the application to start reading from the earliest offset of the source topics. This enables re-processing all data from the source topics and rebuilding the state. However, this method will result in duplicate data, so the target system (such as a database) must be idempotent to handle these duplicates effectively.

Alternatively, you can reset the state of the Kafka source operator. Flink uses operator IDs (UIDs) to map the state to specific operators. When restarting the application from a Savepoint, Flink matches the state to operators based on their assigned IDs. It is recommended to assign a unique ID to each operator to enable seamless state restoration from Savepoints. To reset the state of the Kafka source operator, change its operator ID. Passing the operator ID as a parameter in a configuration file can simplify this process. Restart the Flink application with parameter --allowNonRestoredState (if you are running self-managed Flink). This will reset only the state of the Kafka source operator, leaving other operator states unaffected. As a result, the Kafka source operator resumes from the last committed offset of the consumer group, avoiding full reprocessing and state rebuilding. Although this might still produce some duplicates in the output, it results in no data loss. This approach is applicable only when using the DataStream API to build Flink applications.

Conclusion

Migrating from a Standard broker MSK cluster to an Express broker MSK cluster using MSK Replicator provides a seamless, efficient transition with minimal downtime. By following the steps and strategies discussed in this post, you can take advantage of the high-performance, cost-effective benefits of Express brokers while maintaining data consistency and application uptime.

Ready to optimize your Kafka infrastructure? Start planning your migration to Amazon MSK Express brokers today and experience improved scalability, speed, and reliability. For more details, refer to the Amazon MSK Developer Guide.


About the Author

Subham Rakshit is a Senior Streaming Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build streaming architectures so they can get value from analyzing their streaming data. His two little daughters keep him occupied most of the time outside work, and he loves solving jigsaw puzzles with them. Connect with him on LinkedIn.

Foundational blocks of Amazon SageMaker Unified Studio: An admin’s guide to implement unified access to all your data, analytics, and AI

Post Syndicated from Lakshmi Nair original https://aws.amazon.com/blogs/big-data/foundational-blocks-of-amazon-sagemaker-unified-studio-an-admins-guide-to-implement-unified-access-to-all-your-data-analytics-and-ai/

Amazon SageMaker Unified Studio (preview) provides a unified experience for using data, analytics, and AI capabilities. You can use familiar AWS services for model development, generative AI, data processing, and analytics—all within a single, governed environment. Users can now build, deploy, and execute end-to-end workflows from a single interface. SageMaker Unified Studio is built on the foundations of Amazon DataZone, where it uses domains to categorize and structure the data assets, while offering project-based collaboration features that allow teams to securely share artifacts and work together across various compute services. This experience allows multiple personas to seamlessly collaborate, while operating under appropriate access controls and governance policies.

In this post, we focus on the admin persona and deep dive into the foundational building blocks while implementing the self-service access to all your data.

Conceptual framework

SageMaker Unified Studio offers an integrated development experience organized into three distinct planes, each serving different personas and purposes within the development lifecycle. This architecture enables seamless collaboration while maintaining clear boundaries of responsibility.

As shown in the following figure, each plane represents a distinct layer of functionality that works in harmony with the others to create a complete data and machine learning (ML) solution.

foundational planes

The planes are as follows:

  • Infrastructure plane – The infrastructure plane forms the foundation of SageMaker Unified Studio. Here administrators and domain owners of the organization provision the underlying infrastructure and define rules for users of the data factory plane to deploy the compute resources for data and ML operations in self-service mode. They can also decide to onboard existing resources or pre-create them. They can set up access controls and permissions to enforce and allocate resources to different teams and projects. This layer makes sure that all necessary computational resources are available and properly governed for downstream computation.
  • Data factory plane – The data factory plane functions like a sophisticated vending machine for compute resources, where data scientists and ML engineers can select and utilize preconfigured compute resources or deploy new ones. The data product developers, data engineers, and data scientists can create collaboration spaces and build data products by consuming infrastructure resources, with all the underlying complexity abstracted away.
  • Product experience plane – At the outermost layer, the product experience plane serves as a discovery and collaboration hub where business units (data producers and data consumers) can explore available data products from the asset catalog. This plane drives users to engage in data-driven conversations with knowledge and insights shared across the organization. Through the product experience plane, data product owners can use automated workflows to capture data lineage and data quality metrics and oversee access controls. They can track how their data products are being used and continuously improve the value proposition of their data assets.

In this post, we focus on the infrastructure plane deployment steps from an administrator’s perspective, outlining key responsibilities and actions required and how to configure and organize your assets under specific business units and teams and authorize policies during the initial setup phase.

Roles and responsibilities of the domain owner (admin) for the infrastructure plane

As shown in the following figure, the infrastructure plane revolves around three pivotal operational paradigms: onboard, organize, and authorize.

The details of the three essential functions in the foundational layer are as follows:

  • Onboard – The domain owner establishes a foundational environment by creating a domain, which represents an organization entity for you to connect together your assets, users, resources, and code repository configs. They can onboard the users who have authorization to access the self-serve unified studio. The self-serve unified studio is a browser-based web application where you can analyze, discover, catalog, govern, and share data in self-serve manner. The admin can enable the necessary blueprints and create project profiles to set up the underlying data infrastructure. In a multi-account (Mesh) scenario, the admin can also onboard the business units by associating the AWS accounts.
  • Organize – Here the domain owner creates hierarchies to organize and isolate projects within individual business units. The method of creating hierarchical representation of business units or team-level organization is through domain units. This makes sure that each business unit takes ownership of their assets. The admin can also delegate ownership within these business units.
  • Authorize – The admin or owners of individual business units or line of business (domain unit owners) can manage user policies—project-specific policies that dictate certain actions these principals can perform under a domain unit.

Now that we have discussed the core functions, let’s delve into the workflow that brings these concepts together.

Process workflow (infrastructure plane)

In the following figure, we break down the roles and responsibilities of domain owners to unit administrators through a sequence of operations, providing infrastructure deployment and management.

process workflow

The workflow consists of the following steps:

  1. The root domain owner (admin) creates a SageMaker Unified Studio domain from the console. After the domain is created, you get a SageMaker Unified Studio URL—a browser-based web application that can authenticate you with your AWS Identity and Access Management (IAM) user credentials or with credentials from your identity provider (IdP) through AWS IAM Identity Center or with your SAML credentials.
  2. As part of the onboarding process, the admin onboards single sign-on (SSO) users, SSO groups, and IAM users who are authorized to log in to SageMaker Unified Studio. IAM roles can be onboarded on the domain as well, but can be used for programmatic access only. During the quick setup deployment of the domain, default project profile templates are created. A project profile is a collection of blueprints that holds configurations of AWS tools and services. You can create following project profiles:
    1. Generative AI application development – Provides you with the tooling capabilities to build generative AI applications using Amazon Bedrock foundation models (FMs) and tools.
    2. SQL analytics – Provides you with a SQL editor to query the data in Amazon SageMaker Lakehouse, Amazon Redshift, and Amazon Athena.
    3. Data analytics and AI-ML model development – Provides you tools to build and orchestrate ML and generative AI models powered by AWS Glue, Athena, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), Amazon SageMaker AI, and SageMaker Lakehouse.
    4. Custom project profile – Provides capabilities to build custom templates that can bundle multiple blueprints with varied tooling capabilities to suit your business needs.

Admins can also authorize project profile templates to specific users and groups, enforcing the capability to control resource deployment based on user personas. By default, all users are authorized to use default project profiles. However, this can be changed by the admin to limit the access of certain project profiles to certain users and groups.

The quick setup also establishes a default Git connection to AWS CodeCommit for users to manage their code repository. However, you also have the option to create and enable new Git connections to GitHub, GitHub Enterprise Server, GitLab, and GitLab self-managed. The Free Tier release of Amazon Q is enabled by default to all users of SageMaker Unified Studio domain. Amazon Q Developer Pro can be configured if IAM Identity Center is configured for users of the domain.

Finally, as part of the initial setup, the admin provides access to Amazon Bedrock serverless models.

In a multi-account scenario, the central admin associates AWS accounts, and the associated account admins accept the association and enable the blueprints for the project profiles that the central admin would create. Refer to the appendix at the end of this post for more details.

  1. To organize the data assets within the organization, the admin logs in to the SageMaker Unified Studio URL and creates domain units aligned with the business divisions.
  2. Each domain unit receives delegated ownership, enabling autonomous management of assets within their designated scope. This domain-based isolation provides clear boundaries while allowing unit owners to independently govern their assets and enforce relevant policies.

Steps 3 and 4 are optional as part of the quick deployment setup. Users can directly log in to SageMaker Unified Studio to build data products for their business use case if domain units are not part of immediate requirement. If no domain units are created, all users and groups fall back under the root domain level and authorization policies are applied on the root domain.

Behind the scenes

While users interact with a streamlined project creation interface in SageMaker Unified Studio, a sophisticated orchestration of components operates beneath the surface. This abstraction allows the admin to deploy infrastructure through simple selections while the system handles resource provisioning automatically. Let’s examine the underlying process behind the scenes, as illustrated in the following figure.

conceptual diagram of blueprints

This workflow consists of the following steps:

  1. Administrators enable the blueprints containing the AWS CloudFormation templates that have information on how to create and set up the underlying data infrastructure. These blueprints are automatically enabled during the quick setup deployment.
  2. Project profiles bundle these blueprint configurations into templates. These templates determine which infrastructure components deploy when a project is created.
  3. When users select a project profile within SageMaker Unified Studio, the system automatically triggers the relevant CloudFormation stack and deploys the necessary infrastructure resources in the form of environments. Environments are the actual data infrastructure behind a project.

In a multi-account scenario, the associated account admin enables the blueprints. However, the project profile creation happens at the root domain account. The project profile template will include the associated account details and the linked blueprints from the associated account. Refer to the appendix at the end of this post for more details.

Now that we have understood the functional building blocks of SageMaker Unified Studio, let’s proceed with the deployment walkthrough. We will create a domain using the quick setup deployment for single account. Refer to the appendix for multi-account deployment steps.

Prerequisites

You will need to complete the following prerequisites before you can follow the instructions in the next section:

  1. Sign up for an AWS account.
  2. Create a user with administrative access.
  3. Enable IAM Identity Center in the same AWS Region you want to create your SageMaker Unified Studio domain. Confirm in which Region SageMaker Unified Studio is currently available. Set up your IdP and synchronize identities and groups with IAM Identity Center. For more information, refer to IAM Identity Center Identity source tutorials.
  4. To use Amazon Bedrock FMs, grant access to base models.

Set up domain

Complete the following steps to create a new SageMaker Unified Studio domain:

  1. Sign in to the SageMaker console in the Region in which IAM Identity Center is enabled.
  2. Choose Create a Unified Studio domain.

create domain

  1. Select the Quick setup (recommended for exploration).
  2. Choose Create VPC (you can also use your own VPC but to simplify the cleanup, we opted to use a new VPC).

create vpc

This will open a new tab to deploy the CloudFormation stack to create the VPC and the necessary private and public subnets.

  1. For Stack name, enter a unique name to the stack (if the default name already exists).
  2. Keep the parameter for useVpcEndpoints as false.
  3. Choose Create stack.

create stack

  1. After the stack is created, go to the domain creation page and refresh the page, as shown in the following screenshot.

refresh

  1. For Name, enter a unique name for the domain.
  2. Keep the default selections for Domain Execution role, Domain Service role, Provisioning role, and Manage Access role.
  3. The configuration automatically selects the VPC and private subnets.

domain roles

service roles

  1. Keep the default selection for Model provisioning role and Model consumption role.
  2. Choose Continue.

prov roles

  1. Provide the email address of the SSO user that exists in IAM Identity Center.

The SSO user selected here is used as the administrator in SageMaker Unified Studio. If the account doesn’t have IAM Identity Center set up, then it will create an IAM Identity Center account instance, so long as the account is permitted to do so. An SSO or IAM user is required so that a user is able to log in to the studio after the domain is created.

  1. Choose Create domain.

create IdC

  1. After the domain is created, a dialog box pops up. You can close dialog box to set up authorization policies and onboard users.

dialog box

On the domain detail page, the Amazon SageMaker Unified Studio URL is listed. You can authenticate with your IAM user credentials or with credentials from your IdP through IAM Identity Center or with your SAML credentials. To authorize users to log in to the URL, the administrator must onboard the users to the domain. We see this as part of the next steps.

Unified Studio URL

Onboard users and associated accounts

Complete the following steps:

  1. To onboard users, go to the User management tab and choose Add.
  2. On the Add menu, choose either Add SSO users and groups or Add IAM users.

You can also add IAM roles for the purpose of managing the domain programmatically. However, you can’t use IAM roles to log in to the SageMaker Unified Studio URL. After you add the users, they will appear with the status Assigned. The status changes to Activated only when the user logs in to the SageMaker Unified Studio URL.

onboard users

  1. If you want to onboard multiple AWS accounts to your domain account, go to the Account associations tab and choose Request association.

This enables domain users to publish and consume data from these AWS accounts.

associate accounts

For a multi-account setup, by sending an association request to another AWS account, you share the root domain with the other AWS account with AWS Resource Access Manger (AWS RAM). The associated admin domain owner accepts the invitation. To access the compute resources of the associated accounts from SageMaker Unified Studio, the associated domain owner must enable the necessary blueprints. Refer to the appendix to understand the cross-account deployment steps.

Project profiles and authorizing users

For the quick setup deployment, when you navigate to the Blueprints tab, you will notice all the blueprints are automatically enabled. Also, on the Project profiles tab, you will find default project profiles are available to the user.project profiles

Leave the rest of the tabs with the default options.

Create a custom project profile and authorize users (optional)

In the following example, we show the steps to create a custom project profile by bundling selected blueprints. We also show the steps to authorize only restricted users to use this project profile template. This example creates a custom project profile with selective blueprints. This enables the user to create a data lake environment with AWS Glue database and Athena workgroup to query the data. The user can also create an Amazon MWAA environment for orchestration. You can also change or override the configuration parameters of the blueprint by using the Tooling configurations option within the project profile.

Because SageMaker Unified Studio is in preview mode, the naming conventions of some visual elements might appear different in the current version.

When you create a project profile, you can add blueprint deployment settings in two modes: on create and on demand. On create mode allows you to deploy the blueprint deployment settings as soon as the project is created. On demand mode allows you to deploy the blueprint deployment settings when users need it.

Create a project, create domain units, and delegate ownership (optional)

In the following example, the administrator logs in to SageMaker Unified Studio and creates the retail domain unit. The admin also delegates ownership to the retail business user. The retail business user logs in to SageMaker Unified Studio and creates a project with the authorized project profile template.

With these configurations in place, you have successfully completed the initial infrastructure plane deployment from an administrative perspective.

Authorization of blueprints (optional)

By default, all domain users have authorization to create projects with the enabled blueprints across domain units. If you want to restrict the usage of the blueprint within a specific domain unit (in this case, the retail domain unit, as shown in the following screenshot), you need to revoke the existing permissions and authorize the specific domain units. By limiting the use of blueprints to a particular domain unit, users can only create projects using the blueprint within that domain unit. To apply authorization settings to child domain units, enable the Cascade to all child domain units option.

blueprints authorization

Clean up

Make sure you remove the SageMaker Unified Studio resources to mitigate any unexpected costs. This involves a few steps:

  1. If you had multiple projects and subscribed to assets, unsubscribe to all assets.
  2. Note the names of all AWS Glue databases and Athena workgroups created by your projects.
  3. Delete any connections you created in the data explorer that you don’t want to keep.
  4. Note the project IDs.
  5. Delete the projects. If you encounter any errors, check the AWS CloudFormation console and find the failed stack. Fix the error that failed the stack deletion and delete the projects.
  6. Note down the domain ID.
  7. Delete the domain.
  8. Delete the S3 bucket named amazon-datazone-AWSACCOUNTID-AWSREGION-DOMAINID.
  9. Delete the AWS Glue databases and Athena workgroups you noted earlier.
  10. Delete the CloudFormation stack for the VPC (if you followed that step in the setup).

If you have additional resources that haven’t been deleted, you can also use tags to identify and delete specific resources.

Conclusion

In this post, we discussed the foundational building blocks of SageMaker Unified Studio and how, by abstracting complex technical implementations behind user-friendly interfaces, organizations can maintain standardized governance while enabling efficient resource management across business units. This approach provides consistency in infrastructure deployment while providing the flexibility needed for diverse business requirements.

To learn more, refer to the Amazon SageMaker Unified Studio Administrator Guide and the following resources:

Appendix: Multi-account administration

This section illustrates the cross-account association. After the account invitation is accepted by the associated account owner, follow the instructions as shown in the following example to understand how to enable the blueprints. After the blueprints are enabled in the associate accounts, the root domain account can create project profile templates with the parameters of the associated account, including its linked blueprints. The example then demonstrates how the retail domain unit user can deploy compute resources and create data using the resources from the associated account.


About the Authors

Lakshmi Nair is a Senior Analytics Specialist Solutions Architect at AWS. She specializes in designing advanced analytics systems across industries. She focuses on crafting cloud-based data platforms, enabling real-time streaming, big data processing, and robust data governance. She can be reached via LinkedIn.

Fabrizio Napolitano is a Principal Specialist Solutions Architect for DB and Analytics. He has worked in the analytics space for the last 20 years, and has recently and quite by surprise become a Hockey Dad after moving to Canada.

Building AI-powered customer experiences using a modern communications hub

Post Syndicated from Osman Duman original https://aws.amazon.com/blogs/messaging-and-targeting/building-ai-powered-customer-experiences-using-a-modern-communications-hub/

Customers demand organizations to anticipate and seamlessly fulfill their needs, engaging them with personalized content when, where, and how they prefer. They yearn for context-sensitive, dynamic interactions with nuanced conversations across all communication channels. Organizations are under growing pressure to modernize customer experience workflows to drive loyalty and improve operational efficiency. Leveraging the latest advancements in Generative AI (GenAI), such as hyper-personalization and Agentic AI, presents new challenges. Organizations require a scalable, reusable architecture to integrate GenAI into their customer engagement systems without a complete system overhaul, amid disparate solutions they currently operate.

This blog post explores how to build an AI-powered modern communications hub using open-source GitHub samples that integrate SMS/MMS and WhatsApp services with GenAI capabilities. Organizations can create innovative AI-powered customer experiences with a quick proof-of-concept without disrupting existing systems.

In combination with Vector Databases and Retrieval Augmented Generation (RAG), GenAI makes it possible to reorganize knowledge into a single system and query from a single user interface through natural language conversation with a chatbot or virtual assistant. Funneling customer communications through a multi-channel communications hub linked with GenAI capabilities helps unify customer engagement mechanisms and streamlines the creation of rich customer experiences. Customers meet AI agents and Q&A bots on the communication channel that is convenient to self-serve their needs. Organizations can build communications-channel-agnostic customer experiences while collecting channel engagement event and conversational data into a centralized data store for real-time insights, ad-hoc queries, analytics, and ML training.

Solution overview

In the core of the solution is the Modern Communications Hub that connects digital communication channels with key GenAI services, like Amazon Bedrock and Amazon Q, along with AWS ML, database, storage, and serverless computing services.AWS End User Messaging and Amazon SES provide API level access to digital communication channels, offering secure, scalable, high-performance, and cost-effective services for enterprise applications to exchange SMS/MMS, WhatsApp, push and voice notifications, and email with customers.

A collection of open-source sample code, published in the AWS-samples GitHub repository, illustrates how to facilitate generative conversations on SMS/MMS and WhatsApp channels. This will be extended to include email services. Two key components form the foundation of the GenAI Integration Samples: the Multi-channel Chat with AI Agents and Q&A Bots and the Engagement Database and Analytics for End User Messaging and SES. We will simply refer to these as the Conversation Processor and Engagement Database in the solution diagram.

This diagrams shows the solution architecture in Level 300

The Conversation Processor receives customer messages via AWS End User Messaging and Amazon Simple Email Service (SES), stores the conversation details, and invokes the relevant Amazon Bedrock Agent. Amazon Bedrock Agents use Large Language Models (LLMs) and knowledge bases to analyze tasks, break them into actionable steps, execute those steps or search the knowledge base, observe outcomes, and iteratively refine their approach until completing the task along with a response. Alternatively, the Conversation Processor can function as a Q&A bot in which case it uses Amazon Bedrock Knowledge Bases along with its RAG feature to generate an LLM answer and send back on the same channel as the customer’s message.

The Engagement Database collects and combines customer engagement data and conversational logs from across communication channels, storing the information in a centralized data lake on Amazon S3. By converting the data into a common, canonical format, the solution simplifies querying and analysis of these inbound events. A Lambda Transformer function leverages Apache Velocity Templates to transform the incoming JSON data, enabling real-time insights.

The raw event data stored in the Amazon S3 data lake can then be fed into other AWS services for further processing. For example, the data can flow into Amazon Connect Customer Data Profiles or Amazon SageMaker to support machine learning model training. Data analysts can use Amazon Athena to issue direct queries for detailed ad-hoc reporting, or to send the data to Amazon QuickSight for advanced visualizations and natural language querying capabilities through Amazon Q in QuickSight.

NOTE: There is the potential for end users to send Personal Identifiable Information (PII) in messages. To protect customer privacy, please consider using Amazon Comprehend to assist in redacting PII before storing messages in S3. The following blog post provides a good overview of how to use Comprehend to redact PII: Redact sensitive data from streaming data in near-real time using Amazon Comprehend and Amazon Kinesis Data Firehose.

Amazon Bedrock provides core GenAI capabilities such as LLMs, Knowledge Bases, Retrieval Augmented Generation (RAG), AI agents, and Guardrails, to understand customer asks, determine what action to take, and what to communicate back. Amazon Bedrock Knowledge Bases provide organization specific business knowledge and reasoning, while Amazon Bedrock Agents automate multistep tasks by seamlessly connecting with company systems, APIs, and data sources.

Prerequisites

The following prerequisites are necessary to build your modern communications hub:

  • An AWS account. Sign up for an AWS account at AWS website if you don’t have one.
  • Appropriate AWS Identity and Access Management(IAM) roles and permissions for Amazon Bedrock, AWS End User Messaging, and Amazon S3. For more information, see Create a service role for model import.
  • AWS End User Messaging Configuration: You’ll need to configure the necessary origination identity in the AWS End User Messagingservice to deliver messages via SMS or WhatsApp. If configuring SMS, a registered and active SMS Origination Phone Number must be provisioned in AWS End User Messaging SMS. (Within the United States, use 10DLC or Toll-Free Numbers (TFNs). If configuring WhatsApp, an active number that has been registered with Meta/WhatsApp should be provisioned in AWS End User Messaging Social.
  • Amazon Bedrock models: Bedrock Anthropic Claude 3.0 Sonnet and Titan Text Embeddings V2 enabled in your region. Note that these are the default models used by the solution, however, you are free to experiment with different models.
  • Docker Installed and Running – This is used locally to package resources for deployment.
  • Node (> v18) and NPM (> v8.19) installed and configured on your computer
  • The AWS Command Line Interface(AWS CLI) installed and configured
  • AWS CDK (v2) installed and configured on your computer.

Deploy the Conversation Processor and Engagement Database

Deploy the following two solutions. While not required, it is best to deploy them in this order, as outputs from the Engagement Database can be used in the Multi-Channel Chat example:

  1. Engagement Database and Analytics for End User Messaging and SES
  2. Multi-channel Chat with AI Agents and Q&A Bots

Each solution contains detailed instructions to deploy the required services using the AWS Cloud Development Kit (CDK). The first Engagement Database solution will create an Amazon Data Firehose stream that can be used as an input to the second Multi-Channel Chat application so that data can be stored and queried in the Engagement Database.

Multi-Channel Chat with AI Agents and Q&A Bot Data Sources
This solution demonstrates how users can interact with three different knowledge sources. You may not need all of three, however this should serve as a good example to build the right knowledge source for your particular use-case:

NOTE: The starter project creates an S3 bucket to store the documents used for the Bedrock Knowledge Base. Please consider using Amazon Macie to assist in the discovery of potentially sensitive data in S3 buckets. Amazon Macie can be enabled on a free trial for 30 days, up to 150GB per account.

  • Build your Knowledge Base on Amazon Bedrock using a Web Crawler. Optionally configure your knowledge base to scan or crawl website(s) to populate your knowledge base.
  • Amazon Bedrock Agents: Optionally enable your users to chat with an Amazon Bedrock Agents. Agents have the added benefit of supporting knowledge bases for answering questions and walking users through collecting the information needed to automate a task such as making a reservation. Sample agents are available in the Amazon Bedrock Agent Samples repository. Note that you will need to have an Amazon Bedrock Agent created in your region prior to deploying the solution.

Conclusion

A Modern Communications Hub, loosely coupled with core Generative AI services, will establish a composable foundation to build communication-channel-agnostic customer experiences on. Build one by leveraging the GenAI Integration Samples, Conversation Processor and Engagement Database, combining with the secure, scalable, high-performance, and cost-effective digital communication services by AWS End User Messaging and Amazon SES. This will provide a single point of conversational access to knowledge bases and agentic AI capabilities on Amazon Bedrock. Start experimenting with AI-powered customer experience innovations with a quick proof-of-concept that won’t interfere with your present customer engagement setup.

About the Authors

Announcing ASCP integration with Pod Identity: Enhanced security for secrets management in Amazon EKS

Post Syndicated from Rodrigo Bersa original https://aws.amazon.com/blogs/security/announcing-ascp-integration-with-pod-identity-enhanced-security-for-secrets-management-in-amazon-eks/

In 2021, Amazon Web Services (AWS) introduced the AWS Secrets and Configuration Provider (ASCP) for the Kubernetes Secrets Store Container Storage Interface (CSI) Driver, offering a reliable way to manage secrets in Amazon Elastic Kubernetes Service (Amazon EKS). Today, we’re excited to announce the integration of ASCP with Pod Identity, the new standard for AWS Identity and Access Management (IAM) integration with Amazon EKS. This integration provides enhanced security, simplified configuration, and improved Day-2 operation for applications running on EKS that need access to secrets stored in AWS Secrets Manager and AWS Systems Manager Parameter Store. In this post, we’ll cover use-case scenarios for single and cross-account secrets using Pod Identity integration with ASCP.

Amazon EKS introduced Pod Identity in 2023 as a feature that streamlines the process of configuring IAM permissions for Kubernetes applications. Pod Identity simplifies the identity management of applications running on top of Amazon EKS by allowing you to set up permissions directly through EKS interfaces, reducing the number of steps required and alleviating the need to switch between the EKS and IAM services. Pod Identity enables the use of a single IAM role across multiple clusters without updating trust policies, and it supports role session tags for more granular access control. This approach not only simplifies policy management by allowing the reuse of permission policies across roles, but also enhances security by enabling access to AWS resources based on matching tags.

Background

When your applications that are running on top of Amazon EKS require sensitive information like credentials to access a database, or a key to authenticate through an API, we call this kind of information secrets. You can secure, store, and manage secrets in AWS Secrets Manager. ASCP allows Kubernetes applications to securely retrieve the secrets stored in Secrets Manager and Systems Manager Parameter Store. Previously, ASCP relied on IAM roles for service accounts (IRSA) for authentication. Although IRSA provided improvements over previous methods, Pod Identity offers even greater security and simplicity.

Pod Identity provides a more secure and efficient way to integrate IAM roles with applications running on EKS, granting more granular AWS permissions to individual Pods, so that you don’t need instance-level credentials or IRSA.

This integration provides the following key benefits over using IRSA:

  • Enhanced security: Pod Identity provides a more granular and secure way to manage permissions at the Pod level, alleviating the need to expose the IAM role annotation on Kubernetes ServiceAccount objects.
  • Simplified configuration:Pod Identity streamlines and simplifies the setup process, reducing the potential for misconfiguration, especially in high-scale environments.
  • Improved operation: Pod Identity reduces the operational overhead compared to previous methods, centralizing the management with the AWS API.
  • Native EKS integration: Pod Identity is the new standard for IAM integration for applications running on EKS and provides a more cohesive experience.

Solution overview

With this integration, ASCP uses Pod Identity to authenticate and authorize access to AWS services. When a Pod requires access to a secret, the workflow is as shown in Figure 1.

Figure 1: The workflow performed by the Pod Identity and ASCP integration to provide access to a secret stored on AWS Secrets Manager for a Pod running on Amazon EKS

Figure 1: The workflow performed by the Pod Identity and ASCP integration to provide access to a secret stored on AWS Secrets Manager for a Pod running on Amazon EKS

The workflow is as follows:

  1. A user creates an IAM role and a Pod Identity association between the IAM role and the Kubernetes ServiceAccount assigned to the Pod.
  2. EKS API validates the Pod Identity association, allowing ASCP to use this role to authenticate with AWS services.
  3. If authorized, the Pod Identity agent allows ASCP to assume the IAM role assigned to the Pod through the use of a ServiceAccount token.
  4. The Pod retrieves the requested secrets values and makes them available to the Pod through the use of a mounted volume.

Prerequisites

You need to have the following prerequisites in place in order to implement this solution:

Implement the solution

This guide presents two scenarios: single-account setup and cross-account setup. Complete the single-account steps in Account A before proceeding to the cross-account configuration, which involves Account B. The cross-account setup builds on the single-account foundation to demonstrate secure secrets management across AWS accounts.

Amazon EKS cluster setup

Before you start, you’ll need to set up an Amazon EKS cluster with the required add-ons in a single account (Account A).

  1. (Optional) Use the following commands to set environment variables and create an Amazon EKS cluster:
    export CLUSTER_NAME=<YOUR_CLUSTER_NAME>
    export AWS_REGION=<YOUR_AWS_REGION>
    export NAMESPACE=<YOUR_NAMESPACE>
    export ACCOUNT_A=<ACCOUNT_ID_1>
    export ACCOUNT_B=<ACCOUNT_ID_2>
    export SECRET_NAME=<NAME_OF_YOUR_SECRET>
    export SECRET_NAME_CROSS=<NAME_OF_YOUR_CROSS_SECRET>

    Here are the environment variables for this demonstration:

    export CLUSTER_NAME=ascp-podidentity
    export SECRET_NAME=secret-a
    export AWS_REGION=$(aws configure get region)
    export NAMESPACE=default
    export SECRET_NAME=secret-a
    export SECRET_NAME_CROSS=secret-b

    Create the EKS cluster:

    eksctl create cluster —name $CLUSTER_NAME —version=1.31

  2. Update your kubeconfig file to enable Pod Identity on your EKS cluster (if it’s not already enabled):
    eksctl create addon --cluster $CLUSTER_NAME --name eks-pod-identity-agent

  3. Install the Secrets Store CSI driver:
    aws eks --region $AWS_REGION update-kubeconfig --name $CLUSTER_NAME
    helm repo add secrets-store-csi-driver https://kubernetes-sigs.github.io/secrets-store-csi-driver/charts
    helm install -n kube-system csi-secrets-store secrets-store-csi-driver/secrets-store-csi-driver

  4. Verify the Secrets Store CSI Driver installation:
    kubectl --namespace=kube-system get pods -l "app=secrets-store-csi-driver"

    You should see this expected output:

    NAME READY STATUS RESTARTS AGE
    csi-secrets-store-secrets-store-csi-driver-fcxf6 3/3 Running 0 41s
    csi-secrets-store-secrets-store-csi-driver-j9wqh 3/3 Running 0 41s

  5. Install the ASCP plugin:
    kubectl apply -f https://raw.githubusercontent.com/aws/secrets-store-csi-driver-provider-aws/main/deployment/aws-provider-installer.yaml

Consuming secrets in a single account

Now that you’ve set up your Amazon EKS cluster, in this use case, you’ll create an AWS Secrets Manager secret in the same account where the cluster and the applications reside (Account A).

  1. Create a secret in AWS Secrets Manager with tags kubernetes-namespace and eks-cluster-name within the same AWS account as the EKS cluster:
    aws secretsmanager create-secret --region $AWS_REGION \
    --name $SECRET_NAME \
    --tags Key=kubernetes-namespace,Value=$NAMESPACE Key=eks-cluster-name,Value=$CLUSTER_NAME \
    --secret-string "{"user":"user1","password":"passwd1"}"

    You should see this expected output:

    {
    "ARN": "arn:aws:secretsmanager:$AWS_REGION:$ACCOUNT_A:secret:dummy-secret-q1w2e3",
    "Name": "secret-a",
    "VersionId": "abcdefgh-1234-5678-ijklmnopqrst"
    }

    Note the Amazon Resource Name (ARN) of the new secret. You will use it in the next step.

  2. Create an IAM role and trust policy with the required permissions to retrieve the newly created secret within the same AWS account:
    SECRET_ARN=< NOTED_SECRET_ARN_FROM_PREVIOUS_STEP>
    cat << EOF > policy.json
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
                "Resource": ["$SECRET_ARN"],
                "Condition": {
                    "StringEquals": {
                        "secretsmanager:ResourceTag/kubernetes-namespace": "\${aws:PrincipalTag/kubernetes-namespace}",
                        "secretsmanager:ResourceTag/eks-cluster-name": "\${aws:PrincipalTag/eks-cluster-name}"
                    }
                }
            }
        ]
    }
    EOF

    In the preceding example IAM role permission policy, the use of conditions with kubernetes-namespace and eks-cluster-name tags helps enforce fine-grained access control by specifying that secrets can only be accessed by Pods from specific namespace and clusters. This allows secretsmanager actions only if the secret is tagged with the matching kubernetes-namespace and  eks-cluster-name values.

    cat << EOF > trust.json
    {
        "Version": "2012-10-17",
        "Statement": [
            {  
                "Effect": "Allow",
                "Principal": {
                    "Service": "pods.eks.amazonaws.com" 
                },
                "Action": [
                    "sts:AssumeRole",
                    "sts:TagSession"
                ]
            }
        ]
    }
    EOF

    Notice that the preceding example IAM role trust policy allows the Amazon EKS Pod Identity service (pods.eks.amazonaws.com) to assume the role and tag the session. These actions are necessary for Pod Identity to function correctly, enabling Pods to securely access AWS resources.

    Finally, apply the role and trust policy:

    aws iam create-role --role-name ascp-podidentity --assume-role-policy-document file://trust.json
    aws iam put-role-policy --role-name ascp-podidentity --policy-name ascp-podidentity --policy-document file://policy.json

    Note the ARN of the new role. You will use it in the next step.

  3. Create a Kubernetes ServiceAccount and add the Pod Identity association between the ServiceAccount and the IAM role:
    ROLE_ARN=<NOTED_ROLE_ARN_FROM_PREVIOUS_STEP>
    kubectl create serviceaccount serviceaccount-a
    
    aws eks create-pod-identity-association \
    --cluster-name "$CLUSTER_NAME" \
    --namespace "default" \
    --service-account serviceaccount-a \
    --role-arn $ROLE_ARN

  4. Create a SecretProviderClass to use the newly created secret in your Amazon EKS cluster.
    cat << EOF | kubectl apply -f -
    
    apiVersion: secrets-store.csi.x-k8s.io/v1
    kind: SecretProviderClass
    metadata:
      name: aws-secrets-manager
    spec:
      provider: aws
      parameters:
        objects: |
          - objectName: "secret-a" # Secret name or ARN to be mounted to the Pod
            objectType: "secretsmanager"
        usePodIdentity: "true" # Indicator to use Pod Identity instead of IRSA
        
    EOF

  5. Create a new deployment to consume your newly created secret using Pod Identity as a mounted volume.
    cat << EOF | kubectl apply -f -
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      labels:
        app: application-a
      name: application-a
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: application-a
      template:
        metadata:
          labels:
            app: application-a
        spec:
          containers:
          - image: public.ecr.aws/amazonlinux/amazonlinux:2023-minimal
            name: amazonlinux
            args:
            - infinity
            command:
            - sleep
     volumeMounts:
     - name: secrets-store-inline
     mountPath: "/mnt/secret" # Directory where secret will be mounted to
     readOnly: true
     volumes:
     - name: secrets-store-inline
     csi:
     driver: secrets-store.csi.k8s.io
     readOnly: true
     volumeAttributes:
     secretProviderClass: "aws-secrets-manager" # Match SecretProviderClass name created
     serviceAccountName: serviceaccount-a # Specify service account created
    
    EOF

  6. Validate that the deployment was created successfully and confirm the secret was mounted correctly.
    kubectl get pods -l app=application-a
    kubectl exec -it $(kubectl get pods -l app=application-a -o name) -- cat /mnt/secret/secret-a

    You should see this expected output:

    NAME READY STATUS RESTARTS AGE
    application-a-b98d44bb8-bzvn4 1/1 Running 0 3s
    
    {"user":"user1","password":"passwd1"}%

Working with cross-account secrets through resource policies

For this second use case, you’ll also use the Amazon EKS cluster on Account A, and create a new Secrets Manager secret in a different account (Account B).

In order to access a secret in a different account, you can’t use the default AWS Key Management Service (AWS KMS) key, but will use aws/secretsmanager to encrypt this secret. So you need to first create a new AWS KMS key that allows cross-account access.

On Account B

  1. Create a customer managed key on AWS KMS with cross-account permissions:
    cat < key-policy.json
    {
      "Version": "2012-10-17",
      "Id": "cross-account-access",
      "Statement": [
        {
          "Sid": "Enable IAM User Permissions",
          "Effect": "Allow",
          "Principal": {
            "AWS": "arn:aws:iam::${ACCOUNT_B}:root"
          },
          "Action": "kms:*",
          "Resource": "arn:aws:kms:${AWS_REGION}:${ACCOUNT_B}:key/*"
        },
        {
          "Sid": "Allow access for Key Administrators",
          "Effect": "Allow",
          "Principal": {
            "AWS": "arn:aws:iam::${ACCOUNT_B}:role/Admin"
          },
          "Action": [
            "kms:Create",
            "kms:Describe",
            "kms:Enable",
            "kms:List",
            "kms:Put",
            "kms:Update",
            "kms:Revoke",
            "kms:Disable",
            "kms:Get",
            "kms:Delete",
            "kms:TagResource",
            "kms:UntagResource",
            "kms:ScheduleKeyDeletion",
            "kms:CancelKeyDeletion",
            "kms:RotateKeyOnDemand"
          ],
          "Resource": "arn:aws:kms:${AWS_REGION}:${ACCOUNT_B}:key/*"
        },
        {
          "Sid": "Allow access through AWS Secrets Manager",
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": [
            "kms:Encrypt",
            "kms:Decrypt",
            "kms:ReEncrypt",
            "kms:CreateGrant",
            "kms:DescribeKey"
          ],
          "Resource": "arn:aws:kms:${AWS_REGION}:${ACCOUNT_B}:key/*",
          "Condition": {
            "StringEquals": {
              "kms:ViaService": "secretsmanager.${AWS_REGION}.amazonaws.com",
              "kms:CallerAccount": [
                "$ACCOUNT_A",
                "$ACCOUNT_B"
              ]
            }
          }
        },
        {
          "Sid": "Allow access through AWS Secrets Manager for Generate Data Key",
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": "kms:GenerateDataKey",
          "Resource": "arn:aws:kms:${AWS_REGION}:${ACCOUNT_B}:key/*",
          "Condition": {
            "StringEquals": {
              "kms:CallerAccount": [
                "$ACCOUNT_A",
                "$ACCOUNT_B"
              ]
            },
            "StringLike": {
              "kms:ViaService": "secretsmanager.${AWS_REGION}.amazonaws.com"
            }
          }
        },
        {
          "Sid": "Allow direct access to key metadata to the account",
          "Effect": "Allow",
          "Principal": {
            "AWS": [
              "arn:aws:iam::${ACCOUNT_A}:root",
              "arn:aws:iam::${ACCOUNT_B}:root"
            ]
          },
          "Action": [
            "kms:Describe",
            "kms:Get",
            "kms:List",
            "kms:RevokeGrant"
          ],
          "Resource": "arn:aws:kms:${AWS_REGION}:${ACCOUNT_B}:key/*"
        }
      ]
    }
    EOF

    Then create the KMS key:

    $ aws kms create key --region $AWS_REGION --policy file://key-policy.json

    You should see this expected output:

    {
        "KeyMetadata": {
            "AWSAccountId": "$ACCOUNT_B",
            "KeyId": "abcdefgh-1234-5678-90ij-klmnopqrstuv",
            "Arn": "arn:aws:kms:${AWS_REGION}:${ACCOUNT_B}:key/abcdefgh-1234-5678-90ij-klmnopqrstuv",
            "CreationDate": "2025-01-23T18:00:00.756000-05:00",
            "Enabled": true,
            "Description": "",
            "KeyUsage": "ENCRYPT_DECRYPT",
            "KeyState": "Enabled",
            "Origin": "AWS_KMS",
            "KeyManager": "CUSTOMER",
            "CustomerMasterKeySpec": "SYMMETRIC_DEFAULT",
            "KeySpec": "SYMMETRIC_DEFAULT",
            "EncryptionAlgorithms": [
                "SYMMETRIC_DEFAULT"
            ],
            "MultiRegion": false
        }
    }

    Take note of the KeyID and the ARN of the key. You can use it to create an alias for the newly created key:

    KEY_ID=<NOTED_KEY_ID_FROM_PREVIOUS_STEP>
    aws kms create-alias --region $AWS_REGION —-alias-name alias/secretsmanager-cross —-target-key-id $KEY_ID

  2. Create a new secret in Secrets Manager, using the new KMS key to encrypt it:
    aws secretsmanager create-secret --region $AWS_REGION \
    --name $SECRET_NAME_CROSS \
    --tags Key=kubernetes-namespace,Value=$NAMESPACE Key=eks-cluster-name,Value=$CLUSTER_NAME \
    --secret-string "This is a Cross Account Secret" \
    --kms-key-id $KEY_ID

    You should see this expected output:

    {
    "ARN": "arn:aws:secretsmanager:$AWS_REGION:$ACCOUNT_B:secret:secret-b-12345",
    "Name": "secret-b",
    "VersionId": "abcdefgh-1234-5678-ijklmnopqrst"
    }

    Note the ARN of the new secret. You will use it in the next step.

  3. Add a resource policy that allows the IAM role on Account A to access the newly created secret:
    SECRET_CROSS_ARN=< NOTED_SECRET_CROSS_ARN_FROM_PREVIOUS_STEP>
    cat << EOF > resource-policy.json
    
    {
      "Version" : "2012-10-17",
      "Statement" : [ {
        "Effect" : "Allow",
        "Principal" : {
          "AWS" : "arn:aws:iam::${ACCOUNT_A}:root"
        },
        "Action" : [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ],
        "Resource" : "$SECRET_CROSS_ARN"
        } 
      ]
    }
    
    EOF
    aws secretsmanager put-resource-policy --secret-id $SECRET_NAME_CROSS --resource-policy file://resource-policy.json

Now let’s go back to Account A.

On Account A

  1. Create a new IAM role and trust policy with the required permissions to retrieve the newly created secret within Account B:
    KEY_ARN=< NOTED_KEY_ARN_FROM_PREVIOUS_STEP>
    cat << EOF > cross-policy.json
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
                "Resource": ["$SECRET_CROSS_ARN"],
                "Condition": {
                    "StringEquals": {
                        "secretsmanager:ResourceTag/kubernetes-namespace": "\${aws:PrincipalTag/kubernetes-namespace}",
                        "secretsmanager:ResourceTag/eks-cluster-name": "\${aws:PrincipalTag/eks-cluster-name}"
                    }
                }
            },
            {
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:DescribeKey"
                    ],
                "Resource": "$KEY_ARN"
            }
        ]
    }
    EOF

    In the preceding example IAM role permission policy, note that the resource ARN is pointing to the secret created on Account B.

    cat << EOF > trust.json
    {
        "Version": "2012-10-17",
        "Statement": [
            {  
                "Effect": "Allow",
                "Principal": {
                    "Service": "pods.eks.amazonaws.com" 
                },
                "Action": [
                    "sts:AssumeRole",
                    "sts:TagSession"
                ]
            }
        ]
    }
    EOF

    The IAM role trust policy needs to have pods.eks.amazonaws.com as a trusted entity to perform the sts:AssumeRole and sts:TagSession actions.

    Next, create the role and apply the policy:

    aws iam create-role --role-name ascp-podidentity-cross --assume-role-policy-document file://trust.json
    aws iam put-role-policy --role-name ascp-podidentity-cross --policy-name ascp-podidentity-cross --policy-document file://cross-policy.json

    Note the ARN of the new role. You will use it in the next step.

  2. Create the second Kubernetes ServiceAccount and add the cross-account Pod Identity association:
    kubectl create serviceaccount serviceaccount-b
    
    ROLE_CROSS_ARN=<NOTED_ROLE_CROSS_ARN_FROM_PREVIOUS_STEP>
    aws eks create-pod-identity-association \
    --cluster-name "$CLUSTER_NAME" \
    --namespace "default" \
    --service-account serviceaccount-b \
    --role-arn $ROLE_CROSS_ARN

  3. Create a SecretProviderClass to use the cross-account secret created in Account B in your Amazon EKS cluster:
    cat << EOF | kubectl apply -f -
    
    apiVersion: secrets-store.csi.x-k8s.io/v1
    kind: SecretProviderClass
    metadata:
      name: aws-secrets-manager-cross
    spec:
      provider: aws
      parameters:
        objects: |
          - objectName: "$SECRET_CROSS_ARN" # Full ARN of the Secret in the ACCOUNT B to be mounted on Pod
            objectType: "secretsmanager"
        usePodIdentity: "true" # Indicator to use Pod Identity instead of IRSA
        
    EOF

  4. Create a new deployment to consume your newly created secret using Pod Identity as a mounted volume:
    cat << EOF | kubectl apply -f -
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      labels:
        app: application-b
      name: application-b
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: application-b
      template:
        metadata:
          labels:
            app: application-b
        spec:
          containers:
          - image: public.ecr.aws/amazonlinux/amazonlinux:2023-minimal
            name: amazonlinux
            args:
            - infinity
            command:
            - sleep
     volumeMounts:
     - name: secrets-store-cross
     mountPath: "/mnt/secret" # Directory where secret will be mounted to readOnly: true
     volumes:
     - name: secrets-store-cross
     csi:
     driver: secrets-store.csi.k8s.io
     readOnly: true
     volumeAttributes:
     secretProviderClass: "aws-secrets-manager-cross" # Match SecretProviderClass name created for cross-account access serviceAccountName: serviceaccount-b # Specify service account created for cross-account access
    
    EOF

  5. Validate that the deployment was created successfully and confirm the mounted secret:
    kubectl get pods -l app=application-b
    kubectl exec -it $(kubectl get pods -l app=application-b -o name) -- cat /mnt/secret/$SECRET_CROSS_ARN

    Note that the volume name is the secret ARN, because it’s a cross-account secret.

    You should see this expected output:

    NAME READY STATUS RESTARTS AGE
    application-b-67b755444f-ngrhv 1/1 Running 0 8s
    
    "This is a Cross Account Secret"%

Conclusion

The integration of ASCP with Pod Identity marks a significant step forward in secrets management for Amazon EKS. It offers enhanced security, simplified configuration, and improved operations. We encourage all EKS users to explore this new integration and take advantage of its benefits.

The integration of ASCP with Pod Identity offers these benefits over IRSA:

  1. Simplified setup: With Pod Identity, you don’t need to create and manage service accounts for each application.
  2. Enhanced security: Pod Identity provides more granular control over permissions at the Pod level.
  3. Improved scalability: Pod Identity is easier to implement in large-scale environments.
  4. Consistent AWS experience: Pod Identity aligns more closely with AWS best practices for IAM management.

For more information, see our documentation for: AWS Secrets Manager, AWS Secrets CSI Store Provider (ASCP), and Amazon EKS Pod Identity.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.
 

Rodrigo Bersa
Rodrigo Bersa

Rodrigo is a Senior Specialist Solutions Architect for Containers and AppMod, with a focus on security and infrastructure-as-code automation. In this role, Rodrigo aims to help customers achieve their business goals by leveraging best practices for AWS container services when building new environments or migrating existing technologies.
Akshay Aggarwal
Akshay Aggarwal

Akshay is a Senior Technical Product Manager on the AWS Secrets Manager team. As part of the AWS Cryptography team, Akshay drives technologies and defines best practices that help improve customers’ experiences of building secure, reliable workloads in the AWS Cloud. Akshay is passionate about building technologies that are easy to use, secure, and scalable.
Stephanie Shen
Stephanie Shen

Stephanie Shen is a Software Development Engineer at Amazon Web Services (AWS), working on the AWS Security Services team. She has over 8 years of experience, previously working as a Product Manager. Stephanie aims to help customers improve their security posture by enhancing the features on AWS security services.

Use DeepSeek with Amazon OpenSearch Service vector databases and Amazon SageMaker

Post Syndicated from Jon Handler original https://aws.amazon.com/blogs/big-data/use-deepseek-with-amazon-opensearch-service-vector-databases-and-amazon-sagemaker/

DeepSeek-R1 is a powerful and cost-effective AI model that excels at complex reasoning tasks. When combined with Amazon OpenSearch Service, it enables robust Retrieval Augmented Generation (RAG) applications. This post shows you how to set up RAG using DeepSeek-R1 on Amazon SageMaker with an OpenSearch Service vector database as the knowledge base. This example provides a solution for enterprises looking to enhance their AI capabilities.

OpenSearch Service provides rich capabilities for RAG use cases, as well as vector embedding-powered semantic search. You can use the flexible connector framework and search flow pipelines in OpenSearch to connect to models hosted by DeepSeek, Cohere, and OpenAI, as well as models hosted on Amazon Bedrock and SageMaker. In this post, we build a connection to DeepSeek’s text generation model, supporting a RAG workflow to generate text responses to user queries.

Solution overview

The following diagram illustrates the solution architecture.

In this walkthrough, you will use a set of scripts to create the preceding architecture and data flow. First, you will create an OpenSearch Service domain, and deploy DeepSeek-R1 to SageMaker. You will execute scripts to create an AWS Identity and Access Management (IAM) role for invoking SageMaker, and a role for your user to create a connector to SageMaker. You will create an OpenSearch connector and model that will enable the retrieval_augmented_generation processor within OpenSearch to execute a user query, perform a search, and use DeepSeek to generate a text response. You will create a connector to SageMaker with Amazon Titan Text Embeddings V2 to create embeddings for a set of documents with population statistics. Finally, you will execute the query to compare population growth in Miami and New York City.

Prerequisites

We’ve created and open-sourced a GitHub repo with all the code you need to follow along with the post and deploy it for yourself. You will need the following prerequisites:

Deploy DeepSeek on Amazon SageMaker

You will need to have or deploy DeepSeek with an Amazon SageMaker endpoint. To learn more about deploying DeepSeek-R1 on SageMaker, refer to Deploying DeepSeek-R1 Distill Model on AWS using Amazon SageMaker AI.

Create an OpenSearch Service domain

Refer to Create an Amazon OpenSearch Service domain for instructions on how to create your domain. Make note of the domain Amazon Resource Name (ARN) and domain endpoint, both of which can be found in the General information section of each domain on the OpenSearch Service console.

Download and prepare the code

Run the following steps from your local computer or workspace that has Python and git:

  1. If you haven’t already, clone the repo into a local folder using the following command:
git clone https://github.com/Jon-AtAWS/opensearch-examples.git
  1. Create a Python virtual environment:
cd opensearch-examples/opensearch-deepseek-rag
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

The example scripts use environment variables for setting some common parameters. Set these up now using the following commands. Be sure to update with your AWS Region, your SageMaker endpoint ARN and URL, your OpenSearch Service domain’s endpoint and ARN, and your domain’s primary user and password.

export DEEPSEEK_AWS_REGION='<your current region>'
export SAGEMAKER_MODEL_INFERENCE_ARN='<your SageMaker endpoint’s ARN>' 
export SAGEMAKER_MODEL_INFERENCE_ENDPOINT='<your SageMaker endpoint’s URL>'
export OPENSEARCH_SERVICE_DOMAIN_ARN='<your domain’s ARN>’
export OPENSEARCH_SERVICE_DOMAIN_ENDPOINT='<your domain’s API endpoint>'
export OPENSEARCH_SERVICE_ADMIN_USER='<your domain’s master user name>'
export OPENSEARCH_SERVICE_ADMIN_PASSWORD='<your domain’s master user password>'

You now have the code base and have your virtual environment set up. You can examine the contents of the opensearch-deepseek-rag directory. For clarity of purpose and reading, we’ve encapsulated each of seven steps in its own Python script. This post will guide you through running these scripts. We’ve also chosen to use environment variables to pass parameters between scripts. In an actual solution, you would encapsulate the code in classes and pass the values where needed. Coding this way is clearer, but is less efficient and doesn’t follow coding best practices. Use these scripts as examples to pull from.

First, you will set up permissions for your OpenSearch Service domain to connect to your SageMaker endpoint.

Set up permissions

You will create two IAM roles. The first will allow OpenSearch to call your SageMaker endpoint. The second will allow you to make the create connector API call to OpenSearch.

  1. Examine the code in create_invoke_role.py.
  2. Return to the command line, and execute the script:
python create_invoke_role.py
  1. Execute the command line from the script’s output to set the INVOKE_DEEPSEEK_ROLE environment variable.

You have created a role named invoke_deepseek_role, with a trust relationship for OpenSearch Service to assume the role, and with a permission policy that allows OpenSearch Service to invoke your SageMaker endpoint. The script outputs the ARNs for your role and policy and additionally a command line command to add the role to your environment. Execute that command before running the next script. Make a note of the role ARN in case you need to return at a later time.

Now you need to create a role for your user to be able to create a connector in OpenSearch Service.

  1. Examine the code in create_connector_role.py.
  2. Return to the command line and execute the script:
python create_connector_role.py
  1. Execute the command line from the script’s output to set the CREATE_DEEPSEEK_CONNECTOR_ROLE environment variable.

You have created a role named create_deepseek_connector_role, with a trust relationship with the current user and permissions to write to OpenSearch Service. You need these permissions to call the OpenSearch create_connector API, which packages a connection to a remote model host, DeepSeek in this case. The script prints the policy’s and role’s ARNs, and additionally a command line command to add the role to your environment. Execute that command before running the next script. Again, make note of the role ARN, just in case.

Now that you have your roles created, you will tell OpenSearch about them. The fine-grained access control feature includes an OpenSearch role, ml_full_access, that will allow authenticated entities to execute API calls within OpenSearch.

  1. Examine the code in setup_opensearch_security.py.
  2. Return to the command line and execute the script:
python setup_opensearch_security.py

You set up the OpenSearch Service security plugin to recognize two AWS roles: invoke_create_connector_role and LambdaInvokeOpenSearchMLCommonsRole. You will use the second role later, when you connect with an embedding model and load data into OpenSearch to use as a RAG knowledge base. Now that you have permissions in place, you can create the connector.

Create the connector

You create a connector with configuration that tells OpenSearch how to connect, provides credentials for the target model host, and provides prompt details. For more information, see Creating connectors for third-party ML platforms.

  1. Examine the code in create_connector.py.
  2. Return to the command line and execute the script:
python create_connector.py
  1. Execute the command line from the script’s output to set the DEEPSEEK_CONNECTOR_ID environment variable.

The script will create the connector to call the SageMaker endpoint and return the connector ID. The connector is an OpenSearch construct that tells OpenSearch how to connect to an external model host. You don’t use it directly; you create an OpenSearch model for that.

Create an OpenSearch model

When you work with machine learning (ML) models, in OpenSearch, you use OpenSearch’s ml-commons plugin to create a model. ML models are an OpenSearch abstraction that let you perform ML tasks like sending text for embeddings during indexing, or calling out to a large language model (LLM) to generate text in a search pipeline. The model interface provides you with a model ID in a model group that you then use in your ingest pipelines and search pipelines.

  1. Examine the code in create_deepseek_model.py.
  2. Return to the command line and execute the script:
python create_deepseek_model.py
  1. Execute the command line from the script’s output to set the DEEPSEEK_MODEL_ID environment variable.

You created an OpenSearch ML model group and model that you can use to create ingest and search pipelines. The _register API places the model in the model group and references your SageMaker endpoint through the connector (connector_id) you created.

Verify your setup

You can run a query to verify your setup and make sure that you can connect to DeepSeek on SageMaker and receive generated text. Complete the following steps:

  1. On the OpenSearch Service console, choose Dashboard under Managed clusters in the navigation pane.
  2. Choose your domain’s dashboard.

Amazon OpenSearch Service console on the AWS console showing where to click to reveal a domain’s details

  1. Choose the OpenSearch Dashboards URL (dual stack) link to open OpenSearch Dashboards.
  2. Log in to OpenSearch Dashboards with your primary user name and password.
  3. Dismiss the welcome dialog by choosing Explore on my own.
  4. Dismiss the new look and feel dialog.
  5. Confirm the global tenant in the Select your tenant dialog.
  6. Navigate to the Dev Tools tab.
  7. Dismiss the welcome dialog.

You can also get to Dev Tools by expanding the navigation menu (three lines) to reveal the navigation pane, and scrolling down to Dev Tools.

OpenSearch Dashboards home screen, with an indicator on where to click to open the Dev Tools tab

The Dev Tools page provides a left pane where you enter REST API calls. You execute the commands and the right pane shows the output of the command. Enter the following command in the left pane, replace your_model_id with the model ID you created, and run the command by placing the cursor anywhere in the command and choosing the run icon.

POST _plugins/_ml/models/<your model ID>/_predict{  "parameters": {    "inputs": "Hello"  }}

You should see output like the following screenshot.

Congratulations! You’ve now created and deployed an ML model that can use the connector you created to call to your SageMaker endpoint, and use DeepSeek to generate text. Next, you will use your model in an OpenSearch search pipeline to automate a RAG workflow.

Set up a RAG workflow

RAG is a way of adding information to the prompt so that the LLM generating the response is more accurate. An overall generative application like a chatbot orchestrates a call to external knowledge bases and augments the prompt with knowledge from those sources. We’ve created a small knowledge base comprising population information.

OpenSearch provides search pipelines, which are sets of OpenSearch search processors that are applied to the search request sequentially to build a final result. OpenSearch has processors for hybrid search, reranking, and RAG, among others. You define your processor and then send your queries to the pipeline. OpenSearch responds with the final result.

When you build a RAG application, you choose a knowledge base and a retrieval mechanism. In most cases, you will use an OpenSearch Service vector database as a knowledge base, performing a k-nearest neighbor (k-NN) search to incorporate semantic information in the retrieval with vector embeddings. OpenSearch Service provides integrations with vector embedding models hosted in Amazon Bedrock and SageMaker (among other options).

Make sure that your domain is running OpenSearch 2.9 or later, and that fine-grained access control is enabled for the domain. Then complete the following steps:

  1. On the OpenSearch Service console, choose Integrations in the navigation pane.
  2. Choose Configure domain under Integration with text embedding models through Amazon SageMaker.

  1. Choose Configure public domain.
  2. If you created a virtual private cloud (VPC) domain instead, choose Configure VPC domain.

You will be redirected to the AWS CloudFormation console.

  1. For Amazon OpenSearch Endpoint, enter your endpoint.
  2. Leave everything else as default values.

The CloudFormation stack requires a role to create a connector to the all-MiniLM-L6-v2 model, hosted on SageMaker, called LambdaInvokeOpenSearchMLCommonsRole. You enabled access for this role when you ran setup_opensearch_security.py. If you changed the name in that script, be sure to change it in the Lambda Invoke OpenSearch ML Commons Role Name field.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names, and choose Create stack.

For simplicity, we’ve elected to use the open source all-MiniLM-L6-v2 model, hosted on SageMaker for embedding generation. To achieve high search quality for production workloads, you should fine-tune lightweight models like all-MiniLM-L6-v2, or use OpenSearch Service integrations with models such as Cohere Embed V3 on Amazon Bedrock or Amazon Titan Text Embedding V2, which are designed to deliver high out-of-the-box quality.

Wait for CloudFormation to deploy your stack and the status to change to Create_Complete.

  1. Choose the stack’s Outputs tab on the CloudFormation console and copy the value for ModelID.

The AWS CloudFormation console showing the template results for the integration template and where to find the model ID

You will use this model ID to connect with your embedding model.

  1. Examine the code in load_data.py.
  2. Return to the command line and set an environment variable with the model ID of the embedding model:
export EMBEDDING_MODEL_ID='<the model ID from CloudFormation’s output>'
  1. Execute the script to load data into your domain:
python load_data.py

The script creates the population_data index and an OpenSearch ingest pipeline that calls SageMaker using the connector referenced by the embedding model ID. The ingest pipeline’s field mapping tells OpenSearch the source and destination fields for each document’s embedding.

Now that you have your knowledge base prepared, you can run a RAG query.

  1. Examine the code in run_rag.py.
  2. Return to the command line and execute the script:
python run_rag.py

The script creates a search pipeline with an OpenSearch retrieval_augmented_generation processor. The processor automates running an OpenSearch k-NN query to retrieve relevant information and adding that information to the prompt. It uses the generation_model_id and connector to the DeepSeek model on SageMaker to generate a text response for the user’s question. The OpenSearch neural query (line 55 of run_rag.py) takes care of generating the embedding for the k-NN query using the embedding_model_id. In the ext section of the query, you provide the user’s question for the LLM. The llm_model is set to bedrock/claude because the parameterization and actions are the same as they are for DeepSeek. You’re still using DeepSeek to generate text.

Examine the output from OpenSearch Service. The user asked the question “What’s the population increase of New York City from 2021 to 2023? How is the trending comparing with Miami?” The first portion of the result shows the hits—documents OpenSearch retrieved from the semantic query—as the population statistics for New York City and Miami. The next section of the response includes the prompt, as well as DeepSeek’s answer.

Okay, so I need to figure out the population increase of New York City from 2021 to 2023 and compare it with Miami's growth. 
Let me start by looking at the data provided in the search results.
From SEARCH RESULT 2, I see that in 2021, NYC had a population of 18,823,000. 
In 2022, it was 18,867,000, and in 2023, it's 18,937,000. 
So, the increase from 2021 to 2022 is 18,867,000 - 18,823,000 = 44,000. 
Then from 2022 to 2023, it's 18,937,000 - 18,867,000 = 70,000. 
Adding those together, the total increase from 2021 to 2023 is 44,000 + 70,000 = 114,000.
Now, looking at Miami's data in SEARCH RESULT 1. In 2021, Miami's population was 6,167,000, in 2022 it was 6,215,000, and in 2023 it's 6,265,000. 
The increase from 2021 to 2022 is 6,215,000 - 6,167,000 = 48,000. From 2022 to 2023, it's 6,265,000 - 6,215,000 = 50,000. 
So, the total increase is 48,000 + 50,000 = 98,000.Comparing the two, NYC's increase of 114,000 is higher than Miami's 98,000. 
So, NYC's population increased more over that period."

Congratulations! You’ve connected to an embedding model, created a knowledge base, and used that knowledge base, along with DeepSeek, to generate a text response to a question on population changes in New York City and Miami. You can adapt the code from this post to create your own knowledge base and run your own queries.

Clean up

To avoid incurring additional charges, clean up the resources you deployed:

  1. Delete the SageMaker deployment of DeepSeek. For instructions, see Cleaning Up.
  2. If your Jupyter notebook has lost context, you can delete the endpoint:
    1. On the SageMaker console, under Inference in the navigation pane, choose Endpoints.
    2. Select your endpoint and choose Delete.
  3. Delete the CloudFormation template for connecting to SageMaker for the embedding model.
  4. Delete the OpenSearch Service domain you created.

Conclusion

The OpenSearch connector framework is a flexible way for you to access models you host on other platforms. In this example, you connected to the open source DeepSeek model that you deployed on SageMaker. DeepSeek’s reasoning capabilities, augmented with a knowledge base in the OpenSearch Service vector engine, enabled it to answer a question comparing population growth in New York and Miami.

Find out more about AI/ML capabilities of OpenSearch Service, and let us know how you are using DeepSeek and other generative models to build!


About the Authors

Jon Handler is the Director of Solutions Architecture for Search Services at Amazon Web Services, based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have search and log analytics workloads for OpenSearch. Prior to joining AWS, Jon’s career as a software developer included four years of coding a large-scale, eCommerce search engine. Jon holds a Bachelor of the Arts from the University of Pennsylvania, and a Master of Science and a Ph. D. in Computer Science and Artificial Intelligence from Northwestern University.

Yaliang Wu is a Software Engineering Manager at AWS, focusing on OpenSearch projects, machine learning, and generative AI applications.

Learning AWS best practices from Amazon Q in the Console

Post Syndicated from Brendan Jenkins original https://aws.amazon.com/blogs/devops/learning-aws-best-practices-from-amazon-q-in-the-console/

Operators, administrators, developers, and many other personas leveraging AWS come across multiple use cases and common issues such as lack of permissions, bugs in code in AWS Lambda, and more when leveraging the AWS console. To help alleviate this burden when using the console, AWS released Amazon Q to assist with users accessing the console with these use cases. Amazon Q is AWS’s generative AI-powered assistant that helps write code, answer questions, generate content, solve problems, manage AWS resources, and take actions. A component of Amazon Q is Amazon Q Developer. One way to interact with the service is to chat with Amazon Q Developer in the AWS Management Console, the AWS Console Mobile Application, on AWS websites, AWS Documentation websites, and chat channels integrated with AWS Chatbot to learn about AWS services. You can ask Q Developer about best practices, recommendations, step-by-step instructions for AWS tasks, and architecting your AWS resources and workflows.

In this blog post, we will highlight best practices for interacting with Q Developer in the console including topics such as using Q Developer in the console to generate code snippets, architect workloads, and understand your costs.

Prerequisites

To follow along with these examples, the following prerequisites are required:

Overview

Here are some of the examples on how Amazon Q Developer in the console can be utilized:

Please Note – Amazon Q in Console may generate different output than shown in examples below due to its non-deterministic nature.

To start, access the Amazon Q Developer service by signing into the AWS console and clicking the Amazon Q icon on the right-hand panel as shown below in Figure 1. Authenticate if necessary:

Accessing Amazon Q Chat

Figure 1: Accessing Amazon Q Chat

Use Q to learn about AWS services and best practices

In this section, we will look at how Amazon Q can help you learn about AWS services and also outline the best practices for using those services

Learn about services available in AWS

Whether someone is just learning or an experienced user, Amazon Q provides a simple way to discover AWS capabilities and get helpful information whenever needed.
For example, if you are looking to learn on how to auto-scale your compute instances based on a metric you can ask Amazon Q in console.

Sample prompt –
I need to set a autoscaling group for EC2 instances with this requirement, if
CPU utilization goes above 50% for 5 minutes then it should add new instance
and if CPU utilization drops below 50% for 5 minutes then it should delete 1
instance.

User entering prompt about setting up an auto scaling compute Instance based on a specific metric and threshold and Q generating a response.

Figure 2: User Prompt and Response from Amazon Q for Auto Scaling Compute Instance based on a specific metric and threshold

The response from Amazon Q lists down all the steps to set an Auto Scaling Group based on the requirements provided in the prompt.

Ask specific questions about AWS services

Amazon Q in console can also help you to run systems to deliver business value keeping best practices in mind. With natural language prompts, you can learn the best practices when using AWS services.

Sample Prompt –

I am using API Gateway for REST APIs. I have configured timeout for requests. I would like to learn additional best practices to reduce and handle long running requests.

User entering prompt to Q about keeping compute costs low and Q generating a response.

Figure 3: User Prompt and Response from Amazon Q keeping compute costs low

As shown above in Figure 3, Amazon Q then summarizes various ways to optimize for long running requests in Amazon API Gateway, for example, implement timeouts, use asynchronous invocation for long running operation if possible and several other ways to optimize.

Use Q to generate code snippets or scripts to automate tasks using AWS SDK or AWS CLI

Developers or System Administrators can use Q to generate code snippets or scripts to automate tasks instead of spending time going through documentation.

How to write an AWS Lambda function to read data from S3

For example, a developer may way to get started writing an AWS Lambda function that reads data from an Amazon S3 bucket but doesn’t know how to get started, Amazon Q can help.

User Prompt and Response from Amazon Q on instructions on how to write the lambda function

Figure 4: User Prompt and Response from Amazon Q on instructions to write a Lambda function

User Prompt and Response from Amazon Q on instructions on how to write the lambda function

Figure 5: Response from Amazon Q with sample code to write a Lambda function

As shown above in Figure 4 & 5, Amazon Q returns step-by-step instructions on how to write the Lambda function, including sample code for reference.

How do I upload a file to an S3 bucket using the AWS CLI?

If a developer or IT Professional uses the AWS CLI frequently but struggle with finding right commands to accomplish a task, then Amazon Q is definitely helpful

Sample Prompt

How do I upload a file to an S3 bucket using the AWS CLI?

User Prompt and Response from Q with CLI command to upload a file to an S3 bucket

Figure 6: User Prompt and Response from Q with CLI command to upload file to S3 bucket

As shown above in Figure 6, Amazon Q returns the CLI command to upload a file to an S3 bucket. The response also suggests the command to verify the upload.

Use Console-to-Code to write code to automate use of other services

Console-to-Code records your console actions, then uses generative AI to suggest code in your preferred language which currently supports CLI commands, CDK Java, CDK Python, CDK TypeScript, CloudFormation JSON/YAML.

Let’s say a developer wants to generate a CloudFormation YAML with an Amazon EC2 instance and Amazon RDS database instance. For this, the developer can go to the console of Amazon EC2 and Amazon RDS. On the right side, choose Console-to-Code icon and choose Start Recording.

As shown in the figures below, once an Amazon EC2 instance and then Amazon RDS DB instance is launched, stop the recording and simply download the CloudFormation YAML template.

Console-to-Code recording Amazon EC2 and Amazon RDS Database instance launch

 Figure 7: Console-to-Code recording Amazon EC2 and Amazon RDS Database instance launch

Generate Infrastructure-As-Code from the Console-to-Code recording

Figure 8: Generate Infrastructure-As-Code from the Console-to-Code recording

Generated CloudFormation YAML template from the Console-to-Code recording

      Figure 9: Generated CloudFormation YAML template from the Console-to-Code recording

Use Q to architect a workload & optimize

In this section, we will focus on leveraging Amazon Q to architect new workloads and optimize existing workloads.

Architecting Workloads

Now, a developer wants to build a new financial services application with the following requirements:

  • Architecture must be serverless & event driven
  • It must provide functionality as deposits and withdrawals
  • It should use low-cost, efficient AWS services.
  • It should scale with traffic spikes that can reach thousands of requests per second.
  • Should support REST APIs

Let’s prompt Amazon Q Developer by copying the following prompt below and pasting it into the chat window:

Sample Prompt:

How can I build a new financial services application with the following requirements: it should be serverless and event driven, must provide functionality as deposits and withdrawals, It should use low-cost efficient AWS services, It should scale with traffic spikes that can reach thousands of requests per second, and lastly it should support REST APIs.

Amazon Q should provide a response on how to architect this workload. Here is a sample response received from Amazon Q:

Response from Amazon Q for architecture

Figure 10: Response from Amazon Q for architecture

Amazon Q was able to provide the following response in full:

________________________________________________

To build a new financial services application with the requirements you described, I would recommend the following AWS services and architecture:

  1. Serverless and Event-Driven Architecture:
    • Use AWS Lambda for the core application logic, which will allow you to run your code without provisioning or managing servers.
      Integrate AWS Lambda with other serverless services like Amazon API Gateway for the REST API, Amazon DynamoDB for the database, and Amazon SNS/SQS for event-driven messaging.
  2. Deposits and Withdrawals Functionality:
    • Use Amazon DynamoDB as the database to store customer account information and transaction details.
    • Implement the deposit and withdrawal logic in AWS Lambda functions, which can be triggered by events from Amazon API Gateway or Amazon SNS/SQS.
  3. Low-Cost and Efficient AWS Services:
    • Leverage AWS Lambda, which charges only for the compute time you consume, and Amazon DynamoDB, which provides a pay-per-request pricing model.
    • Use Amazon API Gateway’s pay-per-call pricing to keep costs low for the REST API.
    • Consider using Amazon SQS and Amazon SNS for their low-cost, highly available, and scalable messaging capabilities.
  4. Scalability and High-Performance:
    • AWS Lambda and the other serverless services you choose will automatically scale to handle traffic spikes of thousands of requests per second.
    • You can further optimize performance by using features like Amazon API Gateway’s caching, Amazon DynamoDB’s auto-scaling, and Amazon SQS’s message batching.
  5. REST API Support:
    • Integrate your application logic in AWS Lambda with Amazon API Gateway to expose a RESTful API. Amazon API Gateway will handle the API management, including routing, authentication, and throttling.

To get started, I recommend reviewing the following AWS documentation:

Please let me know if you have any other questions!
Sources:
Key AWS services – Financial Services Industry Lens
Harnessing the scale of AWS for financial simulations | AWS HPC Blog
RPC vs REST – Difference Between API Architectures – AWS

________________________________________________

As shown above, Amazon Q was able to provide the service recommendations for our architecture based on the requirements provided along with links to services recommended based on the requirements

Optimizing workloads

Now, let’s a developer or IT professional has an architecture that revolves around Amazon EC2 and an instance deployed called Server-1-demo in AWS and wants to optimize to help save on costs.

Similar to the previous section, open a new chat window within the Amazon Q chat in the console and enter the following prompt:

Sample Prompt:

Based on the current CPU utilization of my EC2 Server-1-demo, what do you recommend I do to cost optimize?

As a result, Q provides the following response:

Prompt to Q about the optimization of an EC2 instance based on CPU utilization and Response from Amazon Q

Figure 11: Optimization Response from Amazon Q

As shown, Amazon Q took in the context of the CPU utilization for Server-1-demo and made recommendations to leverage new instance types such as AWS Graviton which is designed to deliver the best price performance for your cloud workloads running in Amazon Elastic Compute Cloud (Amazon EC2) along with other recommendations.

Use Q to understand your costs

Another way to leverage Q in the console is to analyze costs. A developer or IT professional can use Amazon Q, to retrieve and analyze cost data from AWS Cost Explorer, being able ask questions about AWS costs and receive answers in natural language that reflect the actual costs of your AWS account.

Now, open a new Amazon Q in Console chat window and lets try the following prompt as an example:

Sample Prompt:

Show me the breakdown of EC2 costs by instance type for the last 30 days.

] Response from Amazon Q for the breakdown of EC2 costs in the last month

Figure 12: Response from Amazon Q for the breakdown of EC2 costs in the last month

As shown above in Figure 12, Q Developer provides a detailed breakdown of the EC2 instance types for the last 30 days.

Now, trying another example:

Sample Prompt:

What was my cost breakdown by service for the past three months?

Response from Amazon Q for the last 3 month’s spend analysis

Figure 13: Response from Amazon Q for last 3 months spend analysis

As shown in figure 13, Amazon Q provides detailed cost breakdowns, including percentages of total spend, making it easy to understand your AWS usage and expenses. This feature allows you to quickly identify your highest-cost services and track spending trends over time. Always verify your cost data with AWS Cost Explorer for the most accurate information. For more details and information on this capability, check out this blog covering the feature in more detail.

Best practices for using Amazon Q in the console

The previous sections showcased examples of leveraging Amazon Q capabilities for AWS application architecture and account management. In both cases, the input given to Amazon Q directly affects its output quality. Your question should be concise, clear and contain the necessary details for the tool to understand the scenario and what should be answered. The recommended approach for providing effective input to a generative AI chat bot is called Prompt Engineering. By adhering to the following best practices, you can achieve improved outcomes when using Amazon Q:

  • Specify the task you want Q to do: explain a concept, compare services, list pros and cons, generate a CLI command, generate a code snippet, suggest architecture options for a scenario.
  • Provide context: Why do you need to know this concept? For which part of your application or architecture will you apply the knowledge?

Amazon Q asks for additional details to better answer the question

          Figure 14: Amazon Q asks for additional details to better answer the question.

In this example, we asked for scenarios, which is what type of answer we want. We also specified the service we want scenarios about, and the edge case of the scenario (after instance creation). Amazon Q summarized our question and provided scenarios and sources in accordance with what we asked.

  • Break a series of questions into multiple questions.
  • Ask for one task at a time.
  • Don’t stop at the first answer; keep asking questions that use the information to help Q provide more enriched responses.

Amazon Q uses chat context to give an answer. In this scenario, the provided input was not enough for Q to provide a good answer, so it asked for more details and considered both inputs as a context to the answer.

Figure 15: Amazon Q uses chat context to give an answer.

In this scenario, the provided input was not enough for Q to provide a good answer, so it asked for more details and considered both inputs as a context to the answer.

  • Be mindful about security related questions about your account- Amazon Q in console may not provide answers that address security in your account
  • Your input must have the maximum of 1000 characters. This is another reason to be concise while providing an input.
  • Create a new conversation if you are going to start a new topic discussion. Unnecessary context will reduce Q answers specificity to your new situation.

Amazon Q does not provide security tips. Create a new conversation. Maximum allowed characters are 1000

Figure 16: Amazon Q does not provide security tips. Create a new conversation. Maximum allowed characters are 1000.

Conclusion

In this blog post, we explored the various ways in which Amazon Q, AWS’s generative AI-powered assistant, is used in the AWS Console to enhance productivity and reduce ramp-up time for developers, DevOps engineers, and architects. Amazon Q functions as an AWS consultant, offering advice on various tasks, such as understanding AWS services and implementing best practices, as well as generating code snippets and automating CLI commands. The tool’s capability to help architect new workloads and enhance existing ones based on specific needs was demonstrated with detailed examples. The importance of prompt engineering – crafting clear, concise prompts to elicit high-quality responses from the AI assistant – was also discussed. By embracing the capabilities of Amazon Q in the console, AWS users will streamline their workflows and speed up their cloud journey. Whether you’re a seasoned cloud architect or starting out, this AI-powered assistant will serve as a partner, helping you navigate the AWS landscape and unlock new levels of efficiency. As you continue exploring the possibilities of Amazon Q, follow the best practices outlined in this post, experiment!

About the authors

Brendan Jenkins

Brendan Jenkins is a Tech Lead Solutions Architect at Amazon Web Services (AWS) working with Enterprise AWS customers providing them with technical guidance and helping achieve their business goals. He has an area of specialization in DevOps and Machine Learning technology.

Renu Yadav

Renu Yadav is a Solutions Architect at Amazon Web Services (AWS), where she works with enterprise-level AWS customers providing them with technical guidance and help them achieve their business objectives. Renu has a strong passion for learning with her area of specialization in DevOps. She leverages her expertise in this domain to assist AWS customers in optimizing their cloud infrastructure and streamlining their software development and deployment processes.

Maria Mendes

Maria Mendes is a Solutions Architect and has been part of the CSC SA team since 2022, working with Small and Medium-sized business customers. Maria’s daily work consists of architecture reviews, providing AWS services best practices guidance, executing workshops with customers, and participating in multi-customer AWS event speaking activities. She is a generalist solutions architect and is also part of a technical field community inside AWS that is focused on DevOps services.

Handle errors in Apache Flink applications on AWS

Post Syndicated from Alexis Tekin original https://aws.amazon.com/blogs/big-data/error-handling-in-apache-flink-applications/

Data streaming applications continuously process incoming data, much like a never-ending query against a database. Unlike traditional database queries where you request data one time and receive a single response, streaming data applications constantly receive new data in real time. This introduces some complexity, particularly around error handling. This post discusses the strategies for handling errors in Apache Flink applications. However, the general principles discussed here apply to stream processing applications at large.

Error handling in streaming applications

When developing stream processing applications, navigating complexities—especially around error handling—is crucial. Fostering data integrity and system reliability requires effective strategies to tackle failures while maintaining high performance. Striking this balance is essential for building resilient streaming applications that can handle real-world demands. In this post, we explore the significance of error handling and outline best practices for achieving both reliability and efficiency.

Before we can talk about how to handle errors in our consumer applications, we first need to consider the two most common types of errors that we encounter: transient and nontransient.

Transient errors, or retryable errors, are temporary issues that usually resolve themselves without requiring significant intervention. These can include network timeouts, temporary service unavailability, or minor glitches that don’t indicate a fundamental problem with the system. The key characteristic of transient errors is that they’re often short-lived and retrying the operation after a brief delay is usually enough to successfully complete the task. We dive deeper into how to implement retries in your system in the following section.

Nontransient errors, on the other hand, are persistent issues that don’t go away with retries and may indicate a more serious underlying problem. These could involve things such as data corruption or business logic violations. Nontransient errors require more comprehensive solutions, such as alerting operators, skipping the problematic data, or routing it to a dead letter queue (DLQ) for manual review and remediation. These errors need to be addressed directly to prevent ongoing issues within the system. For these types of errors, we explore DLQ topics as a viable solution.

Retries

As previously mentioned, retries are mechanisms used to handle transient errors by reprocessing messages that initially failed due to temporary issues. The goal of retries is to make sure that messages are successfully processed when the necessary conditions—such as resource availability—are met. By incorporating a retry mechanism, messages that can’t be processed immediately are reattempted after a delay, increasing the likelihood of successful processing.

We explore this approach through the use of an example based on the Amazon Managed Service for Apache Flink retries with Async I/O code sample. The example focuses on implementing a retry mechanism in a streaming application that calls an external endpoint during processing for purposes such as data enrichment or real-time validation

The application does the following:

  1. Generates data simulating a streaming data source
  2. Makes an asynchronous API call to an Amazon API Gateway or AWS Lambda endpoint, which randomly returns success, failure, or timeout. This call is made to emulate the enrichment of the stream with external data, potentially stored in a database or data store.
  3. Processes the application based on the response returned from the API Gateway endpoint:
    1. If the API Gateway response is successful, processing will continue as normal
    2. If the API Gateway response times out or returns a retryable error, the record will be retried a configurable number of times
  1. Reformats the message in a readable format, extracting the result
  2. Sends messages to the sink topic in our streaming storage layer

In this example, we use an asynchronous request that allows our system to handle many requests and their responses concurrently—increasing the overall throughput of our application. For more information on how to implement asynchronous API calls in Amazon Managed Service for Apache Flink, refer to Enrich your data stream asynchronously using Amazon Kinesis Data Analytics for Apache Flink.

Before we explain the application of retries for the Async function call, here is the AsyncInvoke implementation that will call our external API:

@Override
public void asyncInvoke(IncomingEvent incomingEvent, ResultFuture<ProcessedEvent> resultFuture) {

    // Create a new ProcessedEvent instance
    ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage());
    LOG.debug("New request: {}", incomingEvent);

    // Note: The Async Client used must return a Future object or equivalent
    Future<Response> future = client.prepareGet(apiUrl)
            .setHeader("x-api-key", apiKey)
            .execute();

    // Process the request via a Completable Future, in order to not block request synchronously
    // Notice we are passing executor service for thread management
    CompletableFuture.supplyAsync(() ->
        {
            try {
                LOG.debug("Trying to get response for {}", incomingEvent.getId());
                Response response = future.get();
                return response.getStatusCode();
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Error during async HTTP call: {}", e.getMessage());
                return -1;
            }
        }, org.apache.flink.util.concurrent.Executors.directExecutor()).thenAccept(statusCode -> {
        if (statusCode == 200) {
            LOG.debug("Success! {}", incomingEvent.getId());
            resultFuture.complete(Collections.singleton(processedEvent));
        } else if (statusCode == 500) { // Retryable error
            LOG.error("Status code 500, retrying shortly...");
            resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
        } else {
            LOG.error("Unexpected status code: {}", statusCode);
            resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
        }
    });
}

This example uses an AsyncHttpClient to call an HTTP endpoint that is a proxy to calling a Lambda function. The Lambda function is relatively straightforward, in that it merely returns SUCCESS. Async I/O in Apache Flink allows for making asynchronous requests to an HTTP endpoint for individual records and handles responses as they arrive back to the application. However, Async I/O can work with any asynchronous client that returns a Future or CompletableFuture object. This means that you can also query databases and other endpoints that support this return type. If the client in question makes blocking requests or can’t support asynchronous requests with Future return types, there isn’t any benefit to using Async I/O.

Some helpful notes when defining your Async I/O function:

  • Increasing the capacity parameter in your Async I/O function call will increase the number of in-flight requests. Keep in mind this will cause some overhead on checkpointing, and will introduce more load to your external system.
  • Keep in mind that your external requests are saved in application state. If the resulting object from the Async I/O function call is complex, object serialization may fall back to Kryo serialization which can impact performance.

The Async I/O function can process multiple requests concurrently without waiting for each one to be complete before processing the next. Apache Flink’s Async I/O function provides functionality for both ordered and unordered results when receiving responses back from an asynchronous call, giving flexibility based on your use case.

Errors during Async I/O requests

In the case that there is a transient error in your HTTP endpoint, there could be a timeout in the Async HTTP request. The timeout could be caused by the Apache Flink application overwhelming your HTTP endpoint, for example. This will, by default, result in an exception in the Apache Flink job, forcing a job restart from the latest checkpoint, effectively retrying all data from an earlier point in time. This restart strategy is expected and typical for Apache Flink applications, built to withstand errors without data loss or reprocessing of data. Restoring from the checkpoint should result in a fast restart with 30 seconds (P90) of downtime.

Because network errors could be temporary, backing off for a period and retrying the HTTP request could have a different result. Network errors could mean receiving an error status code back from the endpoint, but it could also mean not getting a response at all, and the request timing out. We can handle such cases within the Async I/O framework and use an Async retry strategy to retry the requests as needed. Async retry strategies are invoked when the ResultFuture request to an external endpoint is complete with an exception that you define in the preceding code snippet. The Async retry strategy is defined as follows:

// async I/O transformation with retry
AsyncRetryStrategy retryStrategy =
        new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<ProcessedEvent>(
                3, 1000) // maxAttempts=3, initialDelay=1000 (in ms)
                .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
                .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
                .build();

When implementing this retry strategy, it’s important to have a solid understanding of the system you will be querying. How will retries impact performance? In the code snippet, we’re using a FixedDelayRetryStrategy that retries requests upon error one time every second with a delay of one second. The FixedDelayRetryStrategy is only one of several available options. Other retry strategies built into Apache Flink’s Async I/O library include the ExponentialBackoffDelayRetryStrategy, which increases the delay between retries exponentially upon every retry. It’s important to tailor your retry strategy to the specific needs and constraints of your target system.

Additionally, within the retry strategy, you can optionally define what happens when there are no results returned from the system or when there are exceptions. The Async I/O function in Flink uses two important predicates: isResult and isException.

The isResult predicate determines whether a returned value should be considered a valid result. If isResult returns false, in the case of empty or null responses, it will trigger a retry attempt.

The isException predicate evaluates whether a given exception should lead to a retry. If isException returns true for a particular exception, it will initiate a retry. Otherwise, the exception will be propagated and the job will fail.

If there is a timeout, you can override the timeout function within the Async I/O function to return zero results, which will result in a retry in the preceding block. This is also true for exceptions, which will result in retries, depending on the logic you determine to cause the .compleExceptionally() function to trigger.

By carefully configuring these predicates, you can fine-tune your retry logic to handle various scenarios, such as timeouts, network issues, or specific application-level exceptions, making sure your asynchronous processing is robust and efficient.

One key factor to keep in mind when implementing retries is the potential impact on overall system performance. Retrying operations too aggressively or with insufficient delays can lead to resource contention and reduced throughput. Therefore, it’s crucial to thoroughly test your retry configuration with representative data and loads to make sure you strike the right balance between resilience and efficiency.

A full code sample can be found at the amazon-managed-service-for-apache-flink-examples repository.

Dead letter queue

Although retries are effective for managing transient errors, not all issues can be resolved by reattempting the operation. Nontransient errors, such as data corruption or validation failures, persist despite retries and require a different approach to protect the integrity and reliability of the streaming application. In these cases, the concept of DLQs comes into play as a vital mechanism for capturing and isolating individual messages that can’t be processed successfully.

DLQs are intended to handle nontransient errors affecting individual messages, not system-wide issues, which require a different approach. Additionally, the use of DLQs might impact the order of messages being processed. In cases where processing order is important, implementing a DLQ may require a more detailed approach to make sure it aligns with your specific business use case.

Data corruption can’t be handled in the source operator of the Apache Flink application and will cause the application to fail and restart from the latest checkpoint. This issue will persist unless the message is handled outside of the source operator, downstream in a map operator or similar. Otherwise, the application will continue retrying and retrying.

In this section, we focus on how DLQs in the form of a dead letter sink can be used to separate messages from the main processing application and isolate them for a more focused or manual processing mechanism.

Consider an application that is receiving messages, transforming the data, and sending the results to a message sink. If a message is identified by the system as corrupt, and therefore can’t be processed, merely retrying the operation won’t fix the issue. This could result in the application getting stuck in a continuous loop of retries and failures. To prevent this from happening, such messages can be rerouted to a dead letter sink for further downstream exception handling.

This implementation results in our application having two different sinks: one for successfully processed messages (sink-topic) and one for messages that couldn’t be processed (exception-topic), as shown in the following diagram. To achieve this data flow, we need to “split” our stream so that each message goes to its appropriate sink topic. To do this in our Flink application, we can use side outputs.

The diagram demonstrates the DLQ concept through Amazon Managed Streaming for Apache Kafka topics and an Amazon Managed Service for Apache Flink application. However, this concept can be implemented through other AWS streaming services such as Amazon Kinesis Data Streams.

Flink writing to an exception topic and a sink topic while reading from MSK

Side outputs

Using side outputs in Apache Flink, you can direct specific parts of your data stream to different logical streams based on conditions, enabling the efficient management of multiple data flows within a single job. In the context of handling nontransient errors, you can use side outputs to split your stream into two paths: one for successfully processed messages and another for those requiring additional handling (i.e. routing to a dead letter sink). The dead letter sink, often external to the application, means that problematic messages are captured without disrupting the main flow. This approach maintains the integrity of your primary data stream while making sure errors are managed efficiently and in isolation from the overall application.

The following shows how to implement side outputs into your Flink application.

Consider the example that you have a map transformation to identify poison messages and produce a stream of tuples:

// Validate stream for invalid messages
SingleOutputStreamOperator<Tuple2<IncomingEvent, ProcessingOutcome>> validatedStream = source
        .map(incomingEvent -> {
            ProcessingOutcome result = "Poison".equals(incomingEvent.message)?ProcessingOutcome.ERROR: ProcessingOutcome.SUCCESS;
            return Tuple2.of(incomingEvent, result);
        }, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, ProcessingOutcome>>() {
        }));

Based on the processing result, you know whether you want to send this message to your dead letter sink or continue processing it in your application. Therefore, you need to split the stream to handle the message accordingly:

// Create an invalid events tag
private static final OutputTag<IncomingEvent> invalidEventsTag = new OutputTag<IncomingEvent>("invalid-events") {};

// Split the stream based on validation
SingleOutputStreamOperator<IncomingEvent> mainStream = validatedStream
        .process(new ProcessFunction<Tuple2<IncomingEvent, ProcessingOutcome>, IncomingEvent>() {
            @Override
            public void processElement(Tuple2<IncomingEvent, ProcessingOutcome> value, Context ctx,
                    Collector<IncomingEvent> out) throws Exception {
                if (value.f1.equals(ProcessingOutcome.ERROR)) {
                    // Invalid event (true), send to DLQ sink
                    ctx.output(invalidEventsTag, value.f0);
                } else {
                    // Valid event (false), continue processing
                    out.collect(value.f0);
                }
            }
        });


// Retrieve exception stream as Side Output
DataStream<IncomingEvent> exceptionStream = mainStream.getSideOutput(invalidEventsTag);

First create an OutputTag to route invalid events to a side output stream. This OutputTag is a typed and named identifier you can use to separately manage and direct specific events, such as invalid ones, to a distinct stream for further handling.

Next, apply a ProcessFunction to the stream. The ProcessFunction is a low-level stream processing operation that gives access to the basic building blocks of streaming applications. This operation will process each event and decide its path based on its validity. If an event is marked as invalid, it’s sent to the side output stream defined by the OutputTag. Valid events are emitted to the main output stream, allowing for continued processing without disruption.

Then retrieve the side output stream for invalid events using getSideOutput(invalidEventsTag). You can use this to independently access the events that were tagged and send them to the dead letter sink. The remainder of the messages will remain in the mainStream , where they can either continue to be processed or be sent to their respective sink:

// Send messages to appropriate sink
exceptionStream
        .map(value -> String.format("%s", value.message))
        .sinkTo(createSink(applicationParameters.get("DLQOutputStream")));
mainStream
        .map(value -> String.format("%s", value.message))
        .sinkTo(createSink(applicationParameters.get("ProcessedOutputStreams")));

The following diagram shows this workflow.

If a message is not poison, it is routed to the not-posion side of the chart, but if it is, it is routed to the exception stream

A full code sample can be found at the amazon-managed-service-for-apache-flink-examples repository.

What to do with messages in the DLQ

After successfully routing problematic messages to a DLQ using side outputs, the next step is determining how to handle these messages downstream. There isn’t a one-size-fits-all approach for managing dead letter messages. The best strategy depends on your application’s specific needs and the nature of the errors encountered. Some messages might be resolved though specialized applications or automated processing, while others might require manual intervention. Regardless of the approach, it’s crucial to make sure there is sufficient visibility and control over failed messages to facilitate any necessary manual handling.

A common approach is to send notifications through services such as Amazon Simple Notification Service (Amazon SNS), alerting administrators that certain messages weren’t processed successfully. This can help make sure that issues are promptly addressed, reducing the risk of prolonged data loss or system inefficiencies. Notifications can include details about the nature of the failure, enabling quick and informed responses.

Another effective strategy is to store dead letter messages externally from the stream, such as in an Amazon Simple Storage Service (Amazon S3) bucket. By archiving these messages in a central, accessible location, you enhance visibility into what went wrong and provide a long-term record of unprocessed data. This stored data can be reviewed, corrected, and even re-ingested into the stream if necessary.

Ultimately, the goal is to design a downstream handling process that fits your operational needs, providing the right balance of automation and manual oversight.

Conclusion

In this post, we looked at how you can leverage concepts such as retries and dead letter sinks for maintaining the integrity and efficiency of your streaming applications. We demonstrated how you can implement these concepts through Apache Flink code samples highlighting Async I/O and Side Output capabilities:

To supplement, we’ve included the code examples highlighted in this post in the above list. For more details, refer to the respective code samples. It’s best to test these solutions with sample data and known results to understand their respective behaviors.


About the Authors

Alexis Tekin is a Solutions Architect at AWS, working with startups to help them scale and innovate using AWS services. Previously, she supported financial services customers by developing prototype solutions, leveraging her expertise in software development and cloud architecture. Alexis is a former Texas Longhorn, where she graduated with a degree in Management Information Systems from the University of Texas at Austin.

Jeremy Ber has been in the software space for over 10 years with experience ranging from Software Engineering, Data Engineering, Data Science and most recently Streaming Data. He currently serves as a Streaming Specialist Solutions Architect at Amazon Web Services, focused on Amazon Managed Streaming for Apache Kafka (MSK) and Amazon Managed Service for Apache Flink (MSF).

Cyber Security Cloud, Inc. accelerates sales with CloudSmart Insights and Amazon SES

Post Syndicated from Anne Grahn original https://aws.amazon.com/blogs/messaging-and-targeting/cyber-security-cloud-inc-accelerates-sales-with-cloudsmart-insights-and-amazon-ses/

In today’s rapidly evolving digital landscape, effective content curation is essential for businesses to stand out and connect with their target audience.

Optimizing customer outreach can be a difficult task. Sales intelligence can help you use data to understand customer behavior, attract prospects with relevant messaging, and focus sales and marketing efforts where they’ll make the most impact.

Web security service provider Cyber Security Cloud Inc. (CSC) is using CloudSmart Insights and Amazon Simple Email Service (SES) to curate and deliver targeted content, and to drive sales of its web application firewall (WAF) automation service, WafCharm.

What is CloudSmart Insights?
CloudSmart Insights is a go-to-market (GTM) and co-sell intelligence solution for Amazon Web Services (AWS) Marketplace sellers. CloudSmart Insights helps remove guesswork, and the need for manual authoring and analyzing of reports from AWS Marketplace seller operations. With CloudSmart Insights, AWS Marketplace sellers can easily visualize sales and forecasts without the need for custom coding, business intelligence (BI) authoring, or data science skills.

CloudSmart Insights’ private offer feature on the AWS Marketplace empowers other Marketplace sellers to deliver personalized customer experiences tailored to individual needs. By curating targeted messages, CloudSmart Insights can provide their customers with valuable resources, guidance, and access to relevant features, helping to maximize investments from the outset. The feature allows CloudSmart Insights’ customers to create customized rules for cost, quantity, and duration, streamlining both single private offers and large-scale sales plays.

What is Amazon SES?
Amazon Simple Email Service (Amazon SES) is a cloud-based email service provider that can integrate into any application for high-volume email automation. Amazon SES supports a variety of deployments including dedicated, shared, or owned IP addresses. Reports on sender statistics and email deliverability tools can help you make every email count. Whether you use an email software to send transactional emails, marketing emails, or promotional emails, you pay only for what you use.

Who is Cyber Security Cloud, Inc.?
CSC provides web application security services powered by advanced artificial intelligence (AI) and global threat intelligence. CSC’s WafCharm is a managed cloud-based web application firewall (WAF) service that seamlessly integrates with AWS WAF to enhance the security of web applications deployed on AWS. WafCharm simplifies the process of configuring, managing, and updating AWS WAF rules, making it easier for your organization to protect web applications from threats.

The opportunity
CSC wanted to increase customer engagement and provide detailed guidance to facilitate the acceptance of private offers from AWS Marketplace. Delivering curated content was a central objective to increase the efficacy of communications. CSC turned to CloudSmart Insights to support customized messaging built on Amazon SES.

The solution
CSC chose CloudSmart Insights’ private offer curation feature to engage with existing and prospective customers using AWS Marketplace. Customers who  discover, purchase, and deploy CSC WafCharm now receive personalized communications directly from CloudSmart Insights through Amazon SES.

CSC uses the CloudSmart insight offer report to preview upcoming renewals, and creates curated messages via the CloudSmart private offer messaging feature. The integration with Amazon SES allows transactional messages to be curated to the customer’s needs, providing additional instructions, resources, and details of the offer. With this flexibility, CSC can manage renewals efficiently and deploy targeted promotional offers that increase engagement with buyers. Amazon SES also allows CSC to confirm that messages are sent from a trusted source.

CloudSmart Insights uses an Amazon QuickSight serverless architecture to allow automatic scaling and meet user requirements, without manual server management. This architecture helps keep dashboards responsive during peak usage periods.

By embedding Amazon QuickSight into CloudSmart Insights, CSC uses the systems they have already found to be effective and decreases the amount of individual configuration needed to examine data. AWS Marketplace provides CSC with APIs for creating and managing catalog products, offers, and agreements. The APIs also provide read-and-write actions to create, list, and manage private offers.

The steps for creating a custom private offer with CloudSmart Insights are fully detailed in this blog post.

The outcome

Integrating CloudSmart Insights with Amazon SES allowed CSC to target specific customer segments based on their interests, purchasing behavior, or demographics, reducing the time taken to send private offers from one hour to 5 minutes per offer extended.

“With CloudSmart Insights, CSC was able to incorporate Amazon SES features such as verified identities into their sales cycle for WafCharm. This helped to improve email deliverability by establishing the authenticity of sellers’ emails, and enhance security by protecting accounts from unauthorized use.” – Takashi Yoshimi, U.S. COO, Cyber Security Cloud Inc.

By tailoring email messages to provide acceptance instructions for individual recipients, CSC increased their closure rate by 5%. Automated email workflows allowed them to nurture leads and drive sales, making it easier for customers to understand the capabilities of WafCharm.

Errors and repetitive work within the CSC marketplace deal desk were reduced, allowing CSC’s customer satisfaction, marketing, and sales teams to gather and analyze areas of customer improvement more efficiently.

Reach the right targets
CloudSmart Insights is available through AWS Marketplace to help your organization create curated private offers, and enhance your AWS Marketplace journey. Visit AWS Marketplace for more information.

To learn more about optimizing email sending, visit Amazon SES. To learn more about CSC WafCharm, please visit the WafCharm website or contact Anri Nakayama, Vice President, Partner Relations at CSC.

Testing and evaluating GuardDuty detections

Post Syndicated from Marshall Jones original https://aws.amazon.com/blogs/security/testing-and-evaluating-guardduty-detections/

Amazon GuardDuty is a threat detection service that continuously monitors, analyzes, and processes Amazon Web Services (AWS) data sources and logs in your AWS environment. GuardDuty uses threat intelligence feeds, such as lists of malicious IP addresses and domains, file hashes, and machine learning (ML) models to identify suspicious and potentially malicious activity in your AWS environment. When GuardDuty identifies a potential security issue, it creates a GuardDuty finding that gives you information about what the potential security issue is, the resources involved, and contextualized information that’s key to remediating the issue. GuardDuty helps you monitor for the latest threats by continually expanding threat detection to emerging and common threats.

Whether you’re new to GuardDuty or are a long-time user, it’s recommended that you understand the different GuardDuty finding types and finding details and practice responding to them as suggested in the security pillar of AWS Well-Architected.

In this blog post, I dive deep into an open source tool for testing GuardDuty findings and then walk through three examples of how you can use this tool to test and improve your response to GuardDuty findings.

Overview

If you want to learn more about GuardDuty, you can read about the finding types in this AWS documentation. However, customers often want realistic findings in their environment to understand what a finding looks like and to practice responding hands on. While you can use GuardDuty to create sample findings in your environment, these findings are approximations populated with placeholder values and look different from real findings. Additionally, you cannot practice remediation with these findings because they’re not tied to actual resources in your account. This can be helpful if you only want to see what details are in a finding, but if you want to practice a real-world scenario, these sample findings might not be adequate.

To address this use case and provide customers with a secure and reliable way to test the threat detection capabilities of GuardDuty, the GuardDuty service team launched an open source project called GuardDuty Tester. The GuardDuty Tester creates infrastructure in your environment to simulate different security issues so that you can test GuardDuty findings that mirror actual security issues that you might encounter, such as crypto mining or a reverse shell being created on an Amazon Elastic Compute Cloud (Amazon EC2) instance. The GuardDuty Tester was originally released in 2018 as an AWS CloudFormation template and was focused more on testing investigation workflows than on a wide range of finding types. AWS has since released an updated version that uses the AWS Cloud Development Kit (AWS CDK) to make the infrastructure code easier to read and expanded the test coverage to over 100 unique finding types and resource combinations.

The ability to create findings across different resource types such as Amazon EC2, Amazon Simple Storage Service (Amazon S3), and Amazon Elastic Kubernetes Service (Amazon EKS) is a valuable resource for your security team, allowing them to simulate various types of threats with isolated infrastructure so that you don’t need to compromise your deployed workloads to improve response actions and techniques. Remember that the GuardDuty Tester doesn’t cover every possible scenario, but is instead focused on threat intelligence and rules-based findings. Anomaly-based findings, which require learning about how you operate your environment, aren’t included in the GuardDuty Tester.

Getting started with the GuardDuty Tester

The GuardDuty Tester is deployed by using the AWS CDK to create the required infrastructure and scripts to generate the GuardDuty findings. For safety, AWS recommends that you deploy the GuardDuty Tester in a nonproduction environment in an account that’s used specifically for this purpose. This way, your security team can differentiate between test GuardDuty findings and findings for other workloads that they’re monitoring.

In this post, I won’t walk through configuring the GuardDuty Tester because this is already documented in the GuardDuty documentation. Instead, I will go over what you need to know about the GuardDuty Tester and some of the benefits.

Figure 1 shows the GuardDuty Tester architecture, which includes the resources necessary to create GuardDuty findings for various protection plans such as Amazon S3 buckets, Amazon EC2 instances, and an Amazon EKS cluster. The tester also deploys a dedicated GuardDuty Tester instance where you will run the scripts needed to create the GuardDuty findings.

Figure 1: GuardDuty Tester architecture

Figure 1: GuardDuty Tester architecture

The GuardDuty Tester provides key features including:

  • A wide range of threat scenario simulations: Resources that the GuardDuty Tester can create findings for include Amazon S3, AWS Identity and Access Management (IAM), Amazon Elastic Container Service (Amazon ECS) for both Amazon EC2 and AWS Fargate hosted workloads, Amazon EKS, and AWS Lambda and covers over 105 threat scenarios. This includes GuardDuty runtime monitoring as well as other GuardDuty protection plans.
  • Access through AWS Systems Manager: The GuardDuty Tester provides secure access by using Systems Manager to minimize open ports to the internet and allowing access only through Systems Manager.
  • Modular scripts: With an expanded library of tests available, the GuardDuty Tester accepts user parameters to set the scope of the tests to run, which gives you greater flexibility for different testing scenarios.

Setting up the GuardDuty Tester environment is straightforward and requires only a few commands. As outlined in the documentation and the README file in the repository, there are a number of prerequisites to set up the stack. These prerequisites include Python 3+, git, the AWS Command Line Interface (AWS CLI), AWS Systems Manager Session Manager plugin, npm, Docker, and a subscription to Kali Linux image for Amazon EC2. You will have to subscribe to the Kali Linux instance in AWS Marketplace, but will be charged for the instance only while the GuardDuty Tester is deployed. After these prerequisites are met, you can clone the repository, install the packages, and deploy the GuardDuty Tester to your AWS account.

Deploying the GuardDuty Tester can take 20–30 minutes, but if you’re following along with this post, I assume that you have deployed the GuardDuty Tester into your environment and have started your Systems Manager session as stated on Part A of Step 3 – Run tester scripts in the GuardDuty documentation. Now, I will dive into the first testing example.

Manual investigation

The first test use case is about getting familiar with what GuardDuty findings look like and the details that a finding gives you. This might be one of your first steps after turning on GuardDuty, or this might be an activity that you perform to help new team members understand GuardDuty findings.

To start a manual investigation:

  1. Run the following command in your Systems Manager session to view the GuardDuty Tester options.
    Python3 guardduty_tester.py --help
  2. Run the following command in your Systems Manager session to create the first test finding.
    Python3 guardduty_tester.py - -ec2 - -runtime only - -tactics impact
  3. Before creating the findings, the GuardDuty Tester prompts you to confirm that it’s allowed to change GuardDuty settings in the environment. For example, if you’ve chosen to create findings related to the GuardDuty runtime monitoring feature but don’t have this feature enabled, the GuardDuty Tester will enable it for the tests and then disable it after testing is complete.

    Note: This will start the 30-day trial of the enabled features in this account, in this AWS Region, even if the feature is disabled after testing is complete. More information about GuardDuty pricing and free trials can be found on the GuardDuty pricing page.

  4. After choosing y which indicates “yes”, the GuardDuty Tester reports the number of domain reputation findings it’s expecting. Figure 2 shows an example of the expected findings. You can learn more about domain reputation findings in the GuardDuty finding documentation.

    Figure 2: Generated GuardDuty findings in the console

    Figure 2: Generated GuardDuty findings in the console

  5. After the GuardDuty Tester is finished, wait a few minutes and then go to the AWS Management Console for GuardDuty to see the findings. In this example, there are four new GuardDuty findings as expected from step 4 and shown in Figure 3. With the findings generated, you can start your manual investigation.

    Figure 3: GuardDuty finding details

    Figure 3: GuardDuty finding details

In the preceding figure, you can see some of the finding details presented—such as the action type and the process information—that can help you quickly identify what trigger started the suspicious communication. From here, I encourage you to use this finding to practice your runbooks for investigation and response. For example, you might start with validating and triaging the finding before moving into evidence collection and remediation. If you don’t have incident response runbooks already built, you can use this finding as an example to get started. There are multiple open source examples such as AWS incident response playbooks and AWS customer response playbook. A runbook will help your team evaluate the information provided in the GuardDuty finding and understand what else they need to know about your specific environment to properly respond to the finding. For example, in the finding, you will have resource and actor information but not things such as who is the account owner or point of contact for security for that account.

Creating alerts

The next use case highlights how to create alerts based on GuardDuty findings. When setting up alerting automation with tools such as Amazon Simple Notification Service (Amazon SNS) and Slack, you should create a finding using the GuardDuty Tester to test that you’ve configured your alert correctly. See Creating custom responses to GuardDuty findings for information about creating alerts with either of these tools. Figure 4 shows a sample EventBridge rule that will send GuardDuty findings to SNS.

Figure 4: EventBridge rule to send GuardDuty findings to SNS

Figure 4: EventBridge rule to send GuardDuty findings to SNS

For this post, I assume that you’ve already configured an Amazon EventBridge rule and Amazon SNS alert.

To test alerts:

  1. Run the following command in your Systems Manager session to create a privileged container finding.
    Python3 guardduty_tester.py - -finding ‘PrivilegedEscalation:Kubernetes/PrivilegedContainer’
  2. Shortly after creating this finding, you should see an SNS alert based on the finding type.

Figure 5: SNS notification from a GuardDuty finding

Figure 5: SNS notification from a GuardDuty finding

If you’ve configured the alert correctly, you will see an email similar to Figure 5. The email demonstrates that SNS notifications were successfully configured and tested using the GuardDuty Tester. If this is a new finding, you will receive this SNS notification shortly after the GuardDuty Tester generates the finding, but if this is an updated finding, then the timing will be based on the notification frequency configured in the account.

There are many ways that customers consume GuardDuty findings in their environments. Whether you’re using Amazon SNS or another mechanism such as a chat application, ticketing system, or a security information and event management (SIEM) solution, you can use this example of an EventBridge rule and the GuardDuty Tester to test out your notification pipeline.

Automated response

For the third use case, I show you how to create an automated action based on a GuardDuty finding. In this example, I create a finding based on an EC2 instance connecting to a Bitcoin mining domain, then based on this finding, I use Lambda to tag the instance to assist with identification during that investigation steps that follow. Although this is a simple example, it shows you what you can do by combining EventBridge rules and Lambda functions. If you want to create an automated response for GuardDuty runtime monitoring findings that requires making a host-level modification, you can use EventBridge rules with AWS Systems Manager Run Command to run commands locally on a host to remediate a security issue.

Start by creating a Lambda function that will take a GuardDuty event delivered by EventBridge, pull out the instance ID information, and then use that as a parameter in the create_tags API call. See the following example code.

import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:
        # Extract the necessary information from the GuardDuty finding
        instance_id = event['detail']['resource']['instanceDetails']['instanceId']
        account_id = event['detail']['accountId']
        region = event['detail']['region']

        # Create an EC2 client
        ec2 = boto3.client('ec2', region_name=region)

        # Add the "infected" and "cryptomining" tag value pair to the instance
        ec2.create_tags(
            Resources=[instance_id],
            Tags=[
                {
                    'Key': 'infected',
                    'Value': 'cryptomining'
                }
            ]
        )

        logger.info(f"Tagged instance {instance_id} with 'infected=cryptomining' in account {account_id} and region {region}")
        return {
            'statusCode': 200,
            'body': 'Instance tagged successfully'
        }
    except Exception as e:
        logger.error(f"Error tagging instance {instance_id}: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error tagging instance: {str(e)}"
        }

Next, I create an EventBridge rule specific to the Bitcoin mining finding that I want to test, shown in Figure 6. The target is the Lambda function that I just created.

Figure 6: EventBridge rule for crypto mining GuardDuty finding

Figure 6: EventBridge rule for crypto mining GuardDuty finding

Now that the EventBridge rule is in place with the Lambda function as the target, I can use the GuardDuty Tester to trigger a Bitcoin mining finding and test my solution with the following command.

Python3 guardduty_tester.py - - finding ‘CryptoCurrency:EC2/BitcoinTool.B!DNS’

After the finding is generated, I go to my EC2 instance, where there’s a new instance tag with a key of infected and a value of cryptomining, shown in Figure 7.

Figure 7: Updated tags after automated response

Figure 7: Updated tags after automated response

Although this is a general example, you can use the same approach across various actions that you might take in response to a GuardDuty finding and then test them using the GuardDuty Tester. Examples include using Lambda to add logic in AWS WAF, a network access control list (network ACL), or AWS Network Firewall to block suspicious traffic, or use Systems Manager Run Command to end a malicious process that’s running on a host.

Conclusion

The updated GuardDuty Tester represents a significant advancement in helping organizations validate and gain confidence in GuardDuty threat detection. The GuardDuty Tester now provides more comprehensive coverage of GuardDuty runtime monitoring and protection plans across various AWS services.

By using the GuardDuty Tester and following the use cases in this post, you can proactively assess your threat detection readiness, identify potential gaps, and implement necessary measures to help you fortify your AWS environments against evolving cyber threats.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.
 

Marshall Jones
Marshall Jones

Marshall is a Worldwide Security Specialist Solutions Architect at AWS. His background is in AWS consulting and security architecture and focused on a variety of security domains including edge, threat detection, and compliance. Today, he’s focused on helping enterprise AWS customers adopt and operationalize AWS security services to increase security effectiveness and reduce risk.

AWS Firewall Manager retrofitting: Harmonizing central security with application team flexibility

Post Syndicated from Ian Olson original https://aws.amazon.com/blogs/security/aws-firewall-manager-retrofitting-harmonizing-central-security-with-application-team-flexibility/

AWS Firewall Manager is a powerful tool that organizations can use to define common AWS WAF rules with centralized security policies. These policies specify which accounts and resources are in scope. Firewall Manager creates a web access control list (web ACL) that adheres to the organization’s policy requirements and associates it with the in-scope resources. Figure 1 shows a Firewall Manager security policy and web ACL created in each in-scope account.

Figure 1: A Firewall Manager security policy and Firewall Manager created web ACLs in each in-scope account

Figure 1: A Firewall Manager security policy and Firewall Manager created web ACLs in each in-scope account

In this post, we’ll talk about the benefits of retrofitting and how you can use this feature to allow Firewall Manager to manage existing web ACLs. When retrofitting is enabled, a Firewall Manager security policy doesn’t replace existing web ACLs. Instead, Firewall Manager adds the top and bottom rule sections to existing web ACLs associated with in-scope resources. For application teams, Firewall Manger no longer restricts how they configure and deploy AWS WAF. Teams can use either the AWS Management Console or infrastructure as code (IaC) tools to customize rules in web ACLs, even if those web ACLs are managed by Firewall Manager.

Firewall Manager before retrofitting

Firewall Manager offers significant benefits, but the existing approach results in several challenges:

  1. Compatibility with infrastructure as code (IaC): Firewall Manager creates and associates AWS WAF web ACLs with in-scope resources. IaC tools expect to create and manage resources (in other words, own their lifecycle). Application teams cannot use IaC to manage the WAF rules and other web ACL configuration components that are created by Firewall Manager; there are custom solutions that inject locally defined rules into Firewall Manager-created web ACLs, but these are complex and have risks such as drift. For AWS WAF customers and application teams, this introduces an operational challenge.
  2. Existing WAF migration: Customers who are already using AWS WAF must migrate existing rules to Firewall Manager-managed web ACLs.
  3. Application-specific rule complexity: Forcing all in-scope resources in the same account and AWS Region to use the same web ACL makes application-specific or exception-based rules more complex. In addition, changes to one application’s rules could impact others that share the same web ACL.
  4. Increased costs: When many applications share a single web ACL, application-specific rules are part of a single web ACL, which can increase total WAF capacity units (WCU) usage, sometimes resulting in higher AWS WAF request costs.

Firewall Manager with retrofitting addresses challenges

To address these challenges, Firewall Manager now offers the ability to retrofit existing web ACLs. Let’s get specific about when and how Firewall Manager retrofitting works:

  • Firewall Manager will only retrofit a web ACL when all associated resources (for example, Application Load Balancers, API Gateways, and Amazon CloudFront distributions) are in scope. If a not-in-scope resource is also associated, Firewall Manager will not retrofit or update future security policy changes to a retrofitted web ACL. In that scenario, associated in-scope resources and the web ACL are marked noncompliant with security policies.
  • Retrofitting only modifies customer-created web ACLs. Retrofitting will not act on web ACLs retrofitted by another security policy or managed by Firewall Manager. If either scenario occurs, the web ACL and associated resources are marked noncompliant with security policies.
  • Retrofitting adds the following on top of an existing web ACL that is associated with one or more in-scope resources:
    • Retrofitting adds first rule groups and last rule groups defined in a security policy to the web ACL. Existing rules within the web ACL are not changed. The order of rule evaluation changes is the following:
      1. Security policy–defined first rule group rules
      2. Security policy–defined last rule group rules
    • Retrofitting adds a WAF logging configuration if one is defined by the security policy. If the web ACL already has a logging configuration, Firewall Manager does not replace the existing logging configuration and marks the web ACL noncompliant.
    • Retrofitting does not verify or configure other attributes that are defined by the security policy. This includes the following properties: default action, custom request headers, web ACL Captcha or challenge configurations, and token domain list. These properties are used only when Firewall Manager creates a web ACL.
  • If an in-scope resource that supports AWS WAF does not have a web ACL, Firewall Manager creates and associates a Firewall Manager-managed web ACL with that resource.

Figure 2 shows the rules and logging configuration before and after a Firewall Manager security policy retrofits a web ACL that is associated with in-scope resources.

Figure 2: Using retrofitting to update an existing web ACL

Figure 2: Using retrofitting to update an existing web ACL

This new retrofitting capability solves the previous challenges in the following ways:

  1. IaC compatible: Application teams can provision and manage AWS WAF with IAC tools. Firewall Manager retrofits existing web ACLs by adding rules defined in the security policy to web ACLs created by IaC tools. Application teams can manage AWS WAF exactly the same as when Firewall Manager was not in use.
  2. Existing WAF integration: Customers with existing AWS WAF deployments can adopt Firewall Manager without the need to migrate existing WAF rules.
  3. Application-specific rules: Multiple resources in the same account can use separate web ACLs, which simplifies application-specific rules.
  4. Help prevent additional costs: Application-specific rules are applied only to the relevant web ACLs. This helps prevent increased AWS WAF request costs from shared web ACLs that have a high WCU usage.

By enhancing Firewall Manager to retrofit existing web ACLs, customers can use the power of centralized WAF management without restricting how AWS WAF is deployed and configured by application teams in member accounts.

Firewall Manager security policy for AWS WAF – Enabling retrofitting

Figure 3 shows an example of a Firewall Manager security policy.

Figure 3: An example Firewall Manager security policy that uses the new Retrofit existing webACLs feature

Figure 3: An example Firewall Manager security policy that uses the new Retrofit existing webACLs feature

This security policy defines WAF rules in the first rule group section. It applies to all Application Load Balancers (ALBs) across your organization in AWS Organizations in the Region where this security policy is created. The policy action is set to automatically remediate. Under Web ACL management, there is a new section, Managed web ACL source, with two options, Default and Retrofit existing webACLs. Default is the existing behavior: Firewall Manager creates and associates a Firewall Manager managed web ACL. Retrofit existing webACLs applies the WAF rules and logging configuration (if any) defined by a security policy to existing web ACLs when they are associated with an in-scope resource. This policy specifies Retrofit existing webACLs. If an in-scope resource does not have a web ACL, Firewall Manager still creates and associates a web ACL by default.

Retrofitting in action

Let’s walk through what happens when you set Managed web ACL source to Retrofit existing webACLs. Figure 4 shows two ALBs that are in-scope of our security policy, LoadBalancer1 and LoadBalancer2.

Figure 4: Two existing ALBs, one with an existing web ACL and the other without

Figure 4: Two existing ALBs, one with an existing web ACL and the other without

LoadBalancer1 has the following characteristics:

  • LoadBalancer1 does NOT have a web ACL associated.
  • After the Firewall Manager security policy applies, LoadBalancer1 is associated with a Firewall Manager created web ACL, as shown in Figure 5.

    Figure 5: LoadBalancer1 is now associated with a Firewall Manager managed and created web ACL

    Figure 5: LoadBalancer1 is now associated with a Firewall Manager managed and created web ACL

  • The Firewall Manager-created web ACL contains the WAF rules defined in the security policy.

LoadBalancer2 has the following characteristics:

  • LoadBalancer2 has an existing customer created web ACL associated with it, as shown in Figure 6.

    Figure 6: LoadBalancer2 is associated with a customer-created web ACL

    Figure 6: LoadBalancer2 is associated with a customer-created web ACL

  • This web ACL was created by the application team with an application-specific rule. The web ACL could have been created with the AWS Management Console, AWS CloudFormation, or other IaC tools like Terraform.
  • After the Firewall Manager security policy takes effect, LoadBalancer2 remains associated with the existing customer-created web ACL MyCustomWebACL.
  • Retrofitting adds WAF rules in the first rule group according to the security policy, as shown in Figure 7. Existing WAF rules are not changed, and rules and other aspects of the web ACL are not changed and can continue to be managed exactly as you would without Firewall Manager present.

    Figure 7: Firewall Manager has retrofitted a customer-created web ACL and added the security policy–defined first rule groups rules

    Figure 7: Firewall Manager has retrofitted a customer-created web ACL and added the security policy–defined first rule groups rules

Figure 8 shows that both ALBs are now compliant with our security policy. LoadBalancer1 has a web ACL created by Firewall Manager; future ALBs that don’t have a web ACL associated would also become associated to this web ACL. LoadBalancer2 is associated with a web ACL the application team previously created. The application-defined WAF rules are not changed and the WAF rules defined by the security policy are added to this web ACL.

Figure 8: Multiple ALBs in scope for the same security policy

Figure 8: Multiple ALBs in scope for the same security policy

Associate a web ACL with Firewall Manager already in place

Let’s continue from our previous scenario. The application team for LoadBalancer1 now configures application-specific rules for their ALB by using the following process.

  1. The application team creates a web ACL and defines their own WAF rules. They might do this with the console, AWS CloudFormation, or other IaC tools like Terraform.
  2. The application team associates LoadBalancer1 with the custom web ACL.

    Figure 9: From the web ACL, only LoadBalancer1 is associated with the app team’s web ACL

    Figure 9: From the web ACL, only LoadBalancer1 is associated with the app team’s web ACL

  3. After a moment, Firewall Manager detects the change to an in-scope resource (LoadBalancer1). Firewall Manager retrofits the application team’s web ACL AppTeamNewWebACL, making it align with the security policy.

    Figure 10: Firewall Manager has retrofitted the app team–created web ACL and added the security policy–defined first rule group rules

    Figure 10: Firewall Manager has retrofitted the app team–created web ACL and added the security policy–defined first rule group rules

The diagram in Figure 11 shows the workflow that happens when the application team creates their own web ACL and associates it with LoadBalancer1. Firewall Manager detects the association change and retrofits the application team’s web ACL again, bringing LoadBalancer1 into alignment with security policies.

Figure 11: Firewall Manager retrofitting a web ACL in response to being associated with an existing in-scope resource

Figure 11: Firewall Manager retrofitting a web ACL in response to being associated with an existing in-scope resource

Out-of-scope resources associated with a retrofitted web ACL

Let’s make a change to our security policy. In Figure 12, the policy scope has been updated to only apply to resources when they have the resource tag Tier: Production. Application teams add this tag to LoadBalancer1 and LoadBalancer2.

Figure 12: The Firewall Manager security policy scope has been updated to include a resource tag

Figure 12: The Firewall Manager security policy scope has been updated to include a resource tag

Later, an application team creates LoadBalancer3 and associates it with AppTeamNewWebACL (Figure 13).

Figure 13: A new ALB associated with a custom WAF web ACL

Figure 13: A new ALB associated with a custom WAF web ACL

The application team does not tag LoadBalancer3, making it out of scope with the security policy, as shown in Figure 14.

Figure 14: A new ALB that is out of scope with a security policy

Figure 14: A new ALB that is out of scope with a security policy

This web ACL is currently retrofitted by our security policy and associated with LoadBalancer2 (in-scope), as shown in Figure 15.

Figure 15: A retrofitted web ACL associated with in-scope and out-of-scope ALBs

Figure 15: A retrofitted web ACL associated with in-scope and out-of-scope ALBs

Firewall Manager detects that an out-of-scope resource is associated with a retrofitted web ACL. As shown in Figure 16, Firewall Manager marks the web ACL AppTeamNewWebACL noncompliant. It also marks LoadBalancer2 noncompliant.

Figure 16: Shows retrofit-specific reasons a resource or web ACL will be marked noncompliant. In this case, a not-in-scope resource is associated with a web ACL where another associated resource is in-scope.

Figure 16: Shows retrofit-specific reasons a resource or web ACL will be marked noncompliant. In this case, a not-in-scope resource is associated with a web ACL where another associated resource is in-scope.

As long as the web ACL is noncompliant, Firewall Manager will not retrofit a non-retrofitted web ACL, and future security policy changes will not be applied to that web ACL. Existing retrofitted WAF rules for that web ACL are not modified or removed. When the web ACL later becomes compliant, it will again retrofit the latest state of the security policy. Figure 17 shows how Firewall Manager keeps the existing retrofitting but does not apply updates. Using our example, one of three things would need to happen for the web ACL to become compliant:

  1. LoadBalancer3 can be made in-scope with the current security policy. The application team can then add the resource tag Tier: Production to this ALB.
  2. The resource tag can be removed from the policy scope.
  3. LoadBalancer3 can be disassociated with the web ACL.

Note: We recommend that you promptly address not-in-scope resources associated with web ACLs that are shared by in-scope resources. For example, if the preceding scenario was not addressed and LoadBalancer3 was deleted three months later, Firewall Manager would at that point retrofit the web ACL (3 months later). This is not necessarily a problem, but could trigger unexpected changes to a web ACL’s rules.

Figure 17: Firewall Manager retains existing but not future changes as long as a not-in-scope resource is associated with this web ACL

Figure 17: Firewall Manager retains existing but not future changes as long as a not-in-scope resource is associated with this web ACL

In summary: Firewall Manager initially retrofits and will apply future updates to existing web ACLs while all associated resources are in-scope. When an out-of-scope resource is associated, an initial retrofit is delayed and security policy retrofit updates are paused until the out-of-scope resource is addressed.

Figure 18 demonstrates how Firewall Manager will not perform an initial retrofit of a web ACL associated with both in-scope and not-in-scope resources.

Figure 18: Firewall Manger will only perform an initial retrofit when only in-scope resources are associated with a web ACL

Figure 18: Firewall Manger will only perform an initial retrofit when only in-scope resources are associated with a web ACL

Conclusion

Firewall Manager verifies that in-scope resources adhere to relevant security policies or are marked noncompliant. You can enable retrofitting for Firewall Manager for AWS WAF to seamlessly enforce these security policies without changing how your application teams manage the configuration of their WAF rules.

Note: Retrofitting is only available for AWS Firewall Manager security policies for AWS WAF; it is not available for AWS WAF Classic.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Firewall Manager re:Post or contact AWS Support.
 

Ian Olson
Ian Olson

Ian is a Senior Security Specialist Solutions Architect at AWS. He helps customers automate security services to safeguard against threats like DDoS and web exploitations. Through intelligent automation, he delivers security solutions tailored to organizations of any scale. When he’s not working, Ian enjoys spending quality time with his two young children.
Bryan Van Hook
Bryan Van Hook

Bryan is a Senior Security Solutions Architect at AWS. He has over 25 years of experience in software engineering, cloud operations, and internet security. He spends most of his time helping customers gain the most value from native AWS security services. Outside of his day job, Bryan can be found playing tabletop games and acoustic guitar.

Generate vector embeddings for your data using AWS Lambda as a processor for Amazon OpenSearch Ingestion

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/generate-vector-embeddings-for-your-data-using-aws-lambda-as-a-processor-for-amazon-opensearch-ingestion/

On Nov 22, 2024, Amazon OpenSearch Ingestion launched support for AWS Lambda processors. With this launch, you now have more flexibility enriching and transforming your logs, metrics, and trace data in an OpenSearch Ingestion pipeline. Some examples include using foundation models (FMs) to generate vector embeddings for your data and looking up external data sources like Amazon DynamoDB to enrich your data.

Amazon OpenSearch Ingestion is a fully managed, serverless data pipeline that delivers real-time log, metric, and trace data to Amazon OpenSearch Service domains and Amazon OpenSearch Serverless collections.

Processors are components within an OpenSearch Ingestion pipeline that enable you to filter, transform, and enrich events using your desired format before publishing records to a destination of your choice. If no processor is defined in the pipeline configuration, then the events are published in the format specified by the source component. You can incorporate multiple processors within a single pipeline, and they are run sequentially as defined in the pipeline configuration.

OpenSearch Ingestion gives you the option of using Lambda functions as processors along with built-in native processors when transforming data. You can batch events into a single payload based on event count or size before invoking Lambda to optimize the pipeline for performance and cost. Lambda enables you to run code without provisioning or managing servers, eliminating the need to create workload-aware cluster scaling logic, maintain event integrations, or manage runtimes.

In this post, we demonstrate how to use the OpenSearch Ingestion’s Lambda processor to generate embeddings for your source data and ingest them to an OpenSearch Serverless vector collection. This solution uses the flexibility of OpenSearch Ingestion pipelines with a Lambda processor to dynamically generate embeddings. The Lambda function will invoke the Amazon Titan Text Embeddings Model hosted in Amazon Bedrock, allowing for efficient and scalable embedding creation. This architecture simplifies various use cases, including recommendation engines, personalized chatbots, and fraud detection systems.

Integrating OpenSearch Ingestion, Lambda, and OpenSearch Serverless creates a fully serverless pipeline for embedding generation and search. This combination offers automatic scaling to match workload demands and a usage-driven model. Operations are simplified because AWS manages infrastructure, updates, and maintenance. This serverless approach allows you to focus on developing search and analytics solutions rather than managing infrastructure.

Note that Amazon OpenSearch Service also provides Neural search which transforms text into vectors and facilitates vector search both at ingestion time and at search time. During ingestion, neural search transforms document text into vector embeddings and indexes both the text and its vector embeddings in a vector index. Neural search is available for managed clusters running version 2.9 and above.

Solution overview

This solution builds embeddings on a dataset stored in Amazon Simple Storage Service (Amazon S3). We use the Lambda function to invoke the Amazon Titan model on the payload delivered by OpenSearch Ingestion.

Prerequisites

You should have an appropriate role with permissions to invoke your Lambda function and Amazon Bedrock model and also write to the OpenSearch Serverless collection.

To provide access to the collection, you must configure an AWS Identity and Access Management (IAM) pipeline role with a permissions policy that grants access to the collection. For more details, see Granting Amazon OpenSearch Ingestion pipelines access to collections. The following is example code:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "allowinvokeFunction",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
                
            ],
            "Resource": "arn:aws:lambda:{{region}}:{{account-id}}:function:{{function-name}}"
            
        }
    ]
}

The role must have the following trust relationship, which allows OpenSearch Ingestion to assume it:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "osis-pipelines.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Create an ingestion pipeline

You can create a pipeline using a blueprint. For this post, we select the AWS Lambda custom enrichment blueprint.

We use the IMDB title basics dataset, which that contains movie information, including originalTitle, runtimeMinutes, and genres.

The OpenSearch Ingestion pipeline uses a Lambda processor to create embeddings for the field original_title and store the embeddings as original_title_embeddings along with other data.

See the following pipeline code:

version: "2"
s3-log-pipeline:
  source:
    s3:
      acknowledgments: true
      compression: "none"
      codec:
        csv:
      aws:
        # Provide the region to use for aws credentials
        region: "us-west-2"
        # Provide the role to assume for requests to SQS and S3
        sts_role_arn: "<<arn:aws:iam::123456789012:role/ Example-Role>>"
      scan:
        buckets:
          - bucket:
              name: "lambdaprocessorblog"
      
  processor:
     - aws_lambda:
        function_name: "generate_embeddings_bedrock"
        response_events_match: true
        tags_on_failure: ["lambda_failure"]
        batch:
          key_name: "documents"
          threshold:
            event_count: 4
        aws:
          region: us-west-2
          sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
  sink:
    - opensearch:
        hosts:
          - 'https://myserverlesscollection.us-region.aoss.amazonaws.com'
        index: imdb-data-embeddings
        aws:
          sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
          region: us-west-2
          serverless : true

Let’s take a closer look at the Lambda processor in the ingestion pipeline .Pay attention to the key_name, parameter. You can choose any value for key_name and your Lambda function will need to reference this key in your Lambda function when processing the payload from OpenSearch Ingestion. The payload size is determined by the batch setting. When batching is enabled in the Lambda processor, OpenSearch Ingestion groups multiple events into a single payload before invoking the Lambda function. A batch is sent to Lambda when any of the following thresholds are met:

    • event_count – The number of events reaches the specified limit
    • maximum_size – The total size of the batch reaches the specified size (for example, 5 MB) and is configurable up to 6MB (Invocation payload limit for AWS Lambda)

Lambda function

The Lambda function receives the data from OpenSearch Ingestion, invokes Amazon Bedrock to generate the embedding, and adds it to the source record. “documents” is used to reference the events coming in from OpenSearch Ingestion and matches the key_name declared in the pipeline. We add the embedding from Amazon Bedrock back to the original record. This new record with the appended embedding value is then sent to the OpenSearch Serverless sink by OpenSearch Ingestion. See the following code:

import json
import boto3
import os

# Initialize Bedrock client
bedrock = boto3.client('bedrock-runtime')

def generate_embedding(text):
    """Generate embedding for the given text using Bedrock."""
    response = bedrock.invoke_model(
        modelId="amazon.titan-embed-text-v1",
        contentType="application/json",
        accept="application/json",
        body=json.dumps({"inputText": text})
    )
    embedding = json.loads(response['body'].read())['embedding']
    return embedding

def lambda_handler(event, context):
    # Assuming the input is a list of JSON documents
    documents = event['documents']
    
    processed_documents = []
    
    for doc in documents:
        if originalTitle' in doc:
            # Generate embedding for the 'originalTitle' field
            embedding = generate_embedding(doc[originalTitle'])
            
            # Add the embedding to the document
            doc['originalTitle_embeddings'] = embedding
        
        processed_documents.append(doc)
    
    # Return the processed documents
    return  processed_documents

In case of any exceptions while using the lambda processor, all the documents in the batch are considered failed events and are forwarded the next chain of processors if any or to the sink with a failed tag. The tag can be configured to the pipeline with the tags_on_failure parameter and the errors are also sent to CloudWatch logs for further action.

After the pipeline runs, you can see that the embeddings were created and stored as originalTitle_embeddings within the document in a k-NN index, imdb-data-embeddings. The following screenshot shows an example.

Summary

In this post, we showed how you can use Lambda as part of your OpenSearch Ingestion pipeline to enable complex transformation and enrichment of your data. For more details on the feature, refer to Using an OpenSearch Ingestion pipeline with AWS Lambda.


About the Authors

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

Sam Selvan is a Principal Specialist Solution Architect with Amazon OpenSearch Service.

Srikanth Govindarajan is a Software Development Engineer at Amazon Opensearch Service. Srikanth is passionate about architecting infrastructure and building scalable solutions for search, analytics, security, AI and machine learning based usecases.

Automate topic provisioning and configuration using Terraform with Amazon MSK

Post Syndicated from Vijay Kardile original https://aws.amazon.com/blogs/big-data/automate-topic-provisioning-and-configuration-using-terraform-with-amazon-msk/

As organizations deploy Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters across multiple use cases, the manual management of topic configurations can be challenging. This can lead to several issues:

  • Inefficiency – Manual configuration is time-consuming and error-prone, especially for large deployments. Maintaining consistency across multiple configurations can be difficult. To avoid this, Kafka administrators often set the create.topics.enable property on brokers, which leads to cluster operation inefficiency.
  • Human error – Manual configuration increases the risk of mistakes that can disrupt data flow and impact applications relying on Amazon MSK.
  • Scalability challenges – Scaling an Amazon MSK environment with manual configuration is cumbersome. Adding new topics or modifying existing ones requires manual intervention, hindering agility.

These challenges highlight the need for a more automated and robust approach to MSK topic configuration management.

In this post, we address this problem by using Terraform to optimize the configuration of MSK topics. This solution supports both provisioned and serverless MSK clusters.

Solution overview

Customers want a better way to manage the overhead of topics and their configurations. Manually handling topic configurations can be cumbersome and error-prone, making it difficult to keep track of changes and updates.

To address these challenges, you can use Terraform, an infrastructure as code (IaC) tool by HashiCorp. Terraform allows you to manage and provision infrastructure declaratively. It uses human-readable configuration files written in HashiCorp Configuration Language (HCL) to define the desired state of infrastructure resources. These resources can span virtual machines, networks, databases, and a vast array of cloud provider-specific offerings.

Terraform offers a compelling solution to the challenges of manual Kafka topic configuration. Terraform allows you to define and manage your Kafka topics through code. This approach provides several key benefits:

  • Automation – Terraform automates the creation, modification, and deletion of MSK topics.
  • Consistency and repeatability – Terraform configurations provide consistent topic structures and settings across your entire Amazon MSK environment. This simplifies management and reduces the likelihood of configuration drift.
  • Scalability – Terraform enables you to provision and manage large numbers of MSK topics, facilitating the growth of your Amazon MSK environment.
  • Version control – Terraform configurations are stored in version control systems, allowing you to track changes, roll back if needed, and collaborate effectively on your Amazon MSK infrastructure.

By using Terraform for MSK topic configuration management, you can streamline your operations, minimize errors, and have a robust and scalable Amazon MSK environment.

In this post, we provide a comprehensive guide for using Terraform to manage Amazon MSK configurations. We explore the process of installing Terraform on Amazon Elastic Compute Cloud (Amazon EC2), defining and decentralizing topic configurations, and deploying and updating configurations in an automated manner.

Prerequisites

Before proceeding with the solution, make sure you have the following resources and access:

By making sure you have these prerequisites in place, you will be ready to streamline your topic configurations with Terraform.

Install Terraform on your client machine

When your cluster and client machine are ready, SSH to your client machine (Amazon EC2) and install Terraform.

  1. Run the following commands to install Terraform:
    sudo yum update -y
    sudo yum install -y yum-utils shadow-utils
    sudo yum-config-manager --add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo
    sudo yum -y install terraform

  2. Run the following command to check the installation:
    terraform -v
    

This indicates that Terraform installation is successful and you are ready to automate your MSK topic configuration.

Provision an MSK topic using Terraform

To provision the MSK topic, complete the following steps:

  1. Create a new file called main.tf and copy the following code into this file, replacing the BOOTSTRAP_SERVERS and AWS_REGION information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for IAM authentication from your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster. This script is common for Amazon MSK provisioned and MSK Serverless.
    terraform {
    required_providers {
    kafka = {
    source = "Mongey/kafka" }}}
    provider "kafka" {
    bootstrap_servers = [{BOOTSTRAP_SERVERS}]
    tls_enabled       = true
    sasl_mechanism    = "aws-iam"
    sasl_aws_region   ={AWS_REGION}
    sasl_aws_profile  = "dev" }
    resource "kafka_topic" "sampleTopic" {
    name               = "sampleTopic"
    replication_factor = 1
    partitions         = 50 }

  2. Add IAM bootstrap servers endpoints in a comma separated list format:
    BOOTSTRAP_SERVERS = ["b-2.mskcluster…. ","b-3.mskcluster…. ","b-1.mskcluster…. "]

  3. Run the command terraform init to initialize Terraform and download the required providers.

The terraform init command initializes a working directory containing Terraform configuration files(main.tf). This is the first command that should be run after writing a new Terraform configuration.

  1. Run the command terraform plan to review the run plan.

This command shows the changes that Terraform will make to the infrastructure based on the provided configuration. This step is optional but is often used as a preview of the changes Terraform will make.

  1. If the plan looks correct, run the command terraform apply to apply the configuration.
  2. When prompted for confirmation before proceeding, enter yes.

The terraform apply command runs the actions proposed in a Terraform plan. Terraform will create the sampleTopic topic in your MSK cluster.

  1. After the terraform apply command is complete, verify the infrastructure has been created with the help of the kafka-topics.sh utility:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…..amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --list

You can use the kafka-toipcs.sh tool with the --list option to retrieve a list of topics associated with your MSK cluster. For more information, refer to the createtopic documentation.

Update the MSK topic configuration using Terraform

To update the MSK topic configuration, let’s assume we want to change the number of partitions from 50 to 10 on our topic. We need to perform the following steps:

  1. Verify the number of partitions on the topic using the --describe command:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…...amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --describe 
    --topic sampleTopic

This command will show 50 partitions on the sampleTopic topic.

  1. Modify the Terraform file main.tf and change the value of the partitions parameter to 10:
    resource "kafka_topic" "sampleTopic" {
    name               = " sampleTopic "
    replication_factor = 1
    partitions         = 10 }

  2. Run the command terraform plan to review the run plan.

  1. If the plan shows the changes, run the command terraform apply to apply the configuration.
  2. When prompted for confirmation before proceeding, enter yes.

Terraform will drop and recreate the sampleTopic topic with the changed configuration.

  1. Verify the changed number of partitions on the topic, ad rerun the --describe command:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…...amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --describe --topic sampleTopic

Now, this command will show 10 partitions on the sampleTopic topic.

Delete the MSK topic using Terraform

When you no longer need the infrastructure, you can remove all resources created by your Terraform file.

  1. Run the command terraform destroy to remove the topic.
  2. When prompted for confirmation before proceeding, enter yes.

Terraform will delete the sampleTopic topic from your MSK cluster.

  1. To verify, rerun the --list command:
    kafka/bin/kafka-topics.sh 
    --bootstrap-server "b-1…..amazonaws.com:9098" 
    --command-config ./kafka/bin/client.properties  
    --list

Now, this command will not show the sampleTopic topic.

Conclusion

In this post, we addressed the common challenges associated with manual MSK topic configuration management and presented a robust Terraform-based solution. Using Terraform for automated topic provisioning and configuration streamlines your processes, fosters scalability, and enhances flexibility. Additionally, it facilitates automated deployments and centralized management.

We encourage you to explore Terraform as a means to optimize Amazon MSK configurations and unlock further efficiencies within your streaming data pipelines.


About the author

Vijay Kardile is a Sr. Technical Account Manager with Enterprise Support, India. With over two decades of experience in IT Consulting and Engineering, he specializes in Analytics services, particularly Amazon EMR and Amazon MSK. He has empowered numerous enterprise clients by facilitating their adoption of various AWS services and offering expert guidance on attaining operational excellence.

Batch data ingestion into Amazon OpenSearch Service using AWS Glue

Post Syndicated from Ravikiran Rao original https://aws.amazon.com/blogs/big-data/batch-data-ingestion-into-amazon-opensearch-service-using-aws-glue/

Organizations constantly work to process and analyze vast volumes of data to derive actionable insights. Effective data ingestion and search capabilities have become essential for use cases like log analytics, application search, and enterprise search. These use cases demand a robust pipeline that can handle high data volumes and enable efficient data exploration.

Apache Spark, an open source powerhouse for large-scale data processing, is widely recognized for its speed, scalability, and ease of use. Its ability to process and transform massive datasets has made it an indispensable tool in modern data engineering. Amazon OpenSearch Service—a community-driven search and analytics solution—empowers organizations to search, aggregate, visualize, and analyze data seamlessly. Together, Spark and OpenSearch Service offer a compelling solution for building powerful data pipelines. However, ingesting data from Spark into OpenSearch Service can present challenges, especially with diverse data sources.

This post showcases how to use Spark on AWS Glue to seamlessly ingest data into OpenSearch Service. We cover batch ingestion methods, share practical examples, and discuss best practices to help you build optimized and scalable data pipelines on AWS.

Overview of solution

AWS Glue is a serverless data integration service that simplifies data preparation and integration tasks for analytics, machine learning, and application development. In this post, we focus on batch data ingestion into OpenSearch Service using Spark on AWS Glue.

AWS Glue offers multiple integration options with OpenSearch Service using various open source and AWS managed libraries, including:

In the following sections, we explore each integration method in detail, guiding you through the setup and implementation. As we progress, we incrementally build the architecture diagram shown in the following figure, providing a clear path for creating robust data pipelines on AWS. Each implementation is independent of the others. We chose to showcase them separately, because in a real-world scenario, only one of the three integration methods is likely to be used.

Image showing the high level architecture diagram

You can find the code base in the accompanying GitHub repo. In the following sections, we walk through the steps to implement the solution.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Clone the repository to your local machine

Clone the repository to your local machine and set the BLOG_DIR environment variable. All the relative paths assume BLOG_DIR is set to the repository location in your machine. If BLOG_DIR is not being used, adjust the path accordingly.

git clone [email protected]:aws-samples/opensearch-glue-integration-patterns.git
cd opensearch-glue-integration-patterns
export BLOG_DIR=$(pwd)

Deploy the AWS CloudFormation template to create the necessary infrastructure

The main focus of this post is to demonstrate how to use the mentioned libraries in Spark on AWS Glue to ingest data into OpenSearch Service. Though we center on this core topic, several key AWS components will need to be pre-provisioned for the integration examples, such as a Amazon Virtual Private Cloud (Amazon VPC), multiple Subnets, an AWS Key Management Service (AWS KMS) key, an Amazon Simple Storage Service (Amazon S3) bucket, an AWS Glue role, and an OpenSearch Service cluster with domains for OpenSearch Service and Elasticsearch. To simplify the setup, we’ve automated the provisioning of this core infrastructure using the cloudformation/opensearch-glue-infrastructure.yaml AWS CloudFormation template.

  1. Run the following commands

The CloudFormation template will deploy the necessary networking components (such as VPC and subnets), Amazon CloudWatch logging, AWS Glue role, and OpenSearch Service and Elasticsearch domains required to implement the proposed architecture. Use a strong password (8–128 characters, three of which are lowercase, uppercase, numbers, or special characters, and no /, “, or spaces) and adhere to your organization’s security standards for ESMasterUserPassword and OSMasterUserPassword in the following command:

cd ${BLOG_DIR}/cloudformation/
aws cloudformation deploy \
--template-file ${BLOG_DIR}/cloudformation/opensearch-glue-infrastructure.yaml \
--stack-name GlueOpenSearchStack \
--capabilities CAPABILITY_NAMED_IAM \
--region <AWS_REGION> \
--parameter-overrides \
ESMasterUserPassword=<ES_MASTER_USER_PASSWORD> \
OSMasterUserPassword=<OS_MASTER_USER_PASSWORD>

You should see a success message such as "Successfully created/updated stack – GlueOpenSearchStack" after the resources have been provisioned successfully. Provisioning this CloudFormation stack typically takes approximately 30 minutes to complete.

  1. On the AWS CloudFormation console, locate the GlueOpenSearchStack stack, and confirm that its status is CREATE_COMPLETE.

Image showing the "CREATE_COMPLETE" status of cloudformation template

You can review the deployed resources on the Resources tab, as shown in the following screenshot.The screenshot does not display all the created resources.

Image showing the "Resources" tab of cloudformation template

Additional setup steps

In this section, we collect essential information, including the S3 bucket name and the OpenSearch Service and Elasticsearch domain endpoints. These details are required for executing the code in subsequent sections.

Capture the details of the provisioned resources

Use the following AWS CLI command to extract and save the output values from the CloudFormation stack to a file named GlueOpenSearchStack_outputs.txt. We refer to the values in this file in upcoming steps.

aws cloudformation describe-stacks \
--stack-name GlueOpenSearchStack \
--query 'sort_by(Stacks[0].Outputs[], &OutputKey)[].{Key:OutputKey,Value:OutputValue}' \
--output table \
--no-cli-pager \
--region <AWS_REGION> > ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Download NY Green Taxi December 2022 dataset and copy to S3 bucket

The purpose of this post is to demonstrate the technical implementation of ingesting data into OpenSearch Service using AWS Glue. Understanding the dataset itself is not essential, aside from its data format, which we discuss in AWS Glue notebooks in later sections. To learn more about the dataset, you can find additional information on the NYC Taxi and Limousine Commission website.

We specifically request that you download the December 2022 dataset, because we have tested the solution using this particular dataset:

S3_BUCKET_NAME=$(awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt)
mkdir -p ${BLOG_DIR}/datasets && cd ${BLOG_DIR}/datasets
curl -O https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-12.parquet
aws s3 cp green_tripdata_2022-12.parquet s3://${S3_BUCKET_NAME}/datasets/green_tripdata_2022-12.parquet

Download the required JARs from the Maven repository and copy to S3 bucket

We’ve specified a particular JAR file version to ensure stable deployment experience. However, we recommend adhering to your organization’s security best practices and reviewing any known vulnerabilities in the version of the JAR files before deployment. AWS does not guarantee the security of any open-source code used here. Additionally, please verify the downloaded JAR file’s checksum against the published value to confirm its integrity and authenticity.

mkdir -p ${BLOG_DIR}/jars && cd ${BLOG_DIR}/jars
# OpenSearch Service jar
curl -O https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/1.0.1/opensearch-spark-30_2.12-1.0.1.jar
aws s3 cp opensearch-spark-30_2.12-1.0.1.jar s3://${S3_BUCKET_NAME}/jars/opensearch-spark-30_2.12-1.0.1.jar
# Elasticsearch jar
curl -O https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/7.17.23/elasticsearch-spark-30_2.12-7.17.23.jar
aws s3 cp elasticsearch-spark-30_2.12-7.17.23.jar s3://${S3_BUCKET_NAME}/jars/elasticsearch-spark-30_2.12-7.17.23.jar

In the following sections, we implement the individual data ingestion methods as outlined in the architecture diagram.

Ingest data into OpenSearch Service using the OpenSearch Spark library

In this section, we load an OpenSearch Service index using Spark and the OpenSearch Spark library. We demonstrate this implementation by using AWS Glue notebooks, employing basic authentication using user name and password.

To demonstrate the ingestion mechanisms, we have provided the Spark-and-OpenSearch-Code-Steps.ipynb notebook with detailed instructions. Follow the steps in this section in conjunction with the instructions in the notebook.

Set up the AWS Glue Studio notebook

Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.

Image showing AWS console page for AWS Glue to open notebook

  1. Upload the notebook file located at ${BLOG_DIR}/glue_jobs/Spark-and-OpenSearch-Code-Steps.ipynb.
  2. For IAM role, choose the AWS Glue job IAM role that begins with GlueOpenSearchStack-GlueRole-*.

Image showing AWS console page for AWS Glue to open notebook

  1. Enter a name for the notebook (for example, Spark-and-OpenSearch-Code-Steps) and choose Save.

Image showing AWS Glue OpenSearch Notebook

Replace the placeholder values in the notebook

Complete the following steps to update the placeholders in the notebook:

  1. In Step 1 in the notebook, replace the placeholder <GLUE-INTERACTIVE-SESSION-CONNECTION-NAME> with the AWS Glue interactive session connection name. You can get the name of the interactive session by executing the following command:
cd ${BLOG_DIR}
awk -F '|' '$2 ~ /GlueInteractiveSessionConnectionName/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 1 in the notebook, replace the placeholder <S3-BUCKET-NAME> and populate the variable s3_bucket with the bucket name. You can get the name of the S3 bucket by executing the following command:
awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 4 in the notebook, replace <OPEN-SEARCH-DOMAIN-WITHOUT-HTTPS> with the OpenSearch Service domain name. You can get the domain name by executing the following command:
awk -F '|' '$2 ~ /OpenSearchDomainEndpoint/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Run the notebook

Run each cell of the notebook to load data into the OpenSearch Service domain and read it back to verify the successful load. Refer to the detailed instructions within the notebook for execution-specific guidance.

Spark write modes (append vs. overwrite)

It is recommended to write data incrementally into OpenSearch Service indexes using the append mode, as demonstrated in Step 8 in the notebook. However, in certain cases, you may need to refresh the entire dataset in the OpenSearch Service index. In these scenarios, you can use the overwrite mode, though it is not advised for large indexes. When using overwrite mode, the Spark library deletes rows from the OpenSearch Service index one by one and then rewrites the data, which can be inefficient for large datasets. To avoid this, you can implement a preprocessing step in Spark to identify insertions and updates, and then write the data into OpenSearch Service using append mode.

Ingest data into Elasticsearch using the Elasticsearch Hadoop library

In this section, we load an Elasticsearch index using Spark and the Elasticsearch Hadoop Library. We demonstrate this implementation by using AWS Glue as the engine for Spark.

Set up the AWS Glue Studio notebook

Complete the following steps to set up the notebook:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.

Image showing AWS console page for AWS Glue to open notebook

  1. Upload the notebook file located at ${BLOG_DIR}/glue_jobs/Spark-and-Elasticsearch-Code-Steps.ipynb.
  2. For IAM role, choose the AWS Glue job IAM role that begins with GlueOpenSearchStack-GlueRole-*.

Image showing AWS console page for AWS Glue to open notebook

  1. Enter a name for the notebook (for example, Spark-and-ElasticSearch-Code-Steps) and choose Save.

Image showing AWS Glue Elasticsearch Notebook

Replace the placeholder values in the notebook

Complete the following steps:

  1. In Step 1 in the notebook, replace the placeholder <GLUE-INTERACTIVE-SESSION-CONNECTION-NAME> with the AWS Glue interactive session connection name. You can get the name of the interactive session by executing the following command:
awk -F '|' '$2 ~ /GlueInteractiveSessionConnectionName/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 1 in the notebook, replace the placeholder <S3-BUCKET-NAME> and populate the variable s3_bucket with the bucket name. You can get the name of the S3 bucket by executing the following command:
awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 4 in the notebook, replace <ELASTIC-SEARCH-DOMAIN-WITHOUT-HTTPS> with the Elasticsearch domain name. You can get the domain name by executing the following command:
awk -F '|' '$2 ~ /ElasticsearchDomainEndpoint/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Run the notebook

Run each cell in the notebook to load data to the Elasticsearch domain and read it back to verify the successful load. Refer to the detailed instructions within the notebook for execution-specific guidance.

Ingest data into OpenSearch Service using the AWS Glue OpenSearch Service connection

In this section, we load an OpenSearch Service index using Spark and the AWS Glue OpenSearch Service connection.

Create the AWS Glue job

Complete the following steps to create an AWS Glue Visual ETL job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Visual ETL

This will open the AWS Glue job visual editor.Image showing AWS console page for AWS Glue to open Visual ETL

  1. Choose the plus sign, and under Sources, choose Amazon S3.

Image showing AWS console page for AWS Glue Visual Editor

  1. In the visual editor, choose the Data Source – S3 bucket node.
  2. In the Data source properties – S3 pane, configure the data source as follows:
    • For S3 source type, select S3 location.
    • For S3 URL, choose Browse S3, and choose the green_tripdata_2022-12.parquet file from the designated S3 bucket.
    • For Data format, choose Parquet.
  1. Choose Infer schema to let AWS Glue detect the schema of the data.

This will set up your data source from the specified S3 bucket.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the plus sign again to add a new node.
  2. For Transforms, choose Drop Fields to include this transformation step.

This will allow you to remove any unnecessary fields from your dataset before loading it into OpenSearch Service.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Drop Fields transform node, then select the following fields to drop from the dataset:
    • payment_type
    • trip_type
    • congestion_surcharge

This will remove these fields from the data before it is loaded into OpenSearch Service.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the plus sign again to add a new node.
  2. For Targets, choose Amazon OpenSearch Service.

This will configure OpenSearch Service as the destination for the data being processed.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Data target – Amazon OpenSearch Service node and configure it as follows:
    • For Amazon OpenSearch Service connection, choose the connection GlueOpenSearchServiceConnec-* from the drop down.
    • For Index, enter green_taxi. The green_taxi index was created earlier in the “Ingest data into OpenSearch Service using the OpenSearch Spark library” section.

This configures the OpenSearch Service to write the processed data to the specified index.

Image showing AWS console page for AWS Glue Visual Editor

  1. On the Job details tab, update the job details as follows:
    • For Name, enter a name (for example, Spark-and-Glue-OpenSearch-Connection).
    • For Description, enter an optional description (for example, AWS Glue job using Glue OpenSearch Connection to load data into Amazon OpenSearch Service).
    • For IAM role, choose the role starting with GlueOpenSearchStack-GlueRole-*.
    • For the Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3
    • Leave the rest of the fields as default.
    • Choose Save to save the changes.

Image showing AWS console page for AWS Glue Visual Editor

  1. To run the AWS Glue job Spark-and-Glue-OpenSearch-Connector, choose Run.

This will initiate the job execution.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Runs tab and wait for the AWS Glue job to complete successfully.

You will see the status change to Succeeded when the job is complete.

Image showing AWS console page for AWS Glue job run status

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack:
aws cloudformation delete-stack \
--stack-name GlueOpenSearchStack \
--region <AWS_REGION>
  1. Delete the AWS Glue jobs:
    • On the AWS Glue console, under ETL jobs in the navigation pane, choose Visual ETL.
    • Select the jobs you created (Spark-and-Glue-OpenSearch-Connector, Spark-and-ElasticSearch-Code-Steps, and Spark-and-OpenSearch-Code-Steps) and on the Actions menu, choose Delete.

Conclusion

In this post, we explored several ways to ingest data into OpenSearch Service using Spark on AWS Glue. We demonstrated the use of three key libraries: the AWS Glue OpenSearch Service connection, the OpenSearch Spark Library, and the Elasticsearch Hadoop Library. The methods outlined in this post can help you streamline your data ingestion into OpenSearch Service.

If you’re interested in learning more and getting hands-on experience, we’ve created a workshop that walks you through the entire process in detail. You can explore the full setup for ingesting data into OpenSearch Service, handling both batch and real-time streams, and building dashboards. Check out the workshop Unified Real-Time Data Processing and Analytics Using Amazon OpenSearch and Apache Spark to deepen your understanding and apply these techniques step by step.


About the Authors

Ravikiran Rao is a Data Architect at Amazon Web Services and is passionate about solving complex data challenges for various customers. Outside of work, he is a theater enthusiast and amateur tennis player.

Vishwa Gupta is a Senior Data Architect with the AWS Professional Services Analytics Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and trying new food.

Suvojit Dasgupta is a Principal Data Architect at Amazon Web Services. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.This post showcases how to use Spark on AWS Glue to seamlessly ingest data into OpenSearch Service. We cover batch ingestion methods, share practical examples, and discuss best practices to help you build optimized and scalable data pipelines on AWS.

Build a high-performance quant research platform with Apache Iceberg

Post Syndicated from Guy Bachar original https://aws.amazon.com/blogs/big-data/build-a-high-performance-quant-research-platform-with-apache-iceberg/

In our previous post Backtesting index rebalancing arbitrage with Amazon EMR and Apache Iceberg, we showed how to use Apache Iceberg in the context of strategy backtesting. In this post, we focus on data management implementation options such as accessing data directly in Amazon Simple Storage Service (Amazon S3), using popular data formats like Parquet, or using open table formats like Iceberg. Our experiments are based on real-world historical full order book data, provided by our partner CryptoStruct, and compare the trade-offs between these choices, focusing on performance, cost, and quant developer productivity.

Data management is the foundation of quantitative research. Quant researchers spend approximately 80% of their time on necessary but not impactful data management tasks such as data ingestion, validation, correction, and reformatting. Traditional data management choices include relational, SQL, NoSQL, and specialized time series databases. In recent years, advances in parallel computing in the cloud have made object stores like Amazon S3 and columnar file formats like Parquet a preferred choice.

This post explores how Iceberg can enhance quant research platforms by improving query performance, reducing costs, and increasing productivity, ultimately enabling faster and more efficient strategy development in quantitative finance. Our analysis shows that Iceberg can accelerate query performance by up to 52%, reduce operational costs, and significantly improve data management at scale.

Having chosen Amazon S3 as our storage layer, a key decision is whether to access Parquet files directly or use an open table format like Iceberg. Iceberg offers distinct advantages through its metadata layer over Parquet, such as improved data management, performance optimization, and integration with various query engines.

In this post, we use the term vanilla Parquet to refer to Parquet files stored directly in Amazon S3 and accessed through standard query engines like Apache Spark, without the additional features provided by table formats such as Iceberg.

Quant developer and researcher productivity

In this section, we focus on the productivity features offered by Iceberg and how it compares to directly reading files in Amazon S3. As mentioned earlier, 80% of quantitative research work is attributed to data management tasks. Business impact heavily relies on quality data (“garbage in, garbage out”). Quants and platform teams have to ingest data from multiple sources with different velocities and update frequencies, and then validate and correct the data. These activities translate into the ability to run append, insert, update, and delete operations. For simple append operations, both Parquet on Amazon S3 and Iceberg offer similar convenience and productivity. However, real-world data is never perfect and needs to be corrected. Gaps filling (inserts), error corrections and restatements (updates), and removing duplicates (deletes) are the most obvious examples. When writing data in the Parquet format directly to Amazon S3 without using an open table format like Iceberg, you have to write code to identify the affected partition, correct errors, and rewrite the partition. Moreover, if the write job fails or a downstream read job occurs during this write operation, all downstream jobs have the possibility of reading inconsistent data. However, Iceberg has built-in insert, update, and delete features with ACID (Atomicity, Consistency, Isolation, Durability) properties, and the framework itself manages the Amazon S3 mechanics on your behalf.

Guarding against lookahead bias is an essential capability of any quant research platform—what backtests as a profitable trading strategy can render itself useless and unprofitable in real time. Iceberg provides time travel and snapshotting capabilities out of the box to manage lookahead bias that could be embedded in the data (such as delayed data delivery).

Simplified data corrections and updates

Iceberg enhances data management for quants in capital markets through its robust insert, delete, and update capabilities. These features allow efficient data corrections, gap-filling in time series, and historical data updates without disrupting ongoing analyses or compromising data integrity.

Unlike direct Amazon S3 access, Iceberg supports these operations on petabyte-scale data lakes without requiring complex custom code. This simplifies data modification processes, which is crucial for ingesting and updating large volumes of market and trade data, quickly iterating on backtesting and reprocessing workflows, and maintaining detailed audit trails for risk and compliance requirements.

Iceberg’s table format separates data files from metadata files, enabling efficient data modifications without full dataset rewrites. This approach also reduces expensive ListObjects API calls typically needed when directly accessing Parquet files in Amazon S3.

Additionally, Iceberg offers merge on read (MoR) and copy on write (CoW) approaches, providing flexibility for different quant research needs. MoR enables faster writes, suitable for frequently updated datasets, and CoW provides faster reads, beneficial for read-heavy workflows like backtesting.

For example, when a new data source or attribute is added, quant researchers can seamlessly incorporate it into their Iceberg tables and then reprocess historical data, confident they’re using correct, time-appropriate information. This capability is particularly valuable in maintaining the integrity of backtests and the reliability of trading strategies.

In scenarios involving large-scale data corrections or updates, such as adjusting for stock splits or dividend payments across historical data, Iceberg’s efficient update mechanisms significantly reduce processing time and resource usage compared to traditional methods.

These features collectively improve productivity and data management efficiency in quant research environments, allowing researchers to focus more on strategy development and less on data handling complexities.

Historical data access for backtesting and validation

Iceberg’s time travel feature can enable quant developers and researchers to access and analyze historical snapshots of their data. This capability can be useful while performing tasks like backtesting, model validation, and understanding data lineage.

Iceberg simplifies time travel workflows on Amazon S3 by introducing a metadata layer that tracks the history of changes made to the table. You can refer to this metadata layer to create a mental model of how Iceberg’s time travel capability works.

Iceberg’s time travel capability is driven by a concept called snapshots, which are recorded in metadata files. These metadata files act as a central repository that stores table metadata, including the history of snapshots. Additionally, Iceberg uses manifest files to provide a representation of data files, their partitions, and any associated deleted files. These manifest files are referenced in the metadata snapshots, allowing Iceberg to identify the relevant data for a specific point in time.

When a user requests a time travel query, the typical workflow involves querying a specific snapshot. Iceberg uses the snapshot identifier to locate the corresponding metadata snapshot in the metadata files. The time travel capability is invaluable to quants, enabling them to backtest and validate strategies against historical data, reproduce and debug issues, perform what-if analysis, comply with regulations by maintaining audit trails and reproducing past states, and roll back and recover from data corruption or errors. Quants can also gain deeper insights into current market trends and correlate them with historical patterns. Also, the time travel feature can further mitigate any risks of lookahead bias. Researchers can access the exact data snapshots that were present in the past, and then run their models and strategies against this historical data, without the risk of inadvertently incorporating future information.

Seamless integration with familiar tools

Iceberg provides a variety of interfaces that enable seamless integration with the open source tools and AWS services that quant developers and researchers are familiar with.

Iceberg provides a comprehensive SQL interface that allows quant teams to interact with their data using familiar SQL syntax. This SQL interface is compatible with popular query engines and data processing frameworks, such as Spark, Trino, Amazon Athena, and Hive. Quant developers and researchers can use their existing SQL knowledge and tools to query, filter, aggregate, and analyze their data stored in Iceberg tables.

In addition to the primary interface of SQL, Iceberg also provides the DataFrame API, which allows quant teams to programmatically interact with their data with popular distributed data processing frameworks like Spark and Flink as well as thin clients like PyIceberg. Quants can further use this API to build more programmatic approaches to access and manipulate data, allowing for the implementation of custom logic and integration of Iceberg with other AWS ecosystems like Amazon EMR.

Although accessing data from Amazon S3 is a viable option, Iceberg provides several advantages like metadata management, performance optimization using partition pruning, data manipulation, and a rich AWS ecosystem integration including services like Athena and Amazon EMR with more seamless and feature-rich data processing experience.

Undifferentiated heavy lifting

Data partitioning is one of major contributing factors to optimizing aggregate throughput to and from Amazon S3, contributing to overall High Performance Computing (HPC) environment price-performance.

Quant researchers often face performance bottlenecks and complex data management challenges when dealing with large-scale datasets in Amazon S3. As discussed in Best practices design patterns: optimizing Amazon S3 performance, single prefix performance is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per partitioned Amazon S3 prefix. Iceberg’s metadata layer and intelligent partitioning strategies automatically optimize data access patterns, reducing the likelihood of I/O throttling and minimizing the need for manual performance tuning. This automation allows quant teams to focus on developing and refining trading strategies rather than troubleshooting data access issues or optimizing storage layouts.

In this section, we discuss situations we discovered while running our experiments at scale and solutions provided by Iceberg vs. vanilla Parquet when accessing data in Amazon S3.

As we mentioned in the introduction, the nature of quant research is “fail fast”—new ideas have to be quickly evaluated and then either prioritized for a deep dive or dismissed. This makes it impossible to come up with universal partitioning that works all the time and for all research styles.

When accessing data directly as Parquet files in Amazon S3, without using an open table format like Iceberg, partitioning and throttling issues can arise. Partitioning in this case is determined by the physical layout of files in Amazon S3, and a mismatch between the intended partitioning and the actual file layout can lead to I/O throttling exceptions. Additionally, listing directories in Amazon S3 can also result in throttling exceptions due to the high number of API calls required.

In contrast, Iceberg provides a metadata layer that abstracts away the physical file layout in Amazon S3. Partitioning is defined at the table level, and Iceberg handles the mapping between logical partitions and the underlying file structure. This abstraction helps mitigate partitioning issues and reduces the likelihood of I/O throttling exceptions. Furthermore, Iceberg’s metadata caching mechanism minimizes the number of List API calls required, addressing the directory listing throttling issue.

Although both approaches involve direct access to Amazon S3, Iceberg is an open table format that introduces a metadata layer, providing better partitioning management and reducing the risk of throttling exceptions. It doesn’t act as a database itself, but rather as a data format and processing engine on top of the underlying storage (in this case, Amazon S3).

One of the most effective techniques to address Amazon S3 API quota limits is salting (random hash prefixes)—a method that adds random partition IDs to Amazon S3 paths. This increases the probability of prefixes residing on different physical partitions, helping distribute API requests more evenly. Iceberg supports this functionality out of the box for both data ingestion and reading.

Implementing salting directly in Amazon S3 requires complex custom code to create and use partitioning schemes with random keys in the naming hierarchy. This approach necessitates a custom data catalog and metadata system to map physical paths to logical paths, allowing direct partition access without relying on Amazon S3 List API calls. Without such a system, applications risk exceeding Amazon S3 API quotas when accessing specific partitions.

At petabyte scale, Iceberg’s advantages become clear. It efficiently manages data through the following features:

  • Directory caching
  • Configurable partitioning strategies (range, bucket)
  • Data management functionality (compaction)
  • Catalog, metadata, and statistics use for optimal execution plans

These built-in features eliminate the need for custom solutions to manage Amazon S3 API quotas and data organization at scale, reducing development time and maintenance costs while improving query performance and reliability.

Performance

We highlighted a lot of the functionality of Iceberg that eliminates undifferentiated heavy lifting and improves developer and quant productivity. What about performance?

This section evaluates whether Iceberg’s metadata layer introduces overhead or delivers optimization for quantitative research use cases, comparing it with vanilla Parquet access on Amazon S3. We examine how these approaches impact common quant research queries and workflows.

The key question is whether Iceberg’s metadata layer, designed to optimize vanilla Parquet access on Amazon S3, introduces overhead or delivers the intended optimization for quantitative research use cases. Then we discuss overlapping optimization techniques, such as data distribution and sorting. We also discuss that there is no magic partitioning and all sorting scheme where one size fits all in the context of quant research. Our benchmarks show that Iceberg performs comparably to direct Amazon S3 access, with additional optimizations from its metadata and statistics usage, similar to database indexing.

Vanilla Parquet vs Iceberg: Amazon S3 read performance

We created four different datasets: two using Iceberg and two with direct Amazon S3 Parquet access, each with both sorted and unsorted write distributions. The purpose of this exercise was to compare the performance of direct Amazon S3 Parquet access vs. the Iceberg open table format, taking into account the impact of write distribution patterns when running various queries commonly used in quantitative trading research.

Query 1

We first run a simple count query to get the total number of records in the table. This query helps understand the baseline performance for a straightforward operation. For example, if the table contains tick-level market data for various financial instruments, the count can give an idea of the total number of data points available for analysis.

The following is the code for vanilla Parquet:

count = spark.read.parquet(s3://example-s3-bucket/path/to/data).count()

The following is the code for Iceberg:

count = spark.read.table(table_name).count()
# We used typical count query for the performance comparision however this could have been also done using metadata as shown below which completes in few seconds 
spark.read.format("iceberg").load(f"{table_name}.files").select(sum("record_count")).show(truncate=False)

Query 2

Our second query is a grouping and counting query to find the number of records for each combination of exchange_code and instrument. This query is commonly used in quantitative trading research to analyze market liquidity and trading activity across different instruments and exchanges.

The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .count().show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Query 3

Next, we run a distinct query to retrieve the distinct combinations of year, month, and day from the adapterTimestamp_ts_utc column. In quantitative trading research, this query can be helpful for understanding the time range covered by the dataset. Researchers can use this information to identify periods of interest for their analysis, such as specific market events, economic cycles, or seasonal patterns.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                 f.month("adapterTimestamp_ts_utc").alias("month"),
                 f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
         .distinct() \
         .count() \
         .show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                f.month("adapterTimestamp_ts_utc").alias("month"),
                f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
        .distinct() \
        .count() \
        .show(truncate=False)

Query 4

Lastly, we run a grouping and counting query with a date range filter on the adapterTimestamp_ts_utc column. This query is similar to Query 2 but focuses on a specific time period. You could use this query to analyze market activity or liquidity during specific time periods, such as periods of high volatility, market crashes, or economic events. Researchers can use this information to identify potential trading opportunities or investigate the impact of these events on market dynamics.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                 (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .show(truncate=False)

The following is the code for Iceberg. Because Iceberg has a metadata layer, the row count can be fetched from metadata:

spark.read.table(table_name) \
        .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Test results

To evaluate the performance and cost benefits of using Iceberg for our quant research data lake, we created four different datasets: two with Iceberg tables and two with direct Amazon S3 Parquet access, each using both sorted and unsorted write distributions. We first ran AWS Glue write jobs to create the Iceberg tables and then mirrored the same write processes for the Amazon S3 Parquet datasets. For the unsorted datasets, we partitioned the data by exchange and instrument, and for the sorted datasets, we added a sort key on the time column.

Next, we ran a series of queries commonly used in quantitative trading research, including simple count queries, grouping and counting, distinct value queries, and queries with date range filters. Our benchmarking process involved reading data from Amazon S3, performing various transformations and joins, and writing the processed data back to Amazon S3 as Parquet files.

By comparing runtimes and costs across different data formats and write distributions, we quantified the benefits of Iceberg’s optimized data organization, metadata management, and efficient Amazon S3 data handling. The results showed that Iceberg not only enhanced query performance without introducing significant overhead, but also reduced the likelihood of task failures, reruns, and throttling issues, leading to more stable and predictable job execution, particularly with large datasets stored in Amazon S3.

AWS Glue write jobs

In the following table, we compare the performance and the cost implications of using Iceberg vs. vanilla Parquet access on Amazon S3, taking into account the following use cases:

  • Iceberg table (unsorted) – We created an Iceberg table partitioned by exchange_code and instrument This means that the data was physically partitioned in Amazon S3 based on the unique combinations of exchange_code and instrument values. Partitioning the data in this way can improve query performance, because Iceberg can prune out partitions that aren’t relevant to a particular query, reducing the amount of data that needs to be scanned. The data was not sorted on any column in this case, which is the default behavior.
  • Vanilla Parquet (unsorted) – For this use case, we wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by exchange_code and instrument columns using standard hash partitioning before writing it out. Repartitioning was necessary to avoid potential throttling issues when reading the data later, because accessing data directly from Amazon S3 without intelligent partitioning can lead to too many requests hitting the same S3 prefix. Like the Iceberg table, the data was not sorted on any column in this case. To make comparison fair, we used the exact repartition count that Iceberg uses.
  • Iceberg table (sorted) – We created another Iceberg table, this time partitioned by exchange_code and instrument Additionally, we sorted the data in this table on the adapterTimestamp_ts_utc column. Sorting the data can improve query performance for certain types of queries, such as those that involve range filters or ordered outputs. Iceberg automatically handles the sorting and partitioning of the data transparently to the user.
  • Vanilla Parquet (sorted) – For this use case, we again wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by range on the exchange_code, instrument, and adapterTimestamp_ts_utc columns before writing it out using standard range partitioning with 1996 partition count, because this was what Iceberg was using based on SparkUI. Repartitioning on the time column (adapterTimestamp_ts_utc) was necessary to achieve a sorted write distribution, because Parquet files are sorted within each partition. This sorted write distribution can improve query performance for certain types of queries, similar to the sorted Iceberg table.
Write Distribution Pattern Iceberg Table (Unsorted) Vanilla Parquet (Unsorted) Iceberg Table (Sorted) Vanilla Parquet
(Sorted)
DPU Hours 899.46639 915.70222 1402 1365
Number of S3 Objects 7444 7288 9283 9283
Size of S3 Parquet Objects 567.7 GB 629.8 GB 525.6 GB 627.1 GB
Runtime 1h 51m 40s 1h 53m 29s 2h 52m 7s 2h 47m 36s

AWS Glue read jobs

For the AWS Glue read jobs, we ran a series of queries commonly used in quantitative trading research, such as simple counts, grouping and counting, distinct value queries, and queries with date range filters. We compared the performance of these queries between the Iceberg tables and the vanilla Parquet files read in Amazon S3. In the following table, you can see two AWS Glue jobs that show the performance and cost implications of access patterns described earlier.

Read Queries / Runtime in Seconds Iceberg Table Vanilla Parquet
COUNT(1) on unsorted 35.76s 74.62s
GROUP BY and ORDER BY on unsorted 34.29s 67.99s
DISTINCT and SELECT on unsorted 51.40s 82.95s
FILTER and GROUP BY and ORDER BY on unsorted 25.84s 49.05s
COUNT(1) on sorted 15.29s 24.25s
GROUP BY and ORDER BY on sorted 15.88s 28.73s
DISTINCT and SELECT on sorted 30.85s 42.06s
FILTER and GROUP BY and ORDER BY on sorted 15.51s 31.51s
AWS Glue DPU hours 45.98 67.97

Test results insights

These test results offered the following insights:

  • Accelerated query performance – Iceberg improved read operations by up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly. In quantitative finance, where speed is crucial, this performance gain allows teams to uncover market insights faster, potentially gaining a competitive edge.
  • Reduced operational costs – For read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage. These efficiency gains translate to cost savings in data-intensive quant operations. With Iceberg, firms can run more comprehensive analyses within the same budget or reallocate resources to other high-value activities, optimizing their research capabilities.
  • Enhanced data management and scalability – Iceberg showed comparable write performance for unsorted data (899.47 DPU hours vs. 915.70 for vanilla Parquet) and maintained consistent object counts across sorted and unsorted scenarios (7,444 and 9,283, respectively). This consistency leads to more reliable and predictable job execution. For quant teams dealing with large-scale datasets, this reduces time spent on troubleshooting data infrastructure issues and increases focus on developing trading strategies.
  • Improved productivity – Iceberg outperformed vanilla Parquet access across various query types. Simple counts were 52.1% faster, grouping and ordering operations improved by 49.6%, and filtered queries were 47.3% faster for unsorted data. This performance enhancement boosts productivity in quant research workflows. It reduces query completion times, allowing quant developers and researchers to spend more time on model development and market analysis, leading to faster iteration on trading strategies.

Conclusion

Quant research platforms often avoid adopting new data management solutions like Iceberg, fearing performance penalties and increased costs. Our analysis disproves these concerns, demonstrating that Iceberg not only matches or enhances performance compared to direct Amazon S3 access, but also provides substantial additional benefits.

Our tests reveal that Iceberg significantly accelerates query performance, with improvements of up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly, potentially uncovering valuable market insights faster.

Iceberg streamlines data management tasks, allowing researchers to focus on strategy development. Its robust insert, update, and delete capabilities, combined with time travel features, enable effortless management of complex datasets, improving backtest accuracy and facilitating rapid strategy iteration.

The platform’s intelligent handling of partitioning and Amazon S3 API quota issues eliminates undifferentiated heavy lifting, freeing quant teams from low-level data engineering tasks. This automation redirects efforts to high-value activities such as model development and market analysis. Moreover, our tests show that for read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage, leading to significant cost savings.

Flexibility is a key advantage of Iceberg. Its various interfaces, including SQL, DataFrames, and programmatic APIs, integrate seamlessly with existing quant research workflows, accommodating diverse analysis needs and coding preferences.

By adopting Iceberg, quant research teams gain both performance enhancements and powerful data management tools. This combination creates an environment where researchers can push analytical boundaries, maintain high data integrity standards, and focus on generating valuable insights. The improved productivity and reduced operational costs enable quant teams to allocate resources more effectively, ultimately leading to a more competitive edge in quantitative finance.


About the Authors

Guy Bachar is a Senior Solutions Architect at AWS based in New York. He specializes in assisting capital markets customers with their cloud transformation journeys. His expertise encompasses identity management, security, and unified communication.

Sercan KaraogluSercan Karaoglu is Senior Solutions Architect, specialized in capital markets. He is a former data engineer and passionate about quantitative investment research.

Boris LitvinBoris Litvin is a Principal Solutions Architect at AWS. His job is in financial services industry innovation. Boris joined AWS from the industry, most recently Goldman Sachs, where he held a variety of quantitative roles across equity, FX, and interest rates, and was CEO and Founder of a quantitative trading FinTech startup.

Salim TutuncuSalim Tutuncu is a Senior Partner Solutions Architect Specialist on Data & AI, based in Dubai with a focus on the EMEA. With a background in the technology sector that spans roles as a data engineer, data scientist, and machine learning engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses using the AWS platform, particularly in data and AI use cases.

Alex TarasovAlex Tarasov is a Senior Solutions Architect working with Fintech startup customers, helping them to design and run their data workloads on AWS. He is a former data engineer and is passionate about all things data and machine learning.

Jiwan PanjikerJiwan Panjiker is a Solutions Architect at Amazon Web Services, based in the Greater New York City area. He works with AWS enterprise customers, helping them in their cloud journey to solve complex business problems by making effective use of AWS services. Outside of work, he likes spending time with his friends and family, going for long drives, and exploring local cuisine.