Tag Archives: Technical How-to

Temporary elevated access management with IAM Identity Center

Post Syndicated from Taiwo Awoyinfa original https://aws.amazon.com/blogs/security/temporary-elevated-access-management-with-iam-identity-center/

AWS recommends using automation where possible to keep people away from systems—yet not every action can be automated in practice, and some operations might require access by human users. Depending on their scope and potential impact, some human operations might require special treatment.

One such treatment is temporary elevated access, also known as just-in-time access. This is a way to request access for a specified time period, validate whether there is a legitimate need, and grant time-bound access. It also allows you to monitor activities performed, and revoke access if conditions change. Temporary elevated access can help you to reduce risks associated with human access without hindering operational capabilities.

In this post, we introduce a temporary elevated access management solution (TEAM) that integrates with AWS IAM Identity Center (successor to AWS Single Sign-On) and allows you to manage temporary elevated access to your multi-account AWS environment. You can download the TEAM solution from AWS Samples, deploy it to your AWS environment, and customize it to meet your needs.

The TEAM solution provides the following features:

  • Workflow and approval — TEAM provides a workflow that allows authorized users to request, review, and approve or reject temporary access. If a request is approved, TEAM activates access for the requester with the scope and duration specified in the request.
  • Invoke access using IAM Identity Center — When temporary elevated access is active, a requester can use the IAM Identity Center AWS access portal to access the AWS Management Console or retrieve temporary credentials. A requester can also invoke access directly from the command line by configuring AWS Command Line Interface (AWS CLI) to integrate with IAM Identity Center.
  • View request details and session activity — Authorized users can view request details and session activity related to current and historical requests from within the application’s web interface.
  • Ability to use managed identities and group memberships — You can either sync your existing managed identities and group memberships from an external identity provider into IAM Identity Center, or manage them directly in IAM Identity Center, in order to control user authorization in TEAM. Similarly, users can authenticate directly in IAM Identity Center, or they can federate from an external identity provider into IAM Identity Center, to access TEAM.
  • A rich authorization model — TEAM uses group memberships to manage eligibility (authorization to request temporary elevated access with a given scope) and approval (authorization to approve temporary elevated access with a given scope). It also uses group memberships to determine whether users can view historical and current requests and session activity, and whether they can administer the solution. You can manage both eligibility and approval policies at different levels of granularity within your organization in AWS Organizations.

TEAM overview

You can download the TEAM solution and deploy it into the same organization where you enable IAM Identity Center. TEAM consists of a web interface that you access from the IAM Identity Center access portal, a workflow component that manages requests and approvals, an orchestration component that activates temporary elevated access, and additional components involved in security and monitoring.

Figure 1 shows an organization with TEAM deployed alongside IAM Identity Center.

Figure 1: An organization using TEAM alongside IAM Identity Center

Figure 1: An organization using TEAM alongside IAM Identity Center

Figure 1 shows three main components:

  • TEAM — a self-hosted solution that allows users to create, approve, monitor and manage temporary elevated access with a few clicks in a web interface.
  • IAM Identity Center — an AWS service which helps you to securely connect your workforce identities and manage their access centrally across accounts.
  • AWS target environment — the accounts where you run your workloads, and for which you want to securely manage both persistent access and temporary elevated access.

There are four personas who can use TEAM:

  • Requesters — users who request temporary elevated access to perform operational tasks within your AWS target environment.
  • Approvers — users who review and approve or reject requests for temporary elevated access.
  • Auditors — users with read-only access who can view request details and session activity relating to current and historical requests.
  • Admins — users who can manage global settings and define policies for eligibility and approval.

TEAM determines a user’s persona from their group memberships, which can either be managed directly in IAM Identity Center or synced from an external identity provider into IAM Identity Center. This allows you to use your existing access governance processes and tools to manage the groups and thereby control which actions users can perform within TEAM.

The following steps describe how you use TEAM during normal operations to request, approve, and invoke temporary elevated access. The steps correspond to the numbered items in Figure 1:

  1. Access the AWS access portal in IAM Identity Center (all personas)
  2. Access the TEAM application (all personas)
  3. Request elevated access (requester persona)
  4. Approve elevated access (approver persona)
  5. Activate elevated access (automatic)
  6. Invoke elevated access (requester persona)
  7. Log session activity (automatic)
  8. End elevated access (automatic; or requester or approver persona)
  9. View request details and session activity (requester, approver, or auditor persona)

In the TEAM walkthrough section later in this post, we provide details on each of these steps.

Deploy and set up TEAM

Before you can use TEAM, you need to deploy and set up the solution.

Prerequisites

To use TEAM, you first need to have an organization set up in AWS Organizations with IAM Identity Center enabled. If you haven’t done so already, create an organization, and then follow the Getting started steps in the IAM Identity Center User Guide.

Before you deploy TEAM, you need to nominate a member account for delegated administration in IAM Identity Center. This has the additional benefit of reducing the need to use your organization’s management account. We strongly recommend that you use this account only for IAM Identity Center delegated administration, TEAM, and associated services; that you do not deploy any other workloads into this account, and that you carefully manage access to this account using the principle of least privilege.

We recommend that you enforce multi-factor authentication (MFA) for users, either in IAM Identity Center or in your external identity provider. If you want to statically assign access to users or groups (persistent access), you can do that in IAM Identity Center, independently of TEAM, as described in Multi-account permissions.

Deploy TEAM

To deploy TEAM, follow the solution deployment steps in the TEAM documentation. You need to deploy TEAM in the same account that you nominate for IAM Identity Center delegated administration.

Access TEAM

After you deploy TEAM, you can access it through the IAM Identity Center web interface, known as the AWS access portal. You do this using the AWS access portal URL, which is configured when you enable IAM Identity Center. Depending on how you set up IAM Identity Center, you are either prompted to authenticate directly in IAM Identity Center, or you are redirected to an external identity provider to authenticate. After you authenticate, the AWS access portal appears, as shown in Figure 2.

Figure 2: TEAM application icon in the AWS access portal of IAM Identity Center

Figure 2: TEAM application icon in the AWS access portal of IAM Identity Center

You configure TEAM as an IAM Identity Center Custom SAML 2.0 application, which means it appears as an icon in the AWS access portal. To access TEAM, choose TEAM IDC APP.

When you first access TEAM, it automatically retrieves your identity and group membership information from IAM Identity Center. It uses this information to determine what actions you can perform and which navigation links are visible.

Set up TEAM

Before users can request temporary elevated access in TEAM, a user with the admin persona needs to set up the application. This includes defining policies for eligibility and approval. A user takes on the admin persona if they are a member of a named IAM Identity Center group that is specified during TEAM deployment.

Manage eligibility policies

Eligibility policies determine who can request temporary elevated access with a given scope. You can define eligibility policies to ensure that people in specific teams can only request the access that you anticipate they’ll need as part of their job function.

  • To manage eligibility policies, in the left navigation pane, under Administration, select Eligibility policy. Figure 3 shows this view with three eligibility policies already defined.
     
Figure 3: Manage eligibility policies

Figure 3: Manage eligibility policies

An eligibility policy has four main parts:

  • Name and Type — An IAM Identity Center user or group
  • Accounts or OUs — One or more accounts, organizational units (OUs), or both, which belong to your organization
  • Permissions — One or more IAM Identity Center permission sets (representing IAM roles)
  • Approval required — whether requests for temporary elevated access require approval.

Each eligibility policy allows the specified IAM Identity Center user, or a member of the specified group, to log in to TEAM and request temporary elevated access using the specified permission sets in the specified accounts. When you choose a permission set, you can either use a predefined permission set provided by IAM Identity Center, or you can create your own permission set using custom permissions to provide least-privilege access for particular tasks.

Note: If you specify an OU in an eligibility or approval policy, TEAM includes the accounts directly under that OU, but not those under its child OUs.

Manage approval policies

Approval policies work in a similar way as eligibility policies, except that they authorize users to approve temporary elevated access requests, rather than create them. If a specific account is referenced in an eligibility policy that is configured to require approval, then you need to create a corresponding approval policy for the same account. If there is no corresponding approval policy—or if one exists but its groups have no members — then TEAM won’t allow users to create temporary elevated access requests for that account, because no one would be able to approve them.

  • To manage approval policies, in the left navigation pane, under Administration, select Approval policy. Figure 4 shows this view with three approval policies already defined.
     
Figure 4: Manage approval policies

Figure 4: Manage approval policies

An approval policy has two main parts:

  • Id, Name, and Type — Identifiers for an account or organizational unit (OU)
  • Approver groups — One or more IAM Identity Center groups

Each approval policy allows a member of a specified group to log in to TEAM and approve temporary elevated access requests for the specified account, or all accounts under the specified OU, regardless of permission set.

Note: If you specify the same group for both eligibility and approval in the same account, this means approvers can be in the same team as requesters for that account. This is a valid approach, sometimes known as peer approval. Nevertheless, TEAM does not allow an individual to approve their own request. If you prefer requesters and approvers to be in different teams, specify different groups for eligibility and approval.

TEAM walkthrough

Now that the admin persona has defined eligibility and approval policies, you are ready to use TEAM.

To begin this walkthrough, imagine that you are a requester, and you need to perform an operational task that requires temporary elevated access to your AWS target environment. For example, you might need to fix a broken deployment pipeline or make some changes as part of a deployment. As a requester, you must belong to a group specified in at least one eligibility policy that was defined by the admin persona.

Step 1: Access the AWS access portal in IAM Identity Center

To access the AWS access portal in IAM Identity Center, use the AWS access portal URL, as described in the Access TEAM section earlier in this post.

Step 2: Access the TEAM application

To access the TEAM application, select the TEAM IDC APP icon, as described in the Access TEAM section earlier.

Step 3: Request elevated access

The next step is to create a new elevated access request as follows:

  1. Under Requests, in the left navigation pane, choose Create request.
  2. In the Elevated access request section, do the following, as shown in Figure 5:
    1. Select the account where you need to perform your task.
    2. For Role, select a permission set that will give you sufficient permissions to perform the task.
    3. Enter a start date and time, duration, ticket ID (typically representing a change ticket or incident ticket related to your task), and business justification.
    4. Choose Submit.
Figure 5: Create a new request

Figure 5: Create a new request

When creating a request, consider the following:

  • In each request, you can specify exactly one account and one permission set.
  • You can only select an account and permission set combination for which you are eligible based on the eligibility policies defined by the admin persona.
  • As a requester, you should apply the principle of least privilege by selecting a permission set with the least privilege, and a time window with the least duration, that will allow you to complete your task safely.
  • TEAM captures a ticket identifier for audit purposes only; it does not try to validate it.
  • The duration specified in a request determines the time window for which elevated access is active, if your request is approved. During this time window, you can invoke sessions to access the AWS target environment. It doesn’t affect the duration of each session.
  • Session duration is configured independently for each permission set by an IAM Identity Center administrator, and determines the time period for which IAM temporary credentials are valid for sessions using that permission set.
  • Sessions invoked just before elevated access ends might remain valid beyond the end of the approved elevated access period. If this is a concern, consider minimizing the session duration configured in your permission sets, for example by setting them to 1 hour.

Step 4: Approve elevated access

After you submit your request, approvers are notified by email. Approvers are notified when there are new requests that fall within the scope of what they are authorized to approve, based on the approval policies defined earlier.

For this walkthrough, imagine that you are now the approver. You will perform the following steps to approve the request. As an approver, you must belong to a group specified in an approval policy that the admin persona configured.

  1. Access the TEAM application in exactly the same way as for the other personas.
  2. In the left navigation pane, under Approvals, choose Approve requests. TEAM displays requests awaiting your review, as shown in Figure 6.
    • To view the information provided by the requester, select a request and then choose View details.
    Figure 6: Requests awaiting review

    Figure 6: Requests awaiting review

  3. Select a pending request, and then do one of the following:
    • To approve the request, select Actions and then choose Approve.
    • To reject the request, select Actions and then choose Reject.

    Figure 7 shows what TEAM displays when you approve a request.

    Figure 7: Approve a request

    Figure 7: Approve a request

  4. After you approve or reject a request, the original requester is notified by email.

A requester can view the status of their requests in the TEAM application.

  • To see the status of your open requests in the TEAM application, in the left navigation pane, under Requests, select My requests. Figure 8 shows this view with one approved request.
     
Figure 8: Approved request

Figure 8: Approved request

Step 5: Automatic activation of elevated access

After a request is approved, the TEAM application waits until the start date and time specified in the request and then automatically activates elevated access. To activate access, a TEAM orchestration workflow creates a temporary account assignment, which links the requester’s user identity in IAM Identity Center with the permission set and account in their request. Then TEAM notifies the requester by email that their request is active.

A requester can now view their active request in the TEAM application.

  1. To see active requests, in the left navigation pane under Elevated access, choose Active access. Figure 9 shows this view with one active request.
     
    Figure 9: Active request

    Figure 9: Active request

  2. To see further details for an active request, select a request and then choose View details. Figure 10 shows an example of these details.
    Figure 10: Details of an active request

    Figure 10: Details of an active request

Step 6: Invoke elevated access

During the time period in which elevated access is active, the requester can invoke sessions to access the AWS target environment to complete their task. Each session has the scope (permission set and account) approved in their request. There are three ways to invoke access.

The first two methods involve accessing IAM Identity Center using the AWS access portal URL. Figure 11 shows the AWS access portal while a request is active.

Figure 11: Invoke access from the AWS access portal

Figure 11: Invoke access from the AWS access portal

From the AWS access portal, you can select an account and permission set that is currently active. You’ll also see the accounts and permission sets that have been statically assigned to you using IAM Identity Center, independently of TEAM. From here, you can do one of the following:

  • Choose Management console to federate to the AWS Management Console.
  • Choose Command line or programmatic access to copy and paste temporary credentials.

The third method is to initiate access directly from the command line using AWS CLI. To use this method, you first need to configure AWS CLI to integrate with IAM Identity Center. This method provides a smooth user experience for AWS CLI users, since you don’t need to copy and paste temporary credentials to your command line.

Regardless of how you invoke access, IAM Identity Center provides temporary credentials for the IAM role and account specified in your request, which allow you to assume that role in that account. The temporary credentials are valid for the duration specified in the role’s permission set, defined by an IAM Identity Center administrator.

When you invoke access, you can now complete the operational tasks that you need to perform in the AWS target environment. During the period in which your elevated access is active, you can invoke multiple sessions if necessary.

Step 7: Log session activity

When you access the AWS target environment, your activity is logged to AWS CloudTrail. Actions you perform in the AWS control plane are recorded as CloudTrail events.

Note: Each CloudTrail event contains the unique identifier of the user who performed the action, which gives you traceability back to the human individual who requested and invoked temporary elevated access.

Step 8: End elevated access

Elevated access ends when either the requested duration elapses or it is explicitly revoked in the TEAM application. The requester or an approver can revoke elevated access whenever they choose.

When elevated access ends, or is revoked, the TEAM orchestration workflow automatically deletes the temporary account assignment for this request. This unlinks the permission set, the account, and the user in IAM Identity Center. The requester is then notified by email that their elevated access has ended.

Step 9: View request details and session activity

You can view request details and session activity for current and historical requests from within the TEAM application. Each persona can see the following information:

  • Requesters can inspect elevated access requested by them.
  • Approvers can inspect elevated access that falls within the scope of what they are authorized to approve.
  • Auditors can inspect elevated access for all TEAM requests.

Session activity is recorded based on the log delivery times provided by AWS CloudTrail, and you can view session activity while elevated access is in progress or after it has ended. Figure 12 shows activity logs for a session displayed in the TEAM application.

Figure 12: Session activity logs

Figure 12: Session activity logs

Security and resiliency considerations

The TEAM application controls access to your AWS environment, and you must manage it with great care to prevent unauthorized access. This solution is built using AWS Amplify to ease the reference deployment. Before operationalizing this solution, consider how to align it with your existing development and security practices.

Further security and resiliency considerations including setting up emergency break-glass access are available in the TEAM documentation.

Additional resources

AWS Security Partners provide temporary elevated access solutions that integrate with IAM Identity Center, and AWS has validated the integration of these partner offerings and assessed their capabilities against a common set of customer requirements. For further information, see temporary elevated access in the IAM Identity Center User Guide.

The blog post Managing temporary elevated access to your AWS environment describes an alternative self-hosted solution for temporary elevated access which integrates directly with an external identity provider using OpenID Connect, and federates users directly into AWS Identity and Access Management (IAM) roles in your accounts. The TEAM solution described in this blog post, on the other hand, integrates with IAM Identity Center, which provides a way to centrally manage user access to accounts across your organization and optionally integrates with an external identity provider.

Conclusion

In this blog post, you learned that your first priority should be to use automation to avoid the need to give human users persistent access to your accounts. You also learned that in the rare cases in which people need access to your accounts, not all access is equal; there are times when you need a process to verify that access is needed, and to provide temporary elevated access.

We introduced you to a temporary elevated access management solution (TEAM) that you can download from AWS Samples and use alongside IAM Identity Center to give your users temporary elevated access. We showed you the TEAM workflow, described the TEAM architecture, and provided links where you can get started by downloading and deploying TEAM.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on AWS IAM Identity Center re:Post or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Taiwo Awoyinfa

Taiwo Awoyinfa

Taiwo is a senior cloud architect with AWS Professional Services. At AWS, he helps global customers with cloud transformation, migration and security initiatives. Taiwo has expertise in cloud architecture, networking, security and application development. He is passionate about identifying and solving problems that delivers value.

Author

James Greenwood

James is a principal security solutions architect who helps AWS Financial Services customers meet their security and compliance objectives in the AWS cloud. James has a background in identity and access management, authentication, credential management, and data protection with more than 20 years experience in the financial services industry.

Varvara Semenova

Varvara Semenova

Varvara is a cloud infrastructure architect with AWS Professional Services. She specialises in building microservices-based serverless applications to address the needs of AWS enterprise customers. Varvara uses her background in DevOps to help the customer innovate faster when developing cost-effective, secure, and reliable solutions.

Implement alerts in Amazon OpenSearch Service with PagerDuty

Post Syndicated from Manikanta Gona original https://aws.amazon.com/blogs/big-data/implement-alerts-in-amazon-opensearch-service-with-pagerduty/

In today’s fast-paced digital world, businesses rely heavily on their data to make informed decisions. This data is often stored and analyzed using various tools, such as Amazon OpenSearch Service, a powerful search and analytics service offered by AWS. OpenSearch Service provides real-time insights into your data to support use cases like interactive log analytics, real-time application monitoring, website search, and more. Analyzing logs can help businesses quickly identify and troubleshoot issues.

However, with the increasing amount of data, it can be challenging to monitor everything manually. Manual monitoring consumes a lot of resources and is hard to maintain as the application landscape changes. We need a sustainable and automated approach to monitor critical applications and infrastructure.

With automated alerting with a third-party service like PagerDuty, an incident management platform, combined with the robust and powerful alerting plugin provided by OpenSearch Service, businesses can proactively manage and respond to critical events. You can use this proactive alerting to monitor data patterns for existing data, monitor clusters, detect patterns, and more.

OpenSearch Dashboard provides an alerting plugin that you can use to set up various types of monitors and alerts. You can use the plugin to set up different monitors, including cluster health, an individual document, a custom query, or aggregated data. These monitors can be used to send alerts to users.

In this post, we demonstrate how to implement PagerDuty as the notification mechanism to get notified based on cluster health status. These notifications can be delivered via various channels, including email, SMS, or custom webhooks (like PagerDuty). The OpenSearch Service alerting plugin supports complex alert rules and provides a user interface to manage them.

Solution overview

PagerDuty is a cloud-based incident management platform that helps businesses handle their alerts and incidents in real time. PagerDuty works by consolidating alerts from various monitoring tools and routing them to the right team member, ensuring that issues are addressed promptly. Many businesses are using PagerDuty for real-time incident notifications via multiple channels, ensuring that the right team members are alerted quickly.

In this post, we describe how to set up PagerDuty and integrate it with an OpenSearch Service custom webhook for alert notifications when a threshold is met.

The following diagram illustrate OpenSearch Service running within an Amazon VPC using monitors and triggers to send a notification to the PagerDuty service using an Events API custom webhook

We need to set up a service and integration on PagerDuty to begin receiving incident notifications from OpenSearch Service. A service in PagerDuty represents an application, component, or team that we can trigger the notification against.

Prerequisites

Before you get started, create the following resources, if not already available:

Create a service on PagerDuty

To create a service on PagerDuty, complete the following steps:

  1. Log in to PagerDuty using your personal or enterprise account that is being used to enable the integration with OpenSearch Service.
  2. On the Services tab, choose New Service.
  3. Enter a name and optional description, then choose Next.

In the next step, we create or assign an escalation policy for the service. An escalation policy represents the order of responsibility for reacting to the issues detected on a service.

  1. If you already have an escalation policy defined within the organization or team, select Select an existing Escalation Policy and specify your policy. Otherwise, select Generate a new Escalation Policy, then choose Next.

In the next step, we can group the alerts based on time or content:

    • To group alerts together based on the alert content, select Content-Based grouping.
    • To group them based on a specific time duration, select Time-Based grouping.
    • Selecting the Intelligent grouping option will group the alerts intelligently based on content or time.
  1. Leave the defaults and choose Next.
  2. On the Integrations page, select the Events API V2 integration (this will be used for integration with OpenSearch Service) and choose Create Service.

If you don’t select the integration during this step, you can add it later.

  1. Take note of the integration key on the Integrations tab.

Create a notification channel on OpenSearch Service with a custom webhook

Custom webhooks provide the ability to send these notifications to third-party services like PagerDuty using a REST API. After we configure the notification channel, we can use it for other monitors beyond this use case and to detect data patterns that are stored within the cluster.

Complete the following steps to configure the notification channel:

  1. On the OpenSearch Dashboards page, choose Notifications under Amazon OpenSearch Plugins in the navigation pane.
  2. On the Channels tab, choose Create channel.
  3. Enter a name for the channel and an optional description.
  4. For Channel type, choose Custom webhook.
  5. For Method, choose POST.
  6. For Define endpoints by, select Custom attributes URL.
  1. For Host, enter events.PagerDuty.com.
  2. For Path, enter v2/enqueue.
  3. Under Webhook headers, choose Add header.
  4. Enter X-Routing-Key as the key and the integration key you obtained earlier as the value.
  5. Choose Create and ensure the channel is successfully created.

Configure OpenSearch Service alerts to send notifications to PagerDuty

We can monitor OpenSearch cluster health in two different ways:

  • Using the OpenSearch Dashboard alerting plugin by setting up a per cluster metrics monitor. This provides a query to retrieve metrics related to the cluster health.
  • Integrating with Amazon CloudWatch, a monitoring and observability service.

In this use case, we use the alerting plugin. Complete the following steps:

  1. On the OpenSearch Dashboards page, choose Alerting under Amazon OpenSearch Plugins in the navigation pane.
  2. On the Monitors tab, choose Create monitor.
  3. For Monitor name, enter a name (for example, Monitor Cluster Health).
  4. For Monitor type, select Per cluster metrics monitor.
  5. Under Schedule¸ configure the monitor to run every minute.
  6. In the Query section, for Request type, choose Cluster health.
  7. Choose Preview query.
  8. Create a trigger by choosing Add trigger.
  9. For Trigger name, enter a name (for example, Cluster Health Status is Red).
  10. Leave Severity level at 1 (Highest).
  11. Under Trigger condition, delete the default code and enter the following:
ctx.results[0].status == "red"
  1. Choose Preview condition response to confirm that Trigger condition response shows as false, indicating that the cluster is healthy.
  2. Under Actions, choose Add action.
  3. For Action name, enter a name (for example, Send a PagerDuty notification).
  4. For Channels, choose the channel you created earlier.
  5. For Message, enter the following code:
{ "event_action": "trigger",
"payload" :
	{	"summary": "{{ctx.trigger.name}}",
		"source": " {{ctx.monitor.name}}",
		"severity": "critical",
		"custom_details":
			{ 
				"-Severity" : "{{ctx.trigger.severity}}",
				"-Period start" : "{{ctx.periodStart}}",
				"-Period end": "{{ctx.periodEnd}}"
			}
	}
}

Note that apart from the custom_details section in the code, the rest of the fields are mandatory for PagerDuty.

  1. Choose Send test message and test to make sure you receive an alert on the PagerDuty service.
  2. Choose Create and ensure the monitor was created successfully.

A notification will be sent to the PagerDuty service as part of the test, which will trigger a notification via a phone call or text message for the person who is available based on the escalation policy defined earlier. This notification can be safely acknowledged and resolved from PagerDuty because this is was a test.

Clean up

To clean up the infrastructure and avoid additional charges, complete the following steps:

  1. Delete the PagerDuty service.
  2. Delete the OpenSearch Service domain that was created as part of the prerequisites.

Conclusion

The integration of OpenSearch Service alerts with PagerDuty provides a powerful and efficient solution for managing and responding to critical events in real time. With this integration, you can easily set up alerts and notifications to stay informed about potential issues within your OpenSearch Service clusters or issues related to data and documents stored within the cluster, and proactively take action to resolve any problems that arise. Additionally, the integration allows for seamless collaboration between teams, enabling them to work together to identify and troubleshoot issues as they occur.

For more information about anomaly detection and alerts in OpenSearch Service, refer to Anomaly Detection in Amazon OpenSearch and Configuring Alerts in Amazon OpenSearch.


About the Authors

Manikanta Gona is a Data and ML Engineer at AWS Professional Services. He joined AWS in 2021 with 6+ years of experience in IT. At AWS, he is focused on Data Lake implementations, and Search, Analytical workloads using Amazon OpenSearch Service. In his spare time, he love to garden, and go on hikes and biking with his husband.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a Bigdata enthusiast and holds 14 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation

Ravikiran Rao is a Data Architect at AWS and is passionate about solving complex data challenges for various customers. Outside of work, he is a theatre enthusiast and an amateur tennis player.

Hari Krishna KC is a Data Architect with the AWS Professional Services Team. He specializes in AWS Data Lakes & AWS OpenSearch Service and have helped numerous client migrate their workload to Data Lakes and Search data stores

How to deploy workloads in a multicloud environment with AWS developer tools

Post Syndicated from Brent Van Wynsberge original https://aws.amazon.com/blogs/devops/how-to-deploy-workloads-in-a-multicloud-environment-with-aws-developer-tools/

As organizations embrace cloud computing as part of “cloud first” strategy, and migrate to the cloud, some of the enterprises end up in a multicloud environment.  We see that enterprise customers get the best experience, performance and cost structure when they choose a primary cloud provider. However, for a variety of reasons, some organizations end up operating in a multicloud environment. For example, in case of mergers & acquisitions, an organization may acquire an entity which runs on a different cloud platform, resulting in the organization operating in a multicloud environment. Another example is in the case where an ISV (Independent Software Vendor) provides services to customers operating on different cloud providers. One more example is the scenario where an organization needs to adhere to data residency and data sovereignty requirements, and ends up with workloads deployed to multiple cloud platforms across locations. Thus, the organization ends up running in a multicloud environment.

In the scenarios described above, one of the challenges organizations face operating such a complex environment is managing release process (building, testing, and deploying applications at scale) across multiple cloud platforms. If an organization’s primary cloud provider is AWS, they may want to continue using AWS developer tools to deploy workloads in other cloud platforms. Organizations facing such scenarios can leverage AWS services to develop their end-to-end CI/CD and release process instead of developing a release pipeline for each platform, which is complex, and not sustainable in the long run.

In this post we show how organizations can continue using AWS developer tools in a hybrid and multicloud environment. We walk the audience through a scenario where we deploy an application to VMs running on-premises and Azure, showcasing AWS’ hybrid and multicloud DevOps capabilities.

Solution and scenario overview

In this post we’re demonstrating the following steps:

  • Setup a CI/CD pipeline using AWS CodePipeline, and show how it’s run when application code is updated, and checked into the code repository (GitHub).
  • Check out application code from the code repository, and use an IDE (Visual Studio Code) to make changes, and check-in the code to the code repository.
  • Check in the modified application code to automatically run the release process built using AWS CodePipeline. It makes use of AWS CodeBuild to retrieve the latest version of code from code repository, compile it, build the deployment package, and test the application.
  • Deploy the updated application to VMs across on-premises, and Azure using AWS CodeDeploy.

The high-level solution is shown below. This post does not show all of the possible combinations and integrations available to build the CI/CD pipeline. As an example, you can integrate the pipeline with your existing tools for test and build such as Selenium, Jenkins, SonarQube etc.

This post focuses on deploying application in a multicloud environment, and how AWS Developer Tools can support virtually any scenario or use case specific to your organization. We will be deploying a sample application from this AWS tutorial to an on-premises server, and an Azure Virtual Machine (VM) running Red Hat Enterprise Linux (RHEL). In future posts in this series, we will cover how you can deploy any type of workload using AWS tools, including containers, and serverless applications.

Architecture Diagram

CI/CD pipeline setup

This section describes instructions for setting up a multicloud CI/CD pipeline.

Note: A key point to note is that the CI/CD pipeline setup, and related sub-sections in this post, are a one-time activity, and you’ll not need to perform these steps every time an application is deployed or modified.

Install CodeDeploy agent

The AWS CodeDeploy agent is a software package that is used to execute deployments on an instance. You can install the CodeDeploy agent on an on-premises server and Azure VM by either using the command line, or AWS Systems Manager.

Setup GitHub code repository

Setup GitHub code repository using the following steps:

  1. Create a new GitHub code repository or use a repository that already exists.
  2. Copy the Sample_App_Linux app (zip) from Amazon S3 as described in Step 3 of Upload a sample application to your GitHub repository tutorial.
  3. Commit the files to code repository
    git add .
    git commit -m 'Initial Commit'
    git push

You will use this repository to deploy your code across environments.

Configure AWS CodePipeline

Follow the steps outlined below to setup and configure CodePipeline to orchestrate the CI/CD pipeline of our application.

  1. Navigate to CodePipeline in the AWS console and click on ‘Create pipeline’
  2. Give your pipeline a name (eg: MyWebApp-CICD) and allow CodePipeline to create a service role on your behalf.
  3. For the source stage, select GitHub (v2) as your source provide and click on the Connect to GitHub button to give CodePipeline access to your git repository.
  4. Create a new GitHub connection and click on the Install a new App button to install the AWS Connector in your GitHub account.
  5. Back in the CodePipeline console select the repository and branch you would like to build and deploy.

Image showing the configured source stage

  1. Now we create the build stage; Select AWS CodeBuild as the build provider.
  2. Click on the ‘Create project’ button to create the project for your build stage, and give your project a name.
  3. Select Ubuntu as the operating system for your managed image, chose the standard runtime and select the ‘aws/codebuild/standard’ image with the latest version.

Image showing the configured environment

  1. In the Buildspec section select “Insert build commands” and click on switch to editor. Enter the following yaml code as your build commands:
version: 0.2
phases:
    build:
        commands:
            - echo "This is a dummy build command"
artifacts:
    files:
        - "*/*"

Note: you can also integrate build commands to your git repository by using a buildspec yaml file. More information can be found at Build specification reference for CodeBuild.

  1. Leave all other options as default and click on ‘Continue to CodePipeline’

Image showing the configured buildspec

  1. Back in the CodePipeline console your Project name will automatically be filled in. You can now continue to the next step.
  2. Click the “Skip deploy stage” button; We will create this in the next section.
  3. Review your changes and click “Create pipeline”. Your newly created pipeline will now build for the first time!

Image showing the first execution of the CI/CD pipeline

Configure AWS CodeDeploy on Azure and on-premises VMs

Now that we have built our application, we want to deploy it to both the environments – Azure, and on-premises. In the “Install CodeDeploy agent” section we’ve already installed the CodeDeploy agent. As a one-time step we now have to give the CodeDeploy agents access to the AWS environment.  You can leverage AWS Identity and Access Management (IAM) Roles Anywhere in combination with the code-deploy-session-helper to give access to the AWS resources needed.
The IAM Role should at least have the AWSCodeDeployFullAccess AWS managed policy and Read only access to the CodePipeline S3 bucket in your account (called codepipeline-<region>-<account-id>) .

For more information on how to setup IAM Roles Anywhere please refer how to extend AWS IAM roles to workloads outside of AWS with IAM Roles Anywhere. Alternative ways to configure access can be found in the AWS CodeDeploy user guide. Follow the steps below for instances you want to configure.

  1. Configure your CodeDeploy agent as described in the user guide. Ensure the AWS Command Line Interface (CLI) is installed on your VM and execute the following command to register the instance with CodeDeploy.
    aws deploy register-on-premises-instance --instance-name <name_for_your_instance> --iam-role-arn <arn_of_your_iam_role>
  1. Tag the instance as follows
    aws deploy add-tags-to-on-premises-instances --instance-names <name_for_your_instance> --tags Key=Application,Value=MyWebApp
  2. You should now see both instances registered in the “CodeDeploy > On-premises instances” panel. You can now deploy application to your Azure VM and on premises VMs!

Image showing the registered instances

Configure AWS CodeDeploy to deploy WebApp

Follow the steps mentioned below to modify the CI/CD pipeline to deploy the application to Azure, and on-premises environments.

  1. Create an IAM role named CodeDeployServiceRole and select CodeDeploy > CodeDeploy as your use case. IAM will automatically select the right policy for you. CodeDeploy will use this role to manage the deployments of your application.
  2. In the AWS console navigate to CodeDeploy > Applications. Click on “Create application”.
  3. Give your application a name and choose “EC2/On-premises” as the compute platform.
  4. Configure the instances we want to deploy to. In the detail view of your application click on “Create deployment group”.
  5. Give your deployment group a name and select the CodeDeployServiceRole.
  6. In the environment configuration section choose On-premises Instances.
  7. Configure the Application, MyWebApp key value pair.
  8. Disable load balancing and leave all other options default.
  9. Click on create deployment group. You should now see your newly created deployment group.

Image showing the created CodeDeploy Application and Deployment group

  1. We can now edit our pipeline to deploy to the newly created deployment group.
  2. Navigate to your previously created Pipeline in the CodePipeline section and click edit. Add the deploy stage by clicking on Add stage and name it Deploy. Aftewards click Add action.
  3. Name your action and choose CodeDeploy as your action provider.
  4. Select “BuildArtifact” as your input artifact and select your newly created application and deployment group.
  5. Click on Done and on Save in your pipeline to confirm the changes. You have now added the deploy step to your pipeline!

Image showing the updated pipeline

This completes the on-time devops pipeline setup, and you will not need to repeat the process.

Automated DevOps pipeline in action

This section demonstrates how the devops pipeline operates end-to-end, and automatically deploys application to Azure VM, and on-premises server when the application code changes.

  1. Click on Release Change to deploy your application for the first time. The release change button manually triggers CodePipeline to update your code. In the next section we will make changes to the repository which triggers the pipeline automatically.
  2. During the “Source” stage your pipeline fetches the latest version from github.
  3. During the “Build” stage your pipeline uses CodeBuild to build your application and generate the deployment artifacts for your pipeline. It uses the buildspec.yml file to determine the build steps.
  4. During the “Deploy” stage your pipeline uses CodeDeploy to deploy the build artifacts to the configured Deployment group – Azure VM and on-premises VM. Navigate to the url of your application to see the results of the deployment process.

Image showing the deployed sample application

 

Update application code in IDE

You can modify the application code using your favorite IDE. In this example we will change the background color and a paragraph of the sample application.

Image showing modifications being made to the file

Once you’ve modified the code, save the updated file followed by pushing the code to the code repository.

git add .
git commit -m "I made changes to the index.html file "
git push

DevOps pipeline (CodePipeline) – compile, build, and test

Once the code is updated, and pushed to GitHub, the DevOps pipeline (CodePipeline) automatically compiles, builds and tests the modified application. You can navigate to your pipeline (CodePipeline) in the AWS Console, and should see the pipeline running (or has recently completed). CodePipeline automatically executes the Build and Deploy steps. In this case we’re not adding any complex logic, but based on your organization’s requirements you can add any build step, or integrate with other tools.

Image showing CodePipeline in action

Deployment process using CodeDeploy

In this section, we describe how the modified application is deployed to the Azure, and on-premises VMs.

  1. Open your pipeline in the CodePipeline console, and click on the “AWS CodeDeploy” link in the Deploy step to navigate to your deployment group. Open the “Deployments” tab.

Image showing application deployment history

  1. Click on the first deployment in the Application deployment history section. This will show the details of your latest deployment.

Image showing deployment lifecycle events for the deployment

  1. In the “Deployment lifecycle events” section click on one of the “View events” links. This shows you the lifecycle steps executed by CodeDeploy and will display the error log output if any of the steps have failed.

Image showing deployment events on instance

  1. Navigate back to your application. You should now see your changes in the application. You’ve successfully set up a multicloud DevOps pipeline!

Image showing a new version of the deployed application

Conclusion

In summary, the post demonstrated how AWS DevOps tools and services can help organizations build a single release pipeline to deploy applications and workloads in a hybrid and multicloud environment. The post also showed how to set up CI/CD pipeline to deploy applications to AWS, on-premises, and Azure VMs.

If you have any questions or feedback, leave them in the comments section.

About the Authors

Picture of Amandeep

Amandeep Bajwa

Amandeep Bajwa is a Senior Solutions Architect at AWS supporting Financial Services enterprises. He helps organizations achieve their business outcomes by identifying the appropriate cloud transformation strategy based on industry trends, and organizational priorities. Some of the areas Amandeep consults on are cloud migration, cloud strategy (including hybrid & multicloud), digital transformation, data & analytics, and technology in general.

Picture of Pawan

Pawan Shrivastava

Pawan Shrivastava is a Partner Solution Architect at AWS in the WWPS team. He focusses on working with partners to provide technical guidance on AWS, collaborate with them to understand their technical requirements, and designing solutions to meet their specific needs. Pawan is passionate about DevOps, automation and CI CD pipelines. He enjoys watching mma, playing cricket and working out in the gym.

Picture of Brent

Brent Van Wynsberge

Brent Van Wynsberge is a Solutions Architect at AWS supporting enterprise customers. He guides organizations in their digital transformation and innovation journey and accelerates cloud adoption. Brent is an IoT enthusiast, specifically in the application of IoT in manufacturing, he is also interested in DevOps, data analytics, containers, and innovative technologies in general.

Picture of Mike

Mike Strubbe

Mike is a Cloud Solutions Architect Manager at AWS with a strong focus on cloud strategy, digital transformation, business value, leadership, and governance. He helps Enterprise customers achieve their business goals through cloud expertise, coupled with strong business acumen skills. Mike is passionate about implementing cloud strategies that enable cloud transformations, increase operational efficiency and drive business value.

How Cargotec uses metadata replication to enable cross-account data sharing

Post Syndicated from Sumesh M R original https://aws.amazon.com/blogs/big-data/how-cargotec-uses-metadata-replication-to-enable-cross-account-data-sharing/

This is a guest blog post co-written with Sumesh M R from Cargotec and Tero Karttunen from Knowit Finland.

Cargotec (Nasdaq Helsinki: CGCBV) is a Finnish company that specializes in cargo handling solutions and services. They are headquartered in Helsinki, Finland, and operates globally in over 100 countries. With its leading cargo handling solutions and services, they are pioneers in their field. Through their unique position in ports, at sea, and on roads, they optimize global cargo flows and create sustainable customer value.

Cargotec captures terabytes of IoT telemetry data from their machinery operated by numerous customers across the globe. This data needs to be ingested into a data lake, transformed, and made available for analytics, machine learning (ML), and visualization. For this, Cargotec built an Amazon Simple Storage Service (Amazon S3) data lake and cataloged the data assets in AWS Glue Data Catalog. They chose AWS Glue as their preferred data integration tool due to its serverless nature, low maintenance, ability to control compute resources in advance, and scale when needed.

In this blog, we discuss the technical challenges faced by Cargotec in replicating their AWS Glue metadata across AWS accounts, and how they navigated these challenges successfully to enable cross-account data sharing.  By sharing their story, we hope to inspire readers facing similar challenges and provide insights into how our services can be customized to meet your specific needs.

Challenges

Like many customers, Cargotec’s data lake is distributed across multiple AWS accounts that are owned by different teams. Cargotec wanted to find a solution to share datasets across accounts and use Amazon Athena to query them. To share the datasets, they needed a way to share access to the data and access to catalog metadata in the form of tables and views. Cargotec’s use cases also required them to create views that span tables and views across catalogs. Cargotec’s implementation covers three discrete AWS accounts, 25 databases, 150 tables, and 10 views.

Solution overview

Cargotec required a single catalog per account that contained metadata from their other AWS accounts. The solution that best fit their needs was to replicate metadata using an in-house version of a publicly available utility called Metastore Migration utility. Cargotec extended the utility by changing the overall orchestration layer by adding an Amazon SQS notification and an AWS Lambda. The approach was to programmatically copy and make available each catalog entity (databases, tables, and views) to all consumer accounts. This makes the tables or views local to the account where the query is being run, while the data still remains in its source S3 bucket.

Cargotec’s solution architecture

The following diagram summarizes the architecture and overall flow of events in Cargotec’s design.

Solution Architecture

Catalog entries from a source account are programmatically replicated to multiple target accounts using the following series of steps.

  1. An AWS Glue job (metadata exporter) runs daily on the source account. It reads the table and partition information from the source AWS Glue Data Catalog. Since the target account is used for analytical purposes and does not require real-time schema changes, the metadata exporter runs only once a day. Cargotec uses partition projection, which ensures that the new partitions are available in real-time.
  2. The job then writes the metadata to an S3 bucket in the same account. Please note that the solution doesn’t involve movement of the data across accounts. The target accounts read data from the source account S3 buckets. For guidance on setting up the right permissions, please see the Amazon Athena User Guide.
  3. After the metadata export has been completed, the AWS Glue job pushes a notification to an Amazon Simple Notification Service (Amazon SNS) topic. This message contains the S3 path to the latest metadata export. The SNS notification is Cargotec’s customization to the existing open-source utility.
  4. Every target account runs an AWS Lambda function that is notified when the source account SNS topic receives a push. In short, there are multiple subscriber Lambda functions (one per target account) for the source account SNS topics that get triggered when an export job is completed.
  5. Once triggered, the Lambda function then initiates an AWS Glue job (metadata importer) on the respective target account. The job receives as input the source account’s S3 path to the metadata that has been recently exported.
  6. Based on the path provided, the metadata importer reads the exported metadata from the source S3 bucket.
  7. The metadata importer now uses this information to create or update the corresponding catalog information in the target account.

All along the way, any errors are published to a separate SNS topic for logging and monitoring purposes. With this approach, Cargotec was able to create and consume views that span tables and views from multiple catalogs spread across different AWS accounts.

Implementation

The core of the catalog replication utility is two AWS Glue scripts:

  • Metadata exporter – An AWS Glue job that reads the source data catalog and creates an export of the databases, tables, and partitions in an S3 bucket in the source account.
  • Metadata importer – An AWS Glue job that reads the export that was created by the metadata exporter and applies the metadata to target databases. This code is triggered by a Lambda function once files are written to S3. The job runs in the target account.

Metadata exporter

This section provides details on the AWS Glue job that exports the AWS Glue Data Catalog into an S3 location. The source code for the application is hosted the AWS Glue GitHub. Though this may need to be customized to suit your needs, we will go over the core components of the code in this blog.

Metadata exporter inputs

The application takes a few job input parameters as described below:

  • --mode key accepts either to-s3 or to-jdbc. The latter is used when the code is moving the metadata directly into a JDBC Hive Metastore. In the case of Cargotec, since we are moving the metadata to files on S3, the value for --mode will remain to-s3.
  • --output-path accepts an S3 location to which the exported metadata should be written. The code creates subdirectories corresponding to databases, tables, and partitions.
  • --database-names accepts a semicolon-separated list of databases on the source catalog that need to be replicated to the target

Reading the catalog

The metadata about the database, tables, and partitions are read from the AWS Glue catalog.

dyf = glue_context.create_dynamic_frame.from_options(
 connection_type=’com.amazonaws.services.glue.connections.DataCatalogConnection‘,
            connection_options = {
                            'catalog.name': ‘datacatalog’,
                            'catalog.database': database,
                            'catalog.region': region
                                 })

The above code snippet reads the metadata into an AWS Glue DynamicFrame. The frame is then converted to a Spark DataFrame. It is filtered into individual DataFrames based on it being either part of a database, table, or partition. A schema is attached to the data frame using one of the below:

DATACATALOG_DATABASE_SCHEMA = 
    StructType([
        StructField('items', ArrayType(
            DATACATALOG_DATABASE_ITEM_SCHEMA, False),
                    True),
        StructField('type', StringType(), False)
    ])
DATACATALOG_TABLE_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('type', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_TABLE_ITEM_SCHEMA, False), True)
    ])
DATACATALOG_PARTITION_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('table', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_PARTITION_ITEM_SCHEMA, False), True),
        StructField('type', StringType(), False)
    ])

For details on the individual item schema, refer to the schema definition on GitHub.

Persisting the metadata

After converting to a DataFrame with schema, it is persisted to the S3 location marked by the output-path parameter

databases.write.format('json').mode('overwrite').save(output_path + 'databases')
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')

Exploring the output

Navigate to the S3 bucket that contains the output location, and you should be able to see the output metadata in format. An example export for a table would look like the following code snippet.

{
    "database": "default",
    "type": "table",
    "item": {
        "createTime": "1651241372000",
        "lastAccessTime": "0",
        "owner": "spark",
        "retention": 0,
        "name": "an_example_table",
        "tableType": "EXTERNAL_TABLE",
        "parameters": {
            "totalSize": "2734148",
            "EXTERNAL": "TRUE",
            "last_commit_time_sync": "20220429140907",
            "spark.sql.sources.schema.part.0": "{redacted_schema}",
            "numFiles": "1",
            "transient_lastDdlTime": "1651241371",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "hudi"
        },
        "partitionKeys": [],
        "storageDescriptor": {
            "inputFormat": "org.apache.hudi.hadoop.HoodieParquetInputFormat",
            "compressed": false,
            "storedAsSubDirectories": false,
            "location": "s3://redacted_bucket_name/table/an_example_table",
            "numberOfBuckets": -1,
            "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "bucketColumns": [],
            "columns": [{
                    "name": "_hoodie_commit_time",
                    "type": "string"
                },
                {
                    "name": "_hoodie_commit_seqno",
                    "type": "string"
                }
            ],
            "parameters": {},
            "serdeInfo": {
                "serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                "parameters": {
                    "hoodie.query.as.ro.table": "false",
                    "path": "s3://redacted_bucket_name/table/an_example_table",
                    "serialization.format": "1"
                }
            },
            "skewedInfo": {
                "skewedColumnNames": [],
                "skewedColumnValueLocationMaps": {},
                "skewedColumnValues": []
            },
            "sortColumns": []
        }
    }
}

Once the export job is complete, the output S3 path will be pushed to an SNS topic. A Lambda function at the target account processes this message and invokes the import AWS Glue job by passing the S3 import location.

Metadata importer

The import job runs on the target account. The code for the job is available on GitHub. As with the exporter, you may need to customize it to suit your specific requirements, but the code as-is should work for most scenarios.

Metadata importer inputs

The inputs to the application are provided as job parameters. Below is a list of parameters that are used for the import process:

  • --mode key accepts either from-s3 or from-jdbc. The latter is used when migration is from a JDBC source to the AWS Glue Data Catalog. At Cargotec, the metadata is already written to Amazon S3, and hence the value for this key is always set to from-s3.
  • --region key accepts a valid AWS Region for the AWS Glue Catalog. The target Region is specified using this key.
  • --database-input-path key accepts the path to the file containing the database metadata. This is the output of the previous import job.
  • --table-input-path key accepts the path to the file containing the table metadata. This is the output of the previous import job.
  • --partition-input-path key accepts the path to the file containing the partition metadata. This is the output of the previous import job.

Reading the metadata

The metadata, as previously discussed, are files on Amazon S3. They are read into individual spark data frames with their respective schema information

databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)

Loading the catalog

Once the spark data frames are read, they are converted to AWS Glue DynamicFrame and then loaded to the catalog, as shown in the following snippet.

glue_context.write_dynamic_frame.from_options(
        frame=dyf_databases, 
        connection_type='catalog',
        connection_options={
               'catalog.name': datacatalog_name, 
               'catalog.region': region
         }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_tables, 
        connection_type='catalog',
        connection_options={
                'catalog.name': datacatalog_name, 
                'catalog.region': region
        }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_partitions, 
        connection_type='catalog',
        connection_options={
                 'catalog.name': datacatalog_name, 
                 'catalog.region': region
         }
)

Once the job concludes, you can query the target AWS Glue catalog to ensure the tables from the source have been synced with the destination. To keep things simple and easy to manage, instead of implementing a mechanism to identify tables that change over time, Cargotec updates the catalog information of all databases or tables that are configured in the export job.

Considerations

Though the setup works effectively for Cargotec’s current business requirements, there are a few drawbacks to this approach, which are highlighted below:

  1. The solution involves code. Customizations were made to the existing open-source utility to be able to publish an SNS notification once an export is complete and a Lambda function to trigger the import process.
  2. The export process on the source account is a scheduled job. Hence there is no real-time sync between the source and target accounts. This was not a requirement for Cargotec’s business process.
  3. For tables that don’t use Athena partition projection, query results may be outdated until the new partitions are added to the metastore through MSCK REPAIR TABLE, ALTER TABLE ADD PARTITION, AWS Glue crawler, and so on.
  4. The current approach requires syncing all the tables across the source and target. If the requirement is to capture only the ones that changed instead of a scheduled daily export, the design needs to change and could benefit from the Amazon EventBridge integration with AWS Glue. An example implementation of using AWS Glue APIs to identify changes is shown in Identify source schema changes using AWS Glue.

Conclusion

In this blog post, we have explored a solution for cross-account sharing of data and tables that makes it possible for Cargotec to create views that combine data from multiple AWS accounts. We’re excited to share Cargotec’s success and believe the post has provided you with valuable insights and inspiration for your own projects.

We encourage you to explore our range of services and see how they can help you achieve your goals. Lastly, for more data and analytics blogs, feel free to bookmark the AWS Blogs.


About the Authors

Sumesh M R is a Full Stack Machine Learning Architect at Cargotec. He has several years of software engineering and ML background. Sumesh is an expert in Sagemaker and other AWS ML/Analytics services. He is passionate about data science and loves to explore the latest ML libraries and techniques. Before joining Cargotec, he worked as a Solution Architect at TCS. In his spare time, he loves to play cricket and badminton.

 Tero Karttunen is a Senior Cloud Architect at Knowit Finland. He advises clients on architecting and adopting Data Architectures that best serve their Data Analytics and Machine Learning needs. He has helped Cargotec in their data journey for more than two years. Outside of work, he enjoys running, winter sports, and role-playing games.

Arun A K is a Big Data Specialist Solutions Architect at AWS.  He works with customers to provide architectural guidance for running analytics solutions on AWS Glue, AWS Lake Formation, Amazon Athena, and Amazon EMR. In his free time, he likes to spend time with his friends and family.

Advanced patterns with AWS SDK for pandas on AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/advanced-patterns-with-aws-sdk-for-pandas-on-aws-glue-for-ray/

AWS SDK for pandas is a popular Python library among data scientists, data engineers, and developers. It simplifies interaction between AWS data and analytics services and pandas DataFrames. It allows easy integration and data movement between 22 types of data stores, including Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon Redshift, and Amazon OpenSearch Service.

In the previous post, we discussed how you can use AWS SDK for pandas to scale your workloads on AWS Glue for Ray. We explained how using both Ray and Modin within the library enabled us to distribute workloads across a compute cluster. To illustrate these capabilities, we explored examples of writing Parquet files to Amazon S3 at scale and querying data in parallel with Athena.

In this post, we show some more advanced ways to use this library on AWS Glue for Ray. We cover features and APIs from AWS services such as S3 Select, Amazon DynamoDB, and Amazon Timestream.

Solution overview

The Ray and Modin frameworks allow scaling of pandas workloads easily. You can write code on your laptop that uses the SDK for pandas to get data from an AWS data or analytics service to a pandas DataFrame, transform it using pandas, and then write it back to the AWS service. By using the distributed version of the SDK for pandas and replacing pandas with Modin, exactly the same code will scale on a Ray runtime—all logic about task coordination and distribution is hidden. Taking advantage of these abstractions, the AWS SDK for pandas team has made considerable use of Ray primitives to distribute some of the existing APIs (for the full list, see Supported APIs).

In this post, we show how to use some of these APIs in an AWS Glue for Ray job, namely querying with S3 Select, writing to and reading from a DynamoDB table, and writing to a Timestream table. Because AWS Glue for Ray is a fully managed environment, it’s by far the easiest way to run jobs because you don’t need to worry about cluster management. If you want to create your own cluster on Amazon Elastic Compute Cloud (Amazon EC2), refer to Distributing Calls on Ray Remote Cluster.

Configure solution resources

We use an AWS CloudFormation stack to provision the solution resources. Complete the following steps:

  1. Choose Launch stack to provision the stack in your AWS account:

Launch CloudFormation Stack

This takes about 2 minutes to complete. On successful deployment, the CloudFormation stack shows the status as CREATE_COMPLETE.

CloudFormation CREATE_COMPLETE

  1. Navigate to AWS Glue Studio to find an AWS Glue job named AdvancedGlueRayJob.

Glue for Ray Job Script

  1. On the Job details tab, scroll down and choose Advanced Properties.

Under Job Parameters, AWS SDK for pandas is specified as an additional Python module to install, along with Modin as an extra dependency.

Glue for Ray Job Details

  1. To run the job, choose Run and navigate to the Runs tab to monitor the job’s progress.

Glue for Ray Job Runs

Import the library

To import the library, use the following code:

import awswrangler as wr

AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a cluster with the default parameters. Advanced users can override this process by starting the Ray runtime before the import command.

Scale S3 Select workflows

S3 Select allows you to use SQL statements to query and filter S3 objects, including compressed files. This can be particularly useful if you have large files of several TBs and want to extract some information. Because the workload is delegated to Amazon S3, you don’t have to download and filter objects on the client side, leading to lower latency, lower cost, and higher performance.

With AWS SDK for pandas, these calls to S3 Select can be distributed across Ray workers in the cluster. In the following example, we query Amazon reviews data in Parquet format, filtering for reviews with 5-star ratings in the Mobile_Electronics partition. star_rating is a column in the Parquet data itself, while the partition is a directory.

# Filter for 5-star reviews with S3 Select within a partition
df_select = wr.s3.select_query(
    sql="SELECT * FROM s3object s where s.\"star_rating\" >= 5",
    path="s3://amazon-reviews-pds/parquet/product_category=Mobile_Electronics/",
    input_serialization="Parquet",
    input_serialization_params={},
    scan_range_chunk_size=1024*1024*16,
)

scan_range_chunk_size is an important parameter to calibrate when using S3 Select. It specifies the range of bytes to query the S3 object, thereby determining the amount of work delegated to each worker. For this example, it’s set to 16 MB, meaning the work of scanning the object is parallelized into separate S3 Select requests each 16 MB in size. A higher value equates to larger chunks per worker but fewer workers, and vice versa.

The results are returned in a Modin DataFrame, which is a drop-in replacement for pandas. It exposes the same APIs but enables you to use all the workers in the cluster. The data in the Modin DataFrame is distributed along with all the operations among the workers.

Scale DynamoDB workflows

DynamoDB is a scalable NoSQL database service that provides high-performance, low-latency, and managed storage.

AWS SDK for pandas uses Ray to scale DynamoDB workflows, allowing parallel data retrieval and insertion operations. The wr.dynamodb.read_items function retrieves data from DynamoDB in parallel across multiple workers, and the results are returned as a Modin DataFrame. Similarly, data insertion into DynamoDB can be parallelized using the wr.dynamodb.put_df function.

For example, the following code inserts the Amazon Reviews DataFrame obtained from S3 Select into a DynamoDB table and then reads it back:

# Write Modin DataFrame to DynamoDB
wr.dynamodb.put_df(
    df=df_select,
    table_name=dynamodb_table_name,
    use_threads=4,
)
# Read data back from DynamoDB to Modin
    df_dynamodb = wr.dynamodb.read_items(
    table_name=dynamodb_table_name,
    allow_full_scan=True,
)

DynamoDB calls are subject to AWS service quotas. The concurrency can be limited using the use_threads parameter.

Scale Timestream workflows

Timestream is a fast, scalable, fully managed, purpose-built time series database that makes it easy to store and analyze trillions of time series data points per day. With AWS SDK for pandas, you can distribute Timestream write operations across multiple workers in your cluster.

Data can be written to Timestream using the wr.timestream.write function, which parallelizes the data insertion process for improved performance.

In this example, we use sample data from Amazon S3 loaded into a Modin DataFrame. Familiar pandas commands such as selecting columns or resetting the index are applied at scale with Modin:

# Select columns
df_timestream = df_timestream.loc[:, ["region", "az", "hostname", "measure_kind", "measure", "time"]]
# Overwrite the time column
df_timestream["time"] = datetime.now()
# Reset the index
df_timestream.reset_index(inplace=True, drop=False)
# Filter a measure
df_timestream = df_timestream[df_timestream.measure_kind == "cpu_utilization"]

The Timestream write operation is parallelized across blocks in your dataset. If the blocks are too big, you can use Ray to repartition the dataset and increase the throughput, because each block will be handled by a separate thread:

# Repartition the data into 100 blocks
df_timestream = ray.data.from_modin(df_timestream).repartition(100).to_modin()

We are now ready to insert the data into Timestream, and a final query confirms the number of rows in the table:

# Write data to Timestream
rejected_records = wr.timestream.write(
    df=df_timestream,
    database=timestream_database_name,
    table=timestream_table_name,
    time_col="time",
    measure_col="measure",
    dimensions_cols=["index", "region", "az", "hostname"],
)

# Query
df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{timestream_database_name}"."{timestream_table_name}"')

Clean up

To prevent unwanted charges to your AWS account, we recommend deleting the AWS resources that you used in this post:

  1. On the Amazon S3 console, empty the data from the S3 bucket with prefix glue-ray-blog-script.

S3 Bucket

  1. On the AWS CloudFormation console, delete the AdvancedSDKPandasOnGlueRay stack.

All resources will be automatically deleted with it.

Conclusion

In this post, we showcased some more advanced patterns to run your workloads using AWS SDK for pandas. In particular, these examples demonstrated how Ray is used within the library to distribute operations for several other AWS services, not just Amazon S3. When used in combination with AWS Glue for Ray, this gives you access to a fully managed environment to run at scale. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.


About the Authors

Abdel JaidiAbdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton KukushkinAnton Kukushkin is a Data Engineer for AWS Professional Services based in London, UK. In his spare time, he enjoys playing musical instruments.

Leon LuttenbergerLeon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale. In his spare time, he enjoys reading and traveling.

Enable complex row-level security in embedded dashboards for non-provisioned users in Amazon QuickSight with OR-based tags

Post Syndicated from Srikanth Baheti original https://aws.amazon.com/blogs/big-data/enable-complex-row-level-security-in-embedded-dashboards-for-non-provisioned-users-in-amazon-quicksight-with-or-based-tags/

Amazon QuickSight is a fully managed, cloud-native business intelligence (BI) service that makes it easy to connect to your data, create interactive dashboards, and share these with tens of thousands of users, both within QuickSight and embedded in your software as a service (SaaS) applications.

QuickSight Enterprise edition started supporting nested conditions within row-level security (RLS) tags where you can combine AND and OR conditions to simplify multi-tenant access patterns. Previously, QuickSight only supported the AND operator for all tags. When users are assigned multiple roles, which enables them to view data in multiple dimensions, you need both AND and OR operators to express RLS rules. QuickSight enables authors and developers to use the OR operator in the form of OR of AND, which allows you to satisfy even the most complex data security scenarios. In this post, we look at how this can be implemented.

Feature overview

When you embed QuickSight dashboards in your application for users who aren’t provisioned (registered) in QuickSight, this is called anonymous embedding. In this scenario, even though the user is anonymous to QuickSight, you can still customize the data that user sees in the dashboard using RLS tags.

You can do this in three simple steps:

  1. Add RLS tags to a dataset.
  2. Add the OR condition to RLS tags.
  3. Assign values to those tags at runtime using the GenerateEmbedUrlForAnonymousUser API operation. For more information, see Embedding QuickSight data dashboards for anonymous (unregistered) users.

To see this feature in action, see Using tag-based rules.

Use case overview

AnyHealth Inc. is a fictitious independent software vendor (ISV) in the healthcare space. They have a SaaS application for different hospitals across different regions of the country to manage their revenue. AnyHealth Inc has thousands of healthcare employees accessing their application portal. Part of their application portal has embedded operational insights related to their business within a QuickSight dashboard. AnyHealth doesn’t want to manage their users in QuickSight separately, and wants to secure data based on who the user is and the hospital the user is affiliated to. AnyHealth decided to authorize data access to their users at runtime, enabling row-level security using tags.

AnyHealth has hospitals (North Hospital, South Hospital, and Downtown Hospital) in regions Central, East, South, and West.

In this example, the following users access AnyHealth’s application with the embedded dashboard. Each user has a certain level of data restriction that define what they can access in the dashboards. PowerUser is a super user that can see the data for all hospitals and regions.

AnyHealth’s

Application Users

Hospital Region Condition Payor State
NorthMedicaidUser North Hospital Central and East OR Medicaid New York
SouthMedicareUser South Hospital South OR Medicare All states
NorthAdmin North Hospital All regions
SouthAdmin South Hospital All regions
PowerUser All hospitals All regions

These users are only application-level users and haven’t been provisioned in QuickSight. AnyHealth wants to continue with user management and their roles at the application level as a single source of truth. This way, when the user accesses the embedded QuickSight dashboard from the application, AnyHealth must secure the data on the dashboard based on the roles and permissions that user has. AnyHealth has different combinations of user permissions; for example, all AnyHealth administrators have access to all the data that can be achieved by PowerUser permissions. A hospital admin, for example NorthAdmin, is a user who is the administrator at North Hospital and can only view all the data related to that hospital. A hospital user, for example SouthUser, is a user who has access to data at South Hospital in a specific region.

Additionally, when there are Medicaid and Medicare claims, there are special users who monitor these programs. For example, there can be a user at North Hospital who has access to all the data in North Hospital in regions Central and East. But this user also manages Medicaid for New York. In this case, to show all the relevant data, RLS rules have to be defined such that the user can see data where (Hospital = North Hospital and Region in (Central, East)) or (payor = Medicaid and State = New York). This can be achieved with the new RLS with OR tags feature in QuickSight.

Solution overview

Setup involves two steps:

  1. Create tag keys.
  2. Set SessionTags for each user.

Create tag keys

AnyHealth creates tag keys on the dataset they’re using to power the dashboard. This can be done in two ways, either through an UpdateDataset API call or through the QuickSight console.

Configuration using the API

In the UpdateDataset API call, the RowLevelPermissionTagConfiguration element is set as follows. Note that the items within an item in TagRuleConfigurations will always run a logical AND when the rules are passed, and if there is more than one item in the list, then the items are run with a logical OR. We use the following sample configuration to address our use case:

"RowLevelPermissionTagConfiguration": {
            "Status": "ENABLED",
            "TagRules": [
                {
                    "TagKey": "region",
                    "ColumnName": "Region",
                    "TagMultiValueDelimiter": ",",
                    "MatchAllValue": "*"
                },
                {
                    "TagKey": "hospital",
                    "ColumnName": "Hospital",
                    "TagMultiValueDelimiter": ",",
                    "MatchAllValue": "*"
                },
                {
                    "TagKey": "payor",
                    "ColumnName": "Payor Segment",
                    "TagMultiValueDelimiter": "*",
                    "MatchAllValue": ","
                },
                {
                    "TagKey": "state",
                    "ColumnName": "State",
                    "TagMultiValueDelimiter": ",",
                    "MatchAllValue": "*"
                }
            ],
            "TagRuleConfigurations": [
                [
                    "region",
                    "hospital"
                ],
                [
                    "payor",
                    "state"
                ]
            ]
        }

Configuration using the QuickSight console

To use the QuickSight console, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose the dataset from the list to apply tag-based RLS tags (for this post, we use the patientinfo dataset).
  3. Choose Edit under Row-level security.
  4. On the Set up row-level security page, expand Tag-based rules.
  5. To begin adding rules, choose columns on the Column drop-down menu under Manage tags.
  6. Create rules as per the permissions table.

To grant access to QuickSight provisioned users, you still need to configure user-based rules.

  1. Repeat these steps to add the required tags.
  2. After all the tags are added, choose Add OR Condition under Manage rules.
  3. Choose your tags for the OR condition and choose Update.

Note that you need to explicitly update the first condition that automatically created AND for all fields added.

  1. Once the rules are created, choose Apply.

Set SessionTags

At runtime, when embedding the dashboards via the GenerateDahboardEmbedURLForAnonymousUser API, set SessionTags for each user.

SessionTags for NorthAdmin are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "North Hospital"
        },
        {
            "Key": "region",
            "Value": "*"
        }
    ]
}

SessionTags for SouthAdmin are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "South Hospital"
        },
        {
            "Key": "region",
            "Value": "*"
        }
    ]
}

SessionTags for PowerUser are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "*"
        },
        {
            "Key": "region",
            "Value": "*"
        }
    ]
}

SessionTags for NorthMedicaidUser are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "North Hospital"
        },
        {
            "Key": "region",
            "Value": "East"
        }, 
        {
            "Key": "payor",
            "Value": "Medicaid"
        },
        {
            "Key": "state",
            "Value": "New York"
        }
    ]
}

SessionTags for SouthMedicareUser are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "South Hospital"
        },
        {
            "Key": "region",
            "Value": "South"
        }, 
        {
            "Key": "payor",
            "Value": "Medicare"
        },
        {
            "Key": "state",
            "Value": "*"
        }
    ]
}

The following screenshot shows what NorthMedicaidUser sees pertaining to all North hospitals in the East region and Medicaid in New York state.

The following screenshot shows what SouthMedicaidUser sees pertaining to all South hospitals in the South region or Medicare in all states.

Based on session tags with OR of AND’s support, AnyHealth has secured data on the embedded dashboards such that each user only sees specific data based on their access. You can access the dashboard as one of the users (by changing the user on the drop-down menu on the top right) and see how the data changes based on the user selected.

Overall, with row-level security using OR of AND, AnyHealth is able to provide a compelling analytics experience within their SaaS application, while making sure that each user only sees the appropriate data without having to provision and manage users in QuickSight. QuickSight provides a highly scalable, secure analytics option that you can set up and roll out to production in days, instead of weeks or months previously.

Conclusion

The combination of embedding dashboards for users not provisioned in QuickSight and row-level security using tags with OR of AND enables developers and ISVs to quickly set up sophisticated, customized analytics for their application users—all without any infrastructure setup or user management, while scaling to millions of users. For more updates from QuickSight embedded analytics, see What’s New in the Amazon QuickSight User Guide.

If you have any questions or feedback, please leave a comment. For additional discussions and help getting answers to your questions, check out the QuickSight Community.


About the Authors

Srikanth Baheti is a Specialized World Wide Principal Solution Architect for Amazon QuickSight. He started his career as a consultant and worked for multiple private and government organizations. Later he worked for PerkinElmer Health and Sciences & eResearch Technology Inc, where he was responsible for designing and developing high traffic web applications, highly scalable and maintainable data pipelines for reporting platforms using AWS services and Serverless computing.

Raji Sivasubramaniam is a Sr. Solutions Architect at AWS, focusing on Analytics. Raji is specialized in architecting end-to-end Enterprise Data Management, Business Intelligence and Analytics solutions for Fortune 500 and Fortune 100 companies across the globe. She has in-depth experience in integrated healthcare data and analytics with wide variety of healthcare datasets including managed market, physician targeting and patient analytics.

Mayank Agarwal is a product manager for Amazon QuickSight, AWS’ cloud-native, fully managed BI service. He focuses on embedded analytics and developer experience. He started his career as an embedded software engineer developing handheld devices. Prior to QuickSight he was leading engineering teams at Credence ID, developing custom mobile embedded device and web solutions using AWS services that make biometric enrollment and identification fast, intuitive, and cost-effective for Government sector, healthcare and transaction security applications.

Real-time inference using deep learning within Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/real-time-inference-using-deep-learning-within-amazon-kinesis-data-analytics-for-apache-flink/

Apache Flink is a framework and distributed processing engine for stateful computations over data streams. Amazon Kinesis Data Analytics for Apache Flink is a fully managed service that enables you to use an Apache Flink application to process streaming data. The Deep Java Library (DJL) is an open-source, high-level, engine-agnostic Java framework for deep learning.

In this blog post, we demonstrate how you can use DJL within Kinesis Data Analytics for Apache Flink for real-time machine learning inference. Real-time inference can be valuable in a variety of applications and industries where it is essential to make predictions or take actions based on new data as quickly as possible with low latencies. We show how to load a pre-trained deep learning model from the DJL model zoo into a Flink job and apply the model to classify data objects in a continuous data stream. The DJL model zoo includes a wide variety of pre-trained models for image classification, semantic segmentation, speech recognition, text embedding generation, question answering, and more. It supports HuggingFace, Pytorch, MXNet, and TensorFlow model frameworks and also helps developers create and publish their own models. We will focus on image classification and use a popular classifier called ResNet-18 to produce predictions in real time. The model has been pre-trained on ImageNet with 1.2 million images belonging to 1,000 class labels.

We provide sample code, architecture diagrams, and an AWS CloudFormation template so you can follow along and employ ResNet-18 as your classifier to make real-time predictions. The solution we provide here is a powerful design pattern for continuously producing ML-based predictions on streaming data within Kinesis Data Analytics for Apache Flink. You can adapt the provided solution for your use case and choose an alternative model from the model zoo or even provide your own model.

Image classification is a classic problem that takes an image and selects the best-fitting class, such as whether the image from an autonomous driving system is that of a bicycle or a pedestrian. Common use cases for real-time inference on streams of images include classifying images from vehicle cameras and license plate recognition systems, and classifying images uploaded to social media and ecommerce websites. The use cases typically need low latency while handling high throughput and potentially bursty streams. For example, in ecommerce websites, real-time classification of uploaded images can help in marking pictures of banned goods or hazardous materials that have been supplied by sellers. Immediate determination through streaming inference is needed to trigger alerts and follow-up actions to prevent these images from being part of the catalog. This enables faster decision-making compared to batch jobs that run on a periodic basis. The data stream pipeline can involve multiple models for different purposes, such as classifying uploaded images into ecommerce categories of electronics, toys, fashion, and so on.

Solution overview

The following diagram illustrates the workflow of our solution.

architecture showcasing a kinesis data analytics for apache flink application reading from Images in an Amazon S3 bucket, classifying those images and then writing out to another S3 bucket called "classifications"

The application performs the following steps:

  1. Read in images from Amazon Simple Storage Service (Amazon S3) using the Apache Flink Filesystem File Source connector.
  2. Window the images into a collection of records.
  3. Classify the batches of images using the DJL image classification class.
  4. Write inference results to Amazon S3 at the path specified.

Images are recommended to be of reasonable size so that they may fit into a Kinesis Processing Unit. Images larger than 50MB in size may result in latency in processing and classification.

The main class for this Apache Flink job is located at src/main/java/com.amazon.embeddedmodelinference/EMI.java. Here you can find the main() method and entry point to our Flink job.

Prerequisites

To get started, configure the following prerequisites on your local machine:

Once this is set up, you can clone the code base to access the source code for this solution. The Java application code for this example is available on GitHub. To download the application code, clone the remote repository using the following command:

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples

Find and navigate to the folder of the image classification example, called image-classification.

An example set of images to stream and test the code is available in the imagenet-sample-images folder.

Let’s walk through the code step by step.

Test on your local machine

If you would like to test this application locally on your machine, ensure you have AWS credentials set up locally on your machine. Additionally, download the Flink S3 Filesystem Hadoop JAR to use with your Apache Flink installation and place it in a folder named plugins/s3 in the root of your project. Then configure the following environment variables either on your IDE or in your machine’s local variable scope:

IS_LOCAL=true;
plugins.dir=<<path-to-flink-s3-fs-hadoop jar>>
s3.access.key=<<aws access key>>
s3.secret.key=<<aws secret key>>

Replace these values with your own.showcasing the environment properties to replace on IntelliJ

After configuring the environment variables and downloading the necessary plugin JAR, let’s look at the code.

In the main method, after setting up our StreamExecutionEnvironment, we define our FileSource to read files from Amazon S3. By default, this source operator reads from a sample bucket. You can replace this bucket name with your own by changing the variable called bucket, or setting the application property on Kinesis Data Analytics for Apache Flink once deployed.

final FileSource<StreamedImage> source =
FileSource.forRecordStreamFormat(new ImageReaderFormat(), new Path(s3SourcePath))
               .monitorContinuously(Duration.ofSeconds(10))
               .build();

The FileSource is configured to read in files in the ImageReaderFormat, and will check Amazon S3 for new images every 10 seconds. This can be configured as well.

After we have read in our images, we convert our FileSource into a stream that can be processed:

DataStream<StreamedImage> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Next, we create a tumbling window of a variable time window duration, specified in the configuration, defaulting to 60 seconds. Every window close creates a batch (list) of images to be classified using a ProcessWindowFunction.

This ProcessWindowFunction calls the classifier predict function on the list of images and returns the best probability of classification from each image. This result is then sent back to the Flink operator, where it’s promptly written out to the S3 bucket path of your configuration.

.process(new ProcessWindowFunction<StreamedImage, String, String, TimeWindow>() {
                    @Override
                    public void process(String s,
                                        ProcessWindowFunction<StreamedImage, String, String, TimeWindow>.Context context,
                                        Iterable<StreamedImage> iterableImages,
                                        Collector<String> out) throws Exception {


                            List<Image> listOfImages = new ArrayList<Image>();
                            iterableImages.forEach(x -> {
                                listOfImages.add(x.getImage());
                            });
                        try
                        {
                            // batch classify images
                            List<Classifications> list = classifier.predict(listOfImages);
                            for (Classifications classifications : list) {
                                Classifications.Classification cl = classifications.best();
                                String ret = cl.getClassName() + ": " + cl.getProbability();
                                out.collect(ret);
                            }
                        } catch (ModelException | IOException | TranslateException e) {
                            logger.error("Failed predict", e);
                        }
                        }
                    });

In Classifier.java, we read the image and apply crop, transpose, reshape, and finally convert to an N-dimensional array that can be processed by the deep learning model. Then we feed the array to the model and apply a forward pass. During the forward pass, the model computes the neural network layer by layer. At last, the output object contains the probabilities for each image object that the model is being trained on. We map the probabilities with the object name and return to the map function.

Deploy the solution with AWS CloudFormation

To run this code base on Kinesis Data Analytics for Apache Flink, we have a helpful CloudFormation template that will spin up the necessary resources. Simply open AWS CloudShell or your local machine’s terminal and enter the following commands. Complete the following steps to deploy the solution:

  1. If you don’t have the AWS Cloud Development Kit (AWS CDK) bootstrapped in your account, run the following command, providing your account number and current Region:
cdk bootstrap aws://ACCOUNT-NUMBER/REGION

The script will clone a GitHub repo of images to classify and upload them to your source S3 bucket. Then it will launch the CloudFormation stack given your input parameters. video walking through the setup of the cloudformation template. Described in text later

  1. Enter the following code and replace the BUCKET variables with your own source bucket and sink bucket, which will contain the source images and the classifications, respectively:
export SOURCE_BUCKET=s3://SAMPLE-BUCKET/PATH;
export SINK_BUCKET=s3://SAMPLE_BUCKET/PATH;
git clone https://github.com/EliSchwartz/imagenet-sample-images; cd imagenet-sample-images;
aws s3 cp . $SOURCE_BUCKET --recursive --exclude "*/";
aws cloudformation create-stack --stack-name KDAImageClassification --template-url https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-3098/BlogStack.template.json --parameters ParameterKey=inputBucketPath,ParameterValue=$SOURCE_BUCKET ParameterKey=outputBucketPath,ParameterValue=$SINK_BUCKET --capabilities CAPABILITY_IAM;

This CloudFormation stack creates the following resources:

    • A Kinesis Data Analytics application with 1 Kinesis Processing Unit (KPU) preconfigured with some application properties
    • An S3 bucket for your output results
  1. When the stack is complete, navigate to the Kinesis Data Analytics for Apache Flink console.
  2. Find the application called blog-DJL-flink-ImageClassification-application and choose Run.
  3. On the Amazon S3 console, navigate to the bucket you specified in the outputBucketPath variable.

If you have readable images in the source bucket listed, you should see classifications of those images within the checkpoint interval of the running application.

Deploy the solution manually

If you prefer to use your own code base, you can follow the manual steps in this section:

  • After you clone the application locally, create your application JAR by navigating to the directory that contains your pom.xml and running the following command:
mvn clean package

This builds your application JAR in the target/ directory called embedded-model-inference-1.0-SNAPSHOT.jar.

application properties on KDA console

  1. Upload this application JAR to an S3 bucket, either the one created from the CloudFormation template, or another one to store code artifacts.
  2. You can then configure your Kinesis Data Analytics application to point to this newly uploaded S3 JAR file.
  3. This is also a great opportunity to configure your runtime properties, as shown in the following screenshot.
  4. Choose Run to start your application.

You can open the Apache Flink Dashboard to check for application exceptions or to see data flowing through the tasks defined in the code.

image of flink dashboard showing successful running of the application

Validate the results

To validate our results, let’s check the results in Amazon S3 by navigating to the Amazon S3 console and finding our S3 bucket. We can find the output in a folder called output-kda.

image showing folders within amazon s3 partitioned by datetime

When we choose one of the data-partitioned folders, we can see partition files. Ensure that there is no underscore in front of your part file, because this indicates that the results are still being finalized according to the rollover interval defined in Apache Flink’s FileSink connector. After the underscores have disappeared, we can use Amazon S3 Select to view our data.

partition files as they land in Amazon S3

We now have a solution that continuously performs classification on incoming images in real time using Kinesis Data Analytics for Apache Flink. It extracts a pre-trained classification model (ResNet-18) from the DJL model zoo, applies some preprocessing, loads the model into a Flink operator’s memory, and continuously applies the model for online predictions on streaming images.

Although we used ResNet-18 in this post, you can choose another model by modifying the classifier. The DJL model zoo provides many other models, both for classification and other applications, that can be used out of the box. You can also load your custom model by providing an S3 link or URL to the criteria. DJL supports models in a large number of engines such as PyTorch, ONNX, TensorFlow, and MXNet. Using a model in the solution is relatively simple. All of the preprocessing and postprocessing code is encapsulated in the (built-in) translator, so all we have to do is load the model, create a predictor, and call predict(). This is done within the data source operator, which processes the stream of input data and sends the links to the data to the inference operator where the model you selected produces the prediction. Then the sink operator writes the results.

The CloudFormation template in this example focused on a simple 1 KPU application. You could extend the solution to further scale out to large models and high-throughput streams, and support multiple models within the pipeline.

Clean up

To clean up the CloudFormation script you launched, complete the following steps:

  1. Empty the source bucket you specified in the bash script.
  2. On the AWS CloudFormation console, locate the CloudFormation template called KDAImageClassification.
  3. Delete the stack, which will remove all of the remaining resources created for this post.
  4. You may optionally delete the bootstrapping CloudFormation template, CDKToolkit, which you launched earlier as well.

Conclusion

In this post, we presented a solution for real-time classification using the Deep Java Library within Kinesis Data Analytics for Apache Flink. We shared a working example and discussed how you can adapt the solution for your specific ML use case. If you have any feedback or questions, please leave them in the comments section.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 9 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. At AWS, he is a Streaming Specialist Solutions Architect, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Deepthi Mohan is a Principal Product Manager for Amazon Kinesis Data Analytics, AWS’s managed offering for Apache Flink.

Gaurav Rele is a Data Scientist at the Amazon ML Solution Lab, where he works with AWS customers across different verticals to accelerate their use of machine learning and AWS Cloud services to solve their business challenges.

Managing data confidentiality for Scope 3 emissions using AWS Clean Rooms

Post Syndicated from Sundeep Ramachandran original https://aws.amazon.com/blogs/architecture/managing-data-confidentiality-for-scope-3-emissions-using-aws-clean-rooms/

Scope 3 emissions are indirect greenhouse gas emissions that are a result of a company’s activities, but occur outside the company’s direct control or ownership. Measuring these emissions requires collecting data from a wide range of external sources, like raw material suppliers, transportation providers, and other third parties. One of the main challenges with Scope 3 data collection is ensuring data confidentiality when sharing proprietary information between third-party suppliers. Organizations are hesitant to share information that could potentially be used by competitors. This can make it difficult for companies to accurately measure and report on their Scope 3 emissions. And the result is that it limits their ability to manage climate-related impacts and risks.

In this blog, we show how to use AWS Clean Rooms to share Scope 3 emissions data between a reporting company and two of their value chain partners (a raw material purchased goods supplier and a transportation provider). Data confidentially requirements are specified by each organization before participating in the data AWS Clean Rooms collaboration (see Figure 1).

Data confidentiality requirements of reporting company and value chain partners

Figure 1. Data confidentiality requirements of reporting company and value chain partners

Each account has confidential data described as follows:

  • Column 1 lists the raw material Region of origin. This is business confidential information for supplier.
  • Column 2 lists the emission factors at the raw material level. This is sensitive information for the supplier.
  • Column 3 lists the mode of transportation. This is business confidential information for the transportation provider.
  • Column 4 lists the emissions in transporting individual items. This is sensitive information for the transportation provider.
  • Rows in column 5 list the product recipe at the ingredient level. This is trade secret information for the reporting company.

Overview of solution

In this architecture, AWS Clean Rooms is used to analyze and collaborate on emission datasets without sharing, moving, or revealing underlying data to collaborators (shown in Figure 2).

Architecture for AWS Clean Rooms Scope 3 collaboration

Figure 2. Architecture for AWS Clean Rooms Scope 3 collaboration

Three AWS accounts are used to demonstrate this approach. The Reporting Account creates a collaboration in AWS Clean Rooms and invites the Purchased Goods Account and Transportation Account to join as members. All accounts can protect their underlying data with privacy-enhancing controls to contribute data directly from Amazon Simple Storage Service (S3) using AWS Glue tables.

The Purchased Goods Account includes users who can update the purchased goods bucket. Similarly, the Transportation Account has users who can update the transportation bucket. The Reporting Account can run SQL queries on the configured tables. AWS Clean Rooms only returns results complying with the analysis rules set by all participating accounts.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Although Amazon S3 and AWS Clean Rooms are free-tier eligible, a low fee applies to AWS Glue. Clean-up actions are provided later in this blog post to minimize costs.

Configuration

We configured the S3 buckets for each AWS account as follows:

  • Reporting Account: reportingcompany.csv
  • Purchased Goods Account: purchasedgood.csv
  • Transportation Account: transportation.csv

Create an AWS Glue Data Catalog for each S3 data source following the method in the Glue Data Catalog Developer Guide. The AWS Glue tables should match the schema detailed previously in Figure 1, for each respective account (see Figure 3).

Configured AWS Glue table for ‘Purchased Goods’

Figure 3. Configured AWS Glue table for ‘Purchased Goods’

Data consumers can be configured to ingest, analyze, and visualize queries (refer back to Figure 2). We will tag the Reporting Account Glue Database as “reporting-db” and the Glue Table as “reporting.” Likewise, the Purchased Goods Account will have “purchase-db” and “purchase” tags.

Security

Additional actions are recommended to secure each account in a production environment. To configure encryption, review the Further Reading section at the end of this post, AWS Identity and Access Management (IAM) roles, and Amazon CloudWatch.

Walkthrough

This walkthrough consists of four steps:

  1. The Reporting Account creates the AWS Clean Rooms collaboration and invites the Purchased Goods Account and Transportation Account to share data.
  2. The Purchased Goods Account and Transportation Account accepts this invitation.
  3. Rules are applied for each collaboration account restricting how data is shared between AWS Clean Rooms collaboration accounts.
  4. The SQL query is created and run in the Reporting Account.

1. Create the AWS Clean Rooms collaboration in the Reporting Account

(The steps covered in this section require you to be logged into the Reporting Account.)

  • Navigate to the AWS Clean Rooms console and click Create collaboration.
  • In the Details section, type “Scope 3 Clean Room Collaboration” in the Name field.
  • Scroll to the Member 1 section. Enter “Reporting Account” in the Member display name field.
  • In Member 2 section, enter “Purchased Goods Account” for your first collaboration member name, with their account number in the Member AWS account ID box.
  • Click Add another member and add “Transportation Account” as the third collaborator with their AWS account number.
  • Choose the “Reporting Account” as the Member who can query and receive result in the Member abilities section. Click Next.
  • Select Yes, join by creating membership now. Click Next.
  • Verify the collaboration settings on the Review and Create page, then select Create and join collaboration and create membership.

Both accounts will then receive an invitation to accept the collaboration (see Figure 4). The console reveals each member status as “Invited” until accepted. Next, we will show how the invited members apply query restrictions on their data.

New collaboration created in AWS Clean Rooms

Figure 4. New collaboration created in AWS Clean Rooms

2. Accept invitations and configure table collaboration rules

Steps in this section are applied to the Purchased Goods Account and Transportation Account following collaboration environment setup. For brevity, we will demonstrate steps using the Purchased Goods Account. Differences for the Transportation Account are noted.

  • Log in to the AWS account owning the Purchased Goods Account and accept the collaboration invitation.
  • Open the AWS Clean Rooms console and select Collaborations on the left-hand navigation pane, then click Available to join.
  • You will see an invitation from the Scope 3 Clean Room Collaboration. Click on Scope 3 Clean Room Collaboration and then Create membership.
  • Select Tables, then Associate table. Click Configure new table.

The next action is to associate the Glue table created from the purchasedgoods.csv file. This sequence restricts access to the origin_region column (transportation_mode for the Transportation Account table) in the collaboration.

  • In the Scope 3 Clean Room Collaboration, select Configured tables in the left-hand pane, then Configure new table. Select the AWS Glue table associated with purchasedgoods.csv (shown in Figure 5).
  • Select the AWS Glue Database (purchase-db) and AWS Glue Table (purchase).
  • Verify the correct table section by toggling View schema from the AWS Glue slider bar.
  • In the Columns allowed in collaboration section, select all fields except for origin_region. This action prevents the origin_region column being accessed and viewed in the collaboration.
  • Complete this step by selecting Configure new table.
Purchased Goods account table configuration

Figure 5. Purchased Goods account table configuration

  • Select Configure analysis rule (see Figure 6).
  • Select Aggregation type then Next.
  • Select SUM as the Aggregate function and s3_upstream_purchased_good for the column.
  • Under Join controls, select Specify Join column. Select “item” from the list of options. This permits SQL join queries to execute on the “item” column. Click Next.
Table rules for the Purchased Goods account

Figure 6. Table rules for the Purchased Goods account

  • The next page specifies the minimum number of unique rows to aggregate for the “join” command. Select “item” for Column name and “2” for the Minimum number of distinct values. Click Next.
  • To confirm the table configuration query rules, click Configure analysis rule.
  • The final step is to click Associate to collaboration and select Scope 3 Clean Room Collaboration in the pulldown menu. Select Associate table after page refresh.

The procedure in this section is repeated for the Transportation Account, with the following exceptions:

  1. The columns shared in this collaboration are item, s3_upstream_transportation, and unit.
  2. The Aggregation function is a SUM applied on the s3_upstream_transportation column.
  3. The item column has an Aggregation constraint minimum of two distinct values.

3. Configure table collaboration rules inside the Reporting Account

At this stage, member account tables are created and shared in the collaboration. The next step is to configure the Reporting Account tables in the Reporting Account’s AWS account.

  • Navigate to AWS Clean Rooms. Select Configured tables, then Configure new table.
  • Select the Glue database and table associated with the file reportingcompany.csv.
  • Under Columns allowed in collaboration, select All columns, then Configure new table.
  • Configure collaboration rules by clicking Configure analysis rule using the Guided workflow.
  • Select Aggregation type, then Next.
  • Select SUM as the Aggregate function and ingredient for the column (see Figure 7).
  • Only SQL join queries can be executed on the ingredient column by selecting it in the Specify join columns section.
  • In the Dimension controls, select product. This option permits grouping by product name in the SQL query. Select Next.
  • Select None in the Scalar functions section. Click Next. Read more about scalar functions in the AWS Clean Rooms User Guide.
Table rules for the Reporting account

Figure 7. Table rules for the Reporting account

  • On the next page, select ingredient for Column name and 2 for the Minimum number of distinct values. Click Next. To confirm query control submission, select Configure analysis rule on the next page.
  • Validate the setting in the Review and Configure window, then select Next.
  • Inside the Configured tables tab, select Associate to collaboration. Assign the table to the Scope 3 Clean Rooms Collaboration.
  • Select the Scope 3 Clean Room Collaboration in the dropdown menu. Select Choose collaboration.
    On the Scope 3 Clean Room Collaboration page, select reporting, then Associate table.

4. Create and run the SQL query

Queries can now be run inside the Reporting Account (shown in Figure 8).

Query results in the Clean Rooms Reporting Account

Figure 8. Query results in the Clean Rooms Reporting Account

  • Select an S3 destination to output the query results. Select Action, then Set results settings.
  • Enter the S3 bucket name, then click Save changes.
  • Paste this SQL snippet inside the query text editor (see Figure 8):

SELECT
  r.product AS “Product”,
SUM(p.s3_upstream_purchased_good) AS “Scope_3_Purchased_Goods_Emissions”,
SUM(t.s3_upstream_transportation) AS “Scope_3_Transportation_Emissions”
FROM
reporting r
  INNER JOIN purchase p ON r.ingredient = p.item
  INNER JOIN transportation t ON p.item = t.item
GROUP BY
  r.product

  • Click Run query. The query results should appear after a few minutes on the initial query, but will take less time for subsequent queries.

Conclusion

This example shows how Clean Rooms can aggregate data across collaborators to produce total Scope 3 emissions for each product from purchased goods and transportation. This query was performed between three organizations without revealing underlying emission factors or proprietary product recipe to one another. This alleviates data confidentially concerns and improves sustainability reporting transparency.

Clean Up

The following steps are taken to clean up all resources created in this walkthrough:

  • Member and Collaboration Accounts:
    1. AWS Clean Rooms: Disassociate and delete collaboration tables
    2. AWS Clean Rooms: Remove member account in the collaboration
    3. AWS Glue: Delete the crawler, database, and tables
    4. AWS IAM: Delete the AWS Clean Rooms service policy
    5. Amazon S3: Delete the CSV file storage buckets
      ·
  • Collaboration Account only:
    1. Amazon S3: delete the SQL query bucket
    2. AWS Clean Rooms: delete the Scope 3 Clean Room Collaboration

Further Reading:

Security Practices

Get custom data into Amazon Security Lake through ingesting Azure activity logs

Post Syndicated from Adam Plotzker original https://aws.amazon.com/blogs/security/get-custom-data-into-amazon-security-lake-through-ingesting-azure-activity-logs/

Amazon Security Lake automatically centralizes security data from both cloud and on-premises sources into a purpose-built data lake stored on a particular AWS delegated administrator account for Amazon Security Lake.

In this blog post, I will show you how to configure your Amazon Security Lake solution with cloud activity data from Microsoft Azure Monitor activity log, which you can query alongside your existing AWS CloudTrail data. I will walk you through the required steps — from configuring the required AWS Identity and Access Management (IAM) permissions, AWS Glue jobs, and Amazon Kinesis Data Streams required on the AWS side to forwarding that data from within Azure.

When you turn on Amazon Security Lake, it begins to collect actionable security data from various AWS sources. However, many enterprises today have complex environments that include a mix of different cloud resources in addition to on-premises data centers.

Although the AWS data sources in Amazon Security Lake encompass a large amount of the necessary security data needed for analysis, you may miss the full picture if your infrastructure operates across multiple cloud venders (for example, AWS, Azure, and Google Cloud Platform) and on-premises at the same time. By querying data from across your entire infrastructure, you can increase the number of indicators of compromise (IOC) that you identify, and thus increase the likelihood that those indicators will lead to actionable outputs.

Solution architecture

Figure 1 shows how to configure data to travel from an Azure event hub to Amazon Security Lake.

Figure 1: Solution architecture

Figure 1: Solution architecture

As shown in Figure 1, the solution involves the following steps:

  1. An AWS user instantiates the required AWS services and features that enable the process to function, including AWS Identity and Access Management (IAM) permissions, Kinesis data streams, AWS Glue jobs, and Amazon Simple Storage Service (Amazon S3) buckets, either manually or through an AWS CloudFormation template, such as the one we will use in this post.
  2. In response to the custom source created from the CloudFormation template, a Security Lake table is generated in AWS Glue.
  3. From this point on, Azure activity logs in their native format are stored within an Azure cloud event hub within an Azure account. An Azure function is deployed to respond to new events within the Azure event hub and forward these logs over the internet to the Kinesis data stream that was created in the preceding step.
  4. The Kinesis data stream forwards the data to an AWS Glue streaming job fronted by the Kinesis data.
  5. The AWS Glue job then performs the extract, transfer, and load (ETL) mapping to the appropriate Open Cybersecurity Schema Framework (OCSF) (specified for API Activity events at OCSF API Activity Mappings).
  6. The Azure events are partitioned with respect to the required partitioning requirements in Amazon Security Lake tables and stored in S3.
  7. The user can query these tables by using Amazon Athena alongside the rest of their data inside Amazon Security Lake.

Prerequisites

Before you implement the solution, complete the following prerequisites:

  • Verify that you have enabled Amazon Security Lake in the AWS Regions that correspond to the Azure Activity logs that you will forward. For more information, see What is Amazon Security Lake?
  • Preconfigure the custom source logging for the source AZURE_ACTIVITY in your Region. To configure this custom source in Amazon Security Lake, open the Amazon Security Lake console, navigate to Create custom data source, and do the following, as shown in Figure 2:
    • For Data source name, enter AZURE_ACTIVITY.
    • For Event class, select API_ACTIVITY.
    • For Account Id, enter the ID of the account which is authorized to write data to your data lake.
    • For External Id, enter “AZURE_ACTIVITY-<YYYYMMDD>
    Figure 2:  Configure custom data source

    Figure 2: Configure custom data source

For more information on how to configure custom sources for Amazon Security Lake, see Collecting data from custom sources.

Step 1: Configure AWS services for Azure activity logging

The first step is to configure the AWS services for Azure activity logging.

  1. To configure Azure activity logging in Amazon Security Lake, first prepare the assets required in the target AWS account. You can automate this process by using the provided CloudFormation template — Security Lake CloudFormation — which will do the heavy lifting for this portion of the setup.

    Note: I have predefined these scripts to create the AWS assets required to ingest Azure activity logs, but you can generalize this process for other external log sources, as well.

    The CloudFormation template has the following components:

    • securitylakeGlueStreamingRole — includes the following managed policies:
      • AWSLambdaKinesisExecutionRole
      • AWSGlueServiceRole
    • securitylakeGlueStreamingPolicy — includes the following attributes:
      • “s3:GetObject”
      • “s3:PutObject”
    • securitylakeAzureActivityStream — This Kinesis data stream is the endpoint that acts as the connection point between Azure and AWS and the frontend of the AWS Glue stream that feeds Azure activity logs to Amazon Security Lake.
    • securitylakeAzureActivityJob — This is an AWS Glue streaming job that is used to take in feeds from the Kinesis data stream and map the Azure activity logs within that stream to OCSF.
    • securitylake-glue-assets S3 bucket — This is the S3 bucket that is used to store the ETL scripts used in the AWS Glue job to map Azure activity logs.

    Running the CloudFormation template will instantiate the aforementioned assets in your AWS delegated administrator account for Amazon Security Lake.

  2. The CloudFormation template creates a new S3 bucket with the following syntax: securityLake-glue-assets-<ACCOUNT-ID><REGION>. After the CloudFormation run is complete, navigate to this bucket within the S3 console.
  3. Within the S3 bucket, create a scripts and temporary folder in the S3 bucket, as shown in Figure 4.
    Figure 4: Glue assets bucket

    Figure 4: Glue assets bucket

  4. Update the Azure AWS Glue Pyspark script by replacing the following values in the file. You will attach this script to your AWS Glue job and use it to generate the AWS assets required for the implementation.
    • Replace <AWS_REGION_NAME> with the Region that you are operating in — for example, us-east-2.
    • Replace <AWS_ACCOUNT_ID> with the account ID of your delegated administrator account for Amazon Security Lake — for example, 111122223333.
    • Replace <SECURITYLAKE-AZURE-STREAM-ARN> with the Kinesis stream name created through the CloudFormation template. To find the stream name, open the Kinesis console, navigate to the Kinesis stream with the name securityLakeAzureActivityStream<STREAM-UID>, and copy the Amazon Resource Name (ARN), as shown in the following figure.

      Figure 5: Kinesis stream ARN

      Figure 5: Kinesis stream ARN

    • Replace <SECURITYLAKE-BUCKET-NAME> with the name of your data lake S3 bucket root name — for example, s3://aws-security-data-lake-DOC-EXAMPLE-BUCKET.

    After you replace these values, navigate within the scripts folder and upload the AWS Glue PySpark Python script named azure-activity-pyspark.py, as shown in Figure 6.

    Figure 6: AWS Glue script

    Figure 6: AWS Glue script

  5. Within your AWS Glue job, choose Job details and configure the job as follows:
    • For Type, select Spark Streaming.
    • For Language, select Python 3.
    • For Script path, select the S3 path that you created in the preceding step.
    • For Temporary path, select the S3 path that you created in the preceding step.
  6. Save the changes, and run the AWS Glue job by selecting Save and then Run.
  7. Choose the Runs tab, and make sure that the Run status of the job is Running.
    igure 7: AWS Glue job status

    Figure 7: AWS Glue job status

At this point, you have finished the configurations from AWS.

Step 2: Configure Azure services for Azure activity log forwarding

You will complete the next steps in the Azure Cloud console. You need to configure Azure to export activity logs to an Azure cloud event hub within your desired Azure account or organization. Additionally, you need to create an Azure function to respond to new events within the Azure event hub and forward those logs over the internet to the Kinesis data stream that the CloudFormation template created in the initial steps of this post.

For information about how to set up and configure Azure Functions to respond to event hubs, see Azure Event Hubs Trigger for Azure Functions in the Azure documentation.

Configure the following Python script — Azure Event Hub Function — in an Azure function app. This function is designed to respond to event hub events, create a connection to AWS, and forward those events to Kinesis as deserialized JSON blobs.

In the script, replace the following variables with your own information:

  • For <SECURITYLAKE-AZURE-STREAM-ARN>, enter the Kinesis data stream ARN.
  • For <SECURITYLAKE-AZURE-STREAM-NAME>, enter the Kinesis data stream name.
  • For <SECURITYLAKE-AZURE-STREAM-KEYID>, enter the AWS Key Management Service (AWS KMS) key ID created through the CloudFormation template.

The <SECURITYLAKE-AZURE-STREAM-ARN> and securityLakeAzureActivityStream<STREAM-UID> are the same variables that you obtained earlier in this post (see Figure 5).

You can find the AWS KMS key ID within the AWS KMS managed key policy associated with securityLakeAzureActivityStream. For example, in the key policy shown in Figure 8, the <SECURITYLAKE-AZURE-STREAM-KEYID> is shown in line 3.

Figure 8: Kinesis data stream inputs

Figure 8: Kinesis data stream inputs

Important: When you are working with KMS keys retrieved from the AWS console or AWS API keys within Azure, you should be extremely mindful of how you approach key management. Improper or poor handling of keys could result in the interception of data from the Kinesis stream or Azure function.

It’s a best security practice to use a trusted key management architecture that uses sufficient encryption and security protocols when working with keys that safeguard sensitive security information. Within Azure, consider using services such as the AWS Azure AD integration for seamless and ephemeral credential usage inside of the azure function. See – Azure AD Integration – for more information on how the Azure AD Integration works to safeguard and manage stored security keys and help make sure that no keys are accessible to unauthorized parties or stored as unencrypted text outside the AWS console.

Step 3: Validate the workflow and query Athena

After you complete the preceding steps, your logs should be flowing. To make sure that the process is working correctly, complete the following steps.

  1. In the Kinesis Data Streams console, verify that the logs are flowing to your data stream. Open the Kinesis stream that you created previously, choose the Data viewer tab, and then choose Get records, as shown in Figure 9.
    Figure 9: Kinesis data stream inputs

    Figure 9: Kinesis data stream inputs

  2. Verify that the logs are partitioned and stored within the correct Security Lake bucket associated with the configured Region. The log partitions within the Security Lake bucket should have the following syntax — “region=<region>/account_id=<account_id>/eventDay=<YYYYMMDD>/”, and they should be stored with the expected parquet compression.
     Figure 10: S3 bucket with object

    Figure 10: S3 bucket with object

  3. Assuming that CloudTrail logs exist within your Amazon Security Lake instance as well, you can now create a query in Athena that pulls data from the newly created Azure activity table and examine it alongside your existing CloudTrail logs by running queries such as the following:
    SELECT 
        api.operation,
        actor.user.uid,
        actor.user.name,
        src_endpoint.ip,
        time,
        severity,
        metadata.version,
        metadata.product.name,
        metadata.product.vendor_name,
        category_name,
        activity_name,
        type_uid,
    FROM {SECURITY-LAKE-DB}.{SECURITY-LAKE-AZURE-TABLE}
    UNION ALL
    SELECT 
        api.operation,
        actor.user.uid,
        actor.user.name,
        src_endpoint.ip,
        time,
        severity,
        metadata.version,
        metadata.product.name,
        metadata.product.vendor_name,
        category_name,
        activity_name,
        type_uid,
    FROM {SECURITY-LAKE-DB}.{SECURITY-LAKE-CLOUDTRAIL-TABLE}

    Figure 11:  Query Azure activity and CloudTrail together in Athena

    Figure 11: Query Azure activity and CloudTrail together in Athena

For additional guidance on how to configure access and query Amazon Security Lake in Athena, see the following resources:

Conclusion

In this blog post, you learned how to create and deploy the AWS and Microsoft Azure assets needed to bring your own data to Amazon Security Lake. By creating an AWS Glue streaming job that can transform Azure activity data streams and by fronting that AWS Glue job with a Kinesis stream, you can open Amazon Security Lake to intake from external Azure activity data streams.

You also learned how to configure Azure assets so that your Azure activity logs can stream to your Kinesis endpoint. The combination of these two creates a working, custom source solution for Azure activity logging.

To get started with Amazon Security Lake, see the Getting Started page, or if you already use Amazon Security Lake and want to read additional blog posts and articles about this service, see Blog posts and articles.

If you have feedback about this blog post, submit comments in the Comments section below. If you have questions about this blog post, start a new thread on Amazon Security Lake re:Post or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Adam Plotzker

Adam Plotzker

Adam is currently a Security Engineer at AWS, working primarily on the Amazon Security Lake solution. One of the things he enjoys most about his work at AWS is his ability to be creative when exploring customer needs and coming up with unique solutions that meet those needs.

Amazon Security Lake is now generally available

Post Syndicated from Ross Warren original https://aws.amazon.com/blogs/security/amazon-security-lake-is-now-generally-available/

Today we are thrilled to announce the general availability of Amazon Security Lake, first announced in a preview release at 2022 re:Invent. Security Lake centralizes security data from Amazon Web Services (AWS) environments, software as a service (SaaS) providers, on-premises, and cloud sources into a purpose-built data lake that is stored in your AWS account. With Open Cybersecurity Schema Framework (OCSF) support, the service normalizes and combines security data from AWS and a broad range of security data sources. This helps provide your team of analysts and security engineers with broad visibility to investigate and respond to security events, which can facilitate timely responses and helps to improve your security across multicloud and hybrid environments.

Figure 1 shows how Security Lake works, step by step. In this post, we discuss these steps, highlight some of the most popular use cases for Security Lake, and share the latest enhancements and updates that we have made since the preview launch.

Figure 1: How Security Lake works

Figure 1: How Security Lake works

Target use cases

In this section, we showcase some of the use cases that customers have found to be most valuable while the service was in preview.

Facilitate your security investigations with elevated visibility

Amazon Security Lake helps to streamline security investigations by aggregating, normalizing, and optimizing data storage in a single security data lake. Security Lake automatically normalizes AWS logs and security findings to the OCSF schema. This includes AWS CloudTrail management events, Amazon Virtual Private Cloud (Amazon VPC) Flow Logs, Amazon Route 53 Resolver query logs, and AWS Security Hub security findings from Amazon security services, including Amazon GuardDuty, Amazon Inspector, and AWS IAM Access Analyzer, as well as security findings from over 50 partner solutions. By having security-related logs and findings in a centralized location, and in the same format, Security Operations teams can streamline their process and devote more time to investigating security issues. This centralization reduces the need to spend valuable time collecting and normalizing logs into a specific format.

Figure 2 shows the Security Lake activation page, which presents users with options to enable log sources, AWS Regions, and accounts.

Figure 2: Security Lake activation page with options to enable log sources, Regions, and accounts

Figure 2: Security Lake activation page with options to enable log sources, Regions, and accounts

Figure 3 shows another section of the Security Lake activation page, which presents users with options to set rollup Regions and storage classes.

Figure 3: Security Lake activation page with options to select a rollup Region and set storage classes

Figure 3: Security Lake activation page with options to select a rollup Region and set storage classes

Simplify your compliance monitoring and reporting

With Security Lake, customers can centralize security data into one or more rollup Regions, which can help teams to simplify their regional compliance and reporting obligations. Teams often face challenges when monitoring for compliance across multiple log sources, Regions, and accounts. By using Security Lake to collect and centralize this evidence, security teams can significantly reduce the time spent on log discovery and allocate more time towards compliance monitoring and reporting.

Analyze multiple years of security data quickly

Security Lake offers integration with third-party security services such as security information and event management (SIEM) and extended detection and response (XDR) tools, as well as popular data analytics services like Amazon Athena and Amazon OpenSearch Service to quickly analyze petabytes of data. This enables security teams to gain deep insights into their security data and take nimble measures to help protect their organization. Security Lake helps enforce least-privilege controls for teams across organizations by centralizing data and implementing robust access controls, automatically applying policies that are scoped to the required subscribers and sources. Data custodians can use the built-in features to create and enforce granular access controls, such as to restrict access to the data in the security lake to only those who require it.

Figure 4 depicts the process of creating a data access subscriber within Security Lake.

Figure 4: Creating a data access subscriber in Security Lake

Figure 4: Creating a data access subscriber in Security Lake

Unify security data management across hybrid environments

The centralized data repository in Security Lake provides a comprehensive view of security data across hybrid and multicloud environments, helping security teams to better understand and respond to threats. You can use Security Lake to store security-related logs and data from various sources, including cloud-based and on-premises systems, making it simpler to collect and analyze security data. Additionally, by using automation and machine learning solutions, security teams can help identify anomalies and potential security risks more efficiently. This can ultimately lead to better risk management and enhance the overall security posture for the organization. Figure 5 illustrates the process of querying AWS CloudTrail and Microsoft Azure audit logs simultaneously by using Amazon Athena.

Figure 5: Querying AWS CloudTrail and Microsoft Azure audit logs together in Amazon Athena

Figure 5: Querying AWS CloudTrail and Microsoft Azure audit logs together in Amazon Athena

Updates since preview launch

Security Lake automatically normalizes logs and events from natively supported AWS services to the OCSF schema. With the general availability release, Security Lake now supports the latest version of OCSF, which is version 1 rc2. CloudTrail management events are now normalized into three distinct OCSF event classes: Authentication, Account Change, and API Activity.

We made various improvements to resource names and schema mapping to enhance the usability of logs. Onboarding is made simpler with automated AWS Identity and Access Management (IAM) role creation from the console. Additionally, you have the flexibility to collect CloudTrail sources independently including management events, Amazon Simple Storage Service (Amazon S3) data events, and AWS Lambda events.

To enhance query performance, we made a transition from hourly to daily time partitioning in Amazon S3, resulting in faster and more efficient data retrieval. Also, we added Amazon CloudWatch metrics to enable proactive monitoring of your log ingestion process to facilitate the identification of collection gaps or surges.

New Security Lake account holders are eligible for a 15-day free trial in supported Regions. Security Lake is now generally available in the following AWS Regions: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Europe (Frankfurt), Europe (Ireland), Europe (London), and South America (São Paolo).

Ecosystem integrations

We have expanded our support for third-party integrations and have added 23 new partners. This includes 10 source partners — Aqua Security, Claroty, Confluent, Darktrace, ExtraHop, Gigamon, Sentra, Torq, Trellix, and Uptycs — enabling them to send data directly to Security Lake. Additionally, we have integrated with nine new subscribing partners — ChaosSearch, New Relic, Ripjar, SOC Prime, Stellar Cyber, Swimlane, Tines, Torq, and Wazuh. We have also established six new services partners, including Booz Allen Hamilton, CMD Solutions, part of Mantel Group, Infosys, Insbuilt, Leidos, and Tata Consultancy Services.

In addition, Security Lake supports third-party sources that provide OCSF security data. Notable partners include Barracuda Networks, Cisco, Cribl, CrowdStrike, CyberArk, Lacework, Laminar, NETSCOUT, Netskope, Okta, Orca, Palo Alto Networks, Ping Identity, Tanium, The Falco Project, Trend Micro, Vectra AI, VMware, Wiz, and Zscaler. We have integrated with various third-party security, automation, and analytics tools. This includes Datadog, IBM, Rapid7, SentinelOne, Splunk, Sumo Logic, and Trellix. Lastly, we have partnered with service partners such as Accenture, Eviden , Deloitte, DXC Technology, Kyndryl, PwC, and Wipro, that can work with you and Security Lake to deliver comprehensive solutions.

Get help from AWS Professional Services

The AWS Professional Services organization is a global team of experts that can help customers realize their desired business outcomes when using AWS. Our teams of data architects and security engineers engage with customer Security, IT, and business leaders to develop enterprise solutions. We follow current recommendations to support customers in their journey to integrate data into Security Lake. We integrate ready-built data transformations, visualizations, and AI/machine learning (ML) workflows that help Security Operations teams rapidly realize value. If you are interested in learning more, reach out to your AWS Professional Services account representative.

Summary

We invite you to explore the benefits of using Amazon Security Lake by taking advantage of our 15-day free trial and providing your feedback on your experiences, use cases, and solutions. We have several resources to help you get started and build your first data lake, including comprehensive documentation, demo videos, and webinars. By giving Security Lake a try, you can experience firsthand how it helps you centralize, normalize, and optimize your security data, and ultimately streamline your organization’s security incident detection and response across multicloud and hybrid environments.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Author

Ross Warren

Ross is a Senior Product SA at AWS for Amazon Security Lake based in Northern Virginia. Prior to his work at AWS, Ross’ areas of focus included cyber threat hunting and security operations. Outside of work, he likes to spend time with his family, bake bread, make sawdust and enjoy time outside.

Nisha Amthul

Nisha Amthul

Nisha is a Senior Product Marketing Manager at AWS Security, specializing in detection and response solutions. She has a strong foundation in product management and product marketing within the domains of information security and data protection. When not at work, you’ll find her cake decorating, strength training, and chasing after her two energetic kiddos, embracing the joys of motherhood.

Jonathan Garzon

Jonathan Garzon

Jonathan is a Senior Product Management leader at AWS with a passion for building products with delightful customer experiences and solving complex problems. He has launched and managed products in various domains, including networking, cybersecurity, and data analytics. Outside of work, Jonathan enjoys spending time with friends and family, playing soccer, mountain biking, hiking, and playing the guitar.

Join a streaming data source with CDC data for real-time serverless data analytics using AWS Glue, AWS DMS, and Amazon DynamoDB

Post Syndicated from Manish Kola original https://aws.amazon.com/blogs/big-data/join-streaming-source-cdc-glue/

Customers have been using data warehousing solutions to perform their traditional analytics tasks. Recently, data lakes have gained lot of traction to become the foundation for analytical solutions, because they come with benefits such as scalability, fault tolerance, and support for structured, semi-structured, and unstructured datasets.

Data lakes are not transactional by default; however, there are multiple open-source frameworks that enhance data lakes with ACID properties, providing a best of both worlds solution between transactional and non-transactional storage mechanisms.

Traditional batch ingestion and processing pipelines that involve operations such as data cleaning and joining with reference data are straightforward to create and cost-efficient to maintain. However, there is a challenge to ingest datasets, such as Internet of Things (IoT) and clickstreams, at a fast rate with near-real-time delivery SLAs. You will also want to apply incremental updates with change data capture (CDC) from the source system to the destination. To make data-driven decisions in a timely manner, you need to account for missed records and backpressure, and maintain event ordering and integrity, especially if the reference data also changes rapidly.

In this post, we aim to address these challenges. We provide a step-by-step guide to join streaming data to a reference table changing in real time using AWS Glue, Amazon DynamoDB, and AWS Database Migration Service (AWS DMS). We also demonstrate how to ingest streaming data to a transactional data lake using Apache Hudi to achieve incremental updates with ACID transactions.

Solution overview

For our example use case, streaming data is coming through Amazon Kinesis Data Streams, and reference data is managed in MySQL. The reference data is continuously replicated from MySQL to DynamoDB through AWS DMS. The requirement here is to enrich the real-time stream data by joining with the reference data in near-real time, and to make it queryable from a query engine such as Amazon Athena while keeping consistency. In this use case, reference data in MySQL can be updated when the requirement is changed, and then queries need to return results by reflecting updates in the reference data.

This solution addresses the issue of users wanting to join streams with changing reference datasets when the size of the reference dataset is small. The reference data is maintained in DynamoDB tables, and the streaming job loads the full table into memory for each micro-batch, joining a high-throughput stream to a small reference dataset.

The following diagram illustrates the solution architecture.

Architecture

Prerequisites

For this walkthrough, you should have the following prerequisites:

Create IAM roles and S3 bucket

In this section, you create an Amazon Simple Storage Service (Amazon S3) bucket and two AWS Identity and Access Management (IAM) roles: one for the AWS Glue job, and one for AWS DMS. We do this using an AWS CloudFormation template. Complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack::
  3. Choose Next.
  4. For Stack name, enter a name for your stack.
  5. For DynamoDBTableName, enter tgt_country_lookup_table. This is the name of your new DynamoDB table.
  6. For S3BucketNamePrefix, enter the prefix of your new S3 bucket.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

Stack creation can take about 1 minute.

Create a Kinesis data stream

In this section, you create a Kinesis data stream:

  1. On the Kinesis console, choose Data streams in the navigation pane.
  2. Choose Create data stream.
  3. For Data stream name, enter your stream name.
  4. Leave the remaining settings as default and choose Create data stream.

A Kinesis data stream is created with on-demand mode.

Create and configure an Aurora MySQL cluster

In this section, you create and configure an Aurora MySQL cluster as the source database. First, configure your source Aurora MySQL database cluster to enable CDC through AWS DMS to DynamoDB.

Create a parameter group

Complete the following steps to create a new parameter group:

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Choose Create parameter group.
  3. For Parameter group family, select aurora-mysql5.7.
  4. For Type, choose DB Cluster Parameter Group.
  5. For Group name, enter my-mysql-dynamodb-cdc.
  6. For Description, enter Parameter group for demo Aurora MySQL database.
  7. Choose Create.
  8. Select my-mysql-dynamodb-cdc, and choose Edit under Parameter group actions.
  9. Edit the parameter group as follows:
Name Value
binlog_row_image full
binlog_format ROW
binlog_checksum NONE
log_slave_updates 1
  1. Choose Save changes.

RDS parameter group

Create the Aurora MySQL cluster

Complete following steps to create the Aurora MySQL cluster:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Choose Create database.
  3. For Choose a database creation method, choose Standard create.
  4. Under Engine options, for Engine type, choose Aurora (MySQL Compatible).
  5. For Engine version, choose Aurora (MySQL 5.7) 2.11.2.
  6. For Templates, choose Production.
  7. Under Settings, for DB cluster identifier, enter a name for your database.
  8. For Master username, enter your primary user name.
  9. For Master password and Confirm master password, enter your primary password.
  10. Under Instance configuration, for DB instance class, choose Burstable classes (includes t classes) and choose db.t3.small.
  11. Under Availability & durability, for Multi-AZ deployment, choose Don’t create an Aurora Replica.
  12. Under Connectivity, for Compute resource, choose Don’t connect to an EC2 compute resource.
  13. For Network type, choose IPv4.
  14. For Virtual private cloud (VPC), choose your VPC.
  15. For DB subnet group, choose your public subnet.
  16. For Public access, choose Yes.
  17. For VPC security group (firewall), choose the security group for your public subnet.
  18. Under Database authentication, for Database authentication options, choose Password authentication.
  19. Under Additional configuration, for DB cluster parameter group, choose the cluster parameter group you created earlier.
  20. Choose Create database.

Grant permissions to the source database

The next step is to grant the required permission on the source Aurora MySQL database. Now you can connect to the DB cluster using the MySQL utility. You can run queries to complete the following tasks:

  • Create a demo database and table and run queries on the data
  • Grant permission for a user used by the AWS DMS endpoint

Complete the following steps:

  1. Log in to the EC2 instance that you’re using to connect to your DB cluster.
  2. Enter the following command at the command prompt to connect to the primary DB instance of your DB cluster:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p
  1. Run the following SQL command to create a database:
> CREATE DATABASE mydev;
  1. Run the following SQL command to create a table:
> use mydev; 
> CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);
  1. Run the following SQL command to populate the table with data:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');
  1. Run the following SQL command to create a user for the AWS DMS endpoint and grant permissions for CDC tasks (replace the placeholder with your preferred password):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Create and configure AWS DMS resources to load data into the DynamoDB reference table

In this section, you create and configure AWS DMS to replicate data into the DynamoDB reference table.

Create an AWS DMS replication instance

First, create an AWS DMS replication instance by completing the following steps:

  1. On the AWS DMS console, choose Replication instances in the navigation pane.
  2. Choose Create replication instance.
  3. Under Settings, for Name, enter a name for your instance.
  4. Under Instance configuration, for High Availability, choose Dev or test workload (Single-AZ).
  5. Under Connectivity and security, for VPC security groups, choose default.
  6. Choose Create replication instance.

Create Amazon VPC endpoints

Optionally, you can create Amazon VPC endpoints for DynamoDB when you need to connect to your DynamoDB table from the AWS DMS instance in a private network. Also make sure that you enable Publicly accessible when you need to connect to a database outside of your VPC.

Create an AWS DMS source endpoint

Create an AWS DMS source endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Source endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Source engine, choose Amazon Aurora MySQL.
  6. For Access to endpoint database, choose Provide access information manually.
  7. For Server Name, enter the endpoint name of your Aurora writer instance (for example, mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. For Port, enter 3306.
  9. For User name, enter a user name for your AWS DMS task.
  10. For Password, enter a password.
  11. Choose Create endpoint.

Crate an AWS DMS target endpoint

Create an AWS DMS target endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Target endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Target engine, choose Amazon DynamoDB.
  6. For Service access role ARN, enter the IAM role for your AWS DMS task.
  7. Choose Create endpoint.

Create AWS DMS migration tasks

Create AWS DMS database migration tasks by completing the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. Under Task configuration, for Task identifier, enter a name for your task.
  4. For Replication instance, choose your replication instance.
  5. For Source database endpoint, choose your source endpoint.
  6. For Target database endpoint, choose your target endpoint.
  7. For Migration type, choose Migrate existing data and replicate ongoing changes.
  8. Under Task settings, for Target table preparation mode, choose Do nothing.
  9. For Stop task after full load completes, choose Don’t stop.
  10. For LOB column settings, choose Limited LOB mode.
  11. For Task logs, enable Turn on CloudWatch logs and Turn on batch-optimized apply.
  12. Under Table mappings, choose JSON Editor and enter the following rules.

Here you can add values to the column. With the following rules, the AWS DMS CDC task will first create a new DynamoDB table with the specified name in target-table-name. Then it will replicate all the records, mapping the columns in the DB table to the attributes in the DynamoDB table.

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "target-table-name": "tgt_country_lookup_table",
            "mapping-parameters": {
                "partition-key-name": "code",
                "sort-key-name": "countryname",
                "exclude-columns": [
                    "code",
                    "countryname"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "code",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${code}"
                    },
                    {
                        "target-attribute-name": "countryname",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${countryname}"
                    }
                ],
                "apply-during-cdc": true
            }
        }
    ]
}

DMS table mapping

  1. Choose Create task.

Now the AWS DMS replication task has been started.

  1. Wait for the Status to show as Load complete.

DMS task

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Select the DynamoDB reference table, and choose Explore table items to review the replicated records.

DynamoDB reference table initial

Create an AWS Glue Data Catalog table and an AWS Glue streaming ETL job

In this section, you create an AWS Glue Data Catalog table and an AWS Glue streaming extract, transform, and load (ETL) job.

Create a Data Catalog table

Create an AWS Glue Data Catalog table for the source Kinesis data stream with the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose Add database.
  3. For Name, enter my_kinesis_db.
  4. Choose Create database.
  5. Choose Tables under Databases, then choose Add table.
  6. For Name, enter my_stream_src_table.
  7. For Database, choose my_kinesis_db.
  8. For Select the type of source, choose Kinesis.
  9. For Kinesis data stream is located in, choose my account.
  10. For Kinesis stream name, enter a name for your data stream.
  11. For Classification, select JSON.
  12. Choose Next.
  13. Choose Edit schema as JSON, enter the following JSON, then choose Save.
[
  {
    "Name": "uuid",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "country",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "itemtype",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "saleschannel",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderpriority",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "region",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "shipdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitssold",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitprice",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalrevenue",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalprofit",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "impressiontime",
    "Type": "string",
    "Comment": ""
  }
]

Glue Catalog table schema

    1. Choose Next, then choose Create.

Create an AWS Glue streaming ETL job

Next, you create an AWS Glue streaming job. AWS Glue 3.0 and later supports Apache Hudi natively, so we use this native integration to ingest into a Hudi table. Complete the following steps to create the AWS Glue streaming job:

  1. On the AWS Glue Studio console, choose Spark script editor and choose Create.
  2. Under Job details tab, for Name, enter a name for your job.
  3. For IAM Role, choose the IAM role for your AWS Glue job.
  4. For Type, select Spark Streaming.
  5. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
  6. For Requested number of workers, enter 3.
  7. Under Advanced properties, for Job parameters, choose Add new parameter.
  8. For Key, enter --conf.
  9. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Choose Add new parameter.
  11. For Key, enter --datalake-formats.
  12. For Value, enter hudi.
  13. For Script path, enter s3://<S3BucketName>/scripts/.
  14. For Temporary path, enter s3://<S3BucketName>/temporary/.
  15. Optionally, for Spark UI logs path, enter s3://<S3BucketName>/sparkHistoryLogs/.

Glue job parameter

  1. On the Script tab, enter the following script into the AWS Glue Studio editor and choose Create.

The near-real-time streaming job enriches data by joining a Kinesis data stream with a DynamoDB table that contains frequently updated reference data. The enriched dataset is loaded into the target Hudi table in the data lake. Replace <S3BucketName> with your bucket that you created via AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv,["JOB_NAME"])

# Initialize spark session and Glue context
sc = SparkContext() 
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" 
kin_src_table_name = "my_stream_src_table" 
hudi_write_operation = "upsert" 
hudi_record_key = "uuid" 
hudi_precomb_key = "orderdate" 
checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" 
s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db"

# hudi options 
additional_options={
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.write.recordkey.field": hudi_record_key,
    "hoodie.datasource.hive_sync.database": hudi_database,
    "hoodie.table.name": hudi_table,
    "hoodie.consistency.check.enabled": "true",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
    "hoodie.datasource.write.hive_style_partitioning": "false",
    "hoodie.datasource.write.precombine.field": hudi_precomb_key,
    "hoodie.bulkinsert.shuffle.parallelism": "4",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.write.operation": hudi_write_operation,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
}

# Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb():
    dynamodb = boto3.resource(“dynamodb”)
    table = dynamodb.Table(dydb_lookup_table)
    response = table.scan()
    items = response[“Items”]
    jsondata = sc.parallelize(items)
    lookupDf = glueContext.read.json(jsondata)
    return lookupDf


# Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog(
    database=kin_src_database_name,
    table_name=kin_src_table_name,
    transformation_ctx=”source_df”,
    additional_options={“startingPosition”: “TRIM_HORIZON”},
)

# As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId):
    if data_frame.count() > 0:

        # Refresh the dymanodb table to pull latest snapshot for each microbatch
        country_lookup_df = readDynamoDb() 
                
        final_frame = data_frame.join(
            country_lookup_df, 
            data_frame["country"] == country_lookup_df["countryname"], 
            'left'
        ).drop(
            "countryname",
            "country",
            "unitprice", 
            "unitcost",
            "totalrevenue",
            "totalcost",
            "totalprofit"
        )

        # Script generated for node my-lab-hudi-connector
        final_frame.write.format("hudi") \
            .options(**additional_options) \
            .mode("append") \
            .save(s3_output_folder)
        
try:
    glueContext.forEachBatch(
        frame=source_df,
        batch_function=processBatch,
        options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path},
    )
except Exception as e:
    print(f"Error is @@@ ....{e}")
  1. Choose Run to start the streaming job.

The following screenshot shows examples of the DataFrames data_frame, country_lookup_df, and final_frame.

Glue job log output initial

The AWS Glue job successfully joined records coming from the Kinesis data stream and the reference table in DynamoDB, and then ingested the joined records into Amazon S3 in Hudi format.

Create and run a Python script to generate sample data and load it into the Kinesis data stream

In this section, you create and run a Python to generate sample data and load it into the source Kinesis data stream. Complete the following steps:

  1. Log in to AWS Cloud9, your EC2 instance, or any other computing host that puts records in your data stream.
  2. Create a Python file called generate-data-for-kds.py:
$ python3 generate-data-for-kds.py
  1. Open the Python file and enter the following script:
import json
import random
import boto3
import time

STREAM_NAME = "<mystreamname>"

def get_data():
    return {
        "uuid": random.randrange(0, 1000001, 1),
        "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ),
        "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "orderpriority": random.choice(["H", "L", "M", "C"]),
        "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12",
                                      "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", 
                                      "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", 
                                      "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ),
        "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ),
        "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", 
                                    "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14",
                                      "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", 
                                      "2/25/17", "3/10/17", "4/1/17", ] ),
        "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", 
                                     "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", 
                                     "8072", "65", "7864", "9778", ] ),
        "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                     "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                     "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                    "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                    "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", 
                                        "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", 
                                        "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", 
                                        "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", 
                                        "5255275.28", "463966.1", ] ),
        "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", 
                                     "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06",
                                       "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", 
                                       "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", 
                                       "3951974.56", "310842.62", ] ),
        "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", 
                                       "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", 
                                       "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7",
                                         "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ),
        "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", 
                                          "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", 
                                          "2022-15-24T02:27:41Z", ] ),
    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )
        time.sleep(2)

if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))

This script puts a Kinesis data stream record every 2 seconds.

Simulate updating the reference table in the Aurora MySQL cluster

Now all the resources and configurations are ready. For this example, we want to add a 3-digit country code to the reference table. Let’s update records in the Aurora MySQL table to simulate changes. Complete the following steps:

  1. Make sure that the AWS Glue streaming job is already running.
  2. Connect to the primary DB instance again, as described earlier.
  3. Enter your SQL commands to update records:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Now the reference table in the Aurora MySQL source database has been updated. Then the changes are automatically replicated to the reference table in DynamoDB.

DynamoDB reference table updated

The following tables show records in data_frame, country_lookup_df, and final_frame. In country_lookup_df and final_frame, the combinedname column has values formatted as <2-digit-country-code>-<3-digit-country-code>-<country-name>, which shows that the changed records in the referenced table are reflected in the table without restarting the AWS Glue streaming job. It means that the AWS Glue job successfully joins the incoming records from the Kinesis data stream with the reference table even when the reference table is changing.
Glue job log output updated

Query the Hudi table using Athena

Let’s query the Hudi table using Athena to see the records in the destination table. Complete the following steps:

  1. Make sure that the script and the AWS Glue Streaming job is still working:
    1. The Python script (generate-data-for-kds.py) is still running.
    2. The generated data is being sent to the data stream.
    3. The AWS Glue streaming job is still running.
  2. On the Athena console, run the following SQL in the query editor:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

The following query result shows the records that are processed before the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<country-name>.

Athena query result initial

The following query result shows the records that are processed after the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena query result updated

Now you understand that the changed reference data is successfully reflected in the target Hudi table joining records from the Kinesis data stream and the reference data in DynamoDB.

Clean up

As the final step, clean up the resources:

  1. Delete the Kinesis data stream.
  2. Delete the AWS DMS migration task, endpoint, and replication instance.
  3. Stop and delete the AWS Glue streaming job.
  4. Delete the AWS Cloud9 environment.
  5. Delete the CloudFormation template.

Conclusion

Building and maintaining a transactional data lake that involves real-time data ingestion and processing has multiple variable components and decisions to be made, such as what ingestion service to use, how to store your reference data, and what transactional data lake framework to use. In this post, we provided the implementation details of such a pipeline, using AWS native components as the building blocks and Apache Hudi as the open-source framework for a transactional data lake.

We believe that this solution can be a starting point for organizations looking to implement a new data lake with such requirements. Additionally, the different components are fully pluggable and can be mixed and matched to existing data lakes to target new requirements or migrate existing ones, addressing their pain points.


About the authors

Manish Kola is a Data Lab Solutions Architect at AWS, where he works closely with customers across various industries to architect cloud-native solutions for their data analytics and AI needs. He partners with customers on their AWS journey to solve their business problems and build scalable prototypes. Before joining AWS, Manish’s experience includes helping customers implement data warehouse, BI, data integration, and data lake projects.

Santosh Kotagiri is a Solutions Architect at AWS with experience in data analytics and cloud solutions leading to tangible business results. His expertise lies in designing and implementing scalable data analytics solutions for clients across industries, with a focus on cloud-native and open-source services. He is passionate about leveraging technology to drive business growth and solve complex problems.

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Introducing the Enhanced Document API for DynamoDB in the AWS SDK for Java 2.x

Post Syndicated from John Viegas original https://aws.amazon.com/blogs/devops/introducing-the-enhanced-document-api-for-dynamodb-in-the-aws-sdk-for-java-2-x/

We are excited to announce that the AWS SDK for Java 2.x now offers the Enhanced Document API for DynamoDB, providing an enhanced way of working with Amazon DynamoDb items.
This post covers using the Enhanced Document API for DynamoDB with the DynamoDB Enhanced Client. By using the Enhanced Document API, you can create an EnhancedDocument instance to represent an item with no fixed schema, and then use the DynamoDB Enhanced Client to read and write to DynamoDB.
Furthermore, unlike the Document APIs of aws-sdk-java 1.x, which provided arguments and return types that were not type-safe, the EnhancedDocument provides strongly-typed APIs for working with documents. This interface simplifies the development process and ensures that the data is correctly typed.

Prerequisites:

Before getting started, ensure you are using an up-to-date version of the AWS Java SDK dependency with all the latest released bug-fixes and features. For Enhanced Document API support, you must use version 2.20.33 or later. See our “Set up an Apache Maven project” guide for details on how to manage the AWS Java SDK dependency in your project.

Add dependency for dynamodb-enhanced in pom.xml.

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
<version>2.20.33</version>
</dependency>

Quick walk-through for using Enhanced Document API to interact with DDB

Step 1 : Create a DynamoDB Enhanced Client

Create an instance of the DynamoDbEnhancedClient class, which provides a high-level interface for Amazon DynamoDB that simplifies working with DynamoDB tables.

DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
                                               .dynamoDbClient(DynamoDbClient.create())
                                               .build();

Step 2 : Create a DynamoDbTable resource object with Document table schema

To execute commands against a DynamoDB table using the Enhanced Document API, you must associate the table with your Document table schema to create a DynamoDbTable resource object. The Document table schema builder requires the primary index key and attribute converter providers. Use AttributeConverterProvider.defaultProvider() to convert document attributes of default types. An optional secondary index key can be added to the builder.


DynamoDbTable<EnhancedDocument> documentTable = enhancedClient.table("my_table",
                                              TableSchema.documentSchemaBuilder()
                                                         .addIndexPartitionKey(TableMetadata.primaryIndexName(),"hashKey", AttributeValueType.S)
                                                         .addIndexSortKey(TableMetadata.primaryIndexName(), "sortKey", AttributeValueType.N)
                                                         .attributeConverterProviders(AttributeConverterProvider.defaultProvider())
                                                         .build());
                                                         
// call documentTable.createTable() if "my_table" does not exist in DynamoDB

Step 3 : Write a DynamoDB item using an EnhancedDocument

The EnhancedDocument class has static factory methods along with a builder method to add attributes to a document. The following snippet demonstrates the type safety provided by EnhancedDocument when you construct a document item.

EnhancedDocument simpleDoc = EnhancedDocument.builder()
 .attributeConverterProviders(defaultProvider())
 .putString("hashKey", "sampleHash")
 .putNull("nullKey")
 .putNumber("sortKey", 1.0)
 .putBytes("byte", SdkBytes.fromUtf8String("a"))
 .putBoolean("booleanKey", true)
 .build();
 
documentTable.putItem(simpleDoc);

Step 4 : Read a Dynamo DB item as an EnhancedDocument

Attributes of the Documents retrieved from a DynamoDB table can be accessed with getter methods

EnhancedDocument docGetItem = documentTable.getItem(r -> r.key(k -> k.partitionValue("samppleHash").sortValue(1)));

docGetItem.getString("hashKey");
docGetItem.isNull("nullKey")
docGetItem.getNumber("sortKey").floatValue();
docGetItem.getBytes("byte");
docGetItem.getBoolean("booleanKey"); 

AttributeConverterProviders for accessing document attributes as custom objects

You can provide a custom AttributeConverterProvider instance to an EnhancedDocument to convert document attributes to a specific object type.
These providers can be set on either DocumentTableSchema or EnhancedDocument to read or write attributes as custom objects.

TableSchema.documentSchemaBuilder()
           .attributeConverterProviders(CustomClassConverterProvider.create(), defaultProvider())
           .build();
    
// Insert a custom class instance into an EnhancedDocument as attribute 'customMapOfAttribute'.
EnhancedDocument customAttributeDocument =
EnhancedDocument.builder().put("customMapOfAttribute", customClassInstance, CustomClass.class).build();

// Retrieve attribute 'customMapOfAttribute' as CustomClass object.
CustomClass customClassObject = customAttributeDocument.get("customMapOfAttribute", CustomClass.class);

Convert Documents to JSON and vice-versa

The Enhanced Document API allows you to convert a JSON string to an EnhancedDocument and vice-versa.

// Enhanced document created from JSON string using defaultConverterProviders.
EnhancedDocument documentFromJson = EnhancedDocument.fromJson("{\"key\": \"Value\"}")
                                              
// Converting an EnhancedDocument to JSON string "{\"key\": \"Value\"}"                                                 
String jsonFromDocument = documentFromJson.toJson();

Define a Custom Attribute Converter Provider

Custom attribute converter providers are implementations of AttributeConverterProvider that provide converters for custom classes.
Below is an example for a CustomClassForDocumentAPI which has as a single field stringAttribute of type String and its corresponding AttributeConverterProvider implementation.

public class CustomClassForDocumentAPI {
    private final String stringAttribute;

    public CustomClassForDocumentAPI(Builder builder) {
        this.stringAttribute = builder.stringAttribute;
    }
    public static Builder builder() {
        return new Builder();
    }
    public String stringAttribute() {
        return stringAttribute;
    }
    public static final class Builder {
        private String stringAttribute;
        private Builder() {
        }
        public Builder stringAttribute(String stringAttribute) {
            this.stringAttribute = string;
            return this;
        }
        public CustomClassForDocumentAPI build() {
            return new CustomClassForDocumentAPI(this);
        }
    }
}
import java.util.Map;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.utils.ImmutableMap;

public class CustomAttributeForDocumentConverterProvider implements AttributeConverterProvider {
    private final Map<EnhancedType<?>, AttributeConverter<?>> converterCache = ImmutableMap.of(
        EnhancedType.of(CustomClassForDocumentAPI.class), new CustomClassForDocumentAttributeConverter());
        // Different types of converters can be added to this map.

    public static CustomAttributeForDocumentConverterProvider create() {
        return new CustomAttributeForDocumentConverterProvider();
    }

    @Override
    public <T> AttributeConverter<T> converterFor(EnhancedType<T> enhancedType) {
        return (AttributeConverter<T>) converterCache.get(enhancedType);
    }
}

A custom attribute converter is an implementation of AttributeConverter that converts a custom classes to and from a map of attribute values, as shown below.

import java.util.LinkedHashMap;
import java.util.Map;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.enhanced.dynamodb.internal.converter.attribute.EnhancedAttributeValue;
import software.amazon.awssdk.enhanced.dynamodb.internal.converter.attribute.StringAttributeConverter;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class CustomClassForDocumentAttributeConverter implements AttributeConverter<CustomClassForDocumentAPI> {
    public static CustomClassForDocumentAttributeConverter create() {
        return new CustomClassForDocumentAttributeConverter();
    }
    @Override
    public AttributeValue transformFrom(CustomClassForDocumentAPI input) {
        Map<String, AttributeValue> attributeValueMap = new LinkedHashMap<>();
        if(input.string() != null){
            attributeValueMap.put("stringAttribute", AttributeValue.fromS(input.string()));
        }
        return EnhancedAttributeValue.fromMap(attributeValueMap).toAttributeValue();
    }

    @Override
    public CustomClassForDocumentAPI transformTo(AttributeValue input) {
        Map<String, AttributeValue> customAttr = input.m();
        CustomClassForDocumentAPI.Builder builder = CustomClassForDocumentAPI.builder();
        if (customAttr.get("stringAttribute") != null) {
            builder.stringAttribute(StringAttributeConverter.create().transformTo(customAttr.get("stringAttribute")));
        }
        return builder.build();
    }
    @Override
    public EnhancedType<CustomClassForDocumentAPI> type() {
        return EnhancedType.of(CustomClassForDocumentAPI.class);
    }
    @Override
    public AttributeValueType attributeValueType() {
        return AttributeValueType.M;
    }
}

Attribute Converter Provider for EnhancedDocument Builder

When working outside of a DynamoDB table context, make sure to set the attribute converter providers explicitly on the EnhancedDocument builder. When used within a DynamoDB table context, the table schema’s converter provider will be used automatically for the EnhancedDocument.
The code snippet below shows how to set an AttributeConverterProvider using the EnhancedDocument builder method.

// Enhanced document created from JSON string using custom AttributeConverterProvider.
EnhancedDocument documentFromJson = EnhancedDocument.builder()
                                                    .attributeConverterProviders(CustomClassConverterProvider.create())
                                                    .json("{\"key\": \"Values\"}")
                                                    .build();
                                                    
CustomClassForDocumentAPI customClass = documentFromJson.get("key", CustomClassForDocumentAPI.class)

Conclusion

In this blog post we showed you how to set up and begin using the Enhanced Document API with the DynamoDB Enhanced Client and standalone with the EnhancedDocument class. The enhanced client is open-source and resides in the same repository as the AWS SDK for Java 2.0.
We hope you’ll find this new feature useful. You can always share your feedback on our GitHub issues page.

How to use managed dedicated IPs for email sending

Post Syndicated from Tyler Holmes original https://aws.amazon.com/blogs/messaging-and-targeting/how-to-use-managed-dedicated-ips-for-email-sending/

Starting to use dedicated IPs has always been a long, complicated process driven by factors such as the large effort to monitor and maintain the IPs and the costs, both in infrastructure and management of IP and Domain reputation. The Dedicated IP (Managed) feature in Amazon SES eliminates much of the complexity of sending email via dedicated IPs and allows you to start sending through dedicated IPs much faster and with less management overhead.

What’s the Difference Between standard dedicated IPs and managed dedicated IPs?

You can use SES for dedicated IP addresses in two ways: standard and managed. Both allow you to lease dedicated IP addresses for an additional fee, but differ in how they’re configured and managed. While there are shared commonalities they each have unique advantages dependent on your use case, see here for a comparison.

Standard dedicated IPs are manually set up and managed in SES. They allow you full control over your sending reputation but require you to fully manage your dedicated IPs, including warming them up, scaling them out, and managing your pools.

Managed dedicated IPs are a quick, and easy, way to start using dedicated IP addresses. These dedicated IP addresses leverage machine learning to remove the need to manage the IP warm-up process. The feature also handles the scaling of your IPs up or down as your volume increases (or decreases) to provide a quick, easy, and cost-efficient way to start using dedicated IP addresses that are managed by SES.

How Does the Managed Dedicated IP Feature Automate IP Warming?

Great deliverability through your dedicated IP address requires that you have a good reputation with the receiving ISPs, also known as a mailbox provider. Mailbox providers will only accept a small volume of email from an IP that they don’t recognize. When you’re first allocated an IP, it’s new and won’t be recognized by the receiving mailbox provider because it doesn’t have any reputation associated with it. In order for an IP’s reputation to be established, it must gradually build trust with receiving mailbox providers—this gradual trust building process is referred to as warming-up. Adding to the complexity is that each mailbox provider has their own concept of warming, accepting varying volumes of email as you work through the warm up process.

The concept of IP warming has always been a misnomer, with customers thinking that once their IP is “warm” that it stays that way when in reality the process is an ongoing one, fluctuating as your recipient domain mix changes and your volume changes. Ensuring that you have the best deliverability when sending via dedicated IPs requires monitoring more than just recipient engagement rates (opens, clicks, bounces, complaints, opt-ins, etc.), you also need to manage volume per mailbox provider. Understanding the volumes that recipient mailbox providers will accept is very difficult if not impossible for senders using standard Dedicated IPs. Managing this aspect of the warm up creates risk for sending too little, meaning warm-up takes longer, or too much, which means receiving mailbox providers may throttle, reduce IP reputation, or even filter out email being sent by an IP that is not properly warming up.

This process is a costly, risky, and time consuming one that can be eliminated using the managed feature. Managed dedicated IPs will automatically apportion the right amount of traffic per mailbox provider to your dedicated IPs and any leftover email volume is sent over the shared network of IPs, allowing you to send as you normally would. As time goes on, the proportion of email traffic being sent over your dedicated IPs increases until they are warm, at which point all of your emails will be sent through your dedicated IPs. In later stages, any sending that is in excess of your normal patterns is proactively queued to ensure the best deliverability to each mailbox provider.

As an example, if you’ve been sending all your traffic to Gmail, the IP addresses are considered warmed up only for Gmail and cold for other mailbox providers. If your customer domain mix changes and includes a large proportion of email sends to Hotmail, SES ramps up traffic slowly for Hotmail as the IP addresses are not warmed up yet while continuing to deliver all the traffic to Gmail via your dedicated IPs. The warmup adjustment is adaptive and is based on your actual sending patterns.

The managed feature is great for those that prioritize and want to be in complete control of their reputation while not wanting to spend the time and effort to manage the warm-up process or the scaling of IPs as your volume grows. A full breakdown of the use cases that are a good fit for the managed feature can be found here

How to Configure Managed Pools and Configuration Sets

Enabling managed dedicated IPs can be configured in just a few steps and can be done either from the console or programmatically. The first step is to create a managed IP pool, then the managed dedicated IPs feature will determine how many dedicated IPs you require based on your sending patterns, provision them for you, and then manage how they scale based on your sending requirements. Note that this process is not instantaneous, dependent on your sending patterns it may take more or less time for the dedicated IPs to be provisioned, you need to have consistent email volume coming from your account in order for the feature to provision the correct number of IPs.

Once enabled, you can utilize managed dedicated IPs in your email sending by associating the managed IP pool with a configuration set, and then specifying that configuration set when sending email. The configuration set can also be applied to a sending identity by using a default configuration set, which can simplify your sending, as anytime the associated sending identity is used to send email your managed dedicated IPs will be used.

Instructions

Configure Via The Console

To enable Dedicated IPs (Managed) via the Amazon SES console:

  • Sign in to the AWS Management Console and open the Amazon SES console at https://console.aws.amazon.com/ses/.
  • In the left navigation pane, choose Dedicated IPs.
  • Do one of the following (Note: You will begin to incur charges after creating a Dedicated IPs (Managed) pool below)
    • If you don’t have existing dedicated IPs in your account:
      • The Dedicated IPs onboarding page is displayed. In the Dedicated IPs (Managed) overview panel, choose Enable dedicated IPs. The Create IP Pool page opens.
    • If you have existing dedicated IPs in your account:
      • Select the Managed IP pools tab on the Dedicated IPs page.
      • In the All Dedicated IP (managed) pools panel, choose Create Managed IP pool. The Create IP Pool page opens.
  • In the Pool details panel,
    • Choose Managed (auto managed) in the Scaling mode field.
    • Enter a name for your managed pool in the IP pool name field.
    • Note
      • The IP pool name must be unique. It can’t be a duplicate of a standard dedicated IP pool name in your account.
      • You can have a mix of up to 50 pools split between both Standard and Dedicated IPs (Managed) per AWS Region in your account.
  • (Optional) You can associate this managed IP pool with a configuration set by choosing one from the dropdown list in the Configuration sets field.
    • Note
      • If you choose a configuration set that’s already associated with an IP pool, it will become associated with this managed pool, and no longer be associated with the previous pool.
      • To add or remove associated configuration sets after this managed pool is created, edit the configuration set’s Sending IP pool parameter in the General details panel.
      • If you haven’t created any configuration sets yet, see Using configuration sets in Amazon SES.
  • (Optional) You can add one or more Tags to your IP pool by including a tag key and an optional value for the key.
    • Choose Add new tag and enter the Key. You can also add an optional Value for the tag. You can add up to 50 tags, if you make a mistake, choose Remove.
    • To add the tags, choose Save changes. After you create the pool, you can add, remove, or edit tags by selecting the managed pool and choosing Edit.
  • Select Create pool.
    • Note
      • After you create a managed IP pool, it can’t be converted to a standard IP pool.
      • When using Dedicated IPs (Managed), you can’t have more than 10,000 sending identities (domains and email addresses, in any combination) per AWS Region in your account.
  • After you create your managed IP pool, a link will automatically be generated in the CloudWatch metrics column in the All Dedicated IPs (Managed) pools table in the SES console, that when selected, will open the Amazon CloudWatch console and display your sending reputation at an effective daily rate with specific mailbox providers for the managed pool using the following metrics:

 

Metric Description
1 Available24HourSend Indicates how  much volume the managed pool has available to send towards a specific mailbox provider.
2 SentLast24Hours Indicates how  much volume of email has been sent through the managed pool by dedicated IPs  towards a specific mailbox provider.

You can also track the managed pool’s sending performance by using event publishing.

Configure VIA The API

You can configure your Managed Dedicated IP Pool through the API as well. A dedicated pool can be specified to be managed by setting the scaling-mode to “MANAGED” when creating the dedicated pool.

Configure Via The CLI

You can configure your SES resources through the CLI. A dedicated pool can be specified to be managed by setting the —scaling-mode MANAGED parameter when creating the dedicated pool.

  • # Specify which AWS region to use
    • export AWS_DEFAULT_REGION=’us-east-1′
  • # Create a managed dedicated pool
    • aws sesv2 create-dedicated-ip-pool —pool-name dedicated1 —scaling-mode MANAGED
  • # Create a configuration set that that will send through the dedicated pool
    • aws sesv2 create-configuration-set —configuration-set-name cs_dedicated1 —delivery-options SendingPoolName=dedicated1
  • # Configure the configuration set as the default for your sending identity
    • aws sesv2 put-email-identity-configuration-set-attributes —email-identity {{YOUR-SENDING-IDENTITY-HERE}} —configuration-set-name cs_dedicated1
  • # Send SES email through the API or SMTP without requiring any code changes. Emails will # be sent out through the dedicated pool.
    • aws sesv2 send-email –from-email-address “{YOUR-SENDING-IDENTITY-HERE}}” –destination “[email protected]” —content ‘{“Simple”: {“Subject”: {“Data”: “Sent from a Dedicated IP Managed pool”},”Body”: {“Text”: {“Data”: “Hello”}}}}’

Monitoring

We recommend customers onboard to event destinations and delivery delay events to more accurately track the sending performance of their dedicated sending.

Conclusion

In this blog post we explained the benefits of sending via a Dedicated IPs (Managed) feature as well as how to configure and begin sending through a Managed Dedicated IP. Dedicated IPs (Managed) pricing can be reviewed at the pricing page for SES here.

Reserving EC2 Capacity across Availability Zones by utilizing On Demand Capacity Reservations (ODCRs)

Post Syndicated from Sheila Busser original https://aws.amazon.com/blogs/compute/reserving-ec2-capacity-across-availability-zones-by-utilizing-on-demand-capacity-reservations-odcrs/

This post is written by Johan Hedlund, Senior Solutions Architect, Enterprise PUMA.

Many customers have successfully migrated business critical legacy workloads to AWS, utilizing services such as Amazon Elastic Compute Cloud (Amazon EC2), Auto Scaling Groups (ASGs), as well as the use of Multiple Availability Zones (AZs), Regions for Business Continuity, and High Availability.

These critical applications require increased levels of availability to meet strict business Service Level Agreements (SLAs), even in extreme scenarios such as when EC2 functionality is impaired (see Advanced Multi-AZ Resilience Patterns for examples). Following AWS best practices such as architecting for flexibility will help here, but for some more rigid designs there can still be challenges around EC2 instance availability.

In this post, I detail an approach for Reserving Capacity for this type of scenario to mitigate the risk of the instance type(s) that your application needs being unavailable, including code for building it and ways of testing it.

Baseline: Multi-AZ application with restrictive instance needs

To focus on the problem of Capacity Reservation, our reference architecture is a simple horizontally scalable monolith. This consists of a single executable running across multiple instances as a cluster in an Auto Scaling group across three AZs for High Availability.

Architecture diagram featuring an Auto Scaling Group spanning three Availability Zones within one Region for high availability.

The application in this example is both business critical and memory intensive. It needs six r6i.4xlarge instances to meet the required specifications. R6i has been chosen to meet the required memory to vCPU requirements.

The third-party application we need to run, has a significant license cost, so we want to optimize our workload to make sure we run only the minimally required number of instances for the shortest amount of time.

The application should be resilient to issues in a single AZ. In the case of multi-AZ impact, it should failover to Disaster Recovery (DR) in an alternate Region, where service level objectives are instituted to return operations to defined parameters. But this is outside the scope for this post.

The problem: capacity during AZ failover

In this solution, the Auto Scaling Group automatically balances its instances across the selected AZs, providing a layer of resilience in the event of a disruption in a single AZ. However, this hinges on those instances being available for use in the Amazon EC2 capacity pools. The criticality of our application comes with SLAs which dictate that even the very low likelihood of instance types being unavailable in AWS must be mitigated.

The solution: Reserving Capacity

There are 2 main ways of Reserving Capacity for this scenario: (a) Running extra capacity 24/7, (b) On Demand Capacity Reservations (ODCRs).

In the past, another recommendation would have been to utilize Zonal Reserved Instances (Non Zonal will not Reserve Capacity). But although Zonal Reserved Instances do provide similar functionality as On Demand Capacity Reservations combined with Savings Plans, they do so in a less flexible way. Therefore, the recommendation from AWS is now to instead use On Demand Capacity Reservations in combination with Savings Plans for scenarios where Capacity Reservation is required.

The TCO impact of the licensing situation rules out the first of the two valid options. Merely keeping the spare capacity up and running all the time also doesn’t cover the scenario in which an instance needs to be stopped and started, for example for maintenance or patching. Without Capacity Reservation, there is a theoretical possibility that that instance type would not be available to start up again.

This leads us to the second option: On Demand Capacity Reservations.

How much capacity to reserve?

Our failure scenario is when functionality in one AZ is impaired and the Auto Scaling Group must shift its instances to the remaining AZs while maintaining the total number of instances. With a minimum requirement of six instances, this means that we need 6/2 = 3 instances worth of Reserved Capacity in each AZ (as we can’t know in advance which one will be affected).

Illustration of number of instances required per Availability Zone, in order to keep the total number of instances at six when one Availability Zone is removed. When using three AZs there are two instances per AZ. When using two AZs there are three instances per AZ.

Spinning up the solution

If you want to get hands-on experience with On Demand Capacity Reservations, refer to this CloudFormation template and its accompanying README file for details on how to spin up the solution that we’re using. The README also contains more information about the Stack architecture. Upon successful creation, you have the following architecture running in your account.

Architecture diagram featuring adding a Resource Group of On Demand Capacity Reservations with 3 On Demand Capacity Reservations per Availability Zone.

Note that the default instance type for the AWS CloudFormation stack has been downgraded to t2.micro to keep our experiment within the AWS Free Tier.

Testing the solution

Now we have a fully functioning solution with Reserved Capacity dedicated to this specific Auto Scaling Group. However, we haven’t tested it yet.

The tests utilize the AWS Command Line Interface (AWS CLI), which we execute using AWS CloudShell.

To interact with the resources created by CloudFormation, we need some names and IDs that have been collected in the “Outputs” section of the stack. These can be accessed from the console in a tab under the Stack that you have created.

Example of outputs from running the CloudFormation stack. AutoScalingGroupName, SubnetForManuallyAddedInstance, and SubnetsToKeepWhenDroppingASGAZ.

We set these as variables for easy access later (replace the values with the values from your stack):

export AUTOSCALING_GROUP_NAME=ASGWithODCRs-CapacityBackedASG-13IZJWXF9QV8E
export SUBNET_FOR_MANUALLY_ADDED_INSTANCE=subnet-03045a72a6328ef72
export SUBNETS_TO_KEEP=subnet-03045a72a6328ef72,subnet-0fd00353b8a42f251

How does the solution react to scaling out the Auto Scaling Group beyond the Capacity Reservation?

First, let’s look at what happens if the Auto Scaling Group wants to Scale Out. Our requirements state that we should have a minimum of six instances running at any one time. But the solution should still adapt to increased load. Before knowing anything about how this works in AWS, imagine two scenarios:

  1. The Auto Scaling Group can scale out to a total of nine instances, as that’s how many On Demand Capacity Reservations we have. But it can’t go beyond that even if there is On Demand capacity available.
  2. The Auto Scaling Group can scale just as much as it could when On Demand Capacity Reservations weren’t used, and it continues to launch unreserved instances when the On Demand Capacity Reservations run out (assuming that capacity is in fact available, which is why we have the On Demand Capacity Reservations in the first place).

The instances section of the Amazon EC2 Management Console can be used to show our existing Capacity Reservations, as created by the CloudFormation stack.

Listing of consumed Capacity Reservations across the three Availability Zones, showing two used per Availability Zone.

As expected, this shows that we are currently using six out of our nine On Demand Capacity Reservations, with two in each AZ.

Now let’s scale out our Auto Scaling Group to 12, thus using up all On Demand Capacity Reservations in each AZ, as well as requesting one extra Instance per AZ.

aws autoscaling set-desired-capacity \
--auto-scaling-group-name $AUTOSCALING_GROUP_NAME \
--desired-capacity 12

The Auto Scaling Group now has the desired Capacity of 12:

Group details of the Auto Scaling Group, showing that Desired Capacity is set to 12.

And in the Capacity Reservation screen we can see that all our On Demand Capacity Reservations have been used up:

Listing of consumed Capacity Reservations across the three Availability Zones, showing that all nine On Demand Capacity Reservations are used.

In the Auto Scaling Group we see that – as expected – we weren’t restricted to nine instances. Instead, the Auto Scaling Group fell back on launching unreserved instances when our On Demand Capacity Reservations ran out:

Listing of Instances in the Auto Scaling Group, showing that the total count is 12.

How does the solution react to adding a matching instance outside the Auto Scaling Group?

But what if someone else/another process in the account starts an EC2 instance of the same type for which we have the On Demand Capacity Reservations? Won’t they get that Reservation, and our Auto Scaling Group will be left short of its three instances per AZ, which would mean that we won’t have enough reservations for our minimum of six instances in case there are issues with an AZ?

This all comes down to the type of On Demand Capacity Reservation that we have created, or the “Eligibility”. Looking at our Capacity Reservations, we can see that they are all of the “targeted” type. This means that they are only used if explicitly referenced, like we’re doing in our Target Group for the Auto Scaling Group.

Listing of existing Capacity Reservations, showing that they are of the targeted type.

It’s time to prove that. First, we scale in our Auto Scaling Group so that only six instances are used, resulting in there being one unused capacity reservation in each AZ. Then, we try to add an EC2 instance manually, outside the target group.

First, scale in the Auto Scaling Group:

aws autoscaling set-desired-capacity \
--auto-scaling-group-name $AUTOSCALING_GROUP_NAME \
--desired-capacity 6

Listing of consumed Capacity Reservations across the three Availability Zones, showing two used reservations per Availability Zone.

Listing of Instances in the Auto Scaling Group, showing that the total count is six

Then, spin up the new instance, and save its ID for later when we clean up:

export MANUALLY_CREATED_INSTANCE_ID=$(aws ec2 run-instances \
--image-id resolve:ssm:/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2 \
--instance-type t2.micro \
--subnet-id $SUBNET_FOR_MANUALLY_ADDED_INSTANCE \
--query 'Instances[0].InstanceId' --output text) 

Listing of the newly created instance, showing that it is running.

We still have the three unutilized On Demand Capacity Reservations, as expected, proving that the On Demand Capacity Reservations with the “targeted” eligibility only get used when explicitly referenced:

Listing of consumed Capacity Reservations across the three Availability Zones, showing two used reservations per Availability Zone.

How does the solution react to an AZ being removed?

Now we’re comfortable that the Auto Scaling Group can grow beyond the On Demand Capacity Reservations if needed, as long as there is capacity, and that other EC2 instances in our account won’t use the On Demand Capacity Reservations specifically purchased for the Auto Scaling Group. It’s time for the big test. How does it all behave when an AZ becomes unavailable?

For our purposes, we can simulate this scenario by changing the Auto Scaling Group to be across two AZs instead of the original three.

First, we scale out to seven instances so that we can see the impact of overflow outside the On Demand Capacity Reservations when we subsequently remove one AZ:

aws autoscaling set-desired-capacity \
--auto-scaling-group-name $AUTOSCALING_GROUP_NAME \
--desired-capacity 7

Then, we change the Auto Scaling Group to only cover two AZs:

aws autoscaling update-auto-scaling-group \
--auto-scaling-group-name $AUTOSCALING_GROUP_NAME \
--vpc-zone-identifier $SUBNETS_TO_KEEP

Give it some time, and we see that the Auto Scaling Group is now spread across two AZs, On Demand Capacity Reservations cover the minimum six instances as per our requirements, and the rest is handled by instances without Capacity Reservation:

Network details for the Auto Scaling Group, showing that it is configured for two Availability Zones.

Listing of consumed Capacity Reservations across the three Availability Zones, showing two Availability Zones using three On Demand Capacity Reservations each, with the third Availability Zone not using any of its On Demand Capacity Reservations.

Listing of Instances in the Auto Scaling Group, showing that there are 4 instances in the eu-west-2a Availability Zone.

Cleanup

It’s time to clean up, as those Instances and On Demand Capacity Reservations come at a cost!

  1. First, remove the EC2 instance that we made:
    aws ec2 terminate-instances --instance-ids $MANUALLY_CREATED_INSTANCE_ID
  2. Then, delete the CloudFormation stack.

Conclusion

Using a combination of Auto Scaling Groups, Resource Groups, and On Demand Capacity Reservations (ODCRs), we have built a solution that provides High Availability backed by reserved capacity, for those types of workloads where the requirements for availability in the case of an AZ becoming temporarily unavailable outweigh the increased cost of reserving capacity, and where the best practices for architecting for flexibility cannot be followed due to limitations on applicable architectures.

We have tested the solution and confirmed that the Auto Scaling Group falls back on using unreserved capacity when the On Demand Capacity Reservations are exhausted. Moreover, we confirmed that targeted On Demand Capacity Reservations won’t risk getting accidentally used by other solutions in our account.

Now it’s time for you to try it yourself! Download the IaC template and give it a try! And if you are planning on using On Demand Capacity Reservations, then don’t forget to look into Savings Plans, as they significantly reduce the cost of that Reserved Capacity..

Simplify AWS Glue job orchestration and monitoring with Amazon MWAA

Post Syndicated from Rushabh Lokhande original https://aws.amazon.com/blogs/big-data/simplify-aws-glue-job-orchestration-and-monitoring-with-amazon-mwaa/

Organizations across all industries have complex data processing requirements for their analytical use cases across different analytics systems, such as data lakes on AWS, data warehouses (Amazon Redshift), search (Amazon OpenSearch Service), NoSQL (Amazon DynamoDB), machine learning (Amazon SageMaker), and more. Analytics professionals are tasked with deriving value from data stored in these distributed systems to create better, secure, and cost-optimized experiences for their customers. For example, digital media companies seek to combine and process datasets in internal and external databases to build unified views of their customer profiles, spur ideas for innovative features, and increase platform engagement.

In these scenarios, customers looking for a serverless data integration offering use AWS Glue as a core component for processing and cataloging data. AWS Glue is well integrated with AWS services and partner products, and provides low-code/no-code extract, transform, and load (ETL) options to enable analytics, machine learning (ML), or application development workflows. AWS Glue ETL jobs may be one component in a more complex pipeline. Orchestrating the run of and managing dependencies between these components is a key capability in a data strategy. Amazon Managed Workflows for Apache Airflows (Amazon MWAA) orchestrates data pipelines using distributed technologies including on-premises resources, AWS services, and third-party components.

In this post, we show how to simplify monitoring an AWS Glue job orchestrated by Airflow using the latest features of Amazon MWAA.

Overview of solution

This post discusses the following:

  • How to upgrade an Amazon MWAA environment to version 2.4.3.
  • How to orchestrate an AWS Glue job from an Airflow Directed Acyclic Graph (DAG).
  • The Airflow Amazon provider package’s observability enhancements in Amazon MWAA. You can now consolidate run logs of AWS Glue jobs on the Airflow console to simplify troubleshooting data pipelines. The Amazon MWAA console becomes a single reference to monitor and analyze AWS Glue job runs. Previously, support teams needed to access the AWS Management Console and take manual steps for this visibility. This feature is available by default from Amazon MWAA version 2.4.3.

The following diagram illustrates our solution architecture.

Prerequisites

You need the following prerequisites:

Set up the Amazon MWAA environment

For instructions on creating your environment, refer to Create an Amazon MWAA environment. For existing users, we recommend upgrading to version 2.4.3 to take advantage of the observability enhancements featured in this post.

The steps to upgrade Amazon MWAA to version 2.4.3 differ depending on whether the current version is 1.10.12 or 2.2.2. We discuss both options in this post.

Prerequisites for setting up an Amazon MWAA environment

You must meet the following prerequisites:

Upgrade from version 1.10.12 to 2.4.3

If you’re using Amazon MWAA version 1.10.12, refer to Migrating to a new Amazon MWAA environment to upgrade to 2.4.3.

Upgrade from version 2.0.2 or 2.2.2 to 2.4.3

If you’re using Amazon MWAA environment version 2.2.2 or lower, complete the following steps:

  1. Create a requirements.txt for any custom dependencies with specific versions required for your DAGs.
  2. Upload the file to Amazon S3 in the appropriate location where the Amazon MWAA environment points to the requirements.txt for installing dependencies.
  3. Follow the steps in Migrating to a new Amazon MWAA environment and select version 2.4.3.

Update your DAGs

Customers who upgraded from an older Amazon MWAA environment may need to make updates to existing DAGs. In Airflow version 2.4.3, the Airflow environment will use the Amazon provider package version 6.0.0 by default. This package may include some potentially breaking changes, such as changes to operator names. For example, the AWSGlueJobOperator has been deprecated and replaced with the GlueJobOperator. To maintain compatibility, update your Airflow DAGs by replacing any deprecated or unsupported operators from previous versions with the new ones. Complete the following steps:

  1. Navigate to Amazon AWS Operators.
  2. Select the appropriate version installed in your Amazon MWAA instance (6.0.0. by default) to find a list of supported Airflow operators.
  3. Make the necessary changes in the existing DAG code and upload the modified files to the DAG location in Amazon S3.

Orchestrate the AWS Glue job from Airflow

This section covers the details of orchestrating an AWS Glue job within Airflow DAGs. Airflow eases the development of data pipelines with dependencies between heterogeneous systems such as on-premises processes, external dependencies, other AWS services, and more.

Orchestrate CloudTrail log aggregation with AWS Glue and Amazon MWAA

In this example, we go through a use case of using Amazon MWAA to orchestrate an AWS Glue Python Shell job that persists aggregated metrics based on CloudTrail logs.

CloudTrail enables visibility into AWS API calls that are being made in your AWS account. A common use case with this data would be to gather usage metrics on principals acting on your account’s resources for auditing and regulatory needs.

As CloudTrail events are being logged, they are delivered as JSON files in Amazon S3, which aren’t ideal for analytical queries. We want to aggregate this data and persist it as Parquet files to allow for optimal query performance. As an initial step, we can use Athena to do the initial querying of the data before doing additional aggregations in our AWS Glue job. For more information about creating an AWS Glue Data Catalog table, refer to Creating the table for CloudTrail logs in Athena using partition projection data. After we’ve explored the data via Athena and decided what metrics we want to retain in aggregate tables, we can create an AWS Glue job.

Create an CloudTrail table in Athena

First, we need to create a table in our Data Catalog that allows CloudTrail data to be queried via Athena. The following sample query creates a table with two partitions on the Region and date (called snapshot_date). Be sure to replace the placeholders for your CloudTrail bucket, AWS account ID, and CloudTrail table name:

create external table if not exists `<<<CLOUDTRAIL_TABLE_NAME>>>`(
  `eventversion` string comment 'from deserializer', 
  `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> comment 'from deserializer', 
  `eventtime` string comment 'from deserializer', 
  `eventsource` string comment 'from deserializer', 
  `eventname` string comment 'from deserializer', 
  `awsregion` string comment 'from deserializer', 
  `sourceipaddress` string comment 'from deserializer', 
  `useragent` string comment 'from deserializer', 
  `errorcode` string comment 'from deserializer', 
  `errormessage` string comment 'from deserializer', 
  `requestparameters` string comment 'from deserializer', 
  `responseelements` string comment 'from deserializer', 
  `additionaleventdata` string comment 'from deserializer', 
  `requestid` string comment 'from deserializer', 
  `eventid` string comment 'from deserializer', 
  `resources` array<struct<arn:string,accountid:string,type:string>> comment 'from deserializer', 
  `eventtype` string comment 'from deserializer', 
  `apiversion` string comment 'from deserializer', 
  `readonly` string comment 'from deserializer', 
  `recipientaccountid` string comment 'from deserializer', 
  `serviceeventdetails` string comment 'from deserializer', 
  `sharedeventid` string comment 'from deserializer', 
  `vpcendpointid` string comment 'from deserializer')
PARTITIONED BY ( 
  `region` string,
  `snapshot_date` string)
ROW FORMAT SERDE 
  'com.amazon.emr.hive.serde.CloudTrailSerde' 
STORED AS INPUTFORMAT 
  'com.amazon.emr.cloudtrail.CloudTrailInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/'
TBLPROPERTIES (
  'projection.enabled'='true', 
  'projection.region.type'='enum',
  'projection.region.values'='us-east-2,us-east-1,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-3,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1',
  'projection.snapshot_date.format'='yyyy/mm/dd', 
  'projection.snapshot_date.interval'='1', 
  'projection.snapshot_date.interval.unit'='days', 
  'projection.snapshot_date.range'='2020/10/01,now', 
  'projection.snapshot_date.type'='date',
  'storage.location.template'='s3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/${region}/${snapshot_date}')

Run the preceding query on the Athena console, and note the table name and AWS Glue Data Catalog database where it was created. We use these values later in the Airflow DAG code.

Sample AWS Glue job code

The following code is a sample AWS Glue Python Shell job that does the following:

  • Takes arguments (which we pass from our Amazon MWAA DAG) on what day’s data to process
  • Uses the AWS SDK for Pandas to run an Athena query to do the initial filtering of the CloudTrail JSON data outside AWS Glue
  • Uses Pandas to do simple aggregations on the filtered data
  • Outputs the aggregated data to the AWS Glue Data Catalog in a table
  • Uses logging during processing, which will be visible in Amazon MWAA
import awswrangler as wr
import pandas as pd
import sys
import logging
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta

# Logging setup, redirects all logs to stdout
LOGGER = logging.getLogger()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(formatter)
LOGGER.addHandler(streamHandler)
LOGGER.setLevel(logging.INFO)

LOGGER.info(f"Passed Args :: {sys.argv}")

sql_query_template = """
select
region,
useridentity.arn,
eventsource,
eventname,
useragent

from "{cloudtrail_glue_db}"."{cloudtrail_table}"
where snapshot_date='{process_date}'
and region in ('us-east-1','us-east-2')
"""

required_args = ['CLOUDTRAIL_GLUE_DB',
                'CLOUDTRAIL_TABLE',
                'TARGET_BUCKET',
                'TARGET_DB',
                'TARGET_TABLE',
                'ACCOUNT_ID']
arg_keys = [*required_args, 'PROCESS_DATE'] if '--PROCESS_DATE' in sys.argv else required_args
JOB_ARGS = getResolvedOptions ( sys.argv, arg_keys)

LOGGER.info(f"Parsed Args :: {JOB_ARGS}")

# if process date was not passed as an argument, process yesterday's data
process_date = (
    JOB_ARGS['PROCESS_DATE']
    if JOB_ARGS.get('PROCESS_DATE','NONE') != "NONE" 
    else (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") 
)

LOGGER.info(f"Taking snapshot for :: {process_date}")

RAW_CLOUDTRAIL_DB = JOB_ARGS['CLOUDTRAIL_GLUE_DB']
RAW_CLOUDTRAIL_TABLE = JOB_ARGS['CLOUDTRAIL_TABLE']
TARGET_BUCKET = JOB_ARGS['TARGET_BUCKET']
TARGET_DB = JOB_ARGS['TARGET_DB']
TARGET_TABLE = JOB_ARGS['TARGET_TABLE']
ACCOUNT_ID = JOB_ARGS['ACCOUNT_ID']

final_query = sql_query_template.format(
    process_date=process_date.replace("-","/"),
    cloudtrail_glue_db=RAW_CLOUDTRAIL_DB,
    cloudtrail_table=RAW_CLOUDTRAIL_TABLE
)

LOGGER.info(f"Running Query :: {final_query}")

raw_cloudtrail_df = wr.athena.read_sql_query(
    sql=final_query,
    database=RAW_CLOUDTRAIL_DB,
    ctas_approach=False,
    s3_output=f"s3://{TARGET_BUCKET}/athena-results",
)

raw_cloudtrail_df['ct']=1

agg_df = raw_cloudtrail_df.groupby(['arn','region','eventsource','eventname','useragent'],as_index=False).agg({'ct':'sum'})
agg_df['snapshot_date']=process_date

LOGGER.info(agg_df.info(verbose=True))

upload_path = f"s3://{TARGET_BUCKET}/{TARGET_DB}/{TARGET_TABLE}"

if not agg_df.empty:
    LOGGER.info(f"Upload to {upload_path}")
    try:
        response = wr.s3.to_parquet(
            df=agg_df,
            path=upload_path,
            dataset=True,
            database=TARGET_DB,
            table=TARGET_TABLE,
            mode="overwrite_partitions",
            schema_evolution=True,
            partition_cols=["snapshot_date"],
            compression="snappy",
            index=False
        )
        LOGGER.info(response)
    except Exception as exc:
        LOGGER.error("Uploading to S3 failed")
        LOGGER.exception(exc)
        raise exc
else:
    LOGGER.info(f"Dataframe was empty, nothing to upload to {upload_path}")

The following are some key advantages in this AWS Glue job:

  • We use an Athena query to ensure initial filtering is done outside of our AWS Glue job. As such, a Python Shell job with minimal compute is still sufficient for aggregating a large CloudTrail dataset.
  • We ensure the analytics library-set option is turned on when creating our AWS Glue job to use the AWS SDK for Pandas library.

Create an AWS Glue job

Complete the following steps to create your AWS Glue job:

  1. Copy the script in the preceding section and save it in a local file. For this post, the file is called script.py.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Create a new job and select Python Shell script editor.
  4. Select Upload and edit an existing script and upload the file you saved locally.
  5. Choose Create.

  1. On the Job details tab, enter a name for your AWS Glue job.
  2. For IAM role, choose an existing role or create a new role that has the required permissions for Amazon S3, AWS Glue, and Athena. The role needs to query the CloudTrail table you created earlier and write to an output location.

You can use the following sample policy code. Replace the placeholders with your CloudTrail logs bucket, output table name, output AWS Glue database, output S3 bucket, CloudTrail table name, AWS Glue database containing the CloudTrail table, and your AWS account ID.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:List*",
                "s3:Get*"
            ],
            "Resource": [
                "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>/*",
                "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>*"
            ],
            "Effect": "Allow",
            "Sid": "GetS3CloudtrailData"
        },
        {
            "Action": [
                "glue:Get*",
                "glue:BatchGet*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>/<<<CLOUDTRAIL_TABLE>>>*"
            ],
            "Effect": "Allow",
            "Sid": "GetGlueCatalogCloudtrailData"
        },
        {
            "Action": [
                "s3:PutObject*",
                "s3:Abort*",
                "s3:DeleteObject*",
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:Head*"
            ],
            "Resource": [
                "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>",
                "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>/*"
            ],
            "Effect": "Allow",
            "Sid": "WriteOutputToS3"
        },
        {
            "Action": [
                "glue:CreateTable",
                "glue:CreatePartition",
                "glue:UpdatePartition",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:DeletePartition",
                "glue:BatchCreatePartition",
                "glue:BatchDeletePartition",
                "glue:Get*",
                "glue:BatchGet*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<OUTPUT_GLUE_DB>>>",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>*"
            ],
            "Effect": "Allow",
            "Sid": "AllowOutputToGlue"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:/aws-glue/*",
            "Effect": "Allow",
            "Sid": "LogsAccess"
        },
        {
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:DeleteObject*",
                "s3:PutObject",
                "s3:PutObjectLegalHold",
                "s3:PutObjectRetention",
                "s3:PutObjectTagging",
                "s3:PutObjectVersionTagging",
                "s3:Abort*"
            ],
            "Resource": [
                "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>",
                "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>/*"
            ],
            "Effect": "Allow",
            "Sid": "AccessToAthenaResults"
        },
        {
            "Action": [
                "athena:StartQueryExecution",
                "athena:StopQueryExecution",
                "athena:GetDataCatalog",
                "athena:GetQueryResults",
                "athena:GetQueryExecution"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog",
                "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:datacatalog/AwsDataCatalog",
                "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:workgroup/primary"
            ],
            "Effect": "Allow",
            "Sid": "AllowAthenaQuerying"
        }
    ]
}

For Python version, choose Python 3.9.

  1. Select Load common analytics libraries.
  2. For Data processing units, choose 1 DPU.
  3. Leave the other options as default or adjust as needed.

  1. Choose Save to save your job configuration.

Configure an Amazon MWAA DAG to orchestrate the AWS Glue job

The following code is for a DAG that can orchestrate the AWS Glue job that we created. We take advantage of the following key features in this DAG:

"""Sample DAG"""
import airflow.utils
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow import DAG
from datetime import timedelta
import airflow.utils

# allow backfills via DAG run parameters
process_date = '{{ dag_run.conf.get("process_date") if dag_run.conf.get("process_date") else "NONE" }}'

dag = DAG(
    dag_id = "CLOUDTRAIL_LOGS_PROCESSING",
    default_args = {
        'depends_on_past':False, 
        'start_date':airflow.utils.dates.days_ago(0),
        'retries':1,
        'retry_delay':timedelta(minutes=5),
        'catchup': False
    },
    schedule_interval = None, # None for unscheduled or a cron expression - E.G. "00 12 * * 2" - at 12noon Tuesday
    dagrun_timeout = timedelta(minutes=30),
    max_active_runs = 1,
    max_active_tasks = 1 # since there is only one task in our DAG
)

## Log ingest. Assumes Glue Job is already created
glue_ingestion_job = GlueJobOperator(
    task_id="<<<some-task-id>>>",
    job_name="<<<GLUE_JOB_NAME>>>",
    script_args={
        "--ACCOUNT_ID":"<<<YOUR_AWS_ACCT_ID>>>",
        "--CLOUDTRAIL_GLUE_DB":"<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>",
        "--CLOUDTRAIL_TABLE":"<<<CLOUDTRAIL_TABLE>>>",
        "--TARGET_BUCKET": "<<<OUTPUT_S3_BUCKET>>>",
        "--TARGET_DB": "<<<OUTPUT_GLUE_DB>>>", # should already exist
        "--TARGET_TABLE": "<<<OUTPUT_TABLE_NAME>>>",
        "--PROCESS_DATE": process_date
    },
    region_name="us-east-1",
    dag=dag,
    verbose=True
)

glue_ingestion_job

Increase observability of AWS Glue jobs in Amazon MWAA

The AWS Glue jobs write logs to Amazon CloudWatch. With the recent observability enhancements to Airflow’s Amazon provider package, these logs are now integrated with Airflow task logs. This consolidation provides Airflow users with end-to-end visibility directly in the Airflow UI, eliminating the need to search in CloudWatch or the AWS Glue console.

To use this feature, ensure the IAM role attached to the Amazon MWAA environment has the following permissions to retrieve and write the necessary logs:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:GetLogEvents",
        "logs:GetLogRecord",
        "logs:DescribeLogStreams",
        "logs:FilterLogEvents",
        "logs:GetLogGroupFields",
        "logs:GetQueryResults",
        
      ],
      "Resource": [
        "arn:aws:logs:*:*:log-group:airflow-243-<<<Your environment name>>>-*"--Your Amazon MWAA Log Stream Name
      ]
    }
  ]
}

If verbose=true, the AWS Glue job run logs show in the Airflow task logs. The default is false. For more information, refer to Parameters.

When enabled, the DAGs read from the AWS Glue job’s CloudWatch log stream and relay them to the Airflow DAG AWS Glue job step logs. This provides detailed insights into an AWS Glue job’s run in real time via the DAG logs. Note that AWS Glue jobs generate an output and error CloudWatch log group based on the job’s STDOUT and STDERR, respectively. All logs in the output log group and exception or error logs from the error log group are relayed into Amazon MWAA.

AWS admins can now limit a support team’s access to only Airflow, making Amazon MWAA the single pane of glass on job orchestration and job health management. Previously, users needed to check AWS Glue job run status in the Airflow DAG steps and retrieve the job run identifier. They then needed to access the AWS Glue console to find the job run history, search for the job of interest using the identifier, and finally navigate to the job’s CloudWatch logs to troubleshoot.

Create the DAG

To create the DAG, complete the following steps:

  1. Save the preceding DAG code to a local .py file, replacing the indicated placeholders.

The values for your AWS account ID, AWS Glue job name, AWS Glue database with CloudTrail table, and CloudTrail table name should already be known. You can adjust the output S3 bucket, output AWS Glue database, and output table name as needed, but make sure the AWS Glue job’s IAM role that you used earlier is configured accordingly.

  1. On the Amazon MWAA console, navigate to your environment to see where the DAG code is stored.

The DAGs folder is the prefix within the S3 bucket where your DAG file should be placed.

  1. Upload your edited file there.

  1. Open the Amazon MWAA console to confirm that the DAG appears in the table.

Run the DAG

To run the DAG, complete the following steps:

  1. Choose from the following options:
    • Trigger DAG – This causes yesterday’s data to be used as the data to process
    • Trigger DAG w/ config – With this option, you can pass in a different date, potentially for backfills, which is retrieved using dag_run.conf in the DAG code and then passed into the AWS Glue job as a parameter

The following screenshot shows the additional configuration options if you choose Trigger DAG w/ config.

  1. Monitor the DAG as it runs.
  2. When the DAG is complete, open the run’s details.

On the right pane, you can view the logs, or choose Task Instance Details for a full view.

  1. View the AWS Glue job output logs in Amazon MWAA without using the AWS Glue console thanks to the GlueJobOperator verbose flag.

The AWS Glue job will have written results to the output table you specified.

  1. Query this table via Athena to confirm it was successful.

Summary

Amazon MWAA now provides a single place to track AWS Glue job status and enables you to use the Airflow console as the single pane of glass for job orchestration and health management. In this post, we walked through the steps to orchestrate AWS Glue jobs via Airflow using GlueJobOperator. With the new observability enhancements, you can seamlessly troubleshoot AWS Glue jobs in a unified experience. We also demonstrated how to upgrade your Amazon MWAA environment to a compatible version, update dependencies, and change the IAM role policy accordingly.

For more information about common troubleshooting steps, refer to Troubleshooting: Creating and updating an Amazon MWAA environment. For in-depth details of migrating to an Amazon MWAA environment, refer to Upgrading from 1.10 to 2. To learn about the open-source code changes for increased observability of AWS Glue jobs in the Airflow Amazon provider package, refer to the relay logs from AWS Glue jobs.

Finally, we recommend visiting the AWS Big Data Blog for other material on analytics, ML, and data governance on AWS.


About the Authors

Rushabh Lokhande is a Data & ML Engineer with the AWS Professional Services Analytics Practice. He helps customers implement big data, machine learning, and analytics solutions. Outside of work, he enjoys spending time with family, reading, running, and golf.

Ryan Gomes is a Data & ML Engineer with the AWS Professional Services Analytics Practice. He is passionate about helping customers achieve better outcomes through analytics and machine learning solutions in the cloud. Outside of work, he enjoys fitness, cooking, and spending quality time with friends and family.

Vishwa Gupta is a Senior Data Architect with the AWS Professional Services Analytics Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and trying new food.

Amazon SES – How to set up EasyDKIM for a new domain

Post Syndicated from Vinay Ujjini original https://aws.amazon.com/blogs/messaging-and-targeting/amazon-ses-how-to-set-up-easydkim-for-a-new-domain/

What is email authentication and why is it important?

Amazon Simple Email Service (SES) lets you reach customers confidently without an on-premises Simple Mail Transfer Protocol (SMTP) system. Amazon SES provides built-in support for email authentication protocols, including DKIM, SPF, and DMARC, which help improve the deliverability and authenticity of outgoing emails.

Email authentication is the process of verifying the authenticity of an email message to ensure that it is sent from a legitimate source and has not been tampered with during transmission. Email authentication methods use cryptographic techniques to add digital signatures or authentication headers to outgoing emails, which can be verified by email receivers to confirm the legitimacy of the email.

Email authentication helps establish a sender’s reputation as a trusted sender. Additionally, when email receivers can verify that emails are legitimately sent from a sender’s domain using authentication methods, it also helps establish the sender’s reputation as a trusted sender. Email authentication involves one or more technical processes used by mail systems (sending and receiving) that make certain key information in an email message verifiable. Email authentication generates signals about the email, which can be utilized in decision-making processes related to spam filtering and other email handling tasks.

There are currently two widely used email authentication mechanisms – SPF (Sender Policy Framework) and DKIM (DomainKeys Identified Mail). They provide information that the receiving domain can use to verify that the sending of the message was authorized in some way by the sending domain. DKIM can also help determine that the content was not altered in transit. And the DMARC (Domain-based Message Authentication, Reporting and Conformance) protocol allows sending domains to publish verifiable policies that can help receiving domains decide how best to handle messages that fail authentication by SPF and DKIM.

Email authentication protocols:

  1. SPF (Sender Policy Framework): SPF is an email authentication protocol that checks which IP addresses are authorized to send mail on behalf of the originating domain. Domain owners use SPF to tell email providers which servers are allowed to send email from their domains. This is an email validation standard that’s designed to prevent email spoofing.
  2. DKIM (DomainKeys Identified Mail): DKIM is an email authentication protocol that allows a domain to attach its identifier to a message. This asserts some level of responsibility or involvement with the message. A sequence of messages signed with the same domain name is assumed to provide a reliable base of information about mail associated with the domain name’s owner, which may feed into an evaluation of the domain’s “reputation”. It uses public-key cryptography to sign an email with a private key. Recipient servers can then use a public key published to a domain’s DNS to verify that parts of the emails have not been modified during the transit.
  3. DMARC (Domain-based Message Authentication, Reporting and Conformance): is an email authentication protocol that uses Sender Policy Framework (SPF) and DomainKeys Identified Mail (DKIM) to detect email spoofing. In order to comply with DMARC, messages must be authenticated through either SPF or DKIM, or both.

Let us dive deep into DKIM in this blog. Amazon SES provides three options for signing your messages using a DKIM signature:

  1. Easy DKIM: To set up a sending identity so that Amazon SES generates a public-private key pair and automatically adds a DKIM signature to every message that you send from that identity.
  2. BYODKIM (Bring Your Own DKIM): To provide your own public-private key pair for so SES adds a DKIM signature to every message that you send from that identity, see Provide your own DKIM authentication token (BYODKIM) in Amazon SES.
  3. Manually add DKIM signature: To add your own DKIM signature to email that you send using the SendRawEmail API, see Manual DKIM signing in Amazon SES.

The purpose of EasyDKIM is to simplify the process of generating DKIM keys, adding DKIM signatures to outgoing emails, and managing DKIM settings, making it easier for users to implement DKIM authentication for their email messages. Using EasyDKIM, Amazon SES aims to improve email deliverability, prevent email fraud and phishing attacks, establish sender reputation, enhance brand reputation, and comply with industry regulations or legal requirements. EasyDKIM doubles as domain verification (simplification) and it eliminates the need for customers to worry about DKIM key rotation (managed automation). By automating and simplifying the DKIM process, EasyDKIM helps users ensure the integrity and authenticity of their email communications, while reducing the risk of fraudulent activities and improving the chances of emails being delivered to recipients’ inboxes.

Setting up Easy DKIM in Amazon SES:

When you set up Easy DKIM for a domain identity, Amazon SES automatically adds a 2048-bit DKIM signature to every email that you send from that identity. You can configure EasyDKIM by using the Amazon SES console, or by using the API.

The procedure in this section is streamlined to just show the steps necessary to configure Easy DKIM on a domain identity that you’ve already created. If you haven’t yet created a domain identity or you want to see all available options for customizing a domain identity, such as using a default configuration set, custom MAIL FROM domain, and tags, see Creating a domain identity. Part of creating an Easy DKIM domain identity is configuring its DKIM-based verification where you will have the choice to either accept the Amazon SES default of 2048 bits, or to override the default by selecting 1024 bits. Steps to set up easyDKIM for a verified identity:

  1. Sign in to the AWS Management Console and open the Amazon SES console at https://console.aws.amazon.com/ses/
  2. In the navigation pane, under Configuration, choose Verified identities.
  3. List of verified identities in SES console

    Verified identities

  4. In the list of identities, choose an identity where the Identity type is Domain.
  5. Under the Authentication tab, in the DomainKeys Identified Mail (DKIM) container, choose Edit.
  6. In the Advanced DKIM settings container, choose the Easy DKIM button in the Identity type field.
  7. Choose EasyDKIM as identity type; RSA_2048_BITT in DKIM signing key length; Check Enabled checkbox under DKIM signatures.

    DKIM settings

  8. In the DKIM signing key length field, choose either RSA_2048_BIT or RSA_1024_BIT.
  9. In the DKIM signatures field, check the Enabled box.
  10. Choose Save changes.
  11. After configuring your domain identity with Easy DKIM, you must complete the verification process with your DNS provider – proceed to Verifying a DKIM domain identity with your DNS provider and follow the DNS authentication procedures for Easy DKIM.

Conclusion:

Email authentication, especially DKIM, is crucial in securing your emails, establishing sender reputation, and improving email deliverability. EasyDKIM provides a simplified and automated way to implement DKIM authentication. It removes the hassles of generating DKIM keys and managing settings, while additionally reducing risks and and enhancing sender authenticity. By following the steps outlined in this blog post, you can easily set up easyDKIM in Amazon SES and start using DKIM authentication for your email campaigns.

About the Author

Vinay Ujjini is an Amazon Pinpoint and Amazon Simple Email Service Worldwide Principal Specialist Solutions Architect at AWS. He has been solving customer’s omni-channel challenges for over 15 years. He is an avid sports enthusiast and in his spare time, enjoys playing tennis & cricket.

Share and query encrypted data in AWS Clean Rooms

Post Syndicated from Jonathan Herzog original https://aws.amazon.com/blogs/security/share-and-query-encrypted-data-in-aws-clean-rooms/

In this post, we’d like to introduce you to the cryptographic computing feature of AWS Clean Rooms. With AWS Clean Rooms, customers can run collaborative data-query sessions on sensitive data sets that live in different AWS accounts, and can do so without having to share, aggregate, or replicate the data. When customers also use the cryptographic computing feature, their data remains cryptographically protected even while it is being processed by an AWS Clean Rooms collaboration.

Where would AWS Clean Rooms be useful? Consider a scenario where two different insurance companies want to identify duplicate claims so they can identify potential fraud. This would be simple if they could compare their claims with each other, but they might not be able to do so due to privacy constraints.

Alternately, consider an advertising network and a client that want to measure the effectiveness of an advertising campaign. To that end, they would like to know how many of the people who saw the campaign (exposures) went on to make a purchase from the client (purchasers). However, confidentiality concerns might prevent the advertising network from sharing their list of exposures with the client or prevent the client from sharing their list of purchasers with the advertising network.

As these examples show, there can be many situations in which different organizations want to collaborate on a joint analysis of their pooled data, but cannot share their individual datasets directly. One solution to this problem is a data clean room, which is a service trusted by a collaboration’s participants to do the following:

  • Hold the data of individual parties
  • Enforce access-control rules that collaborators specify regarding their data
  • Perform analyses over the pooled data

To serve customers with these needs, AWS recently launched a new data clean-room service called AWS Clean Rooms. This service provides AWS customers with a way to collaboratively analyze data (stored in other AWS services as SQL tables) without having to replicate the data, move the data outside of the AWS Cloud, or allow their collaborators to see the data itself.

Additionally, AWS Clean Rooms provides a feature that gives customers even more control over their data: cryptographic computing. This feature allows AWS Clean Rooms to operate over data that customers encrypt themselves and that the service cannot actually read. Specifically, customers can use this feature to select which portions of their data should be encrypted and to encrypt that data themselves. Collaborators can continue to analyze that data as if it were in the clear, however, even though the data in question remains encrypted while it is being processed in AWS Clean Rooms collaborations. In this way, customers can use AWS Clean Rooms to securely collaborate on data they may not have been able to share due to internal policies or regulations.

Cryptographic computing

Using the cryptographic computing feature of AWS Clean Rooms involves these steps:

  • Users create AWS Clean Rooms collaborations and set collaboration-wide encryption settings. They then invite collaborators to support the analysis process.
  • Outside of AWS Clean Rooms, those collaborators agree on a shared secret: a common, secret, cryptographic key.
  • Collaborators individually encrypt their tables outside of the AWS Cloud (typically on their own premises) using the shared secret, the collaboration ID of the intended collaboration, and the Cryptographic Computing for Clean Rooms (C3R) encryption client (which AWS provides as an open-source package). Collaborators then provide the encrypted tables to AWS Clean Rooms, just as they would have provided plaintext tables.
  • Collaborators continue to use AWS Clean Rooms for their data analysis. They impose access-control rules on their tables, submit SQL queries over the tables in the collaboration, and retrieve results.
  • These results might contain encrypted columns, and so collaborators decrypt the results by using the shared secret and the C3R encryption client.

As a result, data that enters AWS Clean Rooms in encrypted format will remain encrypted from input tables to intermediate values to result sets. AWS Clean Rooms will be unable to decrypt or read the data even while performing the queries.

Note: For those interested in the academic aspects of this process, the cryptographic computing feature of AWS Clean Rooms is based on server-aided private set intersection (PSI). Server-aided PSI allows two or more participants to submit sets of values to a server and learn which elements are found in all sets, but without (1) allowing the participants to learn anything about the other (non-shared) elements, or (2) allowing the server to learn anything about the underlying data (aside from the degrees to which the sets overlap). PSI is just one example of the field of cryptographic computing, which provides a variety of new methods by which encrypted data can be processed for various purposes and without decryption. These techniques allow our customers to use the scale and power of AWS systems on data that AWS will not be able to read. See our Cryptographic Computing webpage for more about our work in this area.

Let’s dive deeper into each new step in the process for using cryptographic computing in AWS Clean Rooms.

Key agreement. Each collaboration needs its own shared secret: a secure cryptographic secret (of at least 256 bits). Customers sometimes have a regulatory need to maintain ownership of their encryption keys. Therefore, the cryptographic computing feature supports the case where customers generate, distribute, and store their collaboration’s secret themselves. In this way, customers’ encryption keys are never stored on an AWS system.

Encryption. AWS Clean Rooms allows table owners to control how tables are encrypted on a column-by-column basis. In particular, each column in an encrypted table will be one of three types: cleartext, sealed, or fingerprint. These types map directly to both how columns are used in queries and how they are protected with cryptography, described as follows:

  • Cleartext columns are not cryptographically processed at all. They are copied to encrypted tables verbatim, and can be used anywhere in a SQL query.
  • Sealed columns are encrypted. The encryption scheme used (AES-GCM) is randomized, meaning that encrypting the same value multiple times yields different ciphertexts each time. This helps prevent the statistical analysis of these columns, but also means that these columns cannot be used in JOIN clauses. They can be used in SELECT clauses, however, which allows them to appear in query results.
  • Fingerprint columns are hashed using the Hash-based Message Authentication Code (HMAC) algorithm. There is no way to decrypt these values, and therefore no reason for them to appear in the SELECT clause of a query. They can, however, be used in JOIN clauses: HMAC will map a given value to the same fingerprint every time, meaning that JOINs will be able to unify common values across different fingerprint columns.

Encryption settings. This last point—that fingerprint values will always map a given plaintext value to the same fingerprint—might give pause to some readers. If this is true, won’t the encrypted table be vulnerable to statistical analysis? That is absolutely correct: it will. For this reason, users might wish to set collaboration-wide encryption settings to control these forms of analysis.

To see how statistical analysis might be a concern, imagine a table where one fingerprint column is named US_State. In this case, a simple frequency analysis will reverse-engineer the plaintext values relatively quickly: the most common fingerprint is almost certain to be “California”, followed by “Texas”, “Florida”, and so on. Also, imagine that the same table has another fingerprint column called US_City, and that a given fingerprint appears in both columns. In that case, the fingerprint in question is almost certain to be “New York”. If a row has a fingerprint in the US_City column but a NULL in the US_State column, furthermore, it’s very likely that the fingerprint is for “District of Columbia”. And finally, imagine that the table has a cleartext column called Time_Zone. In this case, values of “HST” (Hawaii standard time) or “AKST” (Alaska standard time) reveal the value in the US_State column regardless of the cryptography.

Not all datasets will be vulnerable to these kinds of statistical analysis, but some will. Only customers can determine which types of analysis may reveal their data and which may not. Because of this, the cryptographic computing feature allows the customer to decide which protections will be needed. At the time of collaboration creation, that is, the creator of the AWS Clean Rooms collaboration can configure the following collaboration-wide encryption settings:

  • Whether or not fingerprint columns can contain duplicate plaintext values (addressing the “California” example)
  • Whether or not fingerprint columns with different names should fingerprint values in the same way (addressing the “New York” example)
  • Whether or not NULL values in the plaintext table should be left as NULL in the encrypted table (addressing the “District of Columbia” example)
  • Whether or not encrypted tables should be allowed to have cleartext columns at all (addressing the time zone example)

Security is maximized when all of these options are set to “no,” but each “no” will limit the queries that C3R will be able to support. For example, the choice of whether or not encrypted tables should be allowed to have cleartext columns will determine which WHERE clauses will be supported: If cleartext columns are not supported, then the Time_Zone column must be cryptographically processed — meaning that the clause WHERE Time_Zone=”EST” will not act as intended. There might be reasons to set these options to “yes” in order to enable a wider variety of queries, which we discuss in the Query behavior section later in this post.

Decryption. AWS Clean Rooms will write query results to an Amazon Simple Storage Service (Amazon S3) bucket. The recipient copies these results from the bucket to some on-premises storage and then runs the C3R encryption client. The client will find encrypted elements of the output and decrypt them. Note that the client can only decrypt elements from sealed columns. If the output contains elements from a fingerprint column, the client will warn you, but will also leave these elements untouched, as cryptographic fingerprints can’t be decrypted.

Having finished our overview, let’s return to the discussion regarding how encryption can affect the behavior of queries.

Query behavior

Implicit in the discussion so far is something worth calling out explicitly: AWS Clean Rooms runs queries over the data that is provided to it. If the data given to AWS Clean Rooms is encrypted, therefore, queries will be run on the ciphertexts and not the plaintexts. This will not affect the results returned, so long as the columns are used for their intended purposes:

  • Fingerprint columns are used in JOIN clauses
  • Sealed columns are used in SELECT clauses

(Cleartext columns can be used anywhere.) Queries might produce unexpected results, however, if the columns are used outside of their intended purposes:

  • Sometimes queries will fail when they would have succeeded on the plaintext. For example, ciphertexts and fingerprints will be string values, even if the original plaintext values were another type. Therefore, SUM() or AVG() calls on fingerprint or sealed columns will yield errors even if the corresponding plaintext columns were numeric.
  • Sometimes queries will omit results that would have been found by querying the plaintext. For example, attempting to JOIN on sealed columns will yield empty result sets: no two ciphertexts will be the same, even if they encrypt the same plaintext value. (Also, performing a JOIN on fingerprint columns with different names will exhibit the same behavior, if the collaboration-wide encryption settings specified that fingerprint columns of different names should fingerprint values differently.)
  • Sometimes results will include rows that would not be found by querying the plaintext. As mentioned, ciphertexts and fingerprints will be string values—base64 encodings of random-looking bytes, specifically. This means that a clause such as WHERE ‘US_State’ CONTAINS ‘CA’ will match some ciphertexts or fingerprints even when they would not match the plaintext.

To avoid these issues, fingerprint and sealed columns should only be used for their intended purposes (JOIN and SELECT clauses, respectively).

Conclusion

In this blog post, you have learned how AWS Clean Rooms can help you harness the power of AWS services to query and analyze your most-sensitive data. By using cryptographic computing, you can work with collaborators to perform joint analyses over pooled data without sharing your “raw” data with each other—or with AWS. If you believe that you can benefit from cryptographic computing (in AWS Clean Rooms or elsewhere), we’d like to hear from you. Please contact us with any questions or feedback. Also, we invite you to learn more about AWS Clean Rooms (including its use of cryptographic computing). Finally, the C3R client is open source, and can be downloaded from its GitHub page.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Jonathan Herzog

Jonathan Herzog

Jonathan is a Principal Security Engineer in AWS Cryptographyand has worked in cryptography for 25 years. He received his PhD in crypto from MIT, and has developed cryptographic systems for the US Air Force, the National Security Agency, Akamai Technologies, and (now) Amazon.

How Zoom implemented streaming log ingestion and efficient GDPR deletes using Apache Hudi on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/how-zoom-implemented-streaming-log-ingestion-and-efficient-gdpr-deletes-using-apache-hudi-on-amazon-emr/

In today’s digital age, logging is a critical aspect of application development and management, but efficiently managing logs while complying with data protection regulations can be a significant challenge. Zoom, in collaboration with the AWS Data Lab team, developed an innovative architecture to overcome these challenges and streamline their logging and record deletion processes. In this post, we explore the architecture and the benefits it provides for Zoom and its users.

Application log challenges: Data management and compliance

Application logs are an essential component of any application; they provide valuable information about the usage and performance of the system. These logs are used for a variety of purposes, such as debugging, auditing, performance monitoring, business intelligence, system maintenance, and security. However, although these application logs are necessary for maintaining and improving the application, they also pose an interesting challenge. These application logs may contain personally identifiable data, such as user names, email addresses, IP addresses, and browsing history, which creates a data privacy concern.

Laws such as the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA) require organizations to retain application logs for a specific period of time. The exact length of time required for data storage varies depending on the specific regulation and the type of data being stored. The reason for these data retention periods is to ensure that companies aren’t keeping personal data longer than necessary, which could increase the risk of data breaches and other security incidents. This also helps ensure that companies aren’t using personal data for purposes other than those for which it was collected, which could be a violation of privacy laws. These laws also give individuals the right to request the deletion of their personal data, also known as the “right to be forgotten.” Individuals have the right to have their personal data erased, without undue delay.

So, on one hand, organizations need to collect application log data to ensure the proper functioning of their services, and keep the data for a specific period of time. But on the other hand, they may receive requests from individuals to delete their personal data from the logs. This creates a balancing act for organizations because they must comply with both data retention and data deletion requirements.

This issue becomes increasingly challenging for larger organizations that operate in multiple countries and states, because each country and state may have their own rules and regulations regarding data retention and deletion. For example, the Personal Information Protection and Electronic Documents Act (PIPEDA) in Canada and the Australian Privacy Act in Australia are similar laws to GDPR, but they may have different retention periods or different exceptions. Therefore, organizations big or small must navigate this complex landscape of data retention and deletion requirements, while also ensuring that they are in compliance with all applicable laws and regulations.

Zoom’s initial architecture

During the COVID-19 pandemic, the use of Zoom skyrocketed as more and more people were asked to work and attend classes from home. The company had to rapidly scale its services to accommodate the surge and worked with AWS to deploy capacity across most Regions globally. With a sudden increase in the large number of application endpoints, they had to rapidly evolve their log analytics architecture and worked with the AWS Data Lab team to quickly prototype and deploy an architecture for their compliance use case.

At Zoom, the data ingestion throughput and performance needs are very stringent. Data had to be ingested from several thousand application endpoints that produced over 30 million messages every minute, resulting in over 100 TB of log data per day. The existing ingestion pipeline consisted of writing the data to Apache Hadoop HDFS storage through Apache Kafka first and then running daily jobs to move the data to persistent storage. This took several hours while also slowing the ingestion and creating the potential for data loss. Scaling the architecture was also an issue because HDFS data would have to be moved around whenever nodes were added or removed. Furthermore, transactional semantics on billions of records were necessary to help meet compliance-related data delete requests, and the existing architecture of daily batch jobs was operationally inefficient.

It was at this time, through conversations with the AWS account team, that the AWS Data Lab team got involved to assist in building a solution for Zoom’s hyper-scale.

Solution overview

The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data, analytics, artificial intelligence (AI), machine learning (ML), serverless, and container modernization initiatives. The Data Lab has three offerings: the Build Lab, the Design Lab, and Resident Architect. During the Build and Design Labs, AWS Data Lab Solutions Architects and AWS experts supported Zoom specifically by providing prescriptive architectural guidance, sharing best practices, building a working prototype, and removing technical roadblocks to help meet their production needs.

Zoom and the AWS team (collectively referred to as “the team” going forward) identified two major workflows for data ingestion and deletion.

Data ingestion workflow

The following diagram illustrates the data ingestion workflow.

Data Ingestion Workflow

The team needed to quickly populate millions of Kafka messages in the dev/test environment to achieve this. To expedite the process, we (the team) opted to use Amazon Managed Streaming for Apache Kafka (Amazon MSK), which makes it simple to ingest and process streaming data in real time, and we were up and running in under a day.

To generate test data that resembled production data, the AWS Data Lab team created a custom Python script that evenly populated over 1.2 billion messages across several Kafka partitions. To match the production setup in the development account, we had to increase the cloud quota limit via a support ticket.

We used Amazon MSK and the Spark Structured Streaming capability in Amazon EMR to ingest and process the incoming Kafka messages with high throughput and low latency. Specifically, we inserted the data from the source into EMR clusters at a maximum incoming rate of 150 million Kafka messages every 5 minutes, with each Kafka message holding 7–25 log data records.

To store the data, we chose to use Apache Hudi as the table format. We opted for Hudi because it’s an open-source data management framework that provides record-level insert, update, and delete capabilities on top of an immutable storage layer like Amazon Simple Storage Service (Amazon S3). Additionally, Hudi is optimized for handling large datasets and works well with Spark Structured Streaming, which was already being used at Zoom.

After 150 million messages were buffered, we processed the messages using Spark Structured Streaming on Amazon EMR and wrote the data into Amazon S3 in Apache Hudi-compatible format every 5 minutes. We first flattened the message array, creating a single record from the nested array of messages. Then we added a unique key, known as the Hudi record key, to each message. This key allows Hudi to perform record-level insert, update, and delete operations on the data. We also extracted the field values, including the Hudi partition keys, from incoming messages.

This architecture allowed end-users to query the data stored in Amazon S3 using Amazon Athena with the AWS Glue Data Catalog or using Apache Hive and Presto.

Data deletion workflow

The following diagram illustrates the data deletion workflow.

Data Deletion Workflow

Our architecture allowed for efficient data deletions. To help comply with the customer-initiated data retention policy for GDPR deletes, scheduled jobs ran daily to identify the data to be deleted in batch mode.

We then spun up a transient EMR cluster to run the GDPR upsert job to delete the records. The data was stored in Amazon S3 in Hudi format, and Hudi’s built-in index allowed us to efficiently delete records using bloom filters and file ranges. Because only those files that contained the record keys needed to be read and rewritten, it only took about 1–2 minutes to delete 1,000 records out of the 1 billion records, which had previously taken hours to complete as entire partitions were read.

Overall, our solution enabled efficient deletion of data, which provided an additional layer of data security that was critical for Zoom, in light of its GDPR requirements.

Architecting to optimize scale, performance, and cost

In this section, we share the following strategies Zoom took to optimize scale, performance, and cost:

  • Optimizing ingestion
  • Optimizing throughput and Amazon EMR utilization
  • Decoupling ingestion and GDPR deletion using EMRFS
  • Efficient deletes with Apache Hudi
  • Optimizing for low-latency reads with Apache Hudi
  • Monitoring

Optimizing ingestion

To keep the storage in Kafka lean and optimal, as well as to get a real-time view of data, we created a Spark job to read incoming Kafka messages in batches of 150 million messages and wrote to Amazon S3 in Hudi-compatible format every 5 minutes. Even during the initial stages of the iteration, when we hadn’t started scaling and tuning yet, we were able to successfully load all Kafka messages consistently under 2.5 minutes using the Amazon EMR runtime for Apache Spark.

Optimizing throughput and Amazon EMR utilization

We launched a cost-optimized EMR cluster and switched from uniform instance groups to using EMR instance fleets. We chose instance fleets because we needed the flexibility to use Spot Instances for task nodes and wanted to diversify the risk of running out of capacity for a specific instance type in our Availability Zone.

We started experimenting with test runs by first changing the number of Kafka partitions from 400 to 1,000, and then changing the number of task nodes and instance types. Based on the results of the run, the AWS team came up with the recommendation to use Amazon EMR with three core nodes (r5.16xlarge (64 vCPUs each)) and 18 task nodes using Spot fleet instances (a combination of r5.16xlarge (64 vCPUs), r5.12xlarge (48 vCPUs), r5.8xlarge (32 vCPUs)). These recommendations helped Zoom to reduce their Amazon EMR costs by more than 80% while meeting their desired performance goals of ingesting 150 million Kafka messages under 5 minutes.

Decoupling ingestion and GDPR deletion using EMRFS

A well-known benefit of separation of storage and compute is that you can scale the two independently. But a not-so-obvious advantage is that you can decouple continuous workloads from sporadic workloads. Previously data was stored in HDFS. Resource-intensive GDPR delete jobs and data movement jobs would compete for resources with the stream ingestion, causing a backlog of more than 5 hours in upstream Kafka clusters, which was close to filling up the Kafka storage (which only had 6 hours of data retention) and potentially causing data loss. Offloading data from HDFS to Amazon S3 allowed us the freedom to launch independent transient EMR clusters on demand to perform data deletion, helping to ensure that the ongoing data ingestion from Kafka into Amazon EMR is not starved for resources. This enabled the system to ingest data every 5 minutes and complete each Spark Streaming read in 2–3 minutes. Another side effect of using EMRFS is a cost-optimized cluster, because we removed reliance on Amazon Elastic Block Store (Amazon EBS) volumes for over 300 TB storage that was used for three copies (including two replicas) of HDFS data. We now pay for only one copy of the data in Amazon S3, which provides 11 9s of durability and is relatively inexpensive storage.

Efficient deletes with Apache Hudi

What about the conflict between ingest writes and GDPR deletes when running concurrently? This is where the power of Apache Hudi stands out.

Apache Hudi provides a table format for data lakes with transactional semantics that enables the separation of ingestion workloads and updates when run concurrently. The system was able to consistently delete 1,000 records in less than a minute. There were some limitations in concurrent writes in Apache Hudi 0.7.0, but the Amazon EMR team quickly addressed this by back-porting Apache Hudi 0.8.0, which supports optimistic concurrency control, to the current (at the time of the AWS Data Lab collaboration) Amazon EMR 6.4 release. This saved time in testing and allowed for a quick transition to the new version with minimal testing. This enabled us to query the data directly using Athena quickly without having to spin up a cluster to run ad hoc queries, as well as to query the data using Presto, Trino, and Hive. The decoupling of the storage and compute layers provided the flexibility to not only query data across different EMR clusters, but also delete data using a completely independent transient cluster.

Optimizing for low-latency reads with Apache Hudi

To optimize for low-latency reads with Apache Hudi, we needed to address the issue of too many small files being created within Amazon S3 due to the continuous streaming of data into the data lake.

We utilized Apache Hudi’s features to tune file sizes for optimal querying. Specifically, we reduced the degree of parallelism in Hudi from the default value of 1,500 to a lower number. Parallelism refers to the number of threads used to write data to Hudi; by reducing it, we were able to create larger files that were more optimal for querying.

Because we needed to optimize for high-volume streaming ingestion, we chose to implement the merge on read table type (instead of copy on write) for our workload. This table type allowed us to quickly ingest the incoming data into delta files in row format (Avro) and asynchronously compact the delta files into columnar Parquet files for fast reads. To do this, we ran the Hudi compaction job in the background. Compaction is the process of merging row-based delta files to produce new versions of columnar files. Because the compaction job would use additional compute resources, we adjusted the degree of parallelism for insertion to a lower value of 1,000 to account for the additional resource usage. This adjustment allowed us to create larger files without sacrificing performance throughput.

Overall, our approach to optimizing for low-latency reads with Apache Hudi allowed us to better manage file sizes and improve the overall performance of our data lake.

Monitoring

The team monitored MSK clusters with Prometheus (an open-source monitoring tool). Additionally, we showcased how to monitor Spark streaming jobs using Amazon CloudWatch metrics. For more information, refer to Monitor Spark streaming applications on Amazon EMR.

Outcomes

The collaboration between Zoom and the AWS Data Lab demonstrated significant improvements in data ingestion, processing, storage, and deletion using an architecture with Amazon EMR and Apache Hudi. One key benefit of the architecture was a reduction in infrastructure costs, which was achieved through the use of cloud-native technologies and the efficient management of data storage. Another benefit was an improvement in data management capabilities.

We showed that the costs of EMR clusters can be reduced by about 82% while bringing the storage costs down by about 90% compared to the prior HDFS-based architecture. All of this while making the data available in the data lake within 5 minutes of ingestion from the source. We also demonstrated that data deletions from a data lake containing multiple petabytes of data can be performed much more efficiently. With our optimized approach, we were able to delete approximately 1,000 records in just 1–2 minutes, as compared to the previously required 3 hours or more.

Conclusion

In conclusion, the log analytics process, which involves collecting, processing, storing, analyzing, and deleting log data from various sources such as servers, applications, and devices, is critical to aid organizations in working to meet their service resiliency, security, performance monitoring, troubleshooting, and compliance needs, such as GDPR.

This post shared what Zoom and the AWS Data Lab team have accomplished together to solve critical data pipeline challenges, and Zoom has extended the solution further to optimize extract, transform, and load (ETL) jobs and resource efficiency. However, you can also use the architecture patterns presented here to quickly build cost-effective and scalable solutions for other use cases. Please reach out to your AWS team for more information or contact Sales.


About the Authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects focused on underprivileged Children’s education.

Chandra DhandapaniChandra Dhandapani is a Senior Solutions Architect at AWS, where he specializes in creating solutions for customers in Analytics, AI/ML, and Databases. He has a lot of experience in building and scaling applications across different industries including Healthcare and Fintech. Outside of work, he is an avid traveler and enjoys sports, reading, and entertainment.

Amit Kumar Agrawal is a Senior Solutions Architect at AWS, based out of San Francisco Bay Area. He works with large strategic ISV customers to architect cloud solutions that address their business challenges. During his free time he enjoys exploring the outdoors with his family.

Viral Shah is a Analytics Sales Specialist working with AWS for 5 years helping customers to be successful in their data journey. He has over 20+ years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.

Perform secure database write-backs with Amazon QuickSight

Post Syndicated from Srikanth Baheti original https://aws.amazon.com/blogs/big-data/perform-secure-database-write-backs-with-amazon-quicksight/

Amazon QuickSight is a scalable, serverless, machine learning (ML)-powered business intelligence (BI) solution that makes it easy to connect to your data, create interactive dashboards, get access to ML-enabled insights, and share visuals and dashboards with tens of thousands of internal and external users, either within QuickSight itself or embedded into any application.

A write-back is the ability to update a data mart, data warehouse, or any other database backend from within BI dashboards and analyze the updated data in near-real time within the dashboard itself. In this post, we show how to perform secure database write-backs with QuickSight.

Use case overview

To demonstrate how to enable a write-back capability with QuickSight, let’s consider a fictional company, AnyCompany Inc. AnyCompany is a professional services firm that specializes in providing workforce solutions to their customers. AnyCompany determined that running workloads in the cloud to support its growing global business needs is a competitive advantage and uses the cloud to host all its workloads. AnyCompany decided to enhance the way its branches provide quotes to its customers. Currently, the branches generate customer quotes manually, and as a first step in this innovation journey, AnyCompany is looking to develop an enterprise solution for customer quote generation with the capability to dynamically apply local pricing data at the time of quote generation.

AnyCompany currently uses Amazon Redshift as their enterprise data warehouse platform and QuickSight as their BI solution.

Building a new solution comes with the following challenges:

  • AnyCompany wants a solution that is easy to build and maintain, and they don’t want to invest in building a separate user interface.
  • AnyCompany wants to extend the capabilities of their existing QuickSight BI dashboard to also enable quote generation and quote acceptance. This will simplify feature rollouts because their employees already use QuickSight dashboards and enjoy the easy-to-use interface that QuickSight provides.
  • AnyCompany wants to store the quote negotiation history that includes generated, reviewed, and accepted quotes.
  • AnyCompany wants to build a new dashboard with quote history data for analysis and business insights.

This post goes through the steps to enable write-back functionality to Amazon Redshift from QuickSight. Note that the traditional BI tools are read-only with little to no options to update source data.

Solution overview

This solution uses the following AWS services:

  • Amazon API Gateway – Hosts and secures the write-back REST API that will be invoked by QuickSight
  • AWS Lambda – Runs the compute function required to generate the hash and a second function to securely perform the write-back
  • Amazon QuickSight – Offers BI dashboards and quote generation capabilities
  • Amazon Redshift – Stores quotes, prices, and other relevant datasets
  • AWS Secrets Manager – Stores and manages keys to sign hashes (message digest)

Although this solution uses Amazon Redshift as the data store, a similar approach can be implemented with any database that supports creating user-defined functions (UDFs) that can invoke Lambda.

The following figure shows the workflow to perform write-backs from QuickSight.

The first step in the solution is to generate a hash or a message digest of the set of attributes in Amazon Redshift by invoking a Lambda function. This step prevents request tampering. To generate a hash, Amazon Redshift invokes a scalar Lambda UDF. The hashing mechanism used here is the popular BLAKE2 function (available in the Python library hashlib). To further secure the hash, keyed hashing is used, which is a faster and simpler alternative to hash-based message authentication code (HMAC). This key is generated and stored by Secrets Manager and should be accessible only to allowed applications. After the secure hash is generated, it’s returned to Amazon Redshift and combined in an Amazon Redshift view.

Writing the generated quote back to Amazon Redshift is performed by the write-back Lambda function, and an API Gateway REST API endpoint is created to secure and pass requests to the write-back function. The write-back function performs the following actions:

  1. Generate the hash based on the API input parameters received from QuickSight.
  2. Sign the hash by applying the key from Secrets Manager.
  3. Compare the generated hash with the hash received from the input parameters using the compare_digest method available in the HMAC module.
  4. Upon successful validation, write the record to the quote submission table in Amazon Redshift.

The following section provide detailed steps with sample payloads and code snippets.

Generate the hash

The hash is generated using a Lambda UDF in Amazon Redshift. Additionally, a Secrets Manager key is used to sign the hash. To create the hash, complete the following steps:

  1. Create the Secrets Manager key from the AWS Command Line Interface (AWS CLI):
aws secretsmanager create-secret --name “name_of_secret” --description "Secret key to sign hash" --secret-string '{" name_of_key ":"value"}' --region us-east-1
  1. Create a Lambda UDF to generate a hash for encryption:
import boto3	
import base64
import json
from hashlib import blake2b
from botocore.exceptions import ClientError

def get_secret(): 	#This key is used by the Lambda function to further secure the hash.

    secret_name = "<name_of_secret>"
    region_name = "<aws_region_name>"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=<aws_region_name>    )

    # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.

    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except Exception as e:
            raise e

   if "SecretString" in get_secret_value_response:
       access_token = get_secret_value_response["SecretString"]
   else:
       access_token = get_secret_value_response["SecretBinary"]

   return json.loads(access_token)[<token key name>]

SECRET_KEY = get_secret()
AUTH_SIZE = 16 

def sign(payload):
    h = blake2b(digest_size=AUTH_SIZE, key=SECRET_KEY)
    h.update(payload)
    return h.hexdigest().encode('utf-8')

def lambda_handler(event, context):
ret = dict()
 try:
  res = []
  for argument in event['arguments']:
   
   try:
     msg = json.dumps(argument)
     signed_key = sign(str.encode(msg))
     res.append(signed_key.decode('utf-8'))
     
   except:
   res.append(None)     
   ret['success'] = True
   ret['results'] = res
    
except Exception as e:
  ret['success'] = False
  ret['error_msg'] = str(e)
  
 return json.dumps(ret)
  1. Define an Amazon Redshift UDF to call the Lambda function to create a hash:
CREATE OR REPLACE EXTERNAL FUNCTION udf_get_digest (par1 varchar)
RETURNS varchar STABLE
LAMBDA 'redshift_get_digest'
IAM_ROLE 'arn:aws:iam::<AWSACCOUNTID>role/service-role/<role_name>';

The AWS Identity and Access Management (IAM) role in the preceding step should have the following policy attached to be able to invoke the Lambda function:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "lambda:InvokeFunction",
            "Resource": "arn:aws:lambda:us-east-1:<AWSACCOUNTID>1:function:redshift_get_digest"
        }
}
  1. Fetch the key from Secrets Manager.

This key is used by the Lambda function to further secure the hash. This is indicated in the get_secret function in Step 2.

Set up Amazon Redshift datasets in QuickSight

The quote generation dashboard uses the following Amazon Redshift view.

Create an Amazon Redshift view that uses all the preceding columns along with the hash column:

create view quote_gen_vw as select *, udf_get_digest 
( customername || BGCheckRequired || Skill|| Shift ||State ||Cost ) from billing_input_tbl

The records will look like the following screenshot.

The preceding view will be used as the QuickSight dataset to generate quotes. A QuickSight analysis will be created using the dataset. For near-real-time analysis, you can use QuickSight direct query mode.

Create API Gateway resources

The write-back operation is initiated by QuickSight invoking an API Gateway resource, which invokes the Lambda write-back function. As a prerequisite for creating the calculated field in QuickSight to call the write-back API, you must first create these resources.

API Gateway secures and invokes the write-back Lambda function with the parameters created as URL query string parameters with mapping templates. The mapping parameters can be avoided by using the Lambda proxy integration.

Create a REST API resource of method type GET that uses Lambda functions (created in the next step) as the integration type. For instructions, refer to Creating a REST API in Amazon API Gateway and Set up Lambda integrations in API Gateway.

The following screenshot shows the details for creating a query string parameter for each parameter passed to API Gateway.

The following screenshot shows the details for creating a mapping template parameter for each parameter passed to API Gateway.

Create the Lambda function

Create a new Lambda function for the API Gateway to invoke. The Lambda function performs the following steps:

  1. Receive parameters from QuickSight through API Gateway and hash the concatenated parameters.

The following code example retrieves parameters from the API Gateway call using the event object of the Lambda function:

   customer= event['customer’])
    bgc = event['bgc']

The function performs the hashing logic as shown in the create hash step earlier using the concatenated parameters passed by QuickSight.

  1. Compare the hashed output with the hash parameter.

If these don’t match, the write-back won’t happen.

  1. If the hashes match, perform a write-back. Check for the presence of a record in the quote generation table by generating a query from the table using the parameters passed from QuickSight:
query_str = "select * From tbquote where cust = '" + cust + "' and bgc = '" + bgc +"'" +" and skilledtrades = '" + skilledtrades + "'  and shift = '" +shift + "' and jobdutydescription ='" + jobdutydescription + "'"
  1. Complete the following action based on the results of the query:
    1. If no record exists for the preceding combination, generate and run an insert query using all parameters with the status as generated.
    2. If a record exists for the preceding combination, generate and run an insert query with the status as in review. The quote_Id for the existing combination will be reused.

Create a QuickSight visual

This step involves creating a table visual that uses a calculated field to pass parameters to API Gateway and invoke the preceding Lambda function.

  1. Add a QuickSight calculated field named Generate Quote to hold the API Gateway hosted URL that will be triggered to write back the quote history into Amazon Redshift:
concat("https://xxxxx.execute-api.us-east-1.amazonaws.com/stage_name/apiresourcename/?cust=",customername,"&bgc=",bgcheckrequired,"&billrate=",toString(billrate),"&skilledtrades=",skilledtrades,"&shift=",shift,"&jobdutydescription=",jobdutydescription,"&hash=",hashvalue)
  1. Create a QuickSight table visual.
  2. Add required fields such as Customer, Skill, and Cost.
  3. Add the Generate Quote calculated field and style this as a hyperlink.

Choosing this link will write the record into Amazon Redshift. This is incumbent on the same hash value returning when the Lambda function performs the hash on the parameters.

The following screenshot shows a sample table visual.

Write to the Amazon Redshift database

The Secrets Manager key is fetched and used by the Lambda function to generate the hash for comparison. The write-back will be performed only if the hash matches with the hash passed in the parameter.

The following Amazon Redshift table will capture the quote history as populated by the Lambda function. Records in green represent the most recent records for the quote.

Considerations and next steps

Using secure hashes prevents the tampering of payload parameters that are visible in the browser window when the write-back URL is invoked. To further secure the write-back URL, you can employ the following techniques:

  • Deploy the REST API in a private VPC that is accessible only to QuickSight users.
  • To prevent replay attacks, a timestamp can be generated alongside the hashing function and passed as an additional parameter in the write-back URL. The backend Lambda function can then be modified to only allow write-backs within a certain time-based threshold.
  • Follow the API Gateway access control and security best practices.
  • Mitigate potential Denial of Service for public-facing APIs.

You can further enhance this solution to render a web-based form when the write-back URL is opened. This could be implemented by dynamically generating an HTML form in the backend Lambda function to support the input of additional information. If your workload requires a high number of write-backs that require higher throughput or concurrency, a purpose-built data store like Amazon Aurora PostgreSQL-Compatible Edition might be a better choice. For more information, refer to Invoking an AWS Lambda function from an Aurora PostgreSQL DB cluster. These updates can then be synchronized into Amazon Redshift tables using federated queries.

Conclusion

This post showed how to use QuickSight along with Lambda, API Gateway, Secrets Manager, and Amazon Redshift to capture user input data and securely update your Amazon Redshift data warehouse without leaving your QuickSight BI environment. This solution eliminates the need to create an external application or user interface for database update or insert operations, and reduces related development and maintenance overhead. The API Gateway call can also be secured using a key or token to ensure only calls originating from QuickSight are accepted by the API Gateway. This will be covered in subsequent posts.


About the Authors

Srikanth Baheti is a Specialized World Wide Principal Solutions Architect for Amazon QuickSight. He started his career as a consultant and worked for multiple private and government organizations. Later he worked for PerkinElmer Health and Sciences & eResearch Technology Inc, where he was responsible for designing and developing high traffic web applications, highly scalable and maintainable data pipelines for reporting platforms using AWS services and Serverless computing.

Raji Sivasubramaniam is a Sr. Solutions Architect at AWS, focusing on Analytics. Raji is specialized in architecting end-to-end Enterprise Data Management, Business Intelligence and Analytics solutions for Fortune 500 and Fortune 100 companies across the globe. She has in-depth experience in integrated healthcare data and analytics with wide variety of healthcare datasets including managed market, physician targeting and patient analytics.