Tag Archives: Application Integration

Sending and receiving CloudEvents with Amazon EventBridge

Post Syndicated from David Boyne original https://aws.amazon.com/blogs/compute/sending-and-receiving-cloudevents-with-amazon-eventbridge/

Amazon EventBridge helps developers build event-driven architectures (EDA) by connecting loosely coupled publishers and consumers using event routing, filtering, and transformation. CloudEvents is an open-source specification for describing event data in a common way. Developers can publish CloudEvents directly to EventBridge, filter and route them, and use input transformers and API Destinations to send CloudEvents to downstream AWS services and third-party APIs.

Overview

Event design is an important aspect in any event-driven architecture. Developers building event-driven architectures often overlook the event design process when building their architectures. This leads to unwanted side effects like exposing implementation details, lack of standards, and version incompatibility.

Without event standards, it can be difficult to integrate events or streams of messages between systems, brokers, and organizations. Each system has to understand the event structure or rely on custom-built solutions for versioning or validation.

CloudEvents is a specification for describing event data in common formats to provide interoperability between services, platforms, and systems using Cloud Native Computing Foundation (CNCF) projects. As CloudEvents is a CNCF graduated project, many third-party brokers and systems adopt this specification.

Using CloudEvents as a standard format to describe events makes integration easier and you can use open-source tooling to help build event-driven architectures and future proof any integrations. EventBridge can route and filter CloudEvents based on common metadata, without needing to understand the business logic within the event itself.

CloudEvents support two implementation modes, structured mode and binary mode, and a range of protocols including HTTP, MQTT, AMQP, and Kafka. When publishing events to an EventBridge bus, you can structure events as CloudEvents and route them to downstream consumers. You can use input transformers to transform any event into the CloudEvents specification. Events can also be forwarded to public APIs, using EventBridge API destinations, which supports both structured and binary mode encodings, enhancing interoperability with external systems.

Standardizing events using Amazon EventBridge

When publishing events to an EventBridge bus, EventBridge uses its own event envelope and represents events as JSON objects. EventBridge requires that you define top-level fields, such as detail-type and source. You can use any event/payload in the detail field.

This example event shows an OrderPlaced event from the orders-service that is unstructured without any event standards. The data within the event contains the order_id, customer_id and order_total.

{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "1234567890",
  "time": "2023-05-23T11:38:46Z",
  "region": "us-east-1",
  "detail-type": "OrderPlaced",
  "source": "myapp.orders-service",
  "resources": [],
  "detail": {
    "data": {
      "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
      "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
      "order_total": "120.00"
    }
  }
}

Publishers may also choose to add an additional metadata field along with the data field within the detail field to help define a set of standards for their events.

{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c58",
  "account": "1234567890",
  "time": "2023-05-23T12:38:46Z",
  "region": "us-east-1",
  "detail-type": "OrderPlaced",
  "source": "myapp.orders-service",
  "resources": [],
  "detail": {
    "metadata": {
      "idempotency_key": "29d2b068-f9c7-42a0-91e3-5ba515de5dbe",
      "correlation_id": "dddd9340-135a-c8c6-95c2-41fb8f492222",
      "domain": "ORDERS",
      "time": "1707908605"
    },
    "data": {
      "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
      "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
      "order_total": "120.00"
    }
  }
}

This additional event information helps downstream consumers, improves debugging, and can manage idempotency. While this approach offers practical benefits, it duplicates solutions that are already solved with the CloudEvents specification.

Publishing CloudEvents using Amazon EventBridge

When publishing events to EventBridge, you can use CloudEvents structured mode. A structured-mode message is where the entire event (attributes and data) is encoded in the message body, according to a specific event format. A binary-mode message is where the event data is stored in the message body, and event attributes are stored as part of the message metadata.

CloudEvents has a list of required fields but also offers flexibility with optional attributes and extensions. CloudEvents also offers a solution to implement idempotency, requiring that the combination of id and source must uniquely identify an event, which can be used as the idempotency key in downstream implementations.

{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c58",
  "account": "1234567890",
  "time": "2023-05-23T12:38:46Z",
  "region": "us-east-1",
  "detail-type": "OrderPlaced",
  "source": "myapp.orders-service",
  "resources": [],
  "detail": {
    "specversion": "1.0",
    "id": "bba4379f-b764-4d90-9fb2-9f572b2b0b61",
    "source": "myapp.orders-service",
    "type": "OrderPlaced",
    "data": {
      "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
      "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
      "order_total": "120.00"
    },
    "time": "2024-01-01T17:31:00Z",
    "dataschema": "https://us-west-2.console.aws.amazon.com/events/home?region=us-west-2#/registries/discovered-schemas/schemas/myapp.orders-service%40OrderPlaced",
    "correlationid": "dddd9340-135a-c8c6-95c2-41fb8f492222",
    "domain": "ORDERS"
  }
}

By incorporating the required fields, the OrderPlaced event is now CloudEvents compliant. The event also contains optional and extension fields for additional information. Optional fields such as dataschema can be useful for brokers and consumers to retrieve a URI path to the published event schema. This example event references the schema in the EventBridge schema registry, so downstream consumers can fetch the schema to validate the payload.

Mapping existing events into CloudEvents using input transformers

When you define a target in EventBridge, input transformations allow you to modify the event before it reaches its destination. Input transformers are configured per target, allowing you to convert events when your downstream consumer requires the CloudEvents format and you want to avoid duplicating information.

Input transformers allow you to map EventBridge fields, such as id, region, detail-type, and source, into corresponding CloudEvents attributes.

This example shows how to transform any EventBridge event into CloudEvents format using input transformers, so the target receives the required structure.

{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c58",
  "account": "1234567890",
  "time": "2024-01-23T12:38:46Z",
  "region": "us-east-1",
  "detail-type": "OrderPlaced",
  "source": "myapp.orders-service",
  "resources": [],
  "detail": {
    "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
    "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
    "order_total": "120.00"
  }
}

Using this input transformer and input template EventBridge transforms the event schema into the CloudEvents specification for downstream consumers.

Input transformer for CloudEvents:

{
  "id": "$.id",
  "source": "$.source",
  "type": "$.detail-type",
  "time": "$.time",
  "data": "$.detail"
}

Input template for CloudEvents:

{
  "specversion": "1.0",
  "id": "<id>",
  "source": "<source>",
  "type": "<type>",
  "time": "<time>",
  "data": <data>
}

This example shows the event payload that is received by downstream targets, which is mapped to the CloudEvents specification.

{
  "specversion": "1.0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c58",
  "source": "myapp.orders-service",
  "type": "OrderPlaced",
  "time": "2024-01-23T12:38:46Z",
  "data": {
      "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
      "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
      "order_total": "120.00"
    }
}

For more information on using input transformers with CloudEvents, see this pattern on Serverless Land.

Transforming events into CloudEvents using API destinations

EventBridge API destinations allows you to trigger HTTP endpoints based on matched rules to integrate with third-party systems using public APIs. You can route events to APIs that support the CloudEvents format by using input transformations and custom HTTP headers to convert EventBridge events to CloudEvents. API destinations now supports custom content-type headers. This allows you to send structured or binary CloudEvents to downstream consumers.

Sending binary CloudEvents using API destinations

When sending binary CloudEvents over HTTP, you must use the HTTP binding specification and set the necessary CloudEvents headers. These headers tell the downstream consumer that the incoming payload uses the CloudEvents format. The body of the request is the event itself.

CloudEvents headers are prefixed with ce-. You can find the list of headers in the HTTP protocol binding documentation.

This example shows the Headers for a binary event:

POST /order HTTP/1.1 
Host: webhook.example.com
ce-specversion: 1.0
ce-type: OrderPlaced
ce-source: myapp.orders-service
ce-id: bba4379f-b764-4d90-9fb2-9f572b2b0b61
ce-time: 2024-01-01T17:31:00Z
ce-dataschema: https://us-west-2.console.aws.amazon.com/events/home?region=us-west-2#/registries/discovered-schemas/schemas/myapp.orders-service%40OrderPlaced
correlationid: dddd9340-135a-c8c6-95c2-41fb8f492222
domain: ORDERS
Content-Type: application/json; charset=utf-8

This example shows the body for a binary event:

{
  "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
  "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
  "order_total": "120.00"
}

For more information when using binary CloudEvents with API destinations, explore this pattern available on Serverless Land.

Sending structured CloudEvents using API destinations

To support structured mode with CloudEvents, you must specify the content-type as application/cloudevents+json; charset=UTF-8, which tells the API consumer that the payload of the event is adhering to the CloudEvents specification.

POST /order HTTP/1.1
Host: webhook.example.com
 
Content-Type: application/cloudevents+json; charset=utf-8
{
    "specversion": "1.0",
    "id": "bba4379f-b764-4d90-9fb2-9f572b2b0b61",
    "source": "myapp.orders-service",
    "type": "OrderPlaced",      
    "data": {
      "order_id": "c172a984-3ae5-43dc-8c3f-be080141845a",
      "customer_id": "dda98122-b511-4aaf-9465-77ca4a115ee6",
      "order_total": "120.00"
    },
    "time": "2024-01-01T17:31:00Z",
    "dataschema": "https://us-west-2.console.aws.amazon.com/events/home?region=us-west-2#/registries/discovered-schemas/schemas/myapp.orders-service%40OrderPlaced",
    "correlationid": "dddd9340-135a-c8c6-95c2-41fb8f492222",
    "domain":"ORDERS"
}

Conclusion

Carefully designing events plays an important role when building event-driven architectures to integrate producers and consumers effectively. The open-source CloudEvents specification helps developers to standardize integration processes, simplifying interactions between internal systems and external partners.

EventBridge allows you to use a flexible payload structure within an event’s detail property to standardize events. You can publish structured CloudEvents directly onto an event bus in the detail field and use payload transformations to allow downstream consumers to receive events in the CloudEvents format.

EventBridge simplifies integration with third-party systems using API destinations. Using the new custom content-type headers with input transformers to modify the event structure, you can send structured or binary CloudEvents to integrate with public APIs.

For more serverless learning resources, visit Serverless Land.

Manage EDI at scale with new AWS B2B Data Interchange

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/introducing-aws-b2b-data-interchange-simplified-connections-with-your-trading-partners/

Today we’re launching AWS B2B Data Interchange, a fully managed service allowing organizations to automate and monitor the transformation of EDI-based business-critical transactions at cloud scale. With this launch, AWS brings automation, monitoring, elasticity, and pay-as-you-go pricing to the world of B2B document exchange.

Electronic data interchange (EDI) is the electronic exchange of business documents in a standard electronic format between business partners. While email is also an electronic approach, the documents exchanged via email must still be handled by people rather than computer systems. Having people involved slows down the processing of the documents and also introduces errors. Instead, EDI documents can flow straight through to the appropriate application on the receiver’s system, and processing can begin immediately. Electronic documents exchanged between computer systems help businesses reduce cost, accelerate transactional workflows, reduce errors, and improve relationships with business partners.

Work on EDI started in the 1970s. I remember reading a thesis about EDIFACT, a set of standards defining the structure of business documents, back in 1994. But despite being a more than 50-year-old technology, traditional self-managed EDI solutions deployed to parse, validate, map, and translate data from business applications to EDI data formats are difficult to scale as the volume of business changes. They typically do not provide much operational visibility into communication and content errors. These challenges often oblige businesses to fall back to error-prone email document exchanges, leading to high manual work, increased difficulty controlling compliance, and ultimately constraining growth and agility.

AWS B2B Data Interchange is a fully managed, easy-to-use, and cost-effective service for accelerating your data transformations and integrations. It eliminates the heavy lifting of establishing connections with your business partners and mapping the documents to your system’s data-formats and gives visibility on documents that can’t be processed.

It provides a low-code interface for business partner onboarding and EDI data transformation to easily import the processed data to your business applications and analytics solutions. B2B Data Interchange gives you easy access to monitoring data, allowing you to build dashboards to monitor the volume of documents exchanged and the status of each document transformation. For example, it is easy to create alarms when incorrectly formatted documents can’t be transformed or imported into your business applications.

It is common for large enterprises to have thousands of business partners and hundreds of types of documents exchanged with each partner, leading to millions of combinations to manage. AWS B2B Data Interchange is not only available through the AWS Management Console, it is also accessible with the AWS Command Line Interface (AWS CLI) and AWS SDKs. This allows you to write applications or scripts to onboard new business partners and their specific data transformations and to programmatically add alarms and monitoring logic to new or existing dashboards.

B2B Data Interchange supports the X12 EDI data format. It makes it easier to validate and transform EDI documents to the formats expected by your business applications, such as JSON or XML. The raw documents and the transformed JSON or XML files are stored on Amazon Simple Storage Service (Amazon S3). This allows you to build event-driven applications for real-time business data processing or to integrate business documents with your existing analytics or AI/ML solutions.

For example, when you receive a new EDI business document, you can trigger additional routing, processing, and transformation logic using AWS Step Functions or Amazon EventBridge. When an error is detected in an incoming document, you can configure the sending of alarm messages by email or SMS or trigger an API call or additional processing logic using AWS Lambda.

Let’s see how it works
As usual on this blog, let me show you how it works. Let’s imagine I am in charge of the supply chain for a large retail company, and I have hundreds of business partners to exchange documents such as bills of lading, customs documents, advanced shipment notices, invoices, or receiving advice certificates.

In this demo, I use the AWS Management Console to onboard a new business partner. By onboarding, I mean defining the contact details of the business partner, the type of documents I will exchange with them, the technical data transformation to the JSON formats expected by my existing business apps, and where to receive the documents.

With this launch, the configuration of the transport mechanism for the EDI document is managed outside B2B Data Interchange. Typically, you will configure a transfer gateway and propose that your business partner transfer the document using SFTP or AS2.

There are no servers to manage or application packages to install and configure. I can get started in just four steps.

First, I create a profile for my business partner.

B2B Data Interchange - Create profile

Second, I create a transformer. A transformer defines the source document format and the mapping to my existing business application data format: JSON or XML. I can use the graphical editor to validate a sample document and see the result of the transformation directly from the console. We use the standard JSONATA query and transformation language to define the transformation logic to JSON documents and standard XSLT when transforming to XML documents.

B2B Data Interchange - Create transformer - input

B2B Data Interchange - Create transformer - transformation

I activate the transformer once created.

B2B Data Interchange - Create transformer - activate

Third, I create a trading capability. This defines which Amazon Simple Storage Service (Amazon S3) buckets will receive the documents from a specific business partner and where the transformed data will be stored.

There is a one-time additional configuration to make sure proper permissions are defined on the S3 bucket policy. I select Copy policy and navigate to the Amazon S3 page of the console to apply the policies to the S3 bucket. One policy allows B2B Data Interchange to read from the incoming bucket, and one policy allows it to write to your outgoing bucket.

B2B Data Interchange - Create capability

B2B Data Interchange - Create capability - configure directory

While I am configuring the S3 bucket, it is also important to turn on Amazon EventBridge on the S3 bucket. This is the mechanism we use to trigger the data transformation upon the arrival of a new business document.

B2B Data Interchange - Enbale EventBridge on S3 bucket

Finally, back at the B2B Data Interchange configuration, I create a partnership. Partnerships are dedicated resources that establish a relationship between you and your individual trading partners. Partnerships contain details about a specific trading partner, the types of EDI documents you receive from them, and how those documents should be transformed into custom JSON or XML formats. A partnership links the business profile I created in the first step with one or multiple document types and transformations I defined in step two.

B2B Data Interchange - Create partnership

This is also where I can monitor the status of the last set of documents I received and the status of their transformation. For more historical data, you can navigate to Amazon CloudWatch using the links provided in the console.

B2B Data Interchange - Log group

To test my setup, I upload an EDI 214 document to the incoming bucket and a few seconds later, I can see the transformed JSON document appearing in the destination bucket.

B2B Data Interchange - Transformed document on the bucket

I can observe the status of document processing and transformation using Invocations and TriggeredRules CloudWatch metrics from EventBridge. From there, together with the CloudWatch Logs, I can build dashboards and configure alarms as usual. I can also configure additional enrichment, routing, and processing of the incoming or transformed business documents by writing an AWS Lambda function or a workflow using AWS Step Functions.

Pricing and availability
AWS B2B Data Interchange is available today in three of the AWS Regions: US East (Ohio, N. Virginia) and US West (Oregon).

There is no one-time setup fee or recurring monthly subscription. AWS charges you on demand based on your real usage. There is a price per partnership per month and a price per document transformed. The B2B Data Interchange pricing page has the details.

AWS B2B Data Interchange makes it easy to manage your trading partner relationships so you can automatically exchange, transform, and monitor EDI workflows at cloud scale. It doesn’t require you to install or manage any infrastructure and makes it easy for you to integrate with your existing business applications and systems. You can use the AWS B2B Data Interchange API or the AWS SDK to automate the onboarding of your partners. Combined with a fully managed and scalable infrastructure, AWS B2B Data Interchange helps your business to be more agile and scale your operations.

Learn more:

Go build!

— seb

New – AWS Audit Manager now supports first third-party GRC integration

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/new-aws-audit-manager-now-supports-first-third-party-grc-integration/

Auditing is a continuous and ongoing process, and every audit includes the collection of evidence. The evidence gathered helps confirm the state of resources and it’s used to demonstrate that the customer’s policies, procedures, and activities (controls), are in place, and that the control has been operational for a specified period of time. AWS Audit Manager already automates this evidence collection for AWS usage. However, large enterprise organizations who deploy their workloads across a range of locations such as cloud, on-premises, or a combination of both, manage this evidence data using a combination of third-party or homegrown tools, spreadsheets, and emails.

Today we’re excited to announce the integration of AWS Audit Manager with third party Governance, Risk, and Compliance (GRC) provider, MetricStream CyberGRC, an AWS Partner with GRC capabilities. This integration allows enterprises to manage compliance across AWS, on-premises, and other cloud environments in a centralized GRC environment.

Before this announcement, Audit Manager operated only in the AWS context, allowing customers to collect compliance evidence for resources in AWS. They would then relay that information to their GRC systems external to AWS for additional aggregation and analysis. This process left customers without an automated way to monitor and evaluate all compliance data in one centralized location, resulting in delays to compliance outcomes.

The GRC integration with Audit Manager allows you to use audit evidence collected by Audit Manager directly in MetricStream CyberGRC. Audit Manager now receives the controls in scope from MetricStream CyberGRC, collects evidence around these controls, and exports the data related to the audit into MetricStream CyberGRC for aggregation and analysis. You will now have aggregated compliance, real-time monitoring and centralized reporting. This will reduce compliance fatigue and improve stakeholder collaboration.

How It Works
Using Amazon Cognito User Pools, you’ll be onboarded into the multi-tenant instance of MetricStream CyberGRC.

Amazon Cognito User Pools diagram

Amazon Cognito User Pools

Once onboarded, you’ll be able to view AWS assets and frameworks inside MetricStream CyberGRC. You can then begin by choosing the suitable Audit Manager framework to define the relationships between your existing enterprise controls and AWS controls. After creating this one-time control mapping, you can define the accounts in scope to create an assessment that MetricStream CyberGRC will manage in AWS Audit Manager on your behalf. This assessment triggers AWS Audit Manager to collect evidence in context of the mapped controls. As a result, you get a unified view of compliance evidence inside your GRC application. Any standard controls that you have in Audit Manager will be provided to MetricStream CyberGRC by using the GetControl API to facilitate manual mapping process wherever automated mapping fails or does not suffice. The EvidenceFinder API will send bulk evidence from Audit Manager to MetricStream CyberGRC.

Available Now
This feature is available today where Audit Manager (AWS Regions) and MetricStream CyberGRC are both available. There are no additional AWS Audit Manager charges for using this integration. To use this integration, please reach out to MetricStream for information about access and purchase of MetricStream CyberGRC software.

As part of the AWS Free Tier, AWS Audit Manager offers a free tier for first-time customers. The free tier will expire in two calendar months after the first subscription. For more information, see AWS Audit Manager pricing. To learn more about AWS Audit Manager integration with MetricStream CyberGRC, see Audit Manager documentation.

Veliswa

Build event-driven architectures with Amazon MSK and Amazon EventBridge

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/build-event-driven-architectures-with-amazon-msk-and-amazon-eventbridge/

Based on immutable facts (events), event-driven architectures (EDAs) allow businesses to gain deeper insights into their customers’ behavior, unlocking more accurate and faster decision-making processes that lead to better customer experiences. In EDAs, modern event brokers, such as Amazon EventBridge and Apache Kafka, play a key role to publish and subscribe to events. EventBridge is a serverless event bus that ingests data from your own apps, software as a service (SaaS) apps, and AWS services and routes that data to targets. Although there is overlap in their role as the backbone in EDAs, both solutions emerged from different problem statements and provide unique features to solve specific use cases. With a solid understanding of both technologies and their primary use cases, developers can create easy-to-use, maintainable, and evolvable EDAs.

If the use case is well defined and directly maps to one event bus, such as event streaming and analytics with streaming events (Kafka) or application integration with simplified and consistent event filtering, transformation, and routing on discrete events (EventBridge), the decision for a particular broker technology is straightforward. However, organizations and business requirements are often more complex and beyond the capabilities of one broker technology. In almost any case, choosing an event broker should not be a binary decision. Combining complementary broker technologies and embracing their unique strengths is a solid approach to build easy-to-use, maintainable, and evolvable EDAs. To make the integration between Kafka and EventBridge even smoother, AWS open-sourced the EventBridge Connector based on Apache Kafka. This allows you to consume from on-premises Kafka deployments and avoid point-to-point communication, while using the existing knowledge and toolset of Kafka Connect.

Streaming applications enable stateful computations over unbound datasets. This allows real-time use cases such as anomaly detection, event-time computations, and much more. These applications can be built using frameworks such as Apache Flink, Apache Spark, or Kafka Streams. Although some of those frameworks support sending events to downstream systems other than Apache Kafka, there is no standardized way of doing so across frameworks. It would require each application owner to build their own logic to send events downstream. In an EDA, the preferred way to handle such a scenario is to publish events to an event bus and then send them downstream.

There are two ways to send events from Apache Kafka to EventBridge: the preferred method using Amazon EventBridge Pipes or the EventBridge sink connector for Kafka Connect. In this post, we explore when to use which option and how to build an EDA using the EventBridge sink connector and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

EventBridge sink connector vs. EventBridge Pipes

EventBridge Pipes connects sources to targets with a point-to-point integration, supporting event filtering, transformations, enrichment, and event delivery to over 14 AWS services and external HTTPS-based targets using API Destinations. This is the preferred and most easy method to send events from Kafka to EventBridge as it simplifies the setup and operations with a delightful developer experience.

Alternatively, under the following circumstances you might want to choose the EventBridge sink connector to send events from Kafka directly to EventBridge Event Buses:

  • You are have already invested in processes and tooling around the Kafka Connect framework as the platform of your choice to integrate with other systems and services
  • You need to integrate with a Kafka-compatible schema registry e.g., the AWS Glue Schema Registry, supporting Avro and Protobuf data formats for event serialization and deserialization
  • You want to send events from on-premises Kafka environments directly to EventBridge Event Buses

Overview of solution

In this post, we show you how to use Kafka Connect and the EventBridge sink connector to send Avro-serialized events from Amazon Managed Streaming for Apache Kafka (Amazon MSK) to EventBridge. This enables a seamless and consistent data flow from Apache Kafka to dozens of supported EventBridge AWS and partner targets, such as Amazon CloudWatch, Amazon SQS, AWS Lambda, and HTTPS targets like Salesforce, Datadog, and Snowflake using EventBridge API destinations. The following diagram illustrates the event-driven architecture used in this blog post based on Amazon MSK and EventBridge.

Architecture Diagram

The workflow consists of the following steps:

  1. The demo application generates credit card transactions, which are sent to Amazon MSK using the Avro format.
  2. An analytics application running on Amazon Elastic Container Service (Amazon ECS) consumes the transactions and analyzes them if there is an anomaly.
  3. If an anomaly is detected, the application emits a fraud detection event back to the MSK notification topic.
  4. The EventBridge connector consumes the fraud detection events from Amazon MSK in Avro format.
  5. The connector converts the events to JSON and sends them to EventBridge.
  6. In EventBridge, we use JSON filtering rules to filter our events and send them to other services or another Event Bus. In this example, fraud detection events are sent to Amazon CloudWatch Logs for auditing and introspection, and to a third-party SaaS provider to showcase how easy it is to integrate with third-party APIs, such as Salesforce.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Deploy the AWS CDK stack

This walkthrough requires you to deploy an AWS CDK stack to your account. You can deploy the full stack end to end or just the required resources to follow along with this post.

  1. In your terminal, run the following command:
    git clone https://github.com/awslabs/eventbridge-kafka-connector/

  2. Navigate to the cdk directory:
    cd eventbridge-kafka-connector/cdk

  3. Deploy the AWS CDK stack based on your preferences:
  4. If you want to see the complete setup explained in this post, run the following command:
    cdk deploy —context deployment=FULL

  5. If you want to deploy the connector on your own but have the required resources already, including the MSK cluster, AWS Identity and Access Management (IAM) roles, security groups, data generator, and so on, run the following command:
    cdk deploy —context deployment=PREREQ

Deploy the EventBridge sink connector on Amazon MSK Connect

If you deployed the CDK stack in FULL mode, you can skip this section and move on to Create EventBridge rules.

The connector needs an IAM role that allows reading the data from the MSK cluster and sending records downstream to EventBridge.

Upload connector code to Amazon S3

Complete the following steps to upload the connector code to Amazon Simple Storage Service (Amazon S3):

  1. Navigate to the GitHub repo.
  2. Download the release 1.0.0 with the AWS Glue Schema Registry dependencies included.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Choose Create bucket.
  5. For Bucket name, enter eventbridgeconnector-bucket-${AWS_ACCOUNT_ID}.

As Because S3 buckets must be globally unique, replace ${AWS_ACCOUNT_ID} with your actual AWS account ID. For example, eventbridgeconnector-bucket-123456789012.

  1. Open the bucket and choose Upload.
  2. Select the .jar file that you downloaded from the GitHub repository and choose Upload.

S3 File Upload Console

Create a custom plugin

We now have our application code in Amazon S3. As a next step, we create a custom plugin in Amazon MSK Connect. Complete the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the file named kafka-eventbridge-sink-with-gsr-dependencies.jar in the S3 bucket eventbridgeconnector-bucket-${AWS_ACCOUNT_ID} for the EventBridge connector.
  4. For Custom plugin name, enter msk-eventBridge-sink-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

MSK Connect Plugin Screen

  1. Wait for plugin to transition to the status Active.

Create a connector

Complete the following steps to create a connector in MSK Connect:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-eventBridge-sink-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-eventBridge-sink-connector.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings (for more details on the configuration, see the GitHub repository):
    auto.offset.reset=earliest
    connector.class=software.amazon.event.kafkaconnector.EventBridgeSinkConnector
    topics=notifications
    aws.eventbridge.connector.id=avro-test-connector
    aws.eventbridge.eventbus.arn=arn:aws:events:us-east-1:123456789012:event-bus/eventbridge-sink-eventbus
    aws.eventbridge.region=us-east-1
    tasks.max=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter.region=us-east-1
    value.converter.registry.name=default-registry
    value.converter.avroRecordType=GENERIC_RECORD

  11. Make sure to replace aws.eventbridge.eventbus.arn, aws.eventbridge.region, and value.converter.region with the values from the prerequisite stack.
  12. In the Connector capacity section, select Autoscaled for Capacity type.
  13. Leave the default value of 1 for MCU count per worker.
  14. Keep all default values for Connector capacity.
  15. For Worker configuration, select Use the MSK default configuration.
  16. Under Access permissions, choose the custom IAM role KafkaEventBridgeSinkStack-connectorRole, which you created during the AWS CDK stack deployment.
  17. Choose Next.
  18. Choose Next again.
  19. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  20. For Log group, choose /aws/mskconnect/eventBridgeSinkConnector.
  21. Choose Next.
  22. Under Review and Create, validate all the settings and choose Create connector.

The connector will be now in the state Creating. It can take up to several minutes for the connector to transition into the status Running.

Create EventBridge rules

Now that the connector is forwarding events to EventBridge, we can use EventBridge rules to filter and send events to other targets. Complete the following steps to create a rule:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Choose Create rule.
  3. Enter eb-to-cloudwatch-logs-and-webhook for Name.
  4. Select eventbridge-sink-eventbus for Event bus.
  5. Choose Next.
  6. Select Custom pattern (JSON editor), choose Insert, and replace the event pattern with the following code:
    {
      "detail": {
        "value": {
          "eventType": ["suspiciousActivity"],
          "source": ["transactionAnalyzer"]
        }
      },
      "detail-type": [{
        "prefix": "kafka-connect-notification"
      }]
    }
    

  7. Choose Next.
  8. For Target 1, select CloudWatch log group and enter kafka-events for Log Group.
  9. Choose Add another target.
  10. (Optional: Create an API destination) For Target 2, select EventBridge API destination for Target types.
  11. Select Create a new API destination.
  12. Enter a descriptive name for Name.
  13. Add the URL and enter it as API destination endpoint. (This can be the URL of your Datadog, Salesforce, etc. endpoint)
  14. Select POST for HTTP method.
  15. Select Create a new connection for Connection.
  16. For Connection Name, enter a name.
  17. Select Other as Destination type and select API Key as Authorization Type.
  18. For API key name and Value, enter your keys.
  19. Choose Next.
  20. Validate your inputs and choose Create rule.

EventBridge Rule

The following screenshot of the CloudWatch Logs console shows several events from EventBridge.

CloudWatch Logs

Run the connector in production

In this section, we dive deeper into the operational aspects of the connector. Specifically, we discuss how the connector scales and how to monitor it using CloudWatch.

Scale the connector

Kafka connectors scale through the number of tasks. The code design of the EventBridge sink connector doesn’t limit the number of tasks that it can run. MSK Connect provides the compute capacity to run the tasks, which can be from Provisioned or Autoscaled type. During the deployment of the connector, we choose the capacity type Autoscaled and 1 MCU per worker (which represents 1vCPU and 4GiB of memory). This means MSK Connect will scale the infrastructure to run tasks but not the number of tasks. The number of tasks is defined by the connector. By default, the connector will start with the number of tasks defined in tasks.max in the connector configuration. If this value is higher than the partition count of the processed topic, the number of tasks will be set to the number of partitions during the Kafka Connect rebalance.

Monitor the connector

MSK Connect emits metrics to CloudWatch for monitoring by default. Besides MSK Connect metrics, the offset of the connector should also be monitored in production. Monitoring the offset gives insights if the connector can keep up with the data produced in the Kafka cluster.

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster you created and choose Delete on the Actions menu.
  5. On the EventBridge console, choose Rules in the navigation pane.
  6. Choose the event bus eventbridge-sink-eventbus.
  7. Select all the rules you created and choose Delete.
  8. Confirm the removal by entering delete, then choose Delete.

If you deployed the AWS CDK stack with the context PREREQ, delete the .jar file for the connector.

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Navigate to the bucket where you uploaded your connector and delete the kafka-eventbridge-sink-with-gsr-dependencies.jar file.

Independent from the chosen deployment mode, all other AWS resources can be deleted by using AWS CDK or AWS CloudFormation. Run cdk destroy from the repository directory to delete the CloudFormation stack.

Alternatively, on the AWS CloudFormation console, select the stack KafkaEventBridgeSinkStack and choose Delete.

Conclusion

In this post, we showed how you can use MSK Connect to run the AWS open-sourced Kafka connector for EventBridge, how to configure the connector to forward a Kafka topic to EventBridge, and how to use EventBridge rules to filter and forward events to CloudWatch Logs and a webhook.

To learn more about the Kafka connector for EventBridge, refer to Amazon EventBridge announces open-source connector for Kafka Connect, as well as the MSK Connect Developer Guide and the code for the connector on the GitHub repo.


About the Authors

Florian Mair is a Senior Solutions Architect and data streaming expert at AWS. He is a technologist that helps customers in Germany succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

Benjamin Meyer is a Senior Solutions Architect at AWS, focused on Games businesses in Germany to solve business challenges by using AWS Cloud services. Benjamin has been an avid technologist for 7 years, and when he’s not helping customers, he can be found developing mobile apps, building electronics, or tending to his cacti.

How Vercel Shipped Cron Jobs in 2 Months Using Amazon EventBridge Scheduler

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/how-vercel-shipped-cron-jobs-in-2-months-using-amazon-eventbridge-scheduler/

Vercel implemented Cron Jobs using Amazon EventBridge Scheduler, enabling their customers to create, manage, and run scheduled tasks at scale. The adoption of this feature was rapid, reaching over 7 million weekly cron invocations within a few months of release. This article shows how they did it and how they handle the massive scale they’re experiencing.

Vercel builds a front-end cloud that makes it easier for engineers to deploy and run their front-end applications. With more than 100 million deployments in Vercel in the last two years, Vercel helps users take advantage of best-in-class AWS infrastructure with zero configuration by relying heavily on serverless technology. Vercel provides a lot of features that help developers host their front-end applications. However, until the beginning of this year, they hadn’t built Cron Jobs yet.

A cron job is a scheduled task that automates running specific commands or scripts at predetermined intervals or fixed times. It enables users to set up regular, repetitive actions, such as backups, sending notification emails to customers, or processing payments when a subscription needs to be renewed. Cron jobs are widely used in computing environments to improve efficiency and automate routine operations, and they were a commonly requested feature from Vercel’s customers.

In December 2022, Vercel hosted an internal hackathon to foster innovation. That’s where Vincent Voyer and Andreas Schneider joined forces to build a prototype cron job feature for the Vercel platform. They formed a team of five people and worked on the feature for a week. The team worked on different tasks, from building a user interface to display the cron jobs to creating the backend implementation of the feature.

Amazon EventBridge Scheduler
When the hackathon team started thinking about solving the cron job problem, their first idea was to use Amazon EventBridge rules that run on a schedule. However, they realized quickly that this feature has a limit of 300 rules per account per AWS Region, which wasn’t enough for their intended use. Luckily, one of the team members had read the announcement of Amazon EventBridge Scheduler in the AWS Compute blog and they thought this would be a perfect tool for their problem.

By using EventBridge Scheduler, they could schedule one-time or recurrently millions of tasks across over 270 AWS services without provisioning or managing the underlying infrastructure.

How cron jobs work

For creating a new cron job in Vercel, a customer needs to define the frequency in which this task will run and the API they want to invoke. Vercel, in the backend, uses EventBridge Scheduler and creates a new schedule when a new cron job is created.

To call the endpoint, the team used an AWS Lambda function that receives the path that needs to be invoked as input parameters.

How cron jobs works

When the time comes for the cron job to run, EventBridge Scheduler invokes the function, which then calls the customer website endpoint that was configured.

By the end of the week, Vincent and his team had a working prototype version of the cron jobs feature, and they won a prize at the hackathon.

Building Vercel Cron Jobs
After working for one week on this prototype in December, the hackathon ended, and Vincent and his team returned to their regular jobs. In early January 2023, Vicent and the Vercel team decided to take the project and turn it into a real product.

During the hackathon, the team built the fundamental parts of the feature, but there were some details that they needed to polish to make it production ready. Vincent and Andreas worked on the feature, and in less than two months, on February 22, 2023, they announced Vercel Cron Jobs to the public. The announcement tweet got over 400 thousand views, and the community loved the launch.

Tweet from Vercel announcing cron jobs

The adoption of this feature was very rapid. Within a few months of launching Cron Jobs, Vercel reached over 7 million cron invocations per week, and they expect the adoption to continue growing.

Cron jobs adoption

How Vercel Cron Jobs Handles Scale
With this pace of adoption, scaling this feature is crucial for Vercel. In order to scale the amount of cron invocations at this pace, they had to make some business and architectural decisions.

From the business perspective, they defined limits for their free-tier customers. Free-tier customers can create a maximum of two cron jobs in their account, and they can only have hourly schedules. This means that free customers cannot run a cron job every 30 minutes; instead, they can do it at most every hour. Only customers on Vercel paid tiers can take advantage of EventBridge Scheduler minute granularity for scheduling tasks.

Also, for free customers, minute precision isn’t guaranteed. To achieve this, Vincent took advantage of the time window configuration from EventBridge Scheduler. The flexible time window configuration allows you to start a schedule within a window of time. This means that the scheduled tasks are dispersed across the time window to reduce the impact of multiple requests on downstream services. This is very useful if, for example, many customers want to run their jobs at midnight. By using the flexible time window, the load can spread across a set window of time.

From the architectural perspective, Vercel took advantage of hosting the APIs and owning the functions that the cron jobs invoke.

Validating the calls

This means that when the Lambda function is started by EventBridge Scheduler, the function ends its run without waiting for a response from the API. Then Vercel validates if the cron job ran by checking if the API and Vercel function ran correctly from its observability mechanisms. In this way, the function duration is very short, less than 400 milliseconds. This allows Vercel to run a lot of functions per second without affecting their concurrency limits.

Lambda invocations and duration dashboard

What Was The Impact?
Vercel’s implementation of Cron Jobs is an excellent example of what serverless technologies enable. In two months, with two people working full time, they were able to launch a feature that their community needed and enthusiastically adopted. This feature shows the completeness of Vercel’s platform and is an important feature to convince their customers to move to a paid account.

If you want to get started with EventBridge Scheduler, see Serverless Land patterns for EventBridge Scheduler, where you’ll find a broad range of examples to help you.

Marcia

Automatically delete schedules upon completion with Amazon EventBridge Scheduler

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/compute/automatically-delete-schedules-upon-completion-with-amazon-eventbridge-scheduler/

Amazon EventBridge Scheduler now supports configuring automatic deletion of schedules after completion. Now you can configure one-time and recurring schedules with an end date to be automatically deleted upon completion to avoid managing individual schedules.

Amazon EventBridge Scheduler allows you to create, run, and manage schedules on scale. Using EventBridge Scheduler, you can schedule millions of tasks to invoke over 270 AWS services and over 6,000 API operations, such as AWS Lambda, AWS Step Functions, and Amazon SNS.

By default, EventBridge Scheduler allows customers to have 1 million schedules per account, which can be increased as needed. However, completed schedules are counted towards the account quota limits. In addition, completed schedules are visible when listing schedules, and require customers to remove them. Some customers have created their own patterns to automatically remove completed schedules and since the EventBridge Scheduler announcement last November, this was one of the most requested features by customers.

Deleting after completion

When you configure automatic deletion for a schedule, EventBridge Scheduler deletes the schedule shortly after its last target invocation. You can set up automatic deletion when you create the schedule, or you can update the schedule settings at any point before its last invocation.

You can configure this setting in one-time and recurring schedules.

  • One-time schedules: your schedule is deleted after the schedule has invoked its target once.
  • Recurring schedules: set with rate or cron expressions, your schedule is deleted after the last invocation.

If all retries are exhausted because of failure for a schedule configured with automatic deletion, the schedule is deleted shortly after the last unsuccessful attempt.

Console action after schedule completition

With this new capability, you can save time, resources, and operational costs when managing your schedules.

Setting up schedules to delete after completion

You can create schedules that are automatically deleted after completion from the AWS Management Console, AWS SDK, or AWS CLI in all AWS Regions where EventBridge Scheduler is available.

For example, imagine that you are a developer on a platform that allows end users to receive notifications when a task is due. You are already using EventBridge Scheduler to implement this feature. For every task that your users create in your application, your code creates a new schedule in EventBridge Scheduler. You can now configure all these schedules to be deleted automatically after completion. And shortly after the schedules run, they are removed from your EventBridge Scheduler, allowing you to scale your system and keep on creating schedules, making it easier to manage your active schedules and quota limits.

Let see how you can implement this example with the new capability of EventBridge Scheduler. When a user creates a new task with a reminder, a function is triggered from your application. That function creates a one-time schedule in EventBridge Scheduler.

Create a schedule diagram

This example shows how you can create a new one-time schedule that is automatically deleted after completion using the AWS CLI and has SNS as a target. Make sure that you update the AWS CLI to the latest version. Then you can create a new schedule with the parameter action-after-completion ‘DELETE’.

$ aws scheduler create-schedule --name SendEmailOnce \
--schedule-expression ”at(2023-08-02T17:35:00)",\
--schedule-expression-timezone "Europe/Helsinki" \
--flexible-time-window "{\"Mode\": \"OFF\"}" \
--target "{\"Arn\": \"arn:aws:sns:us-east-1:xxx:test-send-email\", \"RoleArn\": \" arn:aws:iam::xxxx:role/sam_scheduler_role\" }" \
--action-after-completion 'DELETE'

This command creates a one-time schedule with the name SendEmailOnce, that runs at a specific date, defined in the schedule-expression, and in a specific time zone, defined in the schedule-expression-timezone. This schedule is not using the flexible time window feature. Next, you must define the target for this schedule. This one sends a message to an SNS topic.

You can validate that your schedule is created correctly from the AWS CLI with the get-schedule command.

$ aws scheduler get-schedule --name SendEmailOnce
{
    "ActionAfterCompletion": "DELETE",
    "Arn": "arn:aws:scheduler:us-east-1:905614108351:schedule/default/SendEmailOnce",
    "CreationDate": 1690874334.83,
    "FlexibleTimeWindow": {
        "Mode": "OFF"
    },
    "GroupName": "default",
    "LastModificationDate": 1690874334.83,
    "Name": "SendEmailOnce3",
    "ScheduleExpression": "at(2023-08-02T17:35:00)",
    "ScheduleExpressionTimezone": "Europe/Helsinki",
    "State": "ENABLED",
    "Target": {
        "Arn": "arn:aws:sns:us-east-1:xxxx:test-send-email",
        "RetryPolicy": {
            "MaximumEventAgeInSeconds": 86400,
            "MaximumRetryAttempts": 185
        },
        "RoleArn": "arn:aws:iam::XXXX:role/scheduler_role"
    }
}

In addition, you can see the details of the schedule from the AWS Management Console.

Schedule details

Now when the date of the notification arrives, EventBridge Scheduler invokes the target configured in the schedule, in this case SNS, and emails a notification to the customer.

Schedule starts

Shortly after this schedule is completed, if you list the schedules, you see that the schedule was deleted and it is no longer listed.

$ aws scheduler list-schedules
{
"Schedules": [
]
}

Benefits of automation

Traditionally, many problems that EventBridge Scheduler solves were addressed using batch processes and pull-based models.

Some organizations are using EventBridge Scheduler to replace their pull-based models for a more dynamic push-based model. Before implementing Scheduler, they were relying on the customer to ask for the data when they need it. Now, with EventBridge Scheduler, they are creating schedules to report back to their customers at critical times of their journey.

For example, an airline can use EventBridge Scheduler to create one-time schedules 24 hours, 4 hours, and 2 hours before the flight, to keep their passengers up to date with the flight status. Customers receive a notification with the link for the online check-in, the check-in counter number, baggage pick up information, and any flight changes that occur. In this way, passengers are always up to date on their flight status and they can take immediate action. This dynamic model not only helps to improve the customer experience but also improves the operational efficiency for the airline.

Other organizations use EventBridge Scheduler to replace batch operations, as you can configure a schedule that starts a batch process at the time of the day you need. Also, you can take advantage of EventBridge Scheduler time zones and run the processes at the time that make sense for your end customer.

For example, consider an international financial institution that must send customers a statement of their account at the end of the day. You can use EventBridge Scheduler to set up a recurrent schedule for each of your customers that sends a report at the end of the day of your customers’ time zone. In this way, you can improve the customer experience as now the system is personalized for their settings, and also reduce operational overhead, as the processing operations are distributed throughout the day.

In addition, EventBridge Scheduler solves many new use cases for customers. For example, if you are a financial institution that handles payments, you can create a one-time schedule for every large transaction that needs a confirmation. If the transaction is not confirmed when the schedule runs, you can cancel the transaction. This decreases the risk of handling transactions, improves the customer experience, and also improves the automation of your processes by making them real time.

Another use case is to handle credit card expiration dates. You can create a one-time schedule that emails the customer to update their credit card information one month before the expiration date. This solution removes operational overhead compared to the traditional implementation of using servers and batch processes.

Conclusion

In the preceding use cases listed, automation and task scheduling improve the end user experience, remove undifferentiated heavy lifting, and benefit from using the new capability of removing schedules after their completion.

This blog post introduces the new capability from Amazon EventBridge Scheduler that automatically deletes the completed schedules. This feature simplifies the use of EventBridge Scheduler, reduces the operational overhead of managing schedules at scale, and allows you to scale even further.

To get started with EventBridge Scheduler, visit Serverless Land patterns where you can find over 20 patterns using this service.

Learn how to streamline and secure your SaaS applications at AWS Applications Innovation Day

Post Syndicated from Phil Goldstein original https://aws.amazon.com/blogs/aws/learn-how-to-streamline-and-secure-your-saas-applications-at-aws-applications-innovation-day/

Companies continue to adopt software as a service (SaaS) applications at a rapid clip, with recent research showing that the average SaaS portfolio now has at least 200 applications. While organizations purchase these purpose-built tools to make their employees more productive, they now must contend with growing security complexities, context switching, and data silos.

If your company faces these issues, or you want to avoid them in the future, join us on Tuesday, June 27, for a free-to-attend online event AWS Applications Innovation Day. AWS will stream the event simultaneously across multiple platforms, including LinkedIn Live, Twitter, YouTube, and Twitch. You can also join us in person in Seattle to hear from Dilip Kumar, Vice President of AWS Applications and an executive panel with AWS Partners Splunk, Asana, and Okta.

Join us for Applications Innovation Day June 27, 2023.

Applications Innovation Day is designed to give you the tools you need to improve how your organization uses and secures SaaS applications. Sessions throughout the day will show you how you can secure data while providing your employees with the best tools for the job. You’ll also learn how to support the right mix of applications to improve workforce collaboration, and how to use generative artificial intelligence securely and effectively to improve insights and enhance employee productivity.

We’ll start the virtual broadcast with a keynote from Dilip Kumar, Vice President of AWS Applications, who will discuss the way we use and govern SaaS applications at AWS. He’ll also discuss how we’ll make it easier to deploy purpose-built SaaS applications like Asana, Okta, Splunk, Zoom, and others across your business, including the announcement of some exciting new innovations from AWS.

AWS product leaders will present technical breakout sessions during the day on the productivity and security aspects of managing a SaaS application tech stack. Sessions will cover a wide range of topics, including how the nature of productivity at work is changing, how AI is transforming SaaS applications and collaboration, how you can improve your security observability across your applications, and how you can create custom analytics on SaaS application activity.

Overall, the event is a great opportunity for security leaders, IT administrators and operations leaders, and anyone leading digital workplace and transformation initiatives to learn how to better leverage and govern SaaS applications.

To register for AWS Applications Innovation Day, simply go to the event page.

How CyberCRX cut ML processing time from 8 days to 56 minutes with AWS Step Functions Distributed Map

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/how-cybercrx-cut-ml-processing-time-from-8-days-to-56-minutes-with-aws-step-functions-distributed-map/

Last December, Sébastien Stormacq wrote about the availability of a distributed map state for AWS Step Functions, a new feature that allows you to orchestrate large-scale parallel workloads in the cloud. That’s when Charles Burton, a data systems engineer for a company called CyberGRX, found out about it and refactored his workflow, reducing the processing time for his machine learning (ML) processing job from 8 days to 56 minutes. Before, running the job required an engineer to constantly monitor it; now, it runs in less than an hour with no support needed. In addition, the new implementation with AWS Step Functions Distributed Map costs less than what it did originally.

What CyberGRX achieved with this solution is a perfect example of what serverless technologies embrace: letting the cloud do as much of the undifferentiated heavy lifting as possible so the engineers and data scientists have more time to focus on what’s important for the business. In this case, that means continuing to improve the model and the processes for one of the key offerings from CyberGRX, a cyber risk assessment of third parties using ML insights from its large and growing database.

What’s the business challenge?
CyberGRX shares third-party cyber risk (TPCRM) data with their customers. They predict, with high confidence, how a third-party company will respond to a risk assessment questionnaire. To do this, they have to run their predictive model on every company in their platform; they currently have predictive data on more than 225,000 companies. Whenever there’s a new company or the data changes for a company, they regenerate their predictive model by processing their entire dataset. Over time, CyberGRX data scientists improve the model or add new features to it, which also requires the model to be regenerated.

The challenge is running this job for 225,000 companies in a timely manner, with as few hands-on resources as possible. The job runs a set of operations for each company, and every company calculation is independent of other companies. This means that in the ideal case, every company can be processed at the same time. However, implementing such a massive parallelization is a challenging problem to solve.

First iteration
With that in mind, the company built their first iteration of the pipeline using Kubernetes and Argo Workflows, an open-source container-native workflow engine for orchestrating parallel jobs on Kubernetes. These were tools they were familiar with, as they were already using them in their infrastructure.

But as soon as they tried to run the job for all the companies on the platform, they ran up against the limits of what their system could handle efficiently. Because the solution depended on a centralized controller, Argo Workflows, it was not robust, and the controller was scaled to its maximum capacity during this time. At that time, they only had 150,000 companies. And running the job with all of the companies took around 8 days, during which the system would crash and need to be restarted. It was very labor intensive, and it always required an engineer on call to monitor and troubleshoot the job.

The tipping point came when Charles joined the Analytics team at the beginning of 2022. One of his first tasks was to do a full model run on approximately 170,000 companies at that time. The model run lasted the whole week and ended at 2:00 AM on a Sunday. That’s when he decided their system needed to evolve.

Second iteration
With the pain of the last time he ran the model fresh in his mind, Charles thought through how he could rewrite the workflow. His first thought was to use AWS Lambda and SQS, but he realized that he needed an orchestrator in that solution. That’s why he chose Step Functions, a serverless service that helps you automate processes, orchestrate microservices, and create data and ML pipelines; plus, it scales as needed.

Charles got the new version of the workflow with Step Functions working in about 2 weeks. The first step he took was adapting his existing Docker image to run in Lambda using Lambda’s container image packaging format. Because the container already worked for his data processing tasks, this update was simple. He scheduled Lambda provisioned concurrency to make sure that all functions he needed were ready when he started the job. He also configured reserved concurrency to make sure that Lambda would be able to handle this maximum number of concurrent executions at a time. In order to support so many functions executing at the same time, he raised the concurrent execution quota for Lambda per account.

And to make sure that the steps were run in parallel, he used Step Functions and the map state. The map state allowed Charles to run a set of workflow steps for each item in a dataset. The iterations run in parallel. Because Step Functions map state offers 40 concurrent executions and CyberGRX needed more parallelization, they created a solution that launched multiple state machines in parallel; in this way, they were able to iterate fast across all the companies. Creating this complex solution, required a preprocessor that handled the heuristics of the concurrency of the system and split the input data across multiple state machines.

This second iteration was already better than the first one, as now it was able to finish the execution with no problems, and it could iterate over 200,000 companies in 90 minutes. However, the preprocessor was a very complex part of the system, and it was hitting the limits of the Lambda and Step Functions APIs due to the amount of parallelization.

Second iteration with AWS Step Functions

Third and final iteration
Then, during AWS re:Invent 2022, AWS announced a distributed map for Step Functions, a new type of map state that allows you to write Step Functions to coordinate large-scale parallel workloads. Using this new feature, you can easily iterate over millions of objects stored in Amazon Simple Storage Service (Amazon S3), and then the distributed map can launch up to 10,000 parallel sub-workflows to process the data.

When Charles read in the News Blog article about the 10,000 parallel workflow executions, he immediately thought about trying this new state. In a couple of weeks, Charles built the new iteration of the workflow.

Because the distributed map state split the input into different processors and handled the concurrency of the different executions, Charles was able to drop the complex preprocessor code.

The new process was the simplest that it’s ever been; now whenever they want to run the job, they just upload a file to Amazon S3 with the input data. This action triggers an Amazon EventBridge rule that targets the state machine with the distributed map. The state machine then executes with that file as an input and publishes the results to an Amazon Simple Notification Service (Amazon SNS) topic.

Final iteration with AWS Step Functions

What was the impact?
A few weeks after completing the third iteration, they had to run the job on all 227,000 companies in their platform. When the job finished, Charles’ team was blown away; the whole process took only 56 minutes to complete. They estimated that during those 56 minutes, the job ran more than 57 billion calculations.

Processing of the Distributed Map State

The following image shows an Amazon CloudWatch graph of the concurrent executions for one Lambda function during the time that the workflow was running. There are almost 10,000 functions running in parallel during this time.

Lambda concurrency CloudWatch graph

Simplifying and shortening the time to run the job opens a lot of possibilities for CyberGRX and the data science team. The benefits started right away the moment one of the data scientists wanted to run the job to test some improvements they had made for the model. They were able to run it independently without requiring an engineer to help them.

And, because the predictive model itself is one of the key offerings from CyberGRX, the company now has a more competitive product since the predictive analysis can be refined on a daily basis.

Learn more about using AWS Step Functions:

You can also check the Serverless Workflows Collection that we have available in Serverless Land for you to test and learn more about this new capability.

Marcia

Serverless ICYMI Q4 2022

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/compute/serverless-icymi-q4-2022/

Welcome to the 20th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!In case you missed our last ICYMI, check out what happened last quarter here.

AWS Lambda

For developers using Java, AWS Lambda has introduced Lambda SnapStart. SnapStart is a new capability that can improve the start-up performance of functions using Corretto (java11) runtime by up to 10 times, at no extra cost.

To use this capability, you must enable it in your function and then publish a new version. This triggers the optimization process. This process initializes the function, takes an immutable, encrypted snapshot of the memory and disk state, and caches it for reuse. When the function is invoked, the state is retrieved from the cache in chunks, on an as-needed basis, and it is used to populate the execution environment.

The ICYMI: Serverless pre:Invent 2022 post shares some of the launches for Lambda before November 21, like the support of Lambda functions using Node.js 18 as a runtime, the Lambda Telemetry API, and new .NET tooling to support .NET 7 applications.

Also, now Amazon Inspector supports Lambda functions. You can enable Amazon Inspector to scan your functions continually for known vulnerabilities. The log4j vulnerability shows how important it is to scan your code for vulnerabilities continuously, not only after deployment. Vulnerabilities can be discovered at any time, and with Amazon Inspector, your functions and layers are rescanned whenever a new vulnerability is published.

AWS Step Functions

There were many new launches for AWS Step Functions, like intrinsic functions, cross-account access capabilities, and the new executions experience for Express Workflows covered in the pre:Invent post.

During AWS re:Invent this year, we announced Step Functions Distributed Map. If you need to process many files, or items inside CSV or JSON files, this new flow can help you. The new distributed map flow orchestrates large-scale parallel workloads.

This feature is optimized for files stored in Amazon S3. You can either process in parallel multiple files stored in a bucket, or process one large JSON or CSV file, in which each line contains an independent item. For example, you can convert a video file into multiple .gif animations using a distributed map, or process over 37 GB of aggregated weather data to find the highest temperature of the day. 

Amazon EventBridge

Amazon EventBridge launched two major features: Scheduler and Pipes. Amazon EventBridge Scheduler allows you to create, run, and manage scheduled tasks at scale. You can schedule one-time or recurring tasks across 270 services and over 6.000 APIs.

Amazon EventBridge Pipes allows you to create point-to-point integrations between event producers and consumers. With Pipes you can now connect different sources, like Amazon Kinesis Data Streams, Amazon DynamoDB Streams, Amazon SQS, Amazon Managed Streaming for Apache Kafka, and Amazon MQ to over 14 targets, such as Step Functions, Kinesis Data Streams, Lambda, and others. It not only allows you to connect these different event producers to consumers, but also provides filtering and enriching capabilities for events.

EventBridge now supports enhanced filtering capabilities including:

  • Matching against characters at the end of a value (suffix filtering)
  • Ignoring case sensitivity (equals-ignore-case)
  • OR matching: A single rule can match if any conditions across multiple separate fields are true.

It’s now also simpler to build rules, and you can generate AWS CloudFormation from the console pages and generate event patterns from a schema.

AWS Serverless Application Model (AWS SAM)

There were many announcements for AWS SAM during this quarter summarized in the ICMYI: Serverless pre:Invent 2022 post, like AWS SAM ConnectorsSAM CLI Pipelines now support OpenID Connect Protocol, and AWS SAM CLI Terraform support.

AWS Application Composer

AWS Application Composer is a new visual designer that you can use to build serverless applications using multiple AWS services. This is ideal if you want to build a prototype, review with others architectures, generate diagrams for your projects, or onboard new team members to a project.

Within a simple user interface, you can drag and drop the different AWS resources and configure them visually. You can use AWS Application Composer together with AWS SAM Accelerate to build and test your applications in the AWS Cloud.

AWS Serverless digital learning badges

The new AWS Serverless digital learning badges let you show your AWS Serverless knowledge and skills. This is a verifiable digital badge that is aligned with the AWS Serverless Learning Plan.

This badge proves your knowledge and skills for Lambda, Amazon API Gateway, and designing serverless applications. To earn this badge, you must score at least 80 percent on the assessment associated with the Learning Plan. Visit this link if you are ready to get started learning or just jump directly to the assessment. 

News from other services:

Amazon SNS

Amazon SQS

AWS AppSync and AWS Amplify

Observability

AWS re:Invent 2022

AWS re:Invent was held in Las Vegas from November 28 to December 2, 2022. Werner Vogels, Amazon’s CTO, highlighted event-driven applications during his keynote. He stated that the world is asynchronous and showed how strange a synchronous world would be. During the keynote, he showcased Serverlesspresso as an example of an event-driven application. The Serverless DA team presented many breakouts, workshops, and chalk talks. Rewatch all our breakout content:

In addition, we brought Serverlesspresso back to Vegas. Serverlesspresso is a contactless, serverless order management system for a physical coffee bar. The architecture comprises several serverless apps that support an ordering process from a customer’s smartphone to a real espresso bar. The customer can check the virtual line, place an order, and receive a notification when their drink is ready for pickup.

Serverless blog posts

October

November

December

Videos

Serverless Office Hours – Tuesday 10 AM PT

Weekly live virtual office hours: In each session, we talk about a specific topic or technology related to serverless and open it up to helping with your real serverless challenges and issues. Ask us anything about serverless technologies and applications.

YouTube: youtube.com/serverlessland

Twitch: twitch.tv/aws

October

November

December

FooBar Serverless YouTube Channel

Marcia Villalba frequently publishes new videos on her popular FooBar Serverless YouTube channel.

October

November

December

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. If you want to learn more about event-driven architectures, read our new guide that will help you get started.

You can also follow the Serverless Developer Advocacy team on Twitter and LinkedIn to see the latest news, follow conversations, and interact with the team.

For more serverless learning resources, visit Serverless Land.

New — Create Point-to-Point Integrations Between Event Producers and Consumers with Amazon EventBridge Pipes

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-create-point-to-point-integrations-between-event-producers-and-consumers-with-amazon-eventbridge-pipes/

It is increasingly common to use multiple cloud services as building blocks to assemble a modern event-driven application. Using purpose-built services to accomplish a particular task ensures developers get the best capabilities for their use case. However, communication between services can be difficult if they use different technologies to communicate, meaning that you need to learn the nuances of each service and how to integrate them with each other. We usually need to create integration code (or “glue” code) to connect and bridge communication between services. Writing glue code slows our velocity, increases the risk of bugs, and means we spend our time writing undifferentiated code rather than building better experiences for our customers.

Introducing Amazon EventBridge Pipes
Today, I’m excited to announce Amazon EventBridge Pipes, a new feature of Amazon EventBridge that makes it easier for you to build event-driven applications by providing a simple, consistent, and cost-effective way to create point-to-point integrations between event producers and consumers, removing the need to write undifferentiated glue code.

The simplest pipe consists of a source and a target. An optional filtering step allows only specific source events to flow into the Pipe and an optional enrichment step using AWS Lambda, AWS Step Functions, Amazon EventBridge API Destinations, or Amazon API Gateway enriches or transforms events before they reach the target. With Amazon EventBridge Pipes, you can integrate supported AWS and self-managed services as event producers and event consumers into your application in a simple, reliable, consistent and cost-effective way.

Amazon EventBridge Pipes bring the most popular features of Amazon EventBridge Event Bus, such as event filtering, integration with more than 14 AWS services, and automatic delivery retries.

How Amazon EventBridge Pipes Works
Amazon EventBridge Pipes provides you a seamless means of integrating supported AWS and self-managed services, favouring configuration over code. To start integrating services with EventBridge Pipes, you need to take the following steps:

  1. Choose a source that is producing your events. Supported sources include: Amazon DynamoDB, Amazon Kinesis Data Streams, Amazon SQS, Amazon Managed Streaming for Apache Kafka, and Amazon MQ (both ActiveMQ and RabbitMQ).
  2. (Optional) Specify an event filter to only process events that match your filter (you’re not charged for events that are filtered out).
  3. (Optional) Transform and enrich your events using built-in free transformations, or AWS Lambda, AWS Step Functions, Amazon API Gateway, or EventBridge API Destinations to perform more advanced transformations and enrichments.
  4. Choose a target destination from more than 14 AWS services, including Amazon Step Functions, Kinesis Data Streams, AWS Lambda, and third-party APIs using EventBridge API destinations.

Amazon EventBridge Pipes provides simplicity to accelerate development velocity by reducing the time needed to learn the services and write integration code, to get reliable and consistent integration.

EventBridge Pipes also comes with additional features that can help in building event-driven applications. For example, with event filtering, Pipes helps event-driven applications become more cost-effective by only processing the events of interest.

Get Started with Amazon EventBridge Pipes
Let’s see how to get started with Amazon EventBridge Pipes. In this post, I will show how to integrate an Amazon SQS queue with AWS Step Functions using Amazon EventBridge Pipes.

The following screenshot is my existing Amazon SQS queue and AWS Step Functions state machine. In my case, I need to run the state machine for every event in the queue. To do so, I need to connect my SQS queue and Step Functions state machine with EventBridge Pipes.

Existing Amazon SQS queue and AWS Step Functions state machine

First, I open the Amazon EventBridge console. In the navigation section, I select Pipes. Then I select Create pipe.

On this page, I can start configuring a pipe and set the AWS Identity and Access Management (IAM) permission, and I can navigate to the Pipe settings tab.

Navigate to Pipe Settings

In the Permissions section, I can define a new IAM role for this pipe or use an existing role. To improve developer experience, the EventBridge Pipes console will figure out the IAM role for me, so I don’t need to manually configure required permissions and let EventBridge Pipes configures least-privilege permissions for IAM role. Since this is my first time creating a pipe, I select Create a new role for this specific resource.

Setting IAM Permission for pipe

Then, I go back to the Build pipe section. On this page, I can see the available event sources supported by EventBridge Pipes.

List of available services as the event source

I select SQS and select my existing SQS queue. If I need to do batch processing, I can select Additional settings to start defining Batch size and Batch window. Then, I select Next.

Select SQS Queue as event source

On the next page, things get even more interesting because I can define Event filtering from the event source that I just selected. This step is optional, but the event filtering feature makes it easy for me to process events that only need to be processed by my event-driven application. In addition, this event filtering feature also helps me to be more cost-effective, as this pipe won’t process unnecessary events. For example, if I use Step Functions as the target, the event filtering will only execute events that match the filter.

Event filtering in Amazon EventBridge Pipes

I can use sample events from AWS events or define custom events. For example, I want to process events for returned purchased items with a value of 100 or more. The following is the sample event in JSON format:

{
   "event-type":"RETURN_PURCHASE",
   "value":100
}

Then, in the event pattern section, I can define the pattern by referring to the Content filtering in Amazon EventBridge event patterns documentation. I define the event pattern as follows:

{
   "event-type": ["RETURN_PURCHASE"],
   "value": [{
      "numeric": [">=", 100]
   }]
}

I can also test by selecting test pattern to make sure this event pattern will match the custom event I’m going to use. Once I’m confident that this is the event pattern that I want, I select Next.

Defining and testing an event pattern for filtering

In the next optional step, I can use an Enrichment that will augment, transform, or expand the event before sending the event to the target destination. This enrichment is useful when I need to enrich the event using an existing AWS Lambda function, or external SaaS API using the Destination API. Additionally, I can shape the event using the Enrichment Input Transformer.

The final step is to define a target for processing the events delivered by this pipe.

Defining target destination service

Here, I can select various AWS services supported by EventBridge Pipes.

I select my existing AWS Step Functions state machine, named pipes-statemachine.

In addition, I can also use Target Input Transformer by referring to the Transforming Amazon EventBridge target input documentation. For my case, I need to define a high priority for events going into this target. To do that, I define a sample custom event in Sample events/Event Payload and add the priority: HIGH in the Transformer section. Then in the Output section, I can see the final event to be passed to the target destination service. Then, I select Create pipe.

In less than a minute, my pipe was successfully created.

Pipe successfully created

To test this pipe, I need to put an event into the Amaon SQS queue.

Sending a message into Amazon SQS Queue

To check if my event is successfully processed by Step Functions, I can look into my state machine in Step Functions. On this page, I see my event is successfully processed.

I can also go to Amazon CloudWatch Logs to get more detailed logs.

Things to Know
Event Sources
– At launch, Amazon EventBridge Pipes supports the following services as event sources: Amazon DynamoDB, Amazon Kinesis, Amazon Managed Streaming for Apache Kafka (Amazon MSK) alongside self-managed Apache Kafka, Amazon SQS (standard and FIFO), and Amazon MQ (both for ActiveMQ and RabbitMQ).

Event Targets – Amazon EventBridge Pipes supports 15 Amazon EventBridge targets, including AWS Lambda, Amazon API Gateway, Amazon SNS, Amazon SQS, and AWS Step Functions. To deliver events to any HTTPS endpoint, developers can use API destinations as the target.

Event Ordering – EventBridge Pipes maintains the ordering of events received from an event sources that support ordering when sending those events to a destination service.

Programmatic Access – You can also interact with Amazon EventBridge Pipes and create a pipe using AWS Command Line Interface (CLI), AWS CloudFormation, and AWS Cloud Development Kit (AWS CDK).

Independent Usage – EventBridge Pipes can be used separately from Amazon EventBridge bus and Amazon EventBridge Scheduler. This flexibility helps developers to define source events from supported AWS and self-managed services as event sources without Amazon EventBridge Event Bus.

Availability – Amazon EventBridge Pipes is now generally available in all AWS commercial Regions, with the exception of Asia Pacific (Hyderabad) and Europe (Zurich).

Visit the Amazon EventBridge Pipes page to learn more about this feature and understand the pricing. You can also visit the documentation page to learn more about how to get started.

Happy building!

— Donnie

New — Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-amazon-sagemaker-data-wrangler-supports-saas-applications-as-data-sources/

Data fuels machine learning. In machine learning, data preparation is the process of transforming raw data into a format that is suitable for further processing and analysis. The common process for data preparation starts with collecting data, then cleaning it, labeling it, and finally validating and visualizing it. Getting the data right with high quality can often be a complex and time-consuming process.

This is why customers who build machine learning (ML) workloads on AWS appreciate the ability of Amazon SageMaker Data Wrangler. With SageMaker Data Wrangler, customers can simplify the process of data preparation and complete the required processes of the data preparation workflow on a single visual interface. Amazon SageMaker Data Wrangler helps to reduce the time it takes to aggregate and prepare data for ML.

However, due to the proliferation of data, customers generally have data spread out into multiple systems, including external software-as-a-service (SaaS) applications like SAP OData for manufacturing data, Salesforce for customer pipeline, and Google Analytics for web application data. To solve business problems using ML, customers have to bring all of these data sources together. They currently have to build their own solution or use third-party solutions to ingest data into Amazon S3 or Amazon Redshift. These solutions can be complex to set up and not cost-effective.

Introducing Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources
I’m happy to share that starting today, you can aggregate external SaaS application data for ML in Amazon SageMaker Data Wrangler to prepare data for ML. With this feature, you can use more than 40 SaaS applications as data sources via Amazon AppFlow and have these data available on Amazon SageMaker Data Wrangler. Once the data sources are registered in AWS Glue Data Catalog by AppFlow, you can browse tables and schemas from these data sources using Data Wrangler SQL explorer. This feature provides seamless data integration between SaaS applications and SageMaker Data Wrangler using Amazon AppFlow.

Here is a quick preview of this new feature:

This new feature of Amazon SageMaker Data Wrangler works by using integration with Amazon AppFlow, a fully managed integration service that enables you to securely exchange data between SaaS applications and AWS services. With Amazon AppFlow, you can establish bidirectional data integration between SaaS applications, such as Salesforce, SAP, and Amplitude and all supported services, into your Amazon S3 or Amazon Redshift.

Then, with Amazon AppFlow, you can catalog the data in AWS Glue Data Catalog. This is a new feature where with Amazon AppFlow, you can create an integration with AWS Glue Data Catalog for Amazon S3 destination connector. With this new integration, customers can catalog SaaS data applications into AWS Glue Data Catalog with a few clicks, directly from the Amazon AppFlow Flow configuration, without the need to run any crawlers.

Once you’ve established a flow and inserted it into the AWS Glue Data Catalog, you can use this data inside the Amazon SageMaker Data Wrangler. Then, you can do the data preparation as you usually do. You can write Amazon Athena queries to preview data, join data from multiple sources, or import data to prepare for ML model training.

With this feature, you need to do a few simple steps to perform seamless data integration between SaaS applications into Amazon SageMaker Data Wrangler via Amazon AppFlow. This integration supports more than 40 SaaS applications, and for a complete list of supported applications, please check the Supported source and destination applications documentation.

Get Started with Amazon SageMaker Data Wrangler Support for Amazon AppFlow
Let’s see how this feature works in detail. In my scenario, I need to get data from Salesforce, and do the data preparation using Amazon SageMaker Data Wrangler.

To start using this feature, the first thing I need to do is to create a flow in Amazon AppFlow that registers the data source into the AWS Glue Data Catalog. I already have an existing connection with my Salesforce account, and all I need now is to create a flow.

One important thing to note is that to make SaaS application data available in Amazon SageMaker Data Wrangler, I need to create a flow with Amazon S3 as the destination. Then, I need to enable Create a Data Catalog table in the AWS Glue Data Catalog settings. This option will automatically catalog my Salesforce data into AWS Glue Data Catalog.

On this page, I need to select a user role with the required AWS Glue Data Catalog permissions and define the database name and the table name prefix. In addition, in this section, I can define the data format preference, be it in JSON, CSV, or Apache Parquet formats, and filename preference if I want to add a timestamp into the file name section.

To learn more about how to register SaaS data in Amazon AppFlow and AWS Glue Data Catalog, you can read Cataloging the data output from an Amazon AppFlow flow documentation page.

Once I’ve finished registering SaaS data, I need to make sure the IAM role can view the data sources in Data Wrangler from AppFlow. Here is an example of a policy in the IAM role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "glue:SearchTables",
            "Resource": [
                "arn:aws:glue:*:*:table/*/*",
                "arn:aws:glue:*:*:database/*",
                "arn:aws:glue:*:*:catalog"
            ]
        }
    ]
} 

By enabling data cataloging with AWS Glue Data Catalog, from this point on, Amazon SageMaker Data Wrangler will be able to automatically discover this new data source and I can browse tables and schema using the Data Wrangler SQL Explorer.

Now it’s time to switch to the Amazon SageMaker Data Wrangler dashboard then select Connect to data sources.

On the following page, I need to Create connection and select the data source I want to import. In this section, I can see all the available connections for me to use. Here I see the Salesforce connection is already available for me to use.

If I would like to add additional data sources, I can see a list of external SaaS applications that I can integrate into the Set up new data sources section. To learn how to recognize external SaaS applications as data sources, I can learn more with the select How to enable access.

Now I will import datasets and select the Salesforce connection.

On the next page, I can define connection settings and import data from Salesforce. When I’m done with this configuration, I select Connect.

On the following page, I see my Salesforce data that I already configured with Amazon AppFlow and AWS Glue Data Catalog called appflowdatasourcedb. I can also see a table preview and schema for me to review if this is the data I need.

Then, I start building my dataset using this data by performing SQL queries inside the SageMaker Data Wrangler SQL Explorer. Then, I select Import query.

Then, I define a name for my dataset.

At this point, I can start doing the data preparation process. I can navigate to the Analysis tab to run the data insight report. The analysis will provide me with a report on the data quality issues and what transform I need to use next to fix the issues based on the ML problem I want to predict. To learn more about how to use the data analysis feature, see Accelerate data preparation with data quality and insights in the Amazon SageMaker Data Wrangler blog post.

In my case, there are several columns I don’t need, and I need to drop these columns. I select Add step.

One feature I like is that Amazon SageMaker Data Wrangler provides numerous ML data transforms. It helps me to streamline the process of cleaning, transforming and feature engineering my data in one dashboard. For more about what SageMaker Data Wrangler provides for transformation data, please read this Transform Data documentation page.

In this list, I select Manage columns.

Then, in the Transform section, I select the Drop column option. Then, I select a few columns that I don’t need.

Once I’m done, the columns I don’t need are removed and the Drop column data preparation step I just created is listed in the Add step section.

I can also see the visual of my data flow inside the Amazon SageMaker Data Wrangler. In this example, my data flow is quite basic. But when my data preparation process becomes complex, this visual view makes it easy for me to see all the data preparation steps.

From this point on, I can do what I require with my Salesforce data. For example, I can export data directly to Amazon S3 by selecting Export to and choosing Amazon S3 from the Add destination menu. In my case, I specify Data Wrangler to store the data in Amazon S3 after it has processed it by selecting Add destination and then Amazon S3.

Amazon SageMaker Data Wrangler provides me flexibility to automate the same data preparation flow using scheduled jobs. I can also automate feature engineering with SageMaker Pipelines (via Jupyter Notebook) and SageMaker Feature Store (via Jupyter Notebook), and deploy to Inference end point with SageMaker Inference Pipeline (via Jupyter Notebook).

Things to Know
Related news – This feature will make it easy for you to do data aggregation and preparation with Amazon SageMaker Data Wrangler. As this feature is an integration with Amazon AppFlow and also AWS Glue Data Catalog, you might want to learn more on Amazon AppFlow now supports AWS Glue Data Catalog integration and provides enhanced data preparation page.

Availability – Amazon SageMaker Data Wrangler supports SaaS applications as data sources available in all the Regions currently supported by Amazon AppFlow.

Pricing – There is no additional cost to use SaaS applications supports in Amazon SageMaker Data Wrangler, but there is a cost to running Amazon AppFlow to get the data in Amazon SageMaker Data Wrangler.

Visit Import Data From Software as a Service (SaaS) Platforms documentation page to learn more about this feature, and follow the getting started guide to start data aggregating and preparing SaaS applications data with Amazon SageMaker Data Wrangler.

Happy building!
Donnie

Introducing Amazon EventBridge Scheduler

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/compute/introducing-amazon-eventbridge-scheduler/

Today, we are announcing Amazon EventBridge Scheduler. This is a new capability from Amazon EventBridge that allows you to create, run, and manage scheduled tasks at scale. With EventBridge Scheduler, you can schedule one-time or recurrently tens of millions of tasks across many AWS services without provisioning or managing underlying infrastructure.

Previously, many customers used commercial off-the-shelf tools or built their own scheduling capabilities. This can increase application complexity, slow application development, and increase costs, which are magnified at scale. Most of these solutions are limited in what services they can trigger and create complexity in managing concurrency limitations of invoked targets that can affect application performance.

When to use EventBridge Scheduler?

For example, consider a company that develops a task management system. One feature that the application provides is that users can add a reminder for a task and be reminded by email one week before, two days before, or on the day of the task due date. You can automate the creation of all the schedules with EventBridge Scheduler, create the task for each of the reminders, and send it to Amazon SNS to send the notifications.

Or consider a large organization, like a supermarket, with thousands of AWS accounts and tens of thousands of Amazon EC2 instances. These instances are used in different parts of the world during business hours. You want to make sure that all the instances are started before the stores open and terminated after the business hours to reduce costs as much as possible. You can use EventBridge Scheduler to start and stop all the thousands of instances and also respect time zones.

SaaS providers can also benefit from EventBridge Scheduler, as now they can more easily manage all the different scheduled tasks that their customers have. For example, consider a SaaS provider with a subscription model for your customers paying a monthly or annual fee. You want to ensure that their license key is valid until the end of their current billing period. With Scheduler, you can create a schedule that removes the access to the service when the billing period is over, or when the user cancels their subscription. Also, you can create a series of emails that let your customer knows that their license is expiring so they can purchase a renewal. Example using scheduler

Use cases for EventBridge Scheduler are diverse, from simplifying new feature development to improving your infrastructure operations.

How does EventBridge Scheduler work?

With EventBridge Scheduler, you can now create single or recurrent schedules that trigger over 200 services with more than 6,000 APIs. EventBridge Scheduler allows you to configure schedules with a minimum granularity of one minute.

EventBridge Scheduler provides at-least-once event delivery to targets, and you can create schedules that adjust to different delivery patterns by setting the window of delivery, the number of retries, the time for the event to be retained, and the dead letter queue (DLQ). You can learn more about each configuration from the Scheduler User Guide.

  • Time window allows you to start a schedule within a window of time. This means that the scheduled tasks are dispersed across the time window to reduce the impact of multiple requests on downstream services.
  • Maximum retention time of the event is the maximum time to keep an unprocessed event in the scheduler. If the target is not responding during this time, the event is dropped or sent to a DLQ.
  • Retries with exponential backoff help to retry a failed task with delayed attempts. This improves the success of the task when the target is available.
  • A dead letter queue is an Amazon SQS queue where events that failed to get delivered to the target are routed.

By default, EventBridge Scheduler tries to send the event for 24 hours and a maximum of 185 times. You can configure these values. If that fails, the message is dropped, since by default there is not a DLQ configured.

In addition, by default, all events in Scheduler are encrypted with a key that AWS owns and manages. You can also use your own AWS KMS encryption keys.

You can also schedule tasks using Amazon EventBridge rules. But to schedule tasks at scale, EventBridge Scheduler is better suited for this task. The following table shows the main differences between EventBridge Scheduler and EventBridge rules:

 

Amazon EventBridge Scheduler Amazon EventBridge rules
Quota on schedules 1 million per account 300 rule limit per account per Region
Event invocation throughput Able to support throughput in 1,000s of TPS Because of the schedule limit, you can only have 300 1-minute schedules for max throughput of 5 TPS
Targets Over 270 services and over 6,000 API Actions with AWS SDK targets 20+ targets supported by EventBridge
Time expression and time-zones

at(), cron(), rate()

All time-zones and DST

cron(), rate(), UTC

No support for DST

One-time schedules Yes No
Time window schedules Yes No
Event bus support No event bus is needed Default bus only
Rule quota consumption No. 1 million schedules soft limit Yes, consumes from 2,000 rules per bus

Getting started with EventBridge Scheduler

This walkthrough builds a series of schedules to get started with EventBridge Scheduler. For that, you use the AWS Command Line (AWS CLI) to configure the schedules that send notifications using Amazon SNS.

Prerequisites

Update your AWS CLI to the latest version (v1.27.7).

As a prerequisite, you must create an SNS topic with an email subscription and an AWS IAM role that EventBridge Scheduler can assume to publish messages on your behalf. You can deploy these AWS resources using AWS SAM. Follow the instructions in the README file.

Scheduling a one-time schedule

Once configured, create your first schedule. This is a one-time schedule that publishes an event for the SNS topic you created.

For creating the schedule, run this command in your terminal and replace the schedule expression and time zone with values for your task:

$ aws scheduler create-schedule --name SendEmailOnce \ 
--schedule-expression "at(2022-11-01T11:00:00)" \ 
--schedule-expression-timezone "Europe/Helsinki" \
--flexible-time-window "{\"Mode\": \"OFF\"}" \
--target "{\"Arn\": \"arn:aws:sns:us-east-1:xxx:test-chronos-send-email\", \"RoleArn\": \" arn:aws:iam::xxxx:role/sam_scheduler_role\" }"

Let’s analyze the different parts of this command. The first parameter is the name of the schedule.

In the schedule expression attribute, you can define if the event is a one-time schedule or a recurrent schedule. Because this is a one-time schedule, it uses the at() expression with the date and time you want this schedule to run. Also, you must configure the schedule expression time zone in which this schedule run:

--schedule-expression "at(2022-11-01T11:00:00)" --schedule-expression-timezone "Europe/Helsinki"

Another setting that you can configure is the flexible time window. It’s not used for this example, but if you choose a time window, EventBridge Scheduler invokes the task within that timeframe. This setting helps to distribute the invocations across time and manage the downstream service limits.

--flexible-time-window "{\"Mode\": \"OFF\"}"

Finally, pass the IAM role ARN. This is the role previously created with the AWS SAM template. This role is the one that EventBridge Scheduler assumes when publishing events to SNS and it has permissions to publish messages on that topic.

Finally, you must configure the target. Scheduler comes with predefined targets with simpler APIs, that include actions like putting events for Amazon EventBridge, invoke a Lambda function, send a message to an Amazon SQS queue. For this example, use the universal target, which allows you to invoke almost any AWS services. Learn more about the targets from the User Guide.

--target "{\"Arn\": \"arn:aws:sns:us-east-1:xxx:test-chronos-send-email\", \"RoleArn\": \" arn:aws:iam::xxxx:role/sam_scheduler_role\" }"

Scheduling groups

Scheduling groups help you organize your schedules. Scheduling groups support tags that you can use for cost allocation, access control, and resource organization. When creating a new schedule, you can add it to a scheduling group.

To create a new scheduling group, run:

$ aws scheduler create-schedule-group --name ScheduleGroupTest

Scheduling a recurrent schedule

Now let’s create a recurrent schedule for that scheduling group. This schedule runs every five minutes and publishes a message to the SNS topic you created during the prerequisites.

$ aws scheduler create-schedule --name SendEmailTest \
--group-name ScheduleGroupTest \
--schedule-expression "rate(5minutes)" \
--flexible-time-window "{\"Mode\": \"OFF\"}" \
--target "{\"Arn\": \"arn:aws:sns:us-east-1:xxxx:test-chronos-send-email\", \"RoleArn\": \" arn:aws:iam::xxxx:role/sam_scheduler_role \" }"

Recurrent schedules can be configured with a cron expression or rate expression, to define the frequency that this schedule should be triggered. For scheduling this to run every five minutes, you can use an expression like this one:

--schedule-expression "rate(5minutes)"

Because you have selected the recurring schedule, you can define the timeframe in which this schedule runs. You can optionally choose a start and end date and time for your schedule. If you don’t do it, the schedule starts as soon as you create the task. These times are formatted in the same way as other AWS CLI timestamps.

--start-date "2022-11-01T18:48:00Z" --end-date "2022-11-01T19:00:00Z"

If you run the previous recurrent schedule for some time, and then check Amazon CloudWatch metrics, you find a metric called InvocationAttemptCount, for the schedule invocations that happened within the scheduling group you just created.

You can graph that metric in a dashboard and see how many times this schedule run. Also, you can create alarms to get notified if the number of invocations exceeds a threshold. For example, you can set this threshold to be close to the limits of your downstream service, to prevent reaching those limits.

Graphed metric in dashboard

Cleaning up

Make sure that you delete all the recurrent schedules that you created without an end time.

To check all the schedules that you have configured:

$ aws scheduler list-schedules

To delete a schedule using the AWS CLI:

$ aws scheduler delete-schedule --name <name-of-schedule> --group <name-of-group>

Also delete the CloudFormation stack with the prerequisite infrastructure when you complete this demo, as is defined in the README file of that project.

Conclusion

This blog post introduces the new Amazon EventBridge Scheduler, its use cases and its differences with existing scheduling options. It shows you how to create a new schedule using Amazon EventBridge Scheduler to simplify the creation, execution, and managing of scheduled tasks at scale.

You can get started today with EventBridge Scheduler from the AWS Management Console, AWS CLI, AWS CloudFormation, AWS SDK, and AWS SAM.

For more serverless learning resources, visit Serverless Land.

Persist and analyze metadata in a transient Amazon MWAA environment

Post Syndicated from Praveen Kumar original https://aws.amazon.com/blogs/big-data/persist-and-analyze-metadata-in-a-transient-amazon-mwaa-environment/

Customers can harness sophisticated orchestration capabilities through the open-source tool Apache Airflow. Airflow can be installed on Amazon EC2 instances or can be dockerized and deployed as a container on AWS container services. Alternatively, customers can also opt to leverage Amazon Managed Workflows for Apache Airflow (MWAA).

Amazon MWAA is a fully managed service that enables customers to focus more of their efforts on high-impact activities such as programmatically authoring data pipelines and workflows, as opposed to maintaining or scaling the underlying infrastructure. Amazon MWAA offers auto-scaling capabilities where it can respond to surges in demand by scaling the number of Airflow workers out and back in.

With Amazon MWAA, there are no upfront commitments and you only pay for what you use based on instance uptime, additional auto-scaling capacity, and storage of the Airflow back-end metadata database. This database is provisioned and managed by Amazon MWAA and contains the necessary metadata to support the Airflow application.  It hosts key data points such as historical execution times for tasks and workflows and is valuable in understanding trends and behaviour of your data pipelines over time. Although the Airflow console does provide a series of visualisations that help you analyse these datasets, these are siloed from other Amazon MWAA environments you might have running, as well as the rest of your business data.

Data platforms encompass multiple environments. Typically, non-production environments are not subject to the same orchestration demands and schedule as those of production environments. In most instances, these non-production environments are idle outside of business hours and can be spun down to realise further cost-efficiencies. Unfortunately, terminating Amazon MWAA instances results in the purging of that critical metadata.

In this post, we discuss how to export, persist and analyse Airflow metadata in Amazon S3 enabling you to run and perform pipeline monitoring and analysis. In doing so, you can spin down Airflow instances without losing operational metadata.

Benefits of Airflow metadata

Persisting the metadata in the data lake enables customers to perform pipeline monitoring and analysis in a more meaningful manner:

  • Airflow operational logs can be joined and analysed across environments
  • Trend analysis can be conducted to explore how data pipelines are performing over time, what specific stages are taking the most time, and how is performance effected as data scales
  • Airflow operational data can be joined with business data for improved record level lineage and audit capabilities

These insights can help customers understand the performance of their pipelines over time and guide focus towards which processes need to be optimised.

The technique described below to extract metadata is applicable to any Airflow deployment type, but we will focus on Amazon MWAA in this blog.

Solution Overview

The below diagram illustrates the solution architecture. Please note, Amazon QuickSight is NOT included as part of the CloudFormation stack and is not covered in this tutorial. It has been placed in the diagram to illustrate that metadata can be visualised using a business intelligence tool.

As part of this tutorial, you will be performing the below high-level tasks:

  • Run CloudFormation stack to create all necessary resources
  • Trigger Airflow DAGs to perform sample ETL workload and generate operational metadata in back-end database
  • Trigger Airflow DAG to export operational metadata into Amazon S3
  • Perform analysis with Amazon Athena

This post comes with an AWS CloudFormation stack that automatically provisions the necessary AWS resources and infrastructure, including an active Amazon MWAA instance, for this solution. The entire code is available in the GitHub repository.

The Amazon MWAA instance will already have three directed-acyclic graphs (DAGs) imported:

  1. glue-etl – This ETL workflow leverages AWS Glue to perform transformation logic on a CSV file (customer_activity.csv). This file will be loaded as part of the CloudFormation template into the s3://<DataBucket>/raw/ prefix.

The first task glue_csv_to_parquet converts the ‘raw’ data to parquet format and stores the data in location s3://<DataBucket>/optimised/.  By converting the data in parquet format, you can achieve faster query performance and lower query costs.

The second task glue_transform runs an aggregation over the newly created parquet format and stores the aggregated data in location s3://<DataBucket>/conformed/.

  1. db_export_dag – This DAG consists of one task, export_db, which exports the data from the back-end Airflow database into Amazon S3 in the location s3://<DataBucket>/export/.

Please note that you may experience time-out issues when extracting large amounts of data. On busy Airflow instances, our recommendation will be to set up frequent extracts in small chunks.

  1. run-simple-dag – This DAG does not perform any data transformation or manipulation. It is used in this blog for the purposes of populating the back-end Airflow database with sufficient operational data.

Prerequisites

To implement the solution outlined in this blog, you will need following :

Steps to run a data pipeline using Amazon MWAA and saving metadata to s3:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name for your stack.
  4. Choose Next.
  5. Keep the default settings on the ‘Configure stack options’ page, and choose Next.
  6. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  7. Choose Create stack. The stack can take up to 30 mins to complete.

The CloudFormation template generates the following resources:

    • VPC infrastructure that uses Public routing over the Internet.
    • Amazon S3 buckets required to support Amazon MWAA, detailed below:
      • The Data Bucket, refered in this blog as s3://<DataBucket>, holds the data which will be optimised and transformed for further analytical consumption. This bucket will also hold the data from the Airflow back-end metadata database once extracted.
      • The Environment Bucket, refered in this blog as s3://<EnvironmentBucket>, stores your DAGs, as well as any custom plugins, and Python dependencies you may have.
    • Amazon MWAA environment that’s associated to the  s3://<EnvironmentBucket>/dags location.
    • AWS Glue jobs for data processing and help generate airflow metadata.
    • AWS Lambda-backed custom resources to upload to Amazon S3 the sample data, AWS Glue scripts and DAG configuration files,
    • AWS Identity and Access Management (IAM) users, roles, and policies.
  1. Once the stack creation is successful, navigate to the Outputs tab of the CloudFormation stack and make note of DataBucket and EnvironmentBucket name. Store your Apache Airflow Directed Acyclic Graphs (DAGs), custom plugins in a plugins.zip file, and Python dependencies in a requirements.txt file.
  2. Open the Environments page on the Amazon MWAA console.
  3. Choose the environment created above. (The environment name will include the stack name). Click on Open Airflow UI.
  4. Choose glue-etl DAG , unpause by clicking the radio button next to the name of the DAG and click on the Play Button on Right hand side to Trigger DAG. It may take up to a minute for DAG to appear.
  5. Leave Configuration JSON as empty and hit Trigger.
  6. Choose run-simple-dag DAG, unpause and click on Trigger DAG.
  7. Once both DAG executions have completed, select the db_export_dag DAG, unpause and click on Trigger DAG. Leave Configuration JSON as empty and hit Trigger.

This step will extract the dag and task metadata to a S3 location. This is a sample list of tables and more tables can be added as required. The exported metadata will be located in s3://<DataBucket>/export/ folder.

Visualise using Amazon QuickSight and Amazon Athena

Amazon Athena is a serverless interactive query service that can be used to run exploratory analysis on data stored in Amazon S3.

If you are using Amazon Athena for the first time, please find the steps here to setup query location. We can use Amazon Athena to explore and analyse the metadata generated from airflow dag runs.

  1. Navigate to Athena Console and click explore the query editor.
  2. Hit View Settings.
  3. Click Manage.
  4. Replace with s3://<DataBucket>/logs/athena/. Once completed, return to the query editor.
  5. Before we can perform our pipeline analysis, we need to create the below DDLs. Replace the <DataBucket> as part of the LOCATION clause with the parameter value as defined in the CloudFormation stack (noted in Step 8 above).
    CREATE EXTERNAL TABLE default.airflow_metadata_dagrun (
            sa_instance_state STRING,
            dag_id STRING,
            state STRING,
            start_date STRING,
            run_id STRING,
            external_trigger STRING,
            conf_name STRING,
            dag_hash STRING,
             id STRING,
            execution_date STRING,
            end_date STRING,
            creating_job_id STRING,
            run_type STRING,
            last_scheduling_decision STRING
       )
    PARTITIONED BY (dt string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LOCATION 's3://<DataBucket>/export/dagrun/'
    TBLPROPERTIES ("skip.header.line.count"="1");
    MSCK REPAIR TABLE default.airflow_metadata_dagrun;
    
    CREATE EXTERNAL TABLE default.airflow_metadata_taskinstance (
            sa_instance_state STRING,
            start_date STRING,
            job_id STRING,
            pid STRING,
            end_date STRING,
            pool STRING,
            executor_config STRING,
            duration STRING,
            pool_slots STRING,
            external_executor_id STRING,
            state STRING,
            queue STRING,
            try_number STRING,
            max_tries STRING,
            priority_weight STRING,
            task_id STRING,
            hostname STRING,
            operator STRING,
            dag_id STRING,
            unixname STRING,
            queued_dttm STRING,
            execution_date STRING,
            queued_by_job_id STRING,
            test_mode STRING
       )
    PARTITIONED BY (dt string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LOCATION 's3://<DataBucket>/export/taskinstance/'
    TBLPROPERTIES ("skip.header.line.count"="1");
    MSCK REPAIR TABLE default.airflow_metadata_taskinstance;

  6. You can preview the table in the query editor of Amazon Athena.

  7. With the metadata persisted, you can perform pipeline monitoring and derive some powerful insights on the performance of your data pipelines overtime. As an example to illustrate this, execute the below SQL query in Athena.

This query returns pertinent metrics at a monthly grain which include number of executions of the DAG in that month, success rate, minimum/maximum/average duration for the month and a variation compared to the previous months average.

Through the below SQL query, you will be able to understand how your data pipelines are performing over time.

select dag_run_prev_month_calcs.*
        , avg_duration - prev_month_avg_duration as var_duration
from
    (
select dag_run_monthly_calcs.*
            , lag(avg_duration, 1, avg_duration) over (partition by dag_id order by year_month) as prev_month_avg_duration
    from
        (
            select dag_id
                    , year_month
                    , sum(counter) as num_executions
                    , sum(success_ind) as num_success
                    , sum(failed_ind) as num_failed
                    , (cast(sum(success_ind) as double)/ sum(counter))*100 as success_rate
                    , min(duration) as min_duration
                    , max(duration) as max_duration
                    , avg(duration) as avg_duration
            from
                (
                    select dag_id
                            , 1 as counter
                            , case when state = 'success' then 1 else 0 end as success_ind
                            , case when state = 'failed' then 1 else 0 end as failed_ind
                            , date_parse(start_date,'%Y-%m-%d %H:%i:%s.%f+00:00') as start_date
                            , date_parse(end_date,'%Y-%m-%d %H:%i:%s.%f+00:00') as end_date
                            , date_parse(end_date,'%Y-%m-%d %H:%i:%s.%f+00:00') - date_parse(start_date,'%Y-%m-%d %H:%i:%s.%f+00:00') as duration
                            , date_format(date_parse(start_date,'%Y-%m-%d %H:%i:%s.%f+00:00'), '%Y-%m') as year_month
                    from "default"."airflow_metadata_dagrun"
                    where state <> 'running'
                )  dag_run_counters
            group by dag_id, year_month
        ) dag_run_monthly_calcs
    ) dag_run_prev_month_calcs
order by dag_id, year_month

  1. You can also visualize this data using your BI tool of choice. While step by step details of creating a dashboard is not covered in this blog, please refer the below dashboard built on Amazon QuickSight as an example of what can be built based on the metadata extracted above. If you are using Amazon QuickSight for the first time, please find the steps here on how to get started.

Through QuickSight, we can quickly visualise and derive that our data pipelines are completing successfully, but on average are taking a longer time to complete over time.

Clean up the environment

  1. Navigate to the S3 console and click on the <DataBucket> noted in step 8 above.
  2. Click on Empty bucket.
  3. Confirm the selection.
  4. Repeat this step for bucket <EnvironmentBucket> (noted in step 8 above) and Empty bucket.
  5. Run the below statements in the query editor to drop the two Amazon Athena tables. Run statements individually.
    DROP TABLE default.airflow_metadata_dagrun;
    DROP TABLE default.airflow_metadata_taskinstance;

  6. On the AWS CloudFormation console, select the stack you created and choose Delete.

Summary

In this post, we presented a solution to further optimise the costs of Amazon MWAA by tearing down instances whilst preserving the metadata. Storing this metadata in your data lake enables you to better perform pipeline monitoring and analysis. This process can be scheduled and orchestrated programatically and is applicable to all Airflow deployments, such as Amazon MWAA, Apache Airflow installed on Amazon EC2, and even on-premises installations of Apache Airflow.

To learn more, please visit Amazon MWAA and Getting Started with Amazon MWAA.


About the Authors

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

Avnish Jain is a Specialist Solution Architect in Analytics at AWS with experience designing and implementing scalable, modern data platforms on the cloud for large scale enterprises. He is passionate about helping customers build performant and robust data-driven solutions and realise their data & analytics potential.

Create a serverless event-driven workflow to ingest and process Microsoft data with AWS Glue and Amazon EventBridge

Post Syndicated from Venkata Sistla original https://aws.amazon.com/blogs/big-data/create-a-serverless-event-driven-workflow-to-ingest-and-process-microsoft-data-with-aws-glue-and-amazon-eventbridge/

Microsoft SharePoint is a document management system for storing files, organizing documents, and sharing and editing documents in collaboration with others. Your organization may want to ingest SharePoint data into your data lake, combine the SharePoint data with other data that’s available in the data lake, and use it for reporting and analytics purposes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration so that you can start analyzing your data and putting it to use in minutes instead of months.

Organizations often manage their data on SharePoint in the form of files and lists, and you can use this data for easier discovery, better auditing, and compliance. SharePoint as a data source is not a typical relational database and the data is mostly semi structured, which is why it’s often difficult to join the SharePoint data with other relational data sources. This post shows how to ingest and process SharePoint lists and files with AWS Glue and Amazon EventBridge, which enables you to join other data that is available in your data lake. We use SharePoint REST APIs with a standard open data protocol (OData) syntax. OData advocates a standard way of implementing REST APIs that allows for SQL-like querying capabilities. OData helps you focus on your business logic while building RESTful APIs without having to worry about the various approaches to define request and response headers, query options, and so on.

AWS Glue event-driven workflows

Unlike a traditional relational database, SharePoint data may or may not change frequently, and it’s difficult to predict the frequency at which your SharePoint server generates new data, which makes it difficult to plan and schedule data processing pipelines efficiently. Running data processing frequently can be expensive, whereas scheduling pipelines to run infrequently can deliver cold data. Similarly, triggering pipelines from an external process can increase complexity, cost, and job startup time.

AWS Glue supports event-driven workflows, a capability that lets developers start AWS Glue workflows based on events delivered by EventBridge. The main reason to choose EventBridge in this architecture is because it allows you to process events, update the target tables, and make information available to consume in near-real time. Because frequency of data change in SharePoint is unpredictable, using EventBridge to capture events as they arrive enables you to run the data processing pipeline only when new data is available.

To get started, you simply create a new AWS Glue trigger of type EVENT and place it as the first trigger in your workflow. You can optionally specify a batching condition. Without event batching, the AWS Glue workflow is triggered every time an EventBridge rule matches, which may result in multiple concurrent workflows running. AWS Glue protects you by setting default limits that restrict the number of concurrent runs of a workflow. You can increase the required limits by opening a support case. Event batching allows you to configure the number of events to buffer or the maximum elapsed time before firing the particular trigger. When the batching condition is met, a workflow run is started. For example, you can trigger your workflow when 100 files are uploaded in Amazon Simple Storage Service (Amazon S3) or 5 minutes after the first upload. We recommend configuring event batching to avoid too many concurrent workflows, and optimize resource usage and cost.

To illustrate this solution better, consider the following use case for a wine manufacturing and distribution company that operates across multiple countries. They currently host all their transactional system’s data on a data lake in Amazon S3. They also use SharePoint lists to capture feedback and comments on wine quality and composition from their suppliers and other stakeholders. The supply chain team wants to join their transactional data with the wine quality comments in SharePoint data to improve their wine quality and manage their production issues better. They want to capture those comments from the SharePoint server within an hour and publish that data to a wine quality dashboard in Amazon QuickSight. With an event-driven approach to ingest and process their SharePoint data, the supply chain team can consume the data in less than an hour.

Overview of solution

In this post, we walk through a solution to set up an AWS Glue job to ingest SharePoint lists and files into an S3 bucket and an AWS Glue workflow that listens to S3 PutObject data events captured by AWS CloudTrail. This workflow is configured with an event-based trigger to run when an AWS Glue ingest job adds new files into the S3 bucket. The following diagram illustrates the architecture.

To make it simple to deploy, we captured the entire solution in an AWS CloudFormation template that enables you to automatically ingest SharePoint data into Amazon S3. SharePoint uses ClientID and TenantID credentials for authentication and uses Oauth2 for authorization.

The template helps you perform the following steps:

  1. Create an AWS Glue Python shell job to make the REST API call to the SharePoint server and ingest files or lists into Amazon S3.
  2. Create an AWS Glue workflow with a starting trigger of EVENT type.
  3. Configure CloudTrail to log data events, such as PutObject API calls to CloudTrail.
  4. Create a rule in EventBridge to forward the PutObject API events to AWS Glue when they’re emitted by CloudTrail.
  5. Add an AWS Glue event-driven workflow as a target to the EventBridge rule. The workflow gets triggered when the SharePoint ingest AWS Glue job adds new files to the S3 bucket.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Configure SharePoint server authentication details

Before launching the CloudFormation stack, you need to set up your SharePoint server authentication details, namely, TenantID, Tenant, ClientID, ClientSecret, and the SharePoint URL in AWS Systems Manager Parameter Store of the account you’re deploying in. This makes sure that no authentication details are stored in the code and they’re fetched in real time from Parameter Store when the solution is running.

To create your AWS Systems Manager parameters, complete the following steps:

  1. On the Systems Manager console, under Application Management in the navigation pane, choose Parameter Store.
    systems manager
  2. Choose Create Parameter.
  3. For Name, enter the parameter name /DATALAKE/GlueIngest/SharePoint/tenant.
  4. Leave the type as string.
  5. Enter your SharePoint tenant detail into the value field.
  6. Choose Create parameter.
  7. Repeat these steps to create the following parameters:
    1. /DataLake/GlueIngest/SharePoint/tenant
    2. /DataLake/GlueIngest/SharePoint/tenant_id
    3. /DataLake/GlueIngest/SharePoint/client_id/list
    4. /DataLake/GlueIngest/SharePoint/client_secret/list
    5. /DataLake/GlueIngest/SharePoint/client_id/file
    6. /DataLake/GlueIngest/SharePoint/client_secret/file
    7. /DataLake/GlueIngest/SharePoint/url/list
    8. /DataLake/GlueIngest/SharePoint/url/file

Deploy the solution with AWS CloudFormation

For a quick start of this solution, you can deploy the provided CloudFormation stack. This creates all the required resources in your account.

The CloudFormation template generates the following resources:

  • S3 bucket – Stores data, CloudTrail logs, job scripts, and any temporary files generated during the AWS Glue extract, transform, and load (ETL) job run.
  • CloudTrail trail with S3 data events enabled – Enables EventBridge to receive PutObject API call data in a specific bucket.
  • AWS Glue Job – A Python shell job that fetches the data from the SharePoint server.
  • AWS Glue workflow – A data processing pipeline that is comprised of a crawler, jobs, and triggers. This workflow converts uploaded data files into Apache Parquet format.
  • AWS Glue database – The AWS Glue Data Catalog database that holds the tables created in this walkthrough.
  • AWS Glue table – The Data Catalog table representing the Parquet files being converted by the workflow.
  • AWS Lambda function – The AWS Lambda function is used as an AWS CloudFormation custom resource to copy job scripts from an AWS Glue-managed GitHub repository and an AWS Big Data blog S3 bucket to your S3 bucket.
  • IAM roles and policies – We use the following AWS Identity and Access Management (IAM) roles:
    • LambdaExecutionRole – Runs the Lambda function that has permission to upload the job scripts to the S3 bucket.
    • GlueServiceRole – Runs the AWS Glue job that has permission to download the script, read data from the source, and write data to the destination after conversion.
    • EventBridgeGlueExecutionRole – Has permissions to invoke the NotifyEvent API for an AWS Glue workflow.
    • IngestGlueRole – Runs the AWS Glue job that has permission to ingest data into the S3 bucket.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For pS3BucketName, enter the unique name of your new S3 bucket.
  5. Leave pWorkflowName and pDatabaseName as the default.

cloud formation 1

  1. For pDatasetName, enter the SharePoint list name or file name you want to ingest.
  2. Choose Next.

cloud formation 2

  1. On the next page, choose Next.
  2. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create.

It takes a few minutes for the stack creation to complete; you can follow the progress on the Events tab.

You can run the ingest AWS Glue job either on a schedule or on demand. As the job successfully finishes and ingests data into the raw prefix of the S3 bucket, the AWS Glue workflow runs and transforms the ingested raw CSV files into Parquet files and loads them into the transformed prefix.

Review the EventBridge rule

The CloudFormation template created an EventBridge rule to forward S3 PutObject API events to AWS Glue. Let’s review the configuration of the EventBridge rule:

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose the rule s3_file_upload_trigger_rule-<CloudFormation-stack-name>.
  3. Review the information in the Event pattern section.

event bridge

The event pattern shows that this rule is triggered when an S3 object is uploaded to s3://<bucket_name>/data/SharePoint/tablename_raw/. CloudTrail captures the PutObject API calls made and relays them as events to EventBridge.

  1. In the Targets section, you can verify that this EventBridge rule is configured with an AWS Glue workflow as a target.

event bridge target section

Run the ingest AWS Glue job and verify the AWS Glue workflow is triggered successfully

To test the workflow, we run the ingest-glue-job-SharePoint-file job using the following steps:

  1. On the AWS Glue console, select the ingest-glue-job-SharePoint-file job.

glue job

  1. On the Action menu, choose Run job.

glue job action menu

  1. Choose the History tab and wait until the job succeeds.

glue job history tab

You can now see the CSV files in the raw prefix of your S3 bucket.

csv file s3 location

Now the workflow should be triggered.

  1. On the AWS Glue console, validate that your workflow is in the RUNNING state.

glue workflow running status

  1. Choose the workflow to view the run details.
  2. On the History tab of the workflow, choose the current or most recent workflow run.
  3. Choose View run details.

glue workflow visual

When the workflow run status changes to Completed, let’s check the converted files in your S3 bucket.

  1. Switch to the Amazon S3 console, and navigate to your bucket.

You can see the Parquet files under s3://<bucket_name>/data/SharePoint/tablename_transformed/.

parquet file s3 location

Congratulations! Your workflow ran successfully based on S3 events triggered by uploading files to your bucket. You can verify everything works as expected by running a query against the generated table using Amazon Athena.

Sample wine dataset

Let’s analyze a sample red wine dataset. The following screenshot shows a SharePoint list that contains various readings that relate to the characteristics of the wine and an associated wine category. This is populated by various wine tasters from multiple countries.

redwine dataset

The following screenshot shows a supplier dataset from the data lake with wine categories ordered per supplier.

supplier dataset

We process the red wine dataset using this solution and use Athena to query the red wine data and supplier data where wine quality is greater than or equal to 7.

athena query and results

We can visualize the processed dataset using QuickSight.

Clean up

To avoid incurring unnecessary charges, you can use the AWS CloudFormation console to delete the stack that you deployed. This removes all the resources you created when deploying the solution.

Conclusion

Event-driven architectures provide access to near-real-time information and help you make business decisions on fresh data. In this post, we demonstrated how to ingest and process SharePoint data using AWS serverless services like AWS Glue and EventBridge. We saw how to configure a rule in EventBridge to forward events to AWS Glue. You can use this pattern for your analytical use cases, such as joining SharePoint data with other data in your lake to generate insights, or auditing SharePoint data and compliance requirements.


About the Author

Venkata Sistla is a Big Data & Analytics Consultant on the AWS Professional Services team. He specializes in building data processing capabilities and helping customers remove constraints that prevent them from leveraging their data to develop business insights.

Build a serverless event-driven workflow with AWS Glue and Amazon EventBridge

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/build-a-serverless-event-driven-workflow-with-aws-glue-and-amazon-eventbridge/

Customers are adopting event-driven-architectures to improve the agility and resiliency of their applications. As a result, data engineers are increasingly looking for simple-to-use yet powerful and feature-rich data processing tools to build pipelines that enrich data, move data in and out of their data lake and data warehouse, and analyze data. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration so that you can start analyzing your data and putting it to use in minutes instead of months.

Data integration jobs have varying degrees of priority and time sensitivity. For example, you can use batch processing to process weekly sales data but in some cases, data needs to be processed immediately. Fraud detection applications, for example, require near-real-time processing of security logs. Or if a partner uploads product information to your Amazon Simple Storage Service (Amazon S3) bucket, it needs to be processed right away to ensure that your website has the latest product information.

This post discusses how to configure AWS Glue workflows to run based on real-time events. You no longer need to set schedules or build complex solutions to trigger jobs based on events; AWS Glue event-driven workflows manage it all for you.

Get started with AWS Glue event-driven workflows

As a business requirement, most companies need to hydrate their data lake and data warehouse with data in near-real time. They run their pipelines on a schedule (hourly, daily, or even weekly) or trigger the pipeline through an external system. It’s difficult to predict the frequency at which upstream systems generate data, which makes it difficult to plan and schedule ETL pipelines to run efficiently. Scheduling ETL pipelines to run too frequently can be expensive, whereas scheduling pipelines to run infrequently can lead to making decisions based on stale data. Similarly, triggering pipelines from an external process can increase complexity, cost, and job startup time.

AWS Glue now supports event-driven workflows, a capability that lets developers start AWS Glue workflows based on events delivered by Amazon EventBridge. With this new feature, you can trigger a data integration workflow from any events from AWS services, software as a service (SaaS) providers, and any custom applications. For example, you can react to an S3 event generated when new buckets are created and when new files are uploaded to a specific S3 location. In addition, if your environment generates many events, AWS Glue allows you to batch them either by time duration or by the number of events. Event-driven workflows make it easy to start an AWS Glue workflow based on real-time events.

To get started, you simply create a new AWS Glue trigger of type EVENT and place it as the first trigger in your workflow. You can optionally specify a batching condition. Without event batching, the AWS Glue workflow is triggered every time an EventBridge rule matches which may result in multiple concurrent workflow runs. In some environments, starting many concurrent workflow runs could lead to throttling, reaching service quota limits, and potential cost overruns. This can also result in workflow execution failures in case the concurrency limit specified on the workflow and the jobs within the workflow do not match. Event batching allows you to configure the number of events to buffer or the maximum elapsed time before firing the particular trigger. Once the batching condition is met, a workflow run is started. For example, you can trigger your workflow when 100 files are uploaded in S3 or 5 minutes after the first upload. We recommend configuring event batching to avoid too many concurrent workflow runs, and optimize resource usage and cost.

Overview of the solution

In this post, we walk through a solution to set up an AWS Glue workflow that listens to S3 PutObject data events captured by AWS CloudTrail. This workflow is configured to run when five new files are added or the batching window time of 900 seconds expires after first file is added. The following diagram illustrates the architecture.

The steps in this solution are as follows:

  1. Create an AWS Glue workflow with a starting trigger of EVENT type and configure the batch size on the trigger to be five and batch window to be 900 seconds.
  2. Configure Amazon S3 to log data events, such as PutObject API calls to CloudTrail.
  3. Create a rule in EventBridge to forward the PutObject API events to AWS Glue when they are emitted by CloudTrail.
  4. Add an AWS Glue event-driven workflow as a target to the EventBridge rule.
  5. To start the workflow, upload files to the S3 bucket. Remember you need to have at least five files before the workflow is triggered.

Deploy the solution with AWS CloudFormation

For a quick start of this solution, you can deploy the provided AWS CloudFormation stack. This creates all the required resources in your account.

The CloudFormation template generates the following resources:

  • S3 bucket – This is used to store data, CloudTrail logs, job scripts, and any temporary files generated during the AWS Glue ETL job run.
  • CloudTrail trail with S3 data events enabled – This enables EventBridge to receive PutObject API call data on specific bucket.
  • AWS Glue workflow – A data processing pipeline that is comprised of a crawler, jobs, and triggers. This workflow converts uploaded data files into Apache Parquet format.
  • AWS Glue database – The AWS Glue Data Catalog database that is used to hold the tables created in this walkthrough.
  • AWS Glue table – The Data Catalog table representing the Parquet files being converted by the workflow.
  • AWS Lambda function – This is used as an AWS CloudFormation custom resource to copy job scripts from an AWS Glue-managed GitHub repository and an AWS Big Data blog S3 bucket to your S3 bucket.
  • IAM roles and policies – We use the following AWS Identity and Access Management (IAM) roles:
    • LambdaExecutionRole – Runs the Lambda function that has permission to upload the job scripts to the S3 bucket.
    • GlueServiceRole – Runs the AWS Glue job that has permission to download the script, read data from the source, and write data to the destination after conversion.
    • EventBridgeGlueExecutionRole – Has permissions to invoke the NotifyEvent API for an AWS Glue workflow.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:

  1. Choose Next.
  2. For S3BucketName, enter the unique name of your new S3 bucket.
  3. For WorkflowName, DatabaseName, and TableName, leave as the default.
  4. Choose Next.

  1. On the next page, choose Next.
  2. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create.

It takes a few minutes for the stack creation to complete; you can follow the progress on the Events tab.

By default, the workflow runs whenever a single file is uploaded to the S3 bucket, resulting in a PutObject API call. In the next section, we configure the event batching to change this behavior.

Review the AWS Glue trigger and add event batching conditions

The CloudFormation template provisioned an AWS Glue workflow including a crawler, jobs, and triggers. The first trigger in the workflow is configured as an event-based trigger. Next, we update this trigger to batch five events or wait for 900 seconds after the first event before it starts the workflow.

Before we make any changes, let’s review the trigger on the AWS Glue console:

  1. On the AWS Glue console, under ETL, choose Triggers.
  2. Choose <Workflow-name>_pre_job_trigger.
  3. Choose Edit.

We can see the trigger’s type is set to EventBridge event, which means it’s an event-based trigger. Let’s change the event batching condition to run the workflow after five files are uploaded to Amazon S3.

  1. For Number of events, enter 5.
  2. For Time delay (sec), enter 900.
  3. Choose Next.

  1. On the next screen, under Choose jobs to trigger, leave as the default and choose Next.
  2. Choose Finish.

Review the EventBridge rule

The CloudFormation template created an EventBridge rule to forward S3 PutObject API events to AWS Glue. Let’s review the configuration of the EventBridge rule:

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose s3_file_upload_trigger_rule-<CloudFormation-stack-name>.
  3. Review the information in the Event pattern section.

The event pattern shows that this rule is triggered when an S3 object is uploaded to s3://<bucket_name>/data/products_raw/. CloudTrail captures the PutObject API calls made and relays them as events to EventBridge.

  1. In the Targets section, you can verify that this EventBridge rule is configured with an AWS Glue workflow as a target.

Trigger the AWS Glue workflow by uploading files to Amazon S3

To test your workflow, we upload files to Amazon S3 using the AWS Command Line Interface (AWS CLI). If you don’t have the AWS CLI, see Installing, updating, and uninstalling the AWS CLI.

Let’s upload some small files to your S3 bucket.

  1. Run the following command to upload the first file to your S3 bucket:
$ echo '{"product_id": "00001", "product_name": "Television", "created_at": "2021-06-01"}' > product_00001.json
$ aws s3 cp product_00001.json s3://<bucket-name>/data/products_raw/
  1. Run the following command to upload the second file:
$ echo '{"product_id": "00002", "product_name": "USB charger", "created_at": "2021-06-02"}' > product_00002.json
$ aws s3 cp product_00002.json s3://<bucket-name>/data/products_raw/
  1. Run the following command to upload the third file:
$ echo '{"product_id": "00003", "product_name": "USB charger", "created_at": "2021-06-03"}' &gt; product_00003.json<br />
$ aws s3 cp product_00003.json s3://<bucket-name>/data/products_raw/
  1. Run the following command to upload the fourth file:
$ echo '{"product_id": "00004", "product_name": "USB charger", "created_at": "2021-06-04"}' &gt; product_00004.json<br />
$ aws s3 cp product_00004.json s3://<bucket-name>/data/products_raw/

These events didn’t trigger the workflow because it didn’t meet the batch condition of five events.

  1. Run the following command to upload the fifth file:
$ echo '{"product_id": "00005", "product_name": "USB charger", "created_at": "2021-06-05"}' > product_00005.json
$ aws s3 cp product_00005.json s3://<bucket-name>/data/products_raw/

Now the five JSON files have been uploaded to Amazon S3.

Verify the AWS Glue workflow is triggered successfully

Now the workflow should be triggered. Open the AWS Glue console to validate that your workflow is in the RUNNING state.

To view the run details, complete the following steps:

  1. On the History tab of the workflow, choose the current or most recent workflow run.
  2. Choose View run details.

When the workflow run status changes to Completed, let’s see the converted files in your S3 bucket.

  1. Switch to the Amazon S3 console, and navigate to your bucket.

You can see the Parquet files under s3://<bucket-name>/data/products/.

Congratulations! Your workflow ran successfully based on S3 events triggered by uploading files to your bucket. You can verify everything works as expected by running a query against the generated table using Amazon Athena.

Verify the metrics for the EventBridge rule

Optionally, you can use Amazon CloudWatch metrics to validate the events were sent to the AWS Glue workflow.

  1. On the EventBridge console, in the navigation pane, choose Rules.
  2. Select your EventBridge rule s3_file_upload_trigger_rule-<Workflow-name> and choose Metrics for the rule.

When the target workflow is invoked by the rule, the metrics Invocations and TriggeredRules are published.

The metric FailedInvocations is published if the EventBridge rule is unable to trigger the AWS Glue workflow. In that case, we recommend you check the following configurations:

  • Verify the IAM role provided to the EventBridge rule allows the glue:NotifyEvent permission on the AWS Glue workflow.
  • Verify the trust relationship on the IAM role provides the events.amazonaws.com service principal the ability to assume the role.
  • Verify the starting trigger on your target AWS Glue workflow is an event-based trigger.

Clean up

Now to the final step, cleaning up the resources. Delete the CloudFormation stack to remove any resources you created as part of this walkthrough.

Conclusion

AWS Glue event-driven workflows enable data engineers to easily build event driven ETL pipelines that respond in near-real time, delivering fresh data to business users. In this post, we demonstrated how to configure a rule in EventBridge to forward events to AWS Glue. We also saw how to create an event-based trigger that either immediately, or after a set number of events or period of time, starts a Glue ETL workflow. Migrating your existing AWS Glue workflows to make them event-driven is easy. This can be simply done by replacing the first trigger in the workflow to be of type EVENT and adding this workflow as a target to an EventBridge rule that captures events of your interest.

For more information about event-driven AWS Glue workflows, see Starting an AWS Glue Workflow with an Amazon EventBridge Event.


About the Authors

Noritaka Sekiyama is a Senior Big Data Architect on the AWS Glue and AWS Lake Formation team. In his spare time, he enjoys playing with his children. They are addicted to grabbing crayfish and worms in the park, and putting them in the same jar to observe what happens.

 

 

Karan Vishwanathan is a Software Development Engineer on the AWS Glue team. He enjoys working on distributed systems problems and playing golf.

 

 

 

Keerthi Chadalavada is a Software Development Engineer on the AWS Glue team. She is passionate about building fault tolerant and reliable distributed systems at scale.

Hydrate your data lake with SaaS application data using Amazon AppFlow

Post Syndicated from Ninad Phatak original https://aws.amazon.com/blogs/big-data/hydrate-your-data-lake-with-saas-application-data-using-amazon-appflow/

Organizations today want to make data-driven decisions. The data could lie in multiple source systems, such as line of business applications, log files, connected devices, social media, and many more. As organizations adopt software as a service (SaaS) applications, data becomes increasingly fragmented and trapped in different “data islands.” To make decision-making easier, organizations are building data lakes, which is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, ad hoc analytics, and machine learning (ML) to guide better decisions.

AWS provides services such as AWS Glue, AWS Lake Formation, Amazon Database Migration Service (AWS DMS), and many third-party solutions on AWS Marketplace to integrate data from various source systems into the Amazon Simple Storage Service (Amazon S3) data lake. If you’re using SaaS applications like Salesforce, Marketo, Slack, and ServiceNow to run your business, you may need to integrate data from these sources into your data lake. You likely also want to easily integrate these data sources without writing or managing any code. This is precisely where you can use Amazon AppFlow.

Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications like Salesforce, Marketo, Slack, and ServiceNow and AWS services like Amazon S3 and Amazon Redshift. With Amazon AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event in real time, or on demand. You can configure data transformations such as data masking and concatenation of fields as well as validate and filter data (omitting records that don’t fit a criteria) to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow automatically encrypts data in motion, and optionally allows you to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats. For a complete list of all the SaaS applications that can be integrated with Amazon AppFlow, see Amazon AppFlow integrations.

In this post, we look at how to integrate data from Salesforce into a data lake and query the data via Amazon Athena. Amazon AppFlow recently announced multiple new capabilities such as availability of APIs and integration with AWS CloudFormation. We take advantage of these new capabilities and deploy the solution using a CloudFormation template.

Solution architecture

The following diagram depicts the architecture of the solution that we deploy using AWS CloudFormation.

As seen in the diagram, we use Amazon AppFlow to integrate data from Salesforce into a data lake on Amazon S3. We then use Athena to query this data with the table definitions residing in the AWS Glue Data Catalog.

Deploy the solution with AWS CloudFormation

We use AWS CloudFormation to deploy the solution components in your AWS account. Choose an AWS Region for deployment where the following services are available:

  • Amazon AppFlow
  • AWS Glue
  • Amazon S3
  • Athena

You need to meet the following prerequisites before deploying the solution:

  • Have a Salesforce account with credentials authorized to pull data using APIs.
  • If you’re deploying the stack in an account using the Lake Formation permission model, validate the following settings:
    • The AWS Identity and Access Management (IAM) user used to deploy the stack is added as a data lake administrator under Lake Formation, or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions. The following screenshot shows the Data catalog settings page on the Lake Formation console, where you can set these permissions.

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Although you need these Lake Formation settings for the CloudFormation stack to deploy properly, in a production setting we recommend you use Lake Formation to govern access to the data in the data lake. For more information about Lake Formation, see What Is AWS Lake Formation?

We now deploy the solution and the following components:

  • An Amazon AppFlow flow to integrate Salesforce account data into Amazon S3
  • An AWS Glue Data Catalog database
  • An AWS Glue crawler to crawl the data pulled into Amazon S3 so that it can be queried using Athena.
  1. On the Amazon AppFlow console, on the Connections page, choose Create connection.
  2. For Connection name, enter a name for your connection.
  3. Choose Continue.

You’re redirected to the Salesforce login page, where you enter your Salesforce account credentials.

  1. Enter the appropriate credentials and grant OAuth2 access to the Amazon AppFlow client in the next step, after which a new connector profile is set up in your AWS account.
  2. To deploy the remaining solution components, choose Launch Stack:
  3. For Stack name, enter an appropriate name for the CloudFormation stack.
  4. For Parameters, enter the name of the Salesforce connection you created.
  5. Choose Next.
  6. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  7. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.
  9. Wait for the stack status to change to CREATE_COMPLETE.
  10. On the Outputs tab of the stack, record the name of the S3 bucket.

Run the flow

The CloudFormation stack has deployed a flow named SFDCAccount. Open the flow to see the configuration. The flow has been configured to do the following:

  • Pull the account object from your Salesforce account into a S3 bucket. The flow pulls certain attributes from the object in Parquet format.
  • Mask the last five digits of the phone number associated with the Salesforce account.
  • Build a validation on the Account ID field that ignores the record if the value is NULL.

Make sure that all these attributes pulled by the flow are part of your account object in Salesforce. Make any additional changes that you may want to the flow and save the flow.

  1. Run the flow by choosing Run flow.
  2. When the flow is complete, navigate to the S3 bucket created by the CloudFormation stack to confirm its contents.

The Salesforce account data is stored in Parquet format in the SFDCData/SFDCAccount/ folder in the S3 bucket.

  1. On the AWS Glue console, run the crawler AppFlowGlueCrawler.

This crawler has been created by the CloudFormation stack and is configured to crawl the S3 bucket and create a table in the appflowblogdb database in the Data Catalog.

When the crawler is complete, a table named SFDCAccount exists in the appflowblogdb database.

  1. On the Athena console, run the following query:
    Select * from appflowblogdb.SFDCAccount limit 10;

The output shows the data pulled by the Amazon AppFlow flow into the S3 bucket.

Clean up

When you’re done exploring the solution, complete the following steps to clean up the resources deployed by AWS CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack.
  2. Delete the CloudFormation stack.

Conclusion

In this post, we saw how you can easily set up an Amazon AppFlow flow to integrate data from Salesforce into your data lake. Amazon Appflow allows you to integrate data from many other SaaS applications into your data lake. After the data lands in Amazon S3, you can take it further for downstream processing using services like Amazon EMR and AWS Glue. You can then use the data in the data lake for multiple analytics use cases ranging from dashboards to ad hoc analytics and ML.


About the Authors

Ninad Phatak is a Principal Data Architect at Amazon Development Center India. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

Vinay Kondapi is Head of product for Amazon AppFlow. He specializes in Application and data integration with SaaS products at AWS.

 

 

 

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Introducing Amazon Managed Workflows for Apache Airflow (MWAA)

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-managed-workflows-for-apache-airflow-mwaa/

As the volume and complexity of your data processing pipelines increase, you can simplify the overall process by decomposing it into a series of smaller tasks and coordinate the execution of these tasks as part of a workflow. To do so, many developers and data engineers use Apache Airflow, a platform created by the community to programmatically author, schedule, and monitor workflows. With Airflow you can manage workflows as scripts, monitor them via the user interface (UI), and extend their functionality through a set of powerful plugins. However, manually installing, maintaining, and scaling Airflow, and at the same time handling security, authentication, and authorization for its users takes much of the time you’d rather use to focus on solving actual business problems.

For these reasons, I am happy to announce the availability of Amazon Managed Workflows for Apache Airflow (MWAA), a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to execute your extract-transform-load (ETL) jobs and data pipelines.

Airflow workflows retrieve input from sources like Amazon Simple Storage Service (S3) using Amazon Athena queries, perform transformations on Amazon EMR clusters, and can use the resulting data to train machine learning models on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) using the Python programming language.

A key benefit of Airflow is its open extensibility through plugins which allows you to create tasks that interact with AWS or on-premise resources required for your workflows including AWS Batch, Amazon CloudWatch, Amazon DynamoDB, AWS DataSync, Amazon ECS and AWS Fargate, Amazon Elastic Kubernetes Service (EKS), Amazon Kinesis Firehose, AWS Glue, AWS Lambda, Amazon Redshift, Amazon Simple Queue Service (SQS), and Amazon Simple Notification Service (SNS).

To improve observability, Airflow metrics can be published as CloudWatch Metrics, and logs can be sent to CloudWatch Logs. Amazon MWAA provides automatic minor version upgrades and patches by default, with an option to designate a maintenance window in which these upgrades are performed.

You can use Amazon MWAA with these three steps:

  1. Create an environment – Each environment contains your Airflow cluster, including your scheduler, workers, and web server.
  2. Upload your DAGs and plugins to S3 – Amazon MWAA loads the code into Airflow automatically.
  3. Run your DAGs in Airflow – Run your DAGs from the Airflow UI or command line interface (CLI) and monitor your environment with CloudWatch.

Let’s see how this works in practice!

How to Create an Airflow Environment Using Amazon MWAA
In the Amazon MWAA console, I click on Create environment. I give the environment a name and select the Airflow version to use.

Then, I select the S3 bucket and the folder to load my DAG code. The bucket name must start with airflow-.

Optionally, I can specify a plugins file and a requirements file:

  • The plugins file is a ZIP file containing the plugins used by my DAGs.
  • The requirements file describes the Python dependencies to run my DAGs.

For plugins and requirements, I can select the S3 object version to use. In case the plugins or the requirements I use create a non-recoverable error in my environment, Amazon MWAA will automatically roll back to the previous working version.


I click Next to configure the advanced settings, starting with networking. Each environment runs in a Amazon Virtual Private Cloud using private subnets in two availability zones. Web server access to the Airflow UI is always protected by a secure login using AWS Identity and Access Management (IAM). However, you can choose to have web server access on a public network so that you can login over the Internet, or on a private network in your VPC. For simplicity, I select a Public network. I let Amazon MWAA create a new security group with the correct inbound and outbound rules. Optionally, I can add one or more existing security groups to fine-tune control of inbound and outbound traffic for your environment.

Now, I configure my environment class. Each environment includes a scheduler, a web server, and a worker. Workers automatically scale up and down according to my workload. We provide you a suggestion on which class to use based on the number of DAGs, but you can monitor the load on your environment and modify its class at any time.

Encryption is always enabled for data at rest, and while I can select a customized key managed by AWS Key Management Service (KMS) I will instead keep the default key that AWS owns and manages on my behalf.

For monitoring, I publish environment performance to CloudWatch Metrics. This is enabled by default, but I can disable CloudWatch Metrics after launch. For the logs, I can specify the log level and which Airflow components should send their logs to CloudWatch Logs. I leave the default to send only the task logs and use log level INFO.

I can modify the default settings for Airflow configuration options, such as default_task_retries or worker_concurrency. For now, I am not changing these values.

Finally, but most importantly, I configure the permissions that will be used by my environment to access my DAGs, write logs, and run DAGs accessing other AWS resources. I select Create a new role and click on Create environment. After a few minutes, the new Airflow environment is ready to be used.

Using the Airflow UI
In the Amazon MWAA console, I look for the new environment I just created and click on Open Airflow UI. A new browser window is created and I am authenticated with a secure login via AWS IAM.

There, I look for a DAG that I put on S3 in the movie_list_dag.py file. The DAG is downloading the MovieLens dataset, processing the files on S3 using Amazon Athena, and loading the result to a Redshift cluster, creating the table if missing.

Here’s the full source code of the DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import HttpSensor, S3KeySensor
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import StringIO
from io import BytesIO
from time import sleep
import csv
import requests
import json
import boto3
import zipfile
import io
s3_bucket_name = 'my-bucket'
s3_key='files/'
redshift_cluster='redshift-cluster-1'
redshift_db='dev'
redshift_dbuser='awsuser'
redshift_table_name='movie_demo'
test_http='https://grouplens.org/datasets/movielens/latest/'
download_http='http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
athena_db='demo_athena_db'
athena_results='athena-results/'
create_athena_movie_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Movies (
  `movieId` int,
  `title` string,
  `genres` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://pinwheeldemo1-pinwheeldagsbucketfeed0594-1bks69fq0utz/files/ml-latest-small/movies.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_ratings_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Ratings (
  `userId` int,
  `movieId` int,
  `rating` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://pinwheeldemo1-pinwheeldagsbucketfeed0594-1bks69fq0utz/files/ml-latest-small/ratings.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_tags_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Tags (
  `userId` int,
  `movieId` int,
  `tag` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://pinwheeldemo1-pinwheeldagsbucketfeed0594-1bks69fq0utz/files/ml-latest-small/tags.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
join_tables_athena_query="""
SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating
FROM demo_athena_db.ML_Latest_Small_Movies m
INNER JOIN (SELECT rating, movieId FROM demo_athena_db.ML_Latest_Small_Ratings WHERE rating > 4) r on m.movieId = r.movieId
"""
def download_zip():
    s3c = boto3.client('s3')
    indata = requests.get(download_http)
    n=0
    with zipfile.ZipFile(io.BytesIO(indata.content)) as z:       
        zList=z.namelist()
        print(zList)
        for i in zList: 
            print(i) 
            zfiledata = BytesIO(z.read(i))
            n += 1
            s3c.put_object(Bucket=s3_bucket_name, Key=s3_key+i+'/'+i, Body=zfiledata)
def clean_up_csv_fn(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey=athena_results+"join_athena_tables/"+queryId+".csv"
    print(athenaKey)
    cleanKey=athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    s3c = boto3.client('s3')
    obj = s3c.get_object(Bucket=s3_bucket_name, Key=athenaKey)
    infileStr=obj['Body'].read().decode('utf-8')
    outfileStr=infileStr.replace('"e"', '') 
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name, Key=cleanKey, Body=outfile.getvalue())
def s3_to_redshift(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey='s3://'+s3_bucket_name+"/"+athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    print(athenaKey)
    sqlQuery="copy "+redshift_table_name+" from '"+athenaKey+"' iam_role 'arn:aws:iam::163919838948:role/myRedshiftRole' CSV IGNOREHEADER 1;"
    print(sqlQuery)
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql=sqlQuery
    )
    print(resp)
    return "OK"
def create_redshift_table():
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql="CREATE TABLE IF NOT EXISTS "+redshift_table_name+" (title	character varying, rating	int);"
    )
    print(resp)
    return "OK"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False 
}
with DAG(
    dag_id='movie-list-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='*/10 * * * *',
    tags=['athena','redshift'],
) as dag:
    check_s3_for_key = S3KeySensor(
        task_id='check_s3_for_key',
        bucket_key=s3_key,
        wildcard_match=True,
        bucket_name=s3_bucket_name,
        s3_conn_id='aws_default',
        timeout=20,
        poke_interval=5,
        dag=dag
    )
    files_to_s3 = PythonOperator(
        task_id="files_to_s3",
        python_callable=download_zip
    )
    create_athena_movie_table = AWSAthenaOperator(task_id="create_athena_movie_table",query=create_athena_movie_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_movie_table')
    create_athena_ratings_table = AWSAthenaOperator(task_id="create_athena_ratings_table",query=create_athena_ratings_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_ratings_table')
    create_athena_tags_table = AWSAthenaOperator(task_id="create_athena_tags_table",query=create_athena_tags_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_tags_table')
    join_athena_tables = AWSAthenaOperator(task_id="join_athena_tables",query=join_tables_athena_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'join_athena_tables')
    create_redshift_table_if_not_exists = PythonOperator(
        task_id="create_redshift_table_if_not_exists",
        python_callable=create_redshift_table
    )
    clean_up_csv = PythonOperator(
        task_id="clean_up_csv",
        python_callable=clean_up_csv_fn,
        provide_context=True     
    )
    transfer_to_redshift = PythonOperator(
        task_id="transfer_to_redshift",
        python_callable=s3_to_redshift,
        provide_context=True     
    )
    check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
    files_to_s3 >> create_athena_ratings_table >> join_athena_tables
    files_to_s3 >> create_athena_tags_table >> join_athena_tables
    files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

In the code, different tasks are created using operators like PythonOperator, for generic Python code, or AWSAthenaOperator, to use the integration with Amazon Athena. To see how those tasks are connected in the workflow, you can see the latest few lines, that I repeat here (without indentation) for simplicity:

check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
files_to_s3 >> create_athena_ratings_table >> join_athena_tables
files_to_s3 >> create_athena_tags_table >> join_athena_tables
files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

The Airflow code is overloading the right shift >> operator in Python to create a dependency, meaning that the task on the left should be executed first, and the output passed to the task on the right. Looking at the code, this is quite easy to read. Each of the four lines above is adding dependencies, and they are all evaluated together to execute the tasks in the right order.

In the Airflow console, I can see a graph view of the DAG to have a clear representation of how tasks are executed:

Available Now
Amazon Managed Workflows for Apache Airflow (MWAA) is available today in US East (Northern Virginia), US West (Oregon), US East (Ohio), Asia Pacific (Singapore), Asia Pacific (Toyko), Asia Pacific (Sydney), Europe (Ireland), Europe (Frankfurt), and Europe (Stockholm). You can launch a new Amazon MWAA environment from the console, AWS Command Line Interface (CLI), or AWS SDKs. Then, you can develop workflows in Python using Airflow’s ecosystem of integrations.

With Amazon MWAA, you pay based on the environment class and the workers you use. For more information, see the pricing page.

Upstream compatibility is a core tenet of Amazon MWAA. Our code changes to the AirFlow platform are released back to open source.

With Amazon MWAA you can spend more time building workflows for your engineering and data science tasks, and less time managing and scaling the infrastructure of your Airflow platform.

Learn more about Amazon MWAA and get started today!

Danilo

New – Archive and Replay Events with Amazon EventBridge

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-archive-and-replay-events-with-amazon-eventbridge/

Event-driven architectures use events to share information between the components of one or more applications. Events tell us that “something has happened”, maybe you received an API request, a file has been uploaded to a storage platform, or a database record has been updated. Business events describe something related to your activities, for example that […]

Building Serverless Land: Part 1 – Automating content aggregation

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/building-serverless-land-part-1-automating-content-aggregation/

In this two part blog series, I show how serverlessland.com is built. This is a static website that brings together all the latest blogs, videos, and training for AWS Serverless. It automatically aggregates content from a number of sources. The content exists in static JSON files, which generate a new site build each time they are updated. The result is a low-maintenance, low-latency serverless website, with almost limitless scalability.

This blog post explains how to automate the aggregation of content from multiple RSS feeds into a JSON file stored in GitHub. This workflow uses AWS Lambda and AWS Step Functions, triggered by Amazon EventBridge. The application can be downloaded and deployed from this GitHub repository.

The growing adoption of serverless technologies generates increasing amounts of helpful and insightful content from the developer community. This content can be difficult to discover. Serverless Land helps channel this into a single searchable location. By automating the collection of this content with scheduled serverless workflows, the process robustly scales to near infinite numbers. The Step Functions MAP state allows for dynamic parallel processing of multiple content sources, without the need to alter code. On-boarding a new content source is as fast and simple as making a single CLI command.

The architecture

Automating content aggregation with AWS Step Functions

The application consists of six Lambda functions orchestrated by a Step Functions workflow:

  1. The workflow is triggered every 2 hours by an EventBridge scheduler. The schedule event passes an RSS feed URL to the workflow.
  2. The first task invokes a Lambda function that runs an HTTP GET request to the RSS feed. It returns an array of recent blog URLs. The array of blog URLs is provided as the input to a MAP state. The MAP state type makes it possible to run a set of steps for each element of an input array in parallel. The number of items in the array can be different for each execution. This is referred to as dynamic parallelism.
  3. The next task invokes a Lambda function that uses the GitHub REST API to retrieve the static website’s JSON content file.
  4. The first Lambda function in the MAP state runs an HTTP GET request to the blog post URL provided in the payload. The URL is scraped for content and an object containing detailed metadata about the blog post is returned in the response.
  5. The blog post metadata is compared against the website’s JSON content file in GitHub.
  6. A CHOICE state determines if the blog post metadata has already been committed to the repository.
  7. If the blog post is new, it is added to an array of “content to commit”.
  8. As the workflow exits the MAP state, the results are passed to the final Lambda function. This uses a single git commit to add each blog post object to the website’s JSON content file in GitHub. This triggers an event that rebuilds the static site.

Using Secrets in AWS Lambda

Two of the Lambda functions require a GitHub personal access token to commit files to a repository. Sensitive credentials or secrets such as this should be stored separate to the function code. Use AWS Systems Manager Parameter Store to store the personal access token as an encrypted string. The AWS Serverless Application Model (AWS SAM) template grants each Lambda function permission to access and decrypt the string in order to use it.

  1. Follow these steps to create a personal access token that grants permission to update files to repositories in your GitHub account.
  2. Use the AWS Command Line Interface (AWS CLI) to create a new parameter named GitHubAPIKey:
aws ssm put-parameter \
--name /GitHubAPIKey \
--value ReplaceThisWithYourGitHubAPIKey \
--type SecureString

{
    "Version": 1,
    "Tier": "Standard"
}

Deploying the application

  1. Fork this GitHub repository to your GitHub Account.
  2. Clone the forked repository to your local machine and deploy the application using AWS SAM.
  3. In a terminal, enter:
    git clone https://github.com/aws-samples/content-aggregator-example
    sam deploy -g
  4. Enter the required parameters when prompted.

This deploys the application defined in the AWS SAM template file (template.yaml).

The business logic

Each Lambda function is written in Node.js and is stored inside a directory that contains the package dependencies in a `node_modules` folder. These are defined for each function by its relative package.json file. The function dependencies are bundled and deployed using the sam build && deploy -g command.

The GetRepoContents and WriteToGitHub Lambda functions use the octokit/rest.js library to communicate with GitHub. The library authenticates to GitHub by using the GitHub API key held in Parameter Store. The AWS SDK for Node.js is used to obtain the API key from Parameter Store. With a single synchronous call, it retrieves and decrypts the parameter value. This is then used to authenticate to GitHub.

const AWS = require('aws-sdk');
const SSM = new AWS.SSM();


//get Github API Key and Authenticate
    const singleParam = { Name: '/GitHubAPIKey ',WithDecryption: true };
    const GITHUB_ACCESS_TOKEN = await SSM.getParameter(singleParam).promise();
    const octokit = await  new Octokit({
      auth: GITHUB_ACCESS_TOKEN.Parameter.Value,
    })

Lambda environment variables are used to store non-sensitive key value data such as the repository name and JSON file location. These can be entered when deploying with AWS SAM guided deploy command.

Environment:
        Variables:
          GitHubRepo: !Ref GitHubRepo
          JSONFile: !Ref JSONFile

The GetRepoContents function makes a synchronous HTTP request to the GitHub repository to retrieve the contents of the website’s JSON file. The response SHA and file contents are returned from the Lambda function and acts as the input to the next task in the Step Functions workflow. This SHA is used in final step of the workflow to save all new blog posts in a single commit.

Map state iterations

The MAP state runs concurrently for each element in the input array (each blog post URL).

Each iteration must compare a blog post URL to the existing JSON content file and decide whether to ignore the post. To do this, the MAP state requires both the input array of blog post URLs and the existing JSON file contents. The ItemsPath, ResultPath, and Parameters are used to achieve this:

  • The ItemsPath sets input array path to $.RSSBlogs.body.
  • The ResultPath states that the output of the branches is placed in $.mapResults.
  • The Parameters block replaces the input to the iterations with a JSON node. This contains both the current item data from the context object ($$.Map.Item.Value) and the contents of the GitHub JSON file ($.RepoBlogs).
"Type":"Map",
    "InputPath": "$",
    "ItemsPath": "$.RSSBlogs.body",
    "ResultPath": "$.mapResults",
    "Parameters": {
        "BlogUrl.$": "$$.Map.Item.Value",
        "RepoBlogs.$": "$.RepoBlogs"
     },
    "MaxConcurrency": 0,
    "Iterator": {
       "StartAt": "getMeta",

The Step Functions resource

The AWS SAM template uses the following Step Functions resource definition to create a Step Functions state machine:

  MyStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      DefinitionUri: statemachine/my_state_machine.asl.JSON
      DefinitionSubstitutions:
        GetBlogPostArn: !GetAtt GetBlogPost.Arn
        GetUrlsArn: !GetAtt GetUrls.Arn
        WriteToGitHubArn: !GetAtt WriteToGitHub.Arn
        CompareAgainstRepoArn: !GetAtt CompareAgainstRepo.Arn
        GetRepoContentsArn: !GetAtt GetRepoContents.Arn
        AddToListArn: !GetAtt AddToList.Arn
      Role: !GetAtt StateMachineRole.Arn

The actual workflow definition is defined in a separate file (statemachine/my_state_machine.asl.JSON). The DefinitionSubstitutions property specifies mappings for placeholder variables. This enables the template to inject Lambda function ARNs obtained by the GetAtt intrinsic function during template translation:

Step Functions mappings with placeholder variables

A state machine execution role is defined within the AWS SAM template. It grants the `Lambda invoke function` action. This is tightly scoped to the six Lambda functions that are used in the workflow. It is the minimum set of permissions required for the Step Functions to carry out its task. Additional permissions can be granted as necessary, which follows the zero-trust security model.

Action: lambda:InvokeFunction
Resource:
- !GetAtt GetBlogPost.Arn
- !GetAtt GetUrls.Arn
- !GetAtt CompareAgainstRepo.Arn
- !GetAtt WriteToGitHub.Arn
- !GetAtt AddToList.Arn
- !GetAtt GetRepoContents.Arn

The Step Functions workflow definition is authored using the AWS Toolkit for Visual Studio Code. The Step Functions support allows developers to quickly generate workflow definitions from selectable examples. The render tool and automatic linting can help you debug and understand the workflow during development. Read more about the toolkit in this launch post.

Scheduling events and adding new feeds

The AWS SAM template creates a new EventBridge rule on the default event bus. This rule is scheduled to invoke the Step Functions workflow every 2 hours. A valid JSON string containing an RSS feed URL is sent as the input payload. The feed URL is obtained from a template parameter and can be set on deployment. The AWS Compute Blog is set as the default feed URL. To aggregate additional blog feeds, create a new rule to invoke the Step Functions workflow. Provide the RSS feed URL as valid JSON input string in the following format:

{“feedUrl”:”replace-this-with-your-rss-url”}

ScheduledEventRule:
    Type: "AWS::Events::Rule"
    Properties:
      Description: "Scheduled event to trigger Step Functions state machine"
      ScheduleExpression: rate(2 hours)
      State: "ENABLED"
      Targets:
        -
          Arn: !Ref MyStateMachine
          Id: !GetAtt MyStateMachine.Name
          RoleArn: !GetAtt ScheduledEventIAMRole.Arn
          Input: !Sub
            - >
              {
                "feedUrl" : "${RssFeedUrl}"
              }
            - RssFeedUrl: !Ref RSSFeed

A completed workflow with step output

Conclusion

This blog post shows how to automate the aggregation of content from multiple RSS feeds into a single JSON file using serverless workflows.

The Step Functions MAP state allows for dynamic parallel processing of each item. The recent increase in state payload size limit means that the contents of the static JSON file can be held within the workflow context. The application decision logic is separated from the business logic and events.

Lambda functions are scoped to finite business logic with Step Functions states managing decision logic and iterations. EventBridge is used to manage the inbound business events. The zero-trust security model is followed with minimum permissions granted to each service and Parameter Store used to hold encrypted secrets.

This application is used to pull together articles for http://serverlessland.com. Serverless land brings together all the latest blogs, videos, and training for AWS Serverless. Download the code from this GitHub repository to start building your own automated content aggregation platform.