Tag Archives: Amazon Simple Notification Service (SNS)

ICYMI: Serverless pre:Invent 2019

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-serverless-preinvent-2019/

With Contributions from Chris Munns – Sr Manager – Developer Advocacy – AWS Serverless

The last two weeks have been a frenzy of AWS service and feature launches, building up to AWS re:Invent 2019. As there has been a lot announced we thought we’d ship an ICYMI post summarizing the serverless service specific features that have been announced. We’ve also dropped in some service announcements from services that are commonly used in serverless application architectures or development.

AWS re:Invent

AWS re:Invent 2019

We also want you to know that we’ll be talking about many of these features (as well as those coming) in sessions at re:Invent.

Here’s what’s new!

AWS Lambda

On September 3, AWS Lambda started rolling out a major improvement to how AWS Lambda functions work with your Amazon VPC networks. This change brings both scale and performance improvements, and addresses several of the limitations of the previous networking model with VPCs.

On November 25, Lambda announced that the rollout of this new capability has completed in 6 additional regions including US East (Virginia) and US West (Oregon).

New VPC to VPC NAT for Lambda functions

New VPC to VPC NAT for Lambda functions

On November 18, Lambda announced three new runtime updates. Lambda now supports Node.js 12, Java 11, and Python 3.8. Each of these new runtimes has new language features and benefits so be sure to check out the linked release posts. These new runtimes are all based on an Amazon Linux 2 execution environment.

Lambda has released a number of controls for both stream and async based invocations:

  • For Lambda functions consuming events from Amazon Kinesis Data Streams or Amazon DynamoDB Streams, it’s now possible to limit the retry count, limit the age of records being retried, configure a failure destination, or split a batch to isolate a problem record. These capabilities will help you deal with potential “poison pill” records that would previously cause streams to pause in processing.
  • For asynchronous Lambda invocations, you can now set the maximum event age and retry attempts on the event. If either configured condition is met, the event can be routed to a dead letter queue (DLQ), Lambda destination, or it can be discarded.

In addition to the above controls, Lambda Destinations is a new feature that allows developers to designate an asynchronous target for Lambda function invocation results. You can set one destination for a success, and another for a failure. This unlocks really useful patterns for distributed event-based applications and can reduce code to send function results to a destination manually.

Lambda Destinations

Lambda Destinations

Lambda also now supports setting a Parallelization Factor, which allows you to set multiple Lambda invocations per shard for Amazon Kinesis Data Streams and Amazon DynamoDB Streams. This allows for faster processing without the need to increase your shard count, while still guaranteeing the order of records processed.

Lambda Parallelization Factor diagram

Lambda Parallelization Factor diagram

Lambda now supports Amazon SQS FIFO queues as an event source. FIFO queues guarantee the order of record processing compared to standard queues which are unordered. FIFO queues support messaging batching via a MessageGroupID attribute which allows for parallel Lambda consumers of a single FIFO queue. This allows for high throughput of record processing by Lambda.

Lambda now supports Environment Variables in AWS China (Beijing) Region, operated by Sinnet and the AWS China (Ningxia) Region, operated by NWCD.

Lastly, you can now view percentile statistics for the duration metric of your Lambda functions. Percentile statistics tell you the relative standing of a value in a dataset, and are useful when applied to metrics that exhibit large variances. They can help you understand the distribution of a metric, spot outliers, and find hard-to-spot situations that create a poor customer experience for a subset of your users.

AWS SAM CLI

AWS SAM CLI deploy command

AWS SAM CLI deploy command

The SAM CLI team simplified the bucket management and deployment process in the SAM CLI. You no longer need to manage a bucket for deployment artifacts – SAM CLI handles this for you. The deployment process has also been streamlined from multiple flagged commands to a single command, sam deploy.

AWS Step Functions

One of the powerful features of Step Functions is its ability to integrate directly with AWS services without you needing to write complicated application code. Step Functions has expanded its integration with Amazon SageMaker to simplify machine learning workflows, and added a new integration with Amazon EMR, making it faster to build and easier to monitor EMR big data processing workflows.

Step Functions step with EMR

Step Functions step with EMR

Step Functions now provides the ability to track state transition usage by integrating with AWS Budgets, allowing you to monitor and react to usage and spending trends on your AWS accounts.

You can now view CloudWatch Metrics for Step Functions at a one-minute frequency. This makes it easier to set up detailed monitoring for your workflows. You can use one-minute metrics to set up CloudWatch Alarms based on your Step Functions API usage, Lambda functions, service integrations, and execution details.

AWS Step Functions now supports higher throughput workflows, making it easier to coordinate applications with high event rates.

In US East (N. Virginia), US West (Oregon), and EU (Ireland), throughput has increased from 1,000 state transitions per second to 1,500 state transitions per second with bucket capacity of 5,000 state transitions. The default start rate for state machine executions has also increased from 200 per second to 300 per second, with bucket capacity of up to 1,300 starts in these regions.

In all other regions, throughput has increased from 400 state transitions per second to 500 state transitions per second with bucket capacity of 800 state transitions. The default start rate for AWS Step Functions state machine executions has also increased from 25 per second to 150 per second, with bucket capacity of up to 800 state machine executions.

Amazon SNS

Amazon SNS now supports the use of dead letter queues (DLQ) to help capture unhandled events. By enabling a DLQ, you can catch events that are not processed and re-submit them or analyze to locate processing issues.

Amazon CloudWatch

CloudWatch announced Amazon CloudWatch ServiceLens to provide a “single pane of glass” to observe health, performance, and availability of your application.

CloudWatch ServiceLens

CloudWatch ServiceLens

CloudWatch also announced a preview of a capability called Synthetics. CloudWatch Synthetics allows you to test your application endpoints and URLs using configurable scripts that mimic what a real customer would do. This enables the outside-in view of your customers’ experiences, and your service’s availability from their point of view.

On November 18, CloudWatch launched Embedded Metric Format to help you ingest complex high-cardinality application data in the form of logs and easily generate actionable metrics from them. You can publish these metrics from your Lambda function by using the PutLogEvents API or for Node.js or Python based applications using an open source library.

Lastly, CloudWatch announced a preview of Contributor Insights, a capability to identify who or what is impacting your system or application performance by identifying outliers or patterns in log data.

AWS X-Ray

X-Ray announced trace maps, which enable you to map the end to end path of a single request. Identifiers will show issues and how they affect other services in the request’s path. These can help you to identify and isolate service points that are causing degradation or failures.

X-Ray also announced support for Amazon CloudWatch Synthetics, currently in preview. X-Ray supports tracing canary scripts throughout the application providing metrics on performance or application issues.

X-Ray Service map with CloudWatch Synthetics

X-Ray Service map with CloudWatch Synthetics

Amazon DynamoDB

DynamoDB announced support for customer managed customer master keys (CMKs) to encrypt data in DynamoDB. This allows customers to bring your own key (BYOK) giving you full control over how you encrypt and manage the security of your DynamoDB data.

It is now possible to add global replicas to existing DynamoDB tables to provide enhanced availability across the globe.

Currently under preview, is another new DynamoDB capability to identify frequently accessed keys and database traffic trends. With this you can now more easily identify “hot keys” and understand usage of your DynamoDB tables.

CloudWatch Contributor Insights for DynamoDB

CloudWatch Contributor Insights for DynamoDB

Last but far from least for DynamoDB, is adaptive capacity, a feature which helps you handle imbalanced workloads by isolating frequently accessed items automatically and shifting data across partitions to rebalance them. This helps both reduce cost by enabling you to provision throughput for a more balanced out workload vs. over provisioning for uneven data access patterns.

AWS Serverless Application Repository

The AWS Serverless Application Repository (SAR) now offers Verified Author badges. These badges enable consumers to quickly and reliably know who you are. The badge will appear next to your name in the SAR and will deep-link to your GitHub profile.

SAR Verified developer badges

SAR Verified developer badges

AWS Code Services

AWS CodeCommit launched the ability for you to enforce rule workflows for pull requests, making it easier to ensure that code has pass through specific rule requirements. You can now create an approval rule specifically for a pull request, or create approval rule templates to be applied to all future pull requests in a repository.

AWS CodeBuild added beta support for test reporting. With test reporting, you can now view the detailed results, trends, and history for tests executed on CodeBuild for any framework that supports the JUnit XML or Cucumber JSON test format.

CodeBuild test trends

CodeBuild test trends

AWS Amplify and AWS AppSync

Instead of trying to summarize all the awesome things that our peers over in the Amplify and AppSync teams have done recently we’ll instead link you to their own recent summary: “A round up of the recent pre-re:Invent 2019 AWS Amplify Launches”.

AWS AppSync

AWS AppSync

Still looking for more?

We only covered a small bit of all the awesome new things that were recently announced. Keep your eyes peeled for more exciting announcements next week during re:Invent and for a future ICYMI Serverless Q4 roundup. We’ll also be kicking off a fresh series of Tech Talks in 2020 with new content helping to dive deeper on everything new coming out of AWS for serverless application developers.

Introducing AWS Lambda Destinations

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/

Today we’re announcing AWS Lambda Destinations for asynchronous invocations. This is a feature that provides visibility into Lambda function invocations and routes the execution results to AWS services, simplifying event-driven applications and reducing code complexity.

Asynchronous invocations

When a function is invoked asynchronously, Lambda sends the event to an internal queue. A separate process reads events from the queue and executes your Lambda function. When the event is added to the queue, Lambda previously only returned a 2xx status code to confirm that the queue has received this event. There was no additional information to confirm whether the event had been processed successfully.

A common event-driven microservices architectural pattern is to use a queue or message bus for communication. This helps with resilience and scalability. Lambda asynchronous invocations can put an event or message on Amazon Simple Notification Service (SNS), Amazon Simple Queue Service (SQS), or Amazon EventBridge for further processing. Previously, you needed to write the SQS/SNS/EventBridge handling code within your Lambda function and manage retries and failures yourself.

With Destinations, you can route asynchronous function results as an execution record to a destination resource without writing additional code. An execution record contains details about the request and response in JSON format including version, timestamp, request context, request payload, response context, and response payload. For each execution status such as Success or Failure you can choose one of four destinations: another Lambda function, SNS, SQS, or EventBridge. Lambda can also be configured to route different execution results to different destinations.

Asynchronous Function Execution Result

Success

When a function is invoked successfully, Lambda routes the record to the destination resource for every successful invocation. You can use this to monitor the health of your serverless applications via execution status or build workflows based on the invocation result.

You no longer need to chain long-running Lambda functions together synchronously. Previously you needed to complete the entire workflow within the Lambda 15-minute function timeout, pay for idle time, and wait for a response. Destinations allows you to return a Success response to the calling function and then handle the remaining chaining functions asynchronously.

Failure

Alongside today’s announcement of Maximum Event Age and Maximum Retry Attempt for asynchronous invocations, Destinations gives you the ability to handle the Failure of function invocations along with their Success. When a function invocation fails, such as when retries are exhausted or the event age has been exceeded (hitting its TTL), Destinations routes the record to the destination resource for every failed invocation for further investigation or processing.

Dead Letter Queues (DLQ) have been available since 2016 and are a great way to handle asynchronous failure situations. Destinations provide more useful capabilities by passing additional function execution information, including code exception stack traces, to more destination services.

Destinations and DLQs can be used together and at the same time although Destinations should be considered a more preferred solution. If you already have DLQs set up, existing functionality does not change and Destinations does not replace existing DLQ configurations. If both Destinations and DLQ are used for Failure notifications, function invoke errors are sent to both DLQ and Destinations targets.

How to configure Destinations

Adding Destinations is a straightforward process. This walkthrough uses the AWS Management Console but you can also use the AWS CLI, AWS SAM, AWS CloudFormation, or language-specific SDKs for Lambda.

  1. Open the Lambda console Functions page. Choose an existing Lambda function, or create a new one. In this example, I create a new Lambda function. Choose Create Function.
  2. Enter a Function name, select Node.js 12.x for Runtime, and Choose or create an execution role. Ensure that your Lambda function execution role includes access to the destination resource.
    Basic information
  3. Choose Create function.
  4. Within the Function code pane, paste the following Lambda function code. The code generates a function execution result of either Success or Failure depending on a JSON input ("Success": true or "Success": false).
    // Lambda Destinations tester, Success returns a json blob, Failure throws an error
    
    exports.handler = function(event, context, callback) {
        var event_received_at = new Date().toISOString();
        console.log('Event received at: ' + event_received_at);
        console.log('Received event:', JSON.stringify(event, null, 2));
    
        if (event.Success) {
            console.log("Success");
            context.callbackWaitsForEmptyEventLoop = false;
            callback(null);
        } else {
            console.log("Failure");
            context.callbackWaitsForEmptyEventLoop = false;
            callback(new Error("Failure from event, Success = false, I am failing!"), 'Destination Function Error Thrown');
        }
    };
    
  5. Choose Save.
  6. To configure Destinations, within the Designer pane, choose Add destination.
    Designer pane
  7. Select the Source as Asynchronous invocation. Select the Condition as On failure or On success, depending on your use case. In this example, I select On Success.
  8. Enter the Amazon Resource Name (ARN) for the Destination SQS queue, SNS topic, Lambda function, or EventBridge event bus. In this example, I use the ARN of an SNS topic I have already configured.
    Add destination
  9. Choose Save. The Destination is added to SNS for On Success.
    Designer
  10. Add another Destination for Failure to Lambda. Within the Designer pane, choose Add destination.
    Add destination
  11. Select the Source as Asynchronous invocation, the Condition as On failure and Enter a Destination Lambda function ARN, then choose Save.
    Enter a Destination Lambda function ARN, and choose Save
  12. The Destination is added to Lambda for On Failure.
    7. The Destination has been added to Lambda for On Failure.

Success testing

To test invoking the asynchronous Lambda function to generate a Success result, use the AWS CLI:

aws lambda invoke --function-name event-destinations --invocation-type Event --payload '{ "Success": true }' response.json

The Lambda function is invoked successfully with a response "StatusCode": 202.

And an SNS notification email is received, showing the invocation details with "condition":"Success" and the requestPayload.

{
	"version": "1.0",
	"timestamp": "2019-11-24T23:08:25.651Z",
	"requestContext": {
		"requestId": "c2a6f2ae-7dbb-4d22-8782-d0485c9877e2",
		"functionArn": "arn:aws:lambda:sa-east-1:123456789123:function:event-destinations:$LATEST",
		"condition": "Success",
		"approximateInvokeCount": 1
	},
	"requestPayload": {
		"Success": true
	},
	"responseContext": {
		"statusCode": 200,
		"executedVersion": "$LATEST"
	},
	"responsePayload": null
}

Failure testing

The Lambda function can be set to Failure by throwing an exception within the code. To test invoking the asynchronous Lambda function to generate a Failure result, use the AWS CLI:

aws lambda invoke --function-name event-destinations --invocation-type Event --payload '{ "Success": false }' response.json

The Lambda function is executed and reports a successful invoke on the Lambda processing queue. If Lambda is not able to add the event to the queue, the error message appears in the command output.

However, due to the exception error within the code, the function invocation will fail. Destinations then routes the invoke failure to the configured destination Lambda function. You can see the failed function invocation information in the Amazon CloudWatch Logs for the Destination function including "condition": "RetriesExhausted", along with the requestPayload, errorMessage, and stackTrace.

2019-11-24T21:52:47.855Z	d123456-c0dd-4871-a123-a356cb1b3ba6	EVENT
{
    "version": "1.0",
    "timestamp": "2019-11-24T21:52:47.333Z",
    "requestContext": {
        "requestId": "8ea123e4-1db7-4aca-ad10-d9ca1234c1fd",
        "functionArn": "arn:aws:lambda:sa-east-1:123456678912:function:event-destinations:$LATEST",
        "condition": "RetriesExhausted",
        "approximateInvokeCount": 3
    },
    "requestPayload": {
        "Success": false
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Handled"
    },
    "responsePayload": {
        "errorMessage": "Failure from event, Success = false, I am failing!",
        "errorType": "Error",
        "stackTrace": [ "exports.handler (/var/task/index.js:18:18)" ]
    }
}

Destination-specific JSON format

  • For SNS/SQS, the JSON object is passed as the Message to the destination.
  • For Lambda, the JSON is passed as the payload to the function. The destination function cannot be the same as the source function. For example, if LambdaA has a Destination configuration attached for Success, LambdaA is not a valid destination ARN. This prevents recursive functions.
  • For EventBridge, the JSON is passed as the Detail in the PutEvents call. The source is lambda, and detail type is either Lambda Function Invocation Result - Success or Lambda Function Invocation Result – Failure. The resource fields contain the function and destination ARNs.

AWS CloudFormation configuration

Destinations CloudFormation configuration is created via the following YAML.

Resources: 
  EventInvokeConfig:
    Type: AWS::Lambda::EventInvokeConfig
    Properties:
        FunctionName: “YourLambdaFunctionWithEventInvokeConfig”
        Qualifier: "$LATEST"
        MaximumEventAgeInSeconds: 600
        MaximumRetryAttempts: 0
        DestinationConfig:
            OnSuccess:
                Destination: “arn:aws:sns:us-east-1:123456789012:YourSNSTopicOnSuccess”
            OnFailure:
                Destination: “arn:aws:lambda:us-east-1:123456789012:function:YourLambdaFunctionOnFailure”

Conclusion

AWS Lambda Destinations gives you more visibility and control of function execution results. This helps you build better event-driven applications, reducing code, and using Lambda’s native failure handling controls.

There are no additional costs for enabling Lambda Destinations. However, calls made to destination target services may be charged.

To learn more, see Lambda Destinations in the AWS Lambda Developer Guide.

Application integration patterns for microservices: Fan-out strategies

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/application-integration-patterns-for-microservices-fan-out-strategies/

This post is courtesy of Dirk Fröhner

The first blog in this series introduced asynchronous messaging for building loosely coupled systems that can scale, operate, and evolve individually. It considered messaging as a communications model for microservices architectures. This post covers concrete architectural considerations, focusing on the messaging architecture.

Wild Rydes

Wild Rydes is a fictional technology start-up. You may have heard of it – it disrupts individual transportation by replacing traditional taxis with unicorns. We use the Wild Rydes storyline in several hands-on AWS workshops. It illustrates concepts such as serverless development, event-driven design, API management, and messaging in microservices.

This blog post explores the decision-making process in building the Wild Rydes workshop, with a goal of helping you apply these concepts to your applications.

In the workshop, a customer requests a ‘unicorn’ ride using the Wild Rydes customer application. Registered unicorn drivers can use the application to manage their rides. Unicorn drivers submit a ride completion message after they have successfully delivered a customer to their destination.

Wild Rydes app image

Submit a ride completion

API exposed by the unicorn management service

At Wild Rydes, end-user clients are implemented as mobile applications and communicate via REST APIs (also known as hypermedia APIs) with the backend services.

For this use case, the application interacts with the API exposed by the unicorn management service. It uses the submit-ride-completion resource that it discovered from the API’s home document to send the relevant details of a ride to the backend. In response, the backend persists these details, creates a new completed-ride resource. This returns the respective status code, the location, and a representation of the new resource to the client. The API details are shown below.

Request from client to submit the details of a completed ride:

POST /<submit-ride-completion-resource-path> HTTP/1.1
Content-Type: application/json;charset=UTF-8
...

{
    "from": "...",
    "to": "...",
    "duration": "...",
    "distance": "...",
    "customer": "...",
    "fare": "..."
}

Response from the unicorn management service:

HTTP/1.1 201 Created
Date: Sat, 31 Aug 2019 12:00:00 GMT
Location: <url-of-newly-created-completed-ride-resource>
Content-Location: <url-of-newly-created-completed-ride-resource>
Content-Type: application/json;charset=UTF-8
...

{
    "links": {
        "self": {
            "href": "https://..."
        }
    },
    <completed-ride-resource-representation-properties>
}

Schematic architecture for the use case

The schematic architecture for the use case is shown in diagram 1 below:

Diagram 1: Mutliple microservices need information about ride completion

Diagram 1: Multiple microservices need information about ride completion

There are other microservices in Wild Rydes that are also interested in a new completed ride. The examples from the diagram are:

  • Customer notification service: customers should receive a notification in the app about their latest completed ride.
  • Customer accounting service: After all, Wild Rydes is a business, so this service is responsible for collecting the fare from the customer.
  • Customer loyalty service: Everybody wants to collect miles and would like to receive benefits for being a loyal customer.
  • Data lake ingestion service: Wild Rydes is a data-driven company and they want to ingest all data generated from any process into their data lake for arbitrary analytics.
  • Extraordinary rides service: This special service is interested in rides with fares or distances above certain thresholds for preparing insights for business managers.

Based on this scenario, let’s review the integration options.

Integration options

Integration via database

The unicorn management service stores the details of a completed ride in a database. It could share the database with the other services directly, but that creates tight coupling. Sharing the database also restricts your flexibility to scale and evolve your services.

Integration via REST APIs

What about using REST APIs for the integration? The HTTP-based implementation of the REST architectural style uses the distributed architecture concepts of the web. However, what does this mean for the implementation?

Diagram 2: Using REST APIs to communicate to microservices

Diagram 2: Using REST APIs to communicate to microservices

As shown in diagram 2 above:

  • Effectively, all interested services on the right-hand side would have to expose an API resource. These would be called by the unicorn management service for each newly completed ride.
  • To enable elasticity behind a single resource URL, you may need a load balancer in front of each interested service.
  • The unicorn management service would have to know about all these interested services and their respective APIs. Hopefully, each service uses a streamlined API resource.
  • Lastly, the unicorn management service must store, retry, and track all request attempts in case an interested service is not available. This ensures durability so we don’t lose any of these notifications.

One approach is to manage a recipient list in the unicorn management service. This adds additional complexity to the unicorn management service and coupling on both sides. Although there are self-registration and discovery approaches, managing a recipient list is not the core use case of the unicorn management service.

Diagram 3: Using a separate service to manage the fan-out to other services

Diagram 3: Using a separate service to manage the fan-out to other services

A better approach would be to externalize the recipient list into a separate Request Distribution Service, as diagram 3 shows. This decouples both sides, but binds each side to the new service. Still, the unicorn management service is still responsible for the delivery of the ride data to all the recipients. Again, this heavy lifting is not the main task of this service.

Diagram 4: Filtering information for extraordinary rides

Diagram 4: Filtering information for extraordinary rides

In diagram 4, the information filtering for the Extraordinary Rides Service is self-managed. This means that there is code on one side to either not send or to discard irrelevant ride data.

For this use case, integration via REST APIs potentially adds coupling to the services. And it adds heavy lifting to the services that is beyond their actual domain.

Integration via messaging

A third option could use messaging for the integration.

Publish-subscribe pattern

Both Amazon SNS and Amazon EventBridge can be used to implement the publish-subscribe pattern.  In this use case, we recommend Amazon SNS, which scales to support high throughput and fan-out applications. Amazon EventBridge includes direct integrations with software as a service (SaaS) applications and other AWS services. It’s ideal for publish-subscribe use cases involving these types of integrations.

Diagram 5: Using Amazon SNS to implement a publish-subscribe pattern

Diagram 5: Using Amazon SNS to implement a publish-subscribe pattern

Diagram 5 shows an SNS topic called Ride Completion Topic. The unicorn management service can now send the details about a completed ride into that topic. All interested services on the right-hand side can subscribe to this topic.

Using a message topic to publish the details of a completed ride frees us from managing the recipient list, as well as making ensuring reliable delivery of the messages. It also decouples both sides as much as possible. Services on the right-hand side can autonomously subscribe to the topic. The Unicorn Management Service does not know anything about the topic’s subscribers.

Message filter pattern

Looking at the Extraordinary Rides Service, the message filter functionality of Amazon SNS can autonomously and individually discard irrelevant messages. The Extraordinary Rides Service can specify the threshold values for the fare and distance.

Diagram 6: Filtering extraordinary rides using Amazon SNS

Diagram 6: Filtering extraordinary rides using Amazon SNS

Topic-queue-chaining pattern

Consider the publish-subscribe channel between the Unicorn Management Service, and the subscribing services on the right-hand side.

One of the consuming services may go offline for maintenance. Or the code that processes messages from the ride completion topic could run into an exception. These are two examples where a subscriber service could potentially miss topic messages.

A good pattern to apply here is topic-queue-chaining. That means that you add a queue, in our case an SQS queue, between the ride completion topic and each of the subscriber services. As messages are buffered persistently in an SQS queue, it prevents lost messages if a subscriber process run into problems for many hours or days.

Diagram 7: Chaining topics and queues to buffer messages persistently

Diagram 7: Chaining topics and queues to buffer messages persistently

Queues as buffering load balancers

An SQS queue in front of each subscriber service also acts as a buffering load balancer.

Since every message is delivered to one of potentially many consumer processes, you can scale out the subscriber services, and the message load is distributed over the available consumer processes.

As messages are buffered in the queue, they are preserved during a scaling event, such as when you must wait until an additional consumer process becomes operational.

Lastly, these queue characteristics help flatten peak loads for your consumer processes, buffering messages until consumers are available. This allows you to process messages at a pace decoupled from the message source.

Conclusion

The Wild Rydes example shows how messaging can provide decoupling and greater flexibility for your microservices landscape.

In contrast to REST APIs, a messaging system takes care of message delivery outside of your service code. Using a publish-subscribe channel provides simple fan-out capability. And message filters allow for selective message reception without the effort of implementing that logic into your code.

With topic-queue-chaining pattern, you can add queue characteristics to a fan-out scenario so that you can easily scale out on the consumer side, and flatten peak loads.

For a deeper dive into queues and topics and how to use them in your microservices architecture, please use the following resources:

  1. AWS whitepaper: Implementing Microservices on AWS
  2. AWS blog: Implementing enterprise integration patterns with AWS messaging services: point-to-point channels
  3. AWS blog: Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels
  4. AWS blog: Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox

Understanding asynchronous messaging for microservices

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/understanding-asynchronous-messaging-for-microservices/

This post is courtesy of Dirk Fröhner

One of the implications of applying the microservices architectural style is that much communication between components happens over the network. After all, your microservices landscape is a distributed system. To achieve the promises of microservices, such as being able to individually scale, operate, and evolve each service, this communication must happen in a loosely coupled and reliable manner.

A common way to loosely couple services is to expose an API following the REST architectural style. REST APIs are based on the architecture of the web and provide loose coupling between communicating parties. REST APIs offer a great way to decouple interfaces from concrete implementations, and to advise clients about what they can do next, by the use of links and link relations.

While REST APIs are common and useful in microservices design, REST APIs tend to be designed with synchronous communications, where a response is required. A request coming from an end-user client can trigger a complex communications path within your services landscape, which can effectively add coupling between the services at runtime. After all, this is why there are mitigation patterns like circuit-breaker in the first place. REST APIs can also add some heavy lifting to your infrastructure that we will discuss further below.

Asynchronous messaging

If loose-coupling is important, especially in a system that requires high resilience and has unpredictable scale, another option is asynchronous messaging.

Asynchronous messaging is a fundamental approach for integrating independent systems, or building up a set of loosely coupled systems that can operate, scale, and evolve independently and flexibly. As our colleague Tim Bray said, “If your application is cloud-native, or large-scale, or distributed, and doesn’t include a messaging component, that’s probably a bug.” In this blog post, we will outline some fundamental benefits of asynchronous messaging for the communications between microservices.

For a refresher on the fundamental messaging patterns and their implementations with Amazon SQS, Amazon SNS, and Amazon MQ, please read our previous blog posts

For a summary of the semantics of queues and topics:

  • A queue is like a buffer. You can put messages into a queue, and you can retrieve messages from a queue. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue.
  • A topic is like a broadcasting station. You can publish messages to a topic, and anyone interested in these messages can subscribe to the topic. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern).

Use-case

Consider a typical scenario illustrated in the diagram below. An end-user client (EUC) addresses an API resource of one of our services, through Amazon API Gateway in this example. From there, the request can potentially follow a path across the microservices landscape to get completely processed.

To provide the final result, there will be potentially cascading subsequent requests sent between other microservices. This example illustrates the complexity involved in processing a single end user request.

End User Client accessing a service using an API

Diagram 1: End-User Client accessing a service using an API

End-user clients (EUCs) often communicate with services via REST APIs in a synchronous manner. However, the communication can also be designed using an asynchronous approach. For instance, if an EUC submits a request that takes some time to process, the respective API resource can respond with HTTP status 202 Accepted, and a link to a resource that provides the current processing status. Downstream, the communication between the service that receives that request, and other services that are involved in processing the request, can happen asynchronously using messaging services.

There are situations where a communications model using asynchronous messaging can make your life easier than using REST APIs.

Infrastructure complexity

Start with looking at the infrastructure complexity for the backends of your services. Depending on your implementation paradigm, you have to include different components in your infrastructure that you don’t have to deal with when using messaging.

Imagine your services each expose a REST API. Typically, this means you add a load balancer in front of your compute layer, and your backend implementation includes an HTTP server. It is usually a good idea to decouple your services APIs from their concrete implementations, so you could also consider adding Amazon API Gateway in front of your load balancer.

For a serverless approach, you don’t need to worry about load balancing and scaling out infrastructure. Amazon API Gateway with AWS Lambda integration provides a fully managed solution for removing complexities around infrastructure management.

Using Amazon SQS as a cloud-native messaging service for queues, you don’t employ any of the above mentioned components. As described in a prior post, an SQS queue can act as a load balancer in itself. The consumers, or target services, don’t need an HTTP server, but simply ask a queue for available messages. If you use AWS Lambda for your consumers, this process is even simpler, as the Lambda functions are automatically invoked when messages appear in an SQS queue. See Using AWS Lambda with Amazon SQS to learn more.

The same applies to Serverless architectures implementing a publish/subscribe pattern. Lambda function executions can be directly triggered by SNS messages. Without AWS Lambda, you need load balancers and web servers in your backend implementations to receive SNS notifications, as those are injected via web hooks into your services. SNS also provides the fan-out functionality that you would otherwise have to build using an intermediary component to implement a recipient list of subscribers.

Reliability, resilience

For synchronous systems, if a service crashes while it processes the payload of an API request, the information is lost. A good way to prevent this on a microservice is to explicitly persist an incoming request immediately after receiving it. Then process and reprocess, until the request is finally marked as resolved.

This approach requires additional work, and it requires the microservice to not crash while persisting an incoming API request. The microservice sending a request must also resend if the target service doesn’t acknowledge receipt. For example, it doesn’t respond with a successful HTTP status code, or the connection drops.

When sending messages to a queue, this additional work is addressed by the messaging infrastructure. A message will remain in a queue unless a consumer explicitly states that processing is finished by acknowledging the message reception. As long as message reception is not acknowledged by a consumer, it will stay in the queue. Messages can be retained in an SQS queue for a maximum of 14 days.

Scale out latency

Under increased load, your services must scale out to process the requests. You must then consider scale-out latency, which may be managed for you with serverless implementations. It takes a few moments from when an Auto Scaling group triggers the launch of additional instances until these are ready to operate. Also launching new container tasks takes time. When your scaling threshold is not optimal and the scaling event occurs late, your available resources may be unable to serve all incoming requests. These requests may be lost or answered with HTTP status code 5xx.

Using message queues that buffer messages during a scaling event help prevent this. Even in use cases where the EUC is waiting for an immediate response, this is the more reliable architecture. If your infrastructure needs time to scale out and you are not able to process all requests in time, the requests are persisted.

When messaging is your only choice

What happens when your services must respond to peak loads at scale?

For many applications, the scale-out latency, including load balancer pre-warming, will eventually become too large to handle steeply ascending loads fast enough. With a serverless architecture, exposing your Lambda functions with API Gateway can handle steeply ascending loads. But you must still consider downstream systems, which may be easily overwhelmed.

In these scenarios, where rapid scaling without overwhelming downstream systems is important, messaging may be your best choice. Message queues help protect your downstream services by buffering incoming payloads for consumption at the pace of the consuming service. This helps not only for the communications between microservices, but also when peak loads flood your client-facing API. Often, the most important goal is to accept an incoming request, while the actual processing of that request can happen later. You decouple these steps from each other by using queues.

Serverless messaging systems like Amazon SQS and Amazon SNS can respond quickly to support high scale. These are often the best solution when scale is unpredictable.  While the instance-based messaging system, Amazon MQ, provides compatibility with open standards, it requires manual scaling for large workloads, unlike serverless messaging services.

Conclusion

We hope you got some inspiration to also employ asynchronous messaging for your microservices communications architecture. In blog XYZ we provide concrete examples of these patterns. For more information, feel free to consume the following resources:

  1. AWS whitepaper: Implementing Microservices on AWS
  2. AWS blog: Implementing enterprise integration patterns with AWS messaging services: point-to-point channels
  3. AWS blog: Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels
  4. AWS blog: Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox

Read the next blog in the series,  Application Integration Patterns for Microservices: Fan-out Strategies.

Designing durable serverless apps with DLQs for Amazon SNS, Amazon SQS, AWS Lambda

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/designing-durable-serverless-apps-with-dlqs-for-amazon-sns-amazon-sqs-aws-lambda/

This post is courtesy of Otavio Ferreira, Sr Manager, SNS.

In a postal system, a dead-letter office is a facility for processing undeliverable mail. In pub/sub messaging, a dead-letter queue (DLQ) is a queue to which messages published to a topic can be sent, in case those messages cannot be delivered to a subscribed endpoint.

Amazon SNS supports DLQs, making your applications more resilient and durable upon delivery failure modes.

Understanding message delivery failures and retries

The delivery of a message fails when it’s not possible for Amazon SNS to access the subscribed endpoint. There are two reasons why this might happen:

  • Client errors, where the client is SNS (the message sender).
  • Server errors, where the server is the system that hosts the subscription endpoint (the message receiver), such as Amazon SQS or AWS Lambda.

Client errors

Client errors happen when SNS has stale subscription metadata. One common cause of client errors is when you (the endpoint owner) delete the endpoint. For example, you might delete the SQS queue that is subscribed to your SNS topic, without also deleting the SNS subscription corresponding to the queue. Another common cause is when you change the resource policy attached to your endpoint in a way that prevents SNS from delivering messages to that endpoint.

These errors are considered client errors because the client has attempted the delivery of a message to a destination that, from the client’s perspective, is no longer accessible. SNS does not retry the delivery of messages that failed as the result of client errors.

Server errors

Server errors happen when the system that powers the subscribed endpoint is unavailable, or when it returns an exception response indicating that it failed to process a valid request from SNS.

When server errors occur, SNS retries the failed deliveries according to a backoff function, which can be either linear or exponential. When a server error occurs for an AWS managed endpoint, backed by either SQS or Lambda, then SNS retries the delivery for up to 100,015 times, over 23 days.

Server errors can also happen with customer managed endpoints, namely HTTP, SMS, email, and mobile push endpoints. SNS also retries the delivery for these types of endpoints. HTTP endpoints support customer-defined retry policies, while SNS sets an internal delivery retry policy for SMS, email, and mobile push endpoints to 50 times, over 6 hours.

Delivery retries

SNS may receive a client error, or continue to receive a server error for a message beyond the number of retries defined by the corresponding retry policy. In that event, SNS discards the message. Setting a DLQ to your SNS subscription enables you to keep this message, regardless of the type of error, either client or server. DLQs give you more control over messages that cannot be delivered.

For more information on the delivery retry policy for each delivery protocol supported by SNS, see Amazon SNS Message Delivery Retry.

Using DLQs for AWS services

SNS, SQS, and Lambda support DLQs, addressing different failure modes. All DLQs are regular queues powered by SQS.

In SNS, DLQs store the messages that failed to be delivered to subscribed endpoints. For more information, see Amazon SNS Dead-Letter Queues.

In SQS, DLQs store the messages that failed to be processed by your consumer application. This failure mode can happen when producers and consumers fail to interpret aspects of the protocol that they use to communicate. In that case, the consumer receives the message from the queue, but fails to process it, as the message doesn’t have the structure or content that the consumer expects. The consumer can’t delete the message from the queue either. After exhausting the receive count in the redrive policy, SQS can sideline the message to the DLQ. For more information, see Amazon SQS Dead-Letter Queues.

In Lambda, DLQs store the messages that resulted in failed asynchronous executions of your Lambda function. An execution can result in an error for several reasons. Your code might raise an exception, time out, or run out of memory. The runtime executing your code might encounter an error and stop. Your function might hit its concurrency limit and be throttled. Regardless of the error type, when the error occurs, your code might have run completely, partially, or not at all. By default, Lambda retries an asynchronous execution twice. After exhausting the retries, Lambda can sideline the message to the DQL. For more information, see AWS Lambda Dead-Letter Queues.

When you have a fan-out architecture, with SQS queues and Lambda functions subscribed to an SNS topic, we recommend that you set DLQs to your SNS subscriptions, and to your destination queues and functions as well. This approach gives your application resilience against message delivery failures, message processing failures, and function execution failures too.

Applying DLQs in a use case

Here’s how everything comes together. The following diagram shows a serverless backend architecture that supports a car rental application. This is a durable serverless architecture based on DLQs for SNS, SQS, and Lambda.

Dead Letter Queue - DLQ SNS use case with architecture diagram

When a customer places an order to rent a car, the application sends that request to an API, which is powered by Amazon API Gateway. The REST API is backed by an SNS topic named Rental-Orders, and deployed onto an Amazon VPC subnet. The topic then fans out that order to the following two subscribed endpoints, for parallel processing:

  • An SQS queue, named Rental-Fulfilment, which feeds the integration with an internal fulfilment system hosted on Amazon EC2.
  • A Lambda function, named Rental-Billing, which processes and loads the customer order into a third-party billing system, also hosted on Amazon EC2.

To increase the durability of this serverless backend API, the following DLQs have been set up:

  • Two SNS DLQs, namely Rental-Fulfilment-Fanout-DLQ and Rental-Billing-Fanout-DLQ, which store the order in case either the subscribed SQS queue or Lambda function ever becomes unreachable.
  • An SQS DLQ, named Rental-Fulfilment-DLQ, which stores the order when the fulfilment system fails to process the order.
  • A Lambda DLQ, named Rental-Billing-DLQ, which stores the order when the function fails to process and load the order into the billing system.

When the DLQ captures the message, you can inspect the message for troubleshooting purposes. After you address the error at hand, you can poll the DLQ to retry the processing of the message.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or AWS CloudFormation. You can use the SDK, CLI, and API for polling the DLQs as well.

Configuring DLQs for subscriptions

You can attach a DLQ to an SNS subscription by setting the subscription’s RedrivePolicy parameter. The policy is a JSON object that refers to the DLQ ARN. The ARN must point to an SQS queue in the same AWS account as that of the SNS subscription. Also, both the DLQ and the subscription must be in the same AWS Region.

Here’s how you can configure one of the SNS DLQs applied in the car rental application example, presented earlier.

The following JSON object is a CloudFormation template that subscribes the SQS queue Rental-Fulfilment to the SNS topic Rental-Orders. The template also sets a RedrivePolicy that targets Rental-Fulfilment-Fanout-DLQ as a DLQ.

Lastly, the template sets a FilterPolicy value. It makes SNS deliver a message to the subscribed queue only if the published message carries an attribute named order-status with value set to either confirmed or canceled. As Amazon SNS Message Filtering happens before message delivery, messages that are filtered out aren’t sent to that subscription’s DLQ.

Internally, the CloudFormation template uses the SNS Subscribe API action for deploying the subscription and setting both policies, all part of the same API request.

{  
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
            "TopicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
            "RedrivePolicy": {
               "deadLetterTargetArn": 
                  "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"
            },
            "FilterPolicy": { 
               "order-status": [ "confirmed", "canceled" ]
            }
         }
      }
   }
}

Maybe the SNS topic and subscription are already deployed. In that case, you can use the SNS SetSubscriptionAttributes API action to set the RedrivePolicy, as shown by the following code examples, based on the AWS CLI and the AWS SDK for Java.

$ aws sns set-subscription-attributes 
   --region us-east-1
   --subscription-arn arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2
   --attribute-name RedrivePolicy 
   --attribute-value '{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"}'
AmazonSNS sns = AmazonSNSClientBuilder.defaultClient();

String subscriptionArn = "arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2";

String redrivePolicy = "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}";

SetSubscriptionAttributesRequest request = new SetSubscriptionAttributesRequest(
  subscriptionArn, 
  "RedrivePolicy", 
  redrivePolicy
);

sns.setSubscriptionAttributes(request);

Monitoring DLQs

You can use Amazon CloudWatch metrics and alarms to monitor the DLQs associated with your SNS subscriptions. In the car rental example, you can monitor the DLQs to be notified when the API failed to distribute any car rental order to the fulfillment or billing systems.

As regular SQS queues, the DLQs in SNS emit a number of metrics to CloudWatch, in 5-minute data points, such as NumberOfMessagesSent, NumberOfMessagesReceived and NumberOfMessagesDeleted. You can use these SQS metrics to be notified upon activity in your DLQs in SNS, so you may trigger a message recovery protocol.

You might have a case where you expect the DLQ to be always empty. In that case, create an CloudWatch alarm on NumberOfMessagesSent, set the alarm threshold to zero, and provide a separate SNS topic to be notified when the alarm goes off. The SNS topic, in its turn, can delivery your alarm notification to any endpoint type that you choose, such as email address, phone number, or mobile pager app.

Additionally, SNS itself provides its own set of metrics that are relevant to DLQs. Specifically, SNS metrics include the following:

  • NumberOfNotificationsRedrivenToDlq – Used when sending the message to the DLQ succeeds.
  • NumberOfNotificationsFailedToRedriveToDlq – Used when sending the message to the DLQ fails. This can happen because the DLQ either doesn’t exist anymore or doesn’t have the required access permissions to allow SNS to send messages to it. For more information about setting up the required access policy, see Giving Permissions for Amazon SNS to Send Messages to Amazon SQS.

Debugging with DLQs

Use CloudWatch Logs to see the exceptions that caused your SNS deliveries to fail and your messages to be sidelined to DLQs. In the car rental example, you can inspect the rental orders in the DLQs, as well as the logs associated with these queues. Then you can understand why those orders failed to be fanned out to the fulfilment or billing systems.

SNS can log both successful and failed deliveries in CloudWatch. You can enable Amazon SNS Delivery Status Logging by setting three SNS topic attributes, which are delivery protocol-specific. As an example, for SNS deliveries to SQS queues, you must set the following topic attributes: SQSSuccessFeedbackRoleArn,  SQSFailureFeedbackRoleArn, and SQSSuccessFeedbackSampleRate.

The following JSON object represents a successful SNS delivery in an CloudWatch Logs entry. The status code logged is 200 (SUCCESS). The attribute RedrivePolicy shows that the SNS subscription in question had its DLQ set.

{
  "notification": {
    "messageMD5Sum": "7bb3327ac55e49485bad42e159ca4d4b",
    "messageId": "e8c2bb09-235c-5f5d-b583-efd8df0f7d74",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:13:55.876"
  },
  "delivery": {
    "deliveryId": "6adf232e-fb12-5062-a564-27ff3741051f",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"sqsRequestId\":\"b2608a46-ccc4-51cc-003d-de972097debc\",\"sqsMessageId\":\"05fecd22-60a1-4d7d-bb79-026d49700b5a\"}",
    "dwellTimeMs": 58,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

The following JSON object represents a failed SNS delivery in CloudWatch Logs. In the following code example, the subscribed queue doesn’t exist. As a client error, the status code logged is 400 (FAILURE). Again, the RedrivePolicy attribute refers to a DLQ.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "9959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:51.116"
  },
  "delivery": {
    "deliveryId": "be743821-4c2c-5acc-a586-6cf0807f6fb1",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"ErrorCode\":\"AWS.SimpleQueueService.NonExistentQueue\", \"ErrorMessage\":\"The specified queue does not exist or you do not have access to it.\",\"sqsRequestId\":\"Unrecoverable\"}",
    "dwellTimeMs": 53,
    "attempts": 1,
    "statusCode": 400
  },
  "status": "FAILURE"
}

When the message delivery fails and there is a DLQ attached to the subscription, the message is sent to the DLQ and an additional entry is logged in CloudWatch. This new entry is specific to the delivery to the DLQ and refers to the DLQ ARN as the destination, as shown in the following JSON object.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "8959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:52.876"
  },
  "delivery": {
    "deliveryId": "a877c79f-a3ee-5105-9bbd-92596eae0232",
    "destination":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ",
    "providerResponse": "{\"sqsRequestId\":\"8cef1af5-e86a-519e-ad36-4f33252aa5ec\",\"sqsMessageId\":\"2b742c5c-0750-4ec5-a717-b95897adda8e\"}",
    "dwellTimeMs": 51,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

By analyzing Amazon CloudWatch Logs entries, you can understand why an SNS message was moved to a DLQ, and then take the required set of steps to recover the message. When you enable delivery status logging in SNS, you can configure the sample rate in which deliveries are logged, from 0% to 100%.

Encrypting DLQs

When your SNS subscription targets an SQS encrypted queue, then you probably want your DLQ to be an SQS encrypted queue as well. This configuration provides consistency in the form that your messages are encrypted at rest.

To follow this security recommendation, give the CMK you used to encrypt your DLQ a key policy that grants the SNS service principal access to AWS KMS API actions. For example, see the following sample key policy:

{
    "Sid": "GrantSnsAccessToKms",
    "Effect": "Allow",
    "Principal": { "Service": "sns.amazonaws.com" },
    "Action": [ "kms:Decrypt", "kms:GenerateDataKey*" ],
    "Resource": "*"
}

If you have an SNS encrypted topic, but a subscription in this topic points to a DLQ that isn’t an SQS encrypted queue, then messages sidelined to the DLQ aren’t encrypted at rest.

For more information, see Enabling Server-Side Encryption (SSE) for an Amazon SNS Topic with an Amazon SQS Encrypted Queue Subscribed.

Summary

DLQs for SNS, SQS, and Lambda increase the resiliency and durability of your applications. These DLQs address different failure modes, and can be used together.

  • SNS DLQs store messages that failed to be delivered to subscribed endpoints.
  • SQS DLQs store messages that the consumer system failed to process.
  • Lambda DLQs store the messages that resulted in failed asynchronous executions of your functions.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or CloudFormation. DLQs are available in all AWS Regions. Start today by running the tutorials:

Sending Push Notifications to iOS 13 Devices with Amazon SNS

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/sending-push-notifications-to-ios-13-devices-with-amazon-sns/

Note: This post was written by Alan Chuang, a Senior Product Manager on the AWS Messaging team.


On September 19, 2019, Apple released iOS 13. This update introduced changes to the Apple Push Notification service (APNs) that can impact your existing workloads. Amazon SNS has made some changes that ensure uninterrupted delivery of push notifications to iOS 13 devices.

iOS 13 introduced a new and required header called apns-push-type. The value of this header informs APNs of the contents of your notification’s payload so that APNs can respond to the message appropriately. The possible values for this header are:

  • alert
  • background
  • voip
  • complication
  • fileprovider
  • mdm

Apple’s documentation indicates that the value of this header “must accurately reflect the contents of your notification’s payload. If there is a mismatch, or if the header is missing on required systems, APNs may return an error, delay the delivery of the notification, or drop it altogether.”

We’ve made some changes to the Amazon SNS API that make it easier for developers to handle this breaking change. When you send push notifications of the alert or background type, Amazon SNS automatically sets the apns-push-type header to the appropriate value for your message. For more information about creating an alert type and a background type notification, see Generating a Remote Notification and Pushing Background Updates to Your App on the Apple Developer website.

Because you might not have enough time to react to this breaking change, Amazon SNS provides two options:

  • If you want to set the apns-push-type header value yourself, or the contents of your notification’s payload require the header value to be set to voip, complication, fileprovider, or mdm, Amazon SNS lets you set the header value as a message attribute using the Amazon SNS Publish API action. For more information, see Specifying Custom APNs Header Values and Reserved Message Attributes for Mobile Push Notifications in the Amazon SNS Developer Guide.
  • If you send push notifications as alert or background type, and if the contents of your notification’s payload follow the format described in the Apple Developer documentation, then Amazon SNS automatically sets the correct header value. To send a background notification, the format requires the aps dictionary to have only the content-available field set to 1. For more information about creating an alert type and a background type notification, see Generating a Remote Notification and Pushing Background Updates to Your App on the Apple Developer website.

We hope these changes make it easier for you to send messages to your customers who use iOS 13. If you have questions about these changes, please open a ticket with our Customer Support team through the AWS Management Console.

Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/build-and-automate-a-serverless-data-lake-using-an-aws-glue-trigger-for-the-data-catalog-and-etl-jobs/

Today, data is flowing from everywhere, whether it is unstructured data from resources like IoT sensors, application logs, and clickstreams, or structured data from transaction applications, relational databases, and spreadsheets. Data has become a crucial part of every business. This has resulted in a need to maintain a single source of truth and automate the entire pipeline—from data ingestion to transformation and analytics— to extract value from the data quickly.

There is a growing concern over the complexity of data analysis as the data volume, velocity, and variety increases. The concern stems from the number and complexity of steps it takes to get data to a state that is usable by business users. Often data engineering teams spend most of their time on building and optimizing extract, transform, and load (ETL) pipelines. Automating the entire process can reduce the time to value and cost of operations. In this post, we describe how to create a fully automated data cataloging and ETL pipeline to transform your data.

Architecture

In this post, you learn how to build and automate the following architecture.

You build your serverless data lake with Amazon Simple Storage Service (Amazon S3) as the primary data store. Given the scalability and high availability of Amazon S3, it is best suited as the single source of truth for your data.

You can use various techniques to ingest and store data in Amazon S3. For example, you can use Amazon Kinesis Data Firehose to ingest streaming data. You can use AWS Database Migration Service (AWS DMS) to ingest relational data from existing databases. And you can use AWS DataSync to ingest files from an on-premises Network File System (NFS).

Ingested data lands in an Amazon S3 bucket that we refer to as the raw zone. To make that data available, you have to catalog its schema in the AWS Glue Data Catalog. You can do this using an AWS Lambda function invoked by an Amazon S3 trigger to start an AWS Glue crawler that catalogs the data. When the crawler is finished creating the table definition, you invoke a second Lambda function using an Amazon CloudWatch Events rule. This step starts an AWS Glue ETL job to process and output the data into another Amazon S3 bucket that we refer to as the processed zone.

The AWS Glue ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. Monitoring and notification is an integral part of the automation process. So as soon as the ETL job finishes, another CloudWatch rule sends you an email notification using an Amazon Simple Notification Service (Amazon SNS) topic. This notification indicates that your data was successfully processed.

In summary, this pipeline classifies and transforms your data, sending you an email notification upon completion.

Deploy the automated data pipeline using AWS CloudFormation

First, you use AWS CloudFormation templates to create all of the necessary resources. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time.

Launch the AWS CloudFormation template with the following Launch stack button.

Be sure to choose the US East (N. Virginia) Region (us-east-1). Then enter the appropriate stack name, email address, and AWS Glue crawler name to create the Data Catalog. Add the AWS Glue database name to save the metadata tables. Acknowledge the IAM resource creation as shown in the following screenshot, and choose Create.

Note: It is important to enter your valid email address so that you get a notification when the ETL job is finished.

This AWS CloudFormation template creates the following resources in your AWS account:

  • Two Amazon S3 buckets to store both the raw data and processed Parquet data.
  • Two AWS Lambda functions: one to create the AWS Glue Data Catalog and another function to publish topics to Amazon SNS.
  • An Amazon Simple Queue Service (Amazon SQS) queue for maintaining the retry logic.
  • An Amazon SNS topic to inform you that your data has been successfully processed.
  • Two CloudWatch Events rules: one rule on the AWS Glue crawler and another on the AWS Glue ETL job.
  • AWS Identity and Access Management (IAM) roles for accessing AWS Glue, Amazon SNS, Amazon SQS, and Amazon S3.

When the AWS CloudFormation stack is ready, check your email and confirm the SNS subscription. Choose the Resources tab and find the details.

Follow these steps to verify your email subscription so that you receive an email alert as soon as your ETL job finishes.

  1. On the Amazon SNS console, in the navigation pane, choose Topics. An SNS topic named SNSProcessedEvent appears in the display.

  1. Choose the ARN The topic details page appears, listing the email subscription as Pending confirmation. Be sure to confirm the subscription for your email address as provided in the Endpoint column.

If you don’t see an email address, or the link is showing as not valid in the email, choose the corresponding subscription endpoint. Then choose Request confirmation to confirm your subscription. Be sure to check your email junk folder for the request confirmation link.

Configure an Amazon S3 bucket event trigger

In this section, you configure a trigger on a raw S3 bucket. So when new data lands in the bucket, you trigger GlueTriggerLambda, which was created in the AWS CloudFormation deployment.

To configure notifications:

  1. Open the Amazon S3 console.
  2. Choose the source bucket. In this case, the bucket name contains raws3bucket, for example, <stackname>-raws3bucket-1k331rduk5aph.
  3. Go to the Properties tab, and under Advanced settings, choose Events.

  1. Choose Add notification and configure a notification with the following settings:
  • Name– Enter a name of your choice. In this example, it is crawlerlambdaTrigger.
  • Events– Select the All object create events check box to create the AWS Glue Data Catalog when you upload the file.
  • Send to– Choose Lambda function.
  • Lambda– Choose the Lambda function that was created in the deployment section. Your Lambda function should contain the string GlueTriggerLambda.

See the following screenshot for all the settings. When you’re finished, choose Save.

For more details on configuring events, see How Do I Enable and Configure Event Notifications for an S3 Bucket? in the Amazon S3 Console User Guide.

Download the dataset

For this post, you use a publicly available New York green taxi dataset in CSV format. You upload monthly data to your raw zone and perform automated data cataloging using an AWS Glue crawler. After cataloging, an automated AWS Glue ETL job triggers to transform the monthly green taxi data to Parquet format and store it in the processed zone.

You can download the raw dataset from the NYC Taxi & Limousine Commission trip record data site. Download the monthly green taxi dataset and upload only one month of data. For example, first upload only the green taxi January 2018 data to the raw S3 bucket.

Automate the Data Catalog with an AWS Glue crawler

One of the important aspects of a modern data lake is to catalog the available data so that it’s easily discoverable. To run ETL jobs or ad hoc queries against your data lake, you must first determine the schema of the data along with other metadata information like location, format, and size. An AWS Glue crawler makes this process easy.

After you upload the data into the raw zone, the Amazon S3 trigger that you created earlier in the post invokes the GlueTriggerLambdafunction. This function creates an AWS Glue Data Catalog that stores metadata information inferred from the data that was crawled.

Open the AWS Glue console. You should see the database, table, and crawler that were created using the AWS CloudFormation template. Your AWS Glue crawler should appear as follows.

Browse to the table using the left navigation, and you will see the table in the database that you created earlier.

Choose the table name, and further explore the metadata discovered by the crawler, as shown following.

You can also view the columns, data types, and other details.  In following screenshot, Glue Crawler has created schema from files available in Amazon S3 by determining column name and respective data type. You can use this schema to create external table.

Author ETL jobs with AWS Glue

AWS Glue provides a managed Apache Spark environment to run your ETL job without maintaining any infrastructure with a pay as you go model.

Open the AWS Glue console and choose Jobs under the ETL section to start authoring an AWS Glue ETL job. Give the job a name of your choice, and note the name because you’ll need it later. Choose the already created IAM role with the name containing <stackname>– GlueLabRole, as shown following. Keep the other default options.

AWS Glue generates the required Python or Scala code, which you can customize as per your data transformation needs. In the Advanced properties section, choose Enable in the Job bookmark list to avoid reprocessing old data.

On the next page, choose your raw Amazon S3 bucket as the data source, and choose Next. On the Data target page, choose the processed Amazon S3 bucket as the data target path, and choose Parquet as the Format.

On the next page, you can make schema changes as required, such as changing column names, dropping ones that you’re less interested in, or even changing data types. AWS Glue generates the ETL code accordingly.

Lastly, review your job parameters, and choose Save Job and Edit Script, as shown following.

On the next page, you can modify the script further as per your data transformation requirements. For this post, you can leave the script as is. In the next section, you automate the execution of this ETL job.

Automate ETL job execution

As the frequency of data ingestion increases, you will want to automate the ETL job to transform the data. Automating this process helps reduce operational overhead and free your data engineering team to focus on more critical tasks.

AWS Glue is optimized for processing data in batches. You can configure it to process data in batches on a set time interval. How often you run a job is determined by how recent the end user expects the data to be and the cost of processing. For information about the different methods, see Triggering Jobs in AWS Glue in the AWS Glue Developer Guide.

First, you need to make one-time changes and configure your ETL job name in the Lambda function and the CloudWatch Events rule. On the console, open the ETLJobLambda Lambda function, which was created using the AWS CloudFormation stack.

Choose the Lambda function link that appears, and explore the code. Change the JobName value to the ETL job name that you created in the previous step, and then choose Save.

As shown in in the following screenshot, you will see an AWS CloudWatch Events rule CrawlerEventRule that is associated with an AWS Lambda function. When the CloudWatch Events rule receives a success status, it triggers the ETLJobLambda Lambda function.

Now you are all set to trigger your AWS Glue ETL job as soon as you upload a file in the raw S3 bucket. Before testing your data pipeline, set up the monitoring and alerts.

Monitoring and notification with Amazon CloudWatch Events

Suppose that you want to receive a notification over email when your AWS Glue ETL job is completed. To achieve that, the CloudWatch Events rule OpsEventRule was deployed from the AWS CloudFormation template in the data pipeline deployment section. This CloudWatch Events rule monitors the status of the AWS Glue ETL job and sends an email notification using an SNS topic upon successful completion of the job.

As the following image shows, you configure your AWS Glue job name in the Event pattern section in CloudWatch. The event triggers an SNS topic configured as a target when the AWS Glue job state changes to SUCCEEDED. This SNS topic sends an email notification to the email address that you provided in the deployment section to receive notification.

Let’s make one-time configuration changes in the CloudWatch Events rule OpsEventRule to capture the status of the AWS Glue ETL job.

  1. Open the CloudWatch console.
  2. In the navigation pane, under Events, choose Rules. Choose the rule name that contains OpsEventRule, as shown following.

  1. In the upper-right corner, choose Actions, Edit.

  1. Replace Your-ETL-jobName with the ETL job name that you created in the previous step.

  1. Scroll down and choose Configure details. Then choose Update rule.

Now that you have set up an entire data pipeline in an automated way with the appropriate notifications and alerts, it’s time to test your pipeline. If you upload new monthly data to the raw Amazon S3 bucket (for example, upload the NY green taxi February 2018 CSV), it triggers the GlueTriggerLambda AWS Lambda function. You can navigate to the AWS Glue console, where you can see that the AWS Glue crawler is running.

Upon completion of the crawler, the CloudWatch Events rule CrawlerEventRule triggers your ETLJobLambda Lambda function. You can notice now that the AWS Glue ETL job is running.

When the ETL job is successful, the CloudWatch Events rule OpsEventRule sends an email notification to you using an Amazon SNS topic, as shown following, hence completing the automation cycle.

Be sure to check your processed Amazon S3 bucket, where you will find transformed data processed by your automated ETL pipeline. Now that the processed data is ready in Amazon S3, you need to run the AWS Glue crawler on this Amazon S3 location. The crawler creates a metadata table with the relevant schema in the AWS Glue Data Catalog.

After the Data Catalog table is created, you can execute standard SQL queries using Amazon Athena and visualize the data using Amazon QuickSight. To learn more, see the blog post Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight

Conclusion

Having an automated serverless data lake architecture lessens the burden of managing data from its source to destination—including discovery, audit, monitoring, and data quality. With an automated data pipeline across organizations, you can identify relevant datasets and extract value much faster than before. The advantage of reducing the time to analysis is that businesses can analyze the data as it becomes available in real time. From the BI tools, queries return results much faster for a single dataset than for multiple databases.

Business analysts can now get their job done faster, and data engineering teams can free themselves from repetitive tasks. You can extend it further by loading your data into a data warehouse like Amazon Redshift or making it available for machine learning via Amazon SageMaker.

Additional resources

See the following resources for more information:

 


About the Author

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Luis Lopez Soria is a partner solutions architect and serverless specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys doing sports in addition to traveling around the world exploring new foods and cultures.

 

 

 

Chirag Oswal is a partner solutions architect and AR/VR specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys video games and travel.

Enriching Event-Driven Architectures with AWS Event Fork Pipelines

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/enriching-event-driven-architectures-with-aws-event-fork-pipelines/

This post is courtesy of Otavio Ferreira, Mgr, Amazon SNS, and James Hood, Sr. Software Dev Engineer

Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.

Customers often fork event processing into pipelines that address common event handling requirements, such as event storage, backup, search, analytics, or replay. To help you build event-driven applications even faster, AWS introduces Event Fork Pipelines, a collection of open-source event handling pipelines that you can subscribe to Amazon SNS topics in your AWS account.

Event Fork Pipelines is a suite of open-source nested applications, based on the AWS Serverless Application Model (AWS SAM). You can deploy it directly from the AWS Serverless Application Repository into your AWS account.

Event Fork Pipelines is built on top of serverless services, including Amazon SNS, Amazon SQS, and AWS Lambda. These services provide serverless building blocks that help you build fully managed, highly available, and scalable event-driven platforms. Lambda enables you to build event-driven microservices as serverless functions. SNS and SQS provide serverless topics and queues for integrating these microservices and other distributed systems in your architecture. These building blocks are at the core of the modern application development best practices.

Surfacing the event fork pattern

At AWS, we’ve worked closely with customers across market segments and geographies on event-driven architectures. For example:

  • Financial platforms that handle events related to bank transactions and stock ticks
  • Retail platforms that trigger checkout and fulfillment events

At scale, event-driven architectures often require a set of supporting services to address common requirements such as system auditability, data discoverability, compliance, business insights, and disaster recovery. Translated to AWS, customers often connect event-driven applications to services such as Amazon S3 for event storage and backup, and to Amazon Elasticsearch Service for event search and analytics. Also, customers often implement an event replay mechanism to recover from failure modes in their applications.

AWS created Event Fork Pipelines to encapsulate these common requirements, reducing the amount of effort required for you to connect your event-driven architectures to these supporting AWS services.

AWS then started sharing this pattern more broadly, so more customers could benefit. At the 2018 AWS re:Invent conference in Las Vegas, Amazon CTO Werner Vogels announced the launch of nested applications in his keynote. Werner shared the Event Fork Pipelines pattern with the audience as an example of common application logic that had been encapsulated as a set of nested applications.

The following reference architecture diagram shows an application supplemented by three nested applications:

Each pipeline is subscribed to the same SNS topic, and can process events in parallel as these events are published to the topic. Each pipeline is independent and can set its own subscription filter policy. That way, it processes only the subset of events that it’s interested in, rather than all events published to the topic.

Amazon SNS Fork pipelines reference architecture

Figure 1 – Reference architecture using Event Fork Pipelines

The three event fork pipelines are placed alongside your regular event processing pipelines, which are potentially already subscribed to your SNS topic. Therefore, you don’t have to change any portion of your current message publisher to take advantage of Event Fork Pipelines in your existing workloads. The following sections describe these pipelines and how to deploy them in your system architecture.

Understanding the catalog of event fork pipelines

In the abstract, Event Fork Pipelines is a serverless design pattern. Concretely, Event Fork Pipelines is also a suite of nested serverless applications, based on AWS SAM. You deploy the nested applications directly from the AWS Serverless Application Repository to your AWS account, to enrich your event-driven platforms. You can deploy them individually in your architecture, as needed.

Here’s more information about each nested application in the Event Fork Pipelines suite.

Event Storage & Backup pipeline

Event Fork Pipeline for Event Storage & Backup

Figure 2 – Event Fork Pipeline for Event Storage & Backup

The preceding diagram shows the Event Storage & Backup pipeline. You can subscribe this pipeline to your SNS topic to automatically back up the events flowing through your system. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that automatically polls for these events in the queue and pushes them into an Amazon Kinesis Data Firehose delivery stream
  • An S3 bucket that durably backs up the events loaded by the stream

You can configure this pipeline to fine-tune the behavior of your delivery stream. For example, you can configure your pipeline so that the underlying delivery stream buffers, transforms, and compresses your events before loading them into the bucket. As events are loaded, you can use Amazon Athena to query the bucket using standard SQL queries. Also, you can configure the pipeline to either reuse an existing S3 bucket or create a new one for you.

Event Search & Analytics pipeline

Event Fork Pipeline for Event Search & Analytics

Figure 3 – Event Fork Pipeline for Event Search & Analytics

The preceding diagram shows the Event Search & Analytics pipeline. You can subscribe this pipeline to your SNS topic to index in a search domain the events flowing through your system, and then run analytics on them. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and pushes them into a Data Firehose delivery stream
  • An Amazon ES domain that indexes the events loaded by the delivery stream
  • An S3 bucket that stores the dead-letter events that couldn’t be indexed in the search domain

You can configure this pipeline to fine-tune your delivery stream in terms of event buffering, transformation and compression. You can also decide whether the pipeline should reuse an existing Amazon ES domain in your AWS account or create a new one for you. As events are indexed in the search domain, you can use Kibana to run analytics on your events and update visual dashboards in real time.

Event Replay pipeline

Event Fork Pipeline for Event Replay

Figure 4 – Event Fork Pipeline for Event Replay

The preceding diagram shows the Event Replay pipeline. You can subscribe this pipeline to your SNS topic to record the events that have been processed by your system for up to 14 days. You can then reprocess them in case your platform is recovering from a failure or a disaster. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and redrives them into your regular event processing pipeline, which is also subscribed to your topic

By default, the replay function is disabled, which means it isn’t redriving your events. If the events need to be reprocessed, your operators must enable the replay function.

Applying event fork pipelines in a use case

This is how everything comes together. The following scenario describes an event-driven, serverless ecommerce application that uses the Event Fork Pipelines pattern. This example ecommerce application is available in AWS Serverless Application Repository. You can deploy it to your AWS account using the Lambda console, test it, and look at its source code in GitHub.

Example ecommerce application using Event Fork Pipelines

Figure 5 – Example e-commerce application using Event Fork Pipelines

The ecommerce application takes orders from buyers through a RESTful API hosted by Amazon API Gateway and backed by a Lambda function named CheckoutFunction. This function publishes all orders received to an SNS topic named CheckoutEventsTopic, which in turn fans out the orders to four different pipelines. The first pipeline is the regular checkout-processing pipeline designed and implemented by you as the ecommerce application owner. This pipeline has the following resources:

  • An SQS queue named CheckoutQueue that buffers all orders received
  • A Lambda function named CheckoutFunction that polls the queue to process these orders
  • An Amazon DynamoDB table named CheckoutTable that securely saves all orders as they’re placed

The components of the system described thus far handle what you might think of as the core business logic. But in addition, you should address the set of elements necessary for making the system resilient, compliant, and searchable:

  • Backing up all orders securely. Compressed backups must be encrypted at rest, with sensitive payment details removed for security and compliance purposes.
  • Searching and running analytics on orders, if the amount is $100 or more. Analytics are needed for key ecommerce metrics, such as average ticket size, average shipping time, most popular products, and preferred payment options.
  • Replaying recent orders. If the fulfillment process is disrupted at any point, you should be able to replay the most recent orders from up to two weeks. This is a key requirement that guarantees the continuity of the ecommerce business.

Rather than implementing all the event processing logic yourself, you can choose to subscribe Event Fork Pipelines to your existing SNS topic CheckoutEventsTopic. The pipelines are configured as follows:

  • The Event Storage & Backup pipeline is configured to transform data as follows:
    • Remove credit card details
    • Buffer data for 60 seconds
    • Compress data using GZIP
    • Encrypt data using the default customer master key (CMK) for S3

This CMK is managed by AWS and powered by AWS Key Management Service (AWS KMS). For more information, see Choosing Amazon S3 for Your Destination, Data Transformation, and Configuration Settings in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Search & Analytics pipeline is configured with:
    • An index retry duration of 30 seconds
    • A bucket for storing orders that failed to be indexed in the search domain
    • A filter policy to restrict the set of orders that are indexed

For more information, see Choosing Amazon ES for Your Destination, in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Replay pipeline is configured with the SQS queue name that is part of the regular checkout processing pipeline. For more information, see Queue Name and URL in the Amazon SQS Developer Guide.

The filter policy, shown in JSON format, is set in the configuration for the Event Search & Analytics pipeline. This filter policy matches only incoming orders in which the total amount is $100 or more. For more information, see Message Filtering in the Amazon SNS Developer Guide.


{

    "amount": [

        { "numeric": [ ">=", 100 ] }

    ]

}

By using the Event Fork Pipelines pattern, you avoid the development overhead associated with coding undifferentiated logic for handling events.

Event Fork Pipelines can be deployed directly from AWS Serverless Application Repository into your AWS account.

Deploying event fork pipelines

Event Fork Pipelines is available as a set of public apps in the AWS Serverless Application Repository (to find the apps, select the ‘Show apps that create custom IAM roles or resource policies’ check box under the search bar). It can be deployed and tested manually via the Lambda console. In a production scenario, we recommend embedding fork pipelines within the AWS SAM template of your overall application. The nested applications feature enables you to do this by adding an AWS::Serverless::Application resource to your AWS SAM template. The resource references the ApplicationId and SemanticVersion values of the application to nest.

For example, you can include the Event Storage & Backup pipeline as a nested application by adding the following YAML snippet to the Resources section of your AWS SAM template:


Backup:

  Type: AWS::Serverless::Application

  Properties:

    Location:

      ApplicationId: arn:aws:serverlessrepo:us-east-1:012345678901:applications/fork-event-storage-backup-pipeline

      SemanticVersion: 1.0.0

    Parameters:

      # SNS topic ARN whose messages should be backed up to the S3 bucket.

      TopicArn: !Ref MySNSTopic

When specifying parameter values, you can use AWS CloudFormation intrinsic functions to reference other resources in your template. In the preceding example, the TopicArn parameter is filled in by referencing an AWS::SNS::Topic called MySNSTopic, defined elsewhere in the AWS SAM template. For more information, see Intrinsic Function Reference in the AWS CloudFormation User Guide.

To copy the YAML required for nesting, in the Lambda console page for an AWS Serverless Application Repository application, choose Copy as SAM Resource.

Authoring new event fork pipelines

We invite you to fork the Event Fork Pipelines repository in GitHub and submit pull requests for contributing with new pipelines. In addition to event storage and backup, event search and analytics, and event replay, what other common event handling requirements have you seen?

We look forward to seeing what you’ll come up with for extending the Event Fork Pipelines suite.

Summary

Event Fork Pipelines is a serverless design pattern and a suite of open-source nested serverless applications, based on AWS SAM. You can deploy it directly from AWS Serverless Application Repository to enrich your event-driven system architecture. Event Fork Pipelines lets you store, back up, replay, search, and run analytics on the events flowing through your system. There’s no need to write code, manually stitch resources together, or set up infrastructure.

You can deploy Event Fork Pipelines in any AWS Region that supports the underlying AWS services used in the pipelines. There are no additional costs associated with Event Fork Pipelines itself, and you pay only for using the AWS resources inside each nested application.

Get started today by deploying the example ecommerce application or searching for Event Fork Pipelines in AWS Serverless Application Repository.

Using AWS Lambda and Amazon SNS to Get File Change Notifications from AWS CodeCommit

Post Syndicated from Eason Cao original https://aws.amazon.com/blogs/devops/using-aws-lambda-and-amazon-sns-to-get-file-change-notifications-from-aws-codecommit/

Notifications are an important part of DevOps workflows. Although you can set them up from any stage in the CI or CD pipelines, in this blog post, I will show you how to integrate AWS Lambda and Amazon SNS to extend AWS CodeCommit. Specifically, the solution described in this post makes it possible for you to receive detailed notifications from Amazon SNS about file changes and commit messages when an update is pushed to AWS CodeCommit.

Amazon SNS is a flexible, fully managed notifications service. It coordinates the delivery of messages to receivers. With Amazon SNS, you can fan out messages to a large number of subscribers, including distributed systems and services, and mobile devices. It is easy to set up, operate, and reliably send notifications to all your endpoints – at any scale.

AWS Lambda is our popular serverless service that lets you run code without provisioning or managing servers. In the example used in this post, I use a Lambda function to publish a topic through Amazon SNS to get an update notification.

Amazon CloudWatch is a monitoring and management service. It can collect operational data of AWS resources in the form of events. You can set up simple rules in Amazon CloudWatch to detect changes to your AWS resources. After CloudWatch captures the update event from your AWS resources, it can trigger specific targets to perform other actions (for example, to invoke a Lambda function).

To help you quickly deploy the solution, I have created an AWS CloudFormation template. AWS CloudFormation is a management tool that provides a common language to describe and provision all of the infrastructure resources in AWS.

 

Overview

The following diagram shows how to use AWS services to receive the CodeCommit file change event and details.

AWS CodeCommit supports several useful CloudWatch events, which can notify you of changes to AWS resources. By setting up simple rules, you can detect branch or repository changes. In this example, I create a CloudWatch event rule for an AWS CodeCommit repository so that any designated event invokes a Lambda function. When a change is made to the CodeCommit repository, CloudWatch detects the event and invokes the customized Lambda function.

When this Lambda function is triggered, the following steps are executed:

  1. Use the GetCommit operation in the CodeCommit API to get the latest commit. I want to compare the parent commit IDs with the last commit.
  2. For each commit, use the GetDifferences operation to get a list of each file that was added, modified, or deleted.
  3. Group the modification information from the comparison result and publish the message template to an SNS topic defined in the Lambda environment variable.
  4. Allow reviewers to subscribe to the SNS topic. Any update message from CodeCommit is published to subscribers.

I’ve used Python and Boto 3 to implement this function. The full source code has been published on GitHub. You can find the example in aws-codecommit-file-change-publisher repository.

 

Getting started

There is an AWS CloudFormation template, codecommit-sns-publisher.yml, in the source code. This template uses the AWS Serverless Application Model to define required components of the CodeCommit notification serverless application in simple and clean syntax.

The template is translated to an AWS CloudFormation stack and deploys an SNS topic, CloudWatch event rule, and Lambda function. The Lambda function code already demonstrates a simple notification use case. You can use the sample code to define your own logic and extend the function by using other APIs provided in the AWS SDK for Python (Boto3).

Prerequisites

Before you deploy this example, you must use the AWS CloudFormation template to create a CodeCommit repository. In this example, I have created an empty repository, sample-repo, in the Ohio (us-east-2) Region to demonstrate a scenario in which your repository has a file change or other update on a CodeCommit branch. If you already have a CodeCommit repository, follow these steps to deploy the template and Lambda function.

To deploy the AWS CloudFormation template and Lambda function

1. Download the source code from the aws-codecommit-file-change-publisher repository.

2. Sign in to the AWS Management Console and choose the AWS Region where your CodeCommit repository is located. Create an S3 bucket and then upload the AWS Lambda deployment package, codecommit-sns-publisher.zip, to it. For information, see How Do I Create an S3 Bucket? in the Amazon S3 Console User Guide.

3. Upload the Lambda deployment package to the S3 bucket.

In this example, I created an S3 bucket named codecommit-sns-publisher in the Ohio (us-east-2) Region and uploaded the deployment package from the Amazon S3 console.

4. In the AWS Management Console, choose CloudFormation. You can also open the AWS CloudFormation console directly at https://console.aws.amazon.com/cloudformation.

5. Choose Create Stack.

6. On the Select Template page, choose Upload a template to Amazon S3, and then choose the codecommit-sns-publisher.yml template.

7. Specify the following parameters:

  • Stack Name: codecommit-sns-publisher (You can use your own stack name, if you prefer.)
  • CodeS3BucketLocation: codecommit-sns-publisher (This is the S3 bucket name where you put the sample code.)
  • CodeS3KeyLocation: codecommit-sns-publisher.zip (This is the key name of the sample code S3 object. The object should be a zip file.)
  • CodeCommitRepo: sample-repo (The name of your CodeCommit repository.)
  • MainBranchName: master (Specify the branch name you would like to use as a trigger for publishing an SNS topic.)
  • NotificationEmailAddress: [email protected] (This is the email address you would like to use to subscribe to the SNS topic. The CloudFormation template creates an SNS topic to publish notifications to subscribers.)

8. Choose Next.

9. On the Review page, under Capabilities, choose the following options:

  • I acknowledge that AWS CloudFormation might create IAM resources.
  • I acknowledge that AWS CloudFormation might create IAM resources with custom names.

10. Under Transforms, choose Create Change Set. AWS CloudFormation starts to perform the template transformation and then creates a change set.

11. After the transformation, choose Execute to create the AWS CloudFormation stack.

After the stack has been created, you should receive an SNS subscription confirmation in your email account:

After you subscribe to the SNS topic, you can go to the AWS CloudFormation console and check the created AWS resources. If you would like to monitor the Lambda function, choose Resource to open the SNSPublisherFunction Lambda function.

Now, you can try to push a commit to the remote AWS CodeCommit repository.

1. Clone the CodeCommit repository to your local computer. For information, see Connect to an AWS CodeCommit Repository in the AWS CodeCommit User Guide. The following example shows how to clone a repository named sample-repo in the US East (Ohio) Region:

git clone ssh://git-codecommit.us-east-2.amazonaws.com/v1/repos/sample-repo

2. Enter the folder and create a plain text file:

cd sample-repo/
echo 'This is a sample file' > newfile

3. Add and commit this file change:

git add newfile
git commit -m 'Create initial file'

Look for this output:

[master (root-commit) 810d192] Create initial file
1 file changed, 1 insertion(+)
create mode 100644 newfile

4. Push the commit to the remote CodeCommit repository:

git push -u origin master:master

Look for this output:

Counting objects: 100% (3/3), done.
Writing objects: 100% (3/3), 235 bytes | 235.00 KiB/s, done.
…
* [new branch]      master -> master
Branch 'master' set up to track remote branch 'master' from 'origin'.

After the local commit has been pushed to the remote CodeCommit repository, the CloudWatch event detects this update. You should see the following notification message in your email account:

Commit ID: <Commit ID>
author: [YourName] ([email protected]) - <Timestamp> +0000
message: Create initial file

File: newfile Addition - Blob ID: <Blob ID>

Summary

In this blog post, I showed you how to use an AWS CloudFormation template to quickly build a sample solution that can help your operations team or development team track updates to a CodeCommit repository.

The example CloudFormation template and Lambda function can be found in the aws-codecommit-file-change-publisher GitHub repository. Using the sample code, you can customize the email content with HTML or add other information to your email message.

If you have questions or other feedback about this example, please open an issue or submit a pull request.

Building Simpler Genomics Workflows on AWS Step Functions

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/building-simpler-genomics-workflows-on-aws-step-functions/

This post is courtesy of Ryan Ulaszek, AWS Genomics Partner Solutions Architect and Aaron Friedman, AWS Healthcare and Life Sciences Partner Solutions Architect

In 2017, we published a four part blog series on how to build a genomics workflow on AWS. In part 1, we introduced a general architecture highlighting three common layers: job, batch and workflow.  In part 2, we described building the job layer with Docker and Amazon Elastic Container Registry (Amazon ECR).  In part 3, we tackled the batch layer and built a batch engine using AWS Batch.  In part 4, we built out the workflow layer using AWS Step Functions and AWS Lambda.

Since then, we’ve worked with many AWS customers and APN partners to implement this solution in genomics as well as in other workloads-of-interest. Today, we wanted to highlight a new feature in Step Functions that simplifies how customers and partners can build high-throughput genomics workflows on AWS.

Step Functions now supports native integration with AWS Batch, which simplifies how you can create an AWS Batch state that submits an asynchronous job and waits for that job to finish.

Before, you needed to build a state machine building block that submitted a job to AWS Batch, and then polled and checked its execution. Now, you can just submit the job to AWS Batch using the new AWS Batch task type.  Step Functions waits to proceed until the job is completed. This reduces the complexity of your state machine and makes it easier to build a genomics workflow with asynchronous AWS Batch steps.

The new integrations include support for the following API actions:

  • AWS Batch SubmitJob
  • Amazon SNS Publish
  • Amazon SQS SendMessage
  • Amazon ECS RunTask
  • AWS Fargate RunTask
  • Amazon DynamoDB
    • PutItem
    • GetItem
    • UpdateItem
    • DeleteItem
  • Amazon SageMaker
    • CreateTrainingJob
    • CreateTransformJob
  • AWS Glue
    • StartJobRun

You can also pass parameters to the service API.  To use the new integrations, the role that you assume when running a state machine needs to have the appropriate permissions.  For more information, see the AWS Step Functions Developer Guide.

Using a job status poller

In our 2017 post series, we created a job poller “pattern” with two separate Lambda functions. When the job finishes, the state machine proceeds to the next step and operates according to the necessary business logic.  This is a useful pattern to manage asynchronous jobs when a direct integration is unavailable.

The steps in this building block state machine are as follows:

  1. A job is submitted through a Lambda function.
  2. The state machine queries the AWS Batch API for the job status in another Lambda function.
  3. The job status is checked to see if the job has completed.  If the job status equals SUCCESS, the final job status is logged. If the job status equals FAILED, the execution of the state machine ends. In all other cases, wait 30 seconds and go back to Step 2.

Both of the Submit Job and Get Job Lambda functions are available as example Lambda functions in the console.  The job status poller is available in the Step Functions console as a sample project.

Here is the JSON representing this state machine.

{
  "Comment": "A simple example that submits a job to AWS Batch",
  "StartAt": "SubmitJob",
  "States": {
    "SubmitJob": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>::function:batchSubmitJob",
      "Next": "GetJobStatus"
    },
    "GetJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>:function:batchGetJobStatus",
      "Next": "CheckJobStatus",
      "InputPath": "$",
      "ResultPath": "$.status"
    },
    "CheckJobStatus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.status",
          "StringEquals": "FAILED",
          "End": true
        },
        {
          "Variable": "$.status",
          "StringEquals": "SUCCEEDED",
          "Next": "GetFinalJobStatus"
        }
      ],
      "Default": "Wait30Seconds"
    },
    "Wait30Seconds": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "GetJobStatus"
    },
    "GetFinalJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>:function:batchGetJobStatus",
      "End": true
    }
  }
}

With Step Functions Service Integrations

With Step Functions service integrations, it is now simpler to submit and wait for an AWS Batch job, or any other supported service.

The following code block is the JSON representing the new state machine for an asynchronous batch job. If you are familiar with the AWS Batch SubmitJob API action, you may notice that the parameters are consistent with what you would see in that API call. You can also use the optional AWS Batch parameters in addition to JobDefinition, JobName, and JobQueue.

{
 "StartAt": "RunBatchJob",
 "States": {
     "RunIsaacJob":{
     "Type":"Task",
     "Resource":"arn:aws:states:::batch:submitJob.sync",
     "Parameters":{
        "JobDefinition":"Isaac",
        "JobName.$":"$.isaac.JobName",
        "JobQueue":"HighPriority",
        "Parameters.$": "$.isaac"
     },
     "TimeoutSeconds": 900,
     "HeartbeatSeconds": 60,
     "Next":"Parallel",
     "InputPath":"$",
     "ResultPath":"$.status",
     "Retry" : [
        {
          "ErrorEquals": [ "States.Timeout" ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 1.5
        }
     ]
  }
}

Here is an example of the workflow input JSON.  Pass all of the container parameters that were being constructed in the submit job Lambda function.

{
  "isaac": {
    "WorkingDir": "/scratch",
    "JobName": "isaac-1",
    "FastQ1S3Path": "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz",
    "BAMS3FolderPath": "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz",
    "FastQ2S3Path": "s3://bccn-genome-data/fastq/NIST7035_R2_trimmed.fastq.gz",
    "ReferenceS3Path": "s3://aws-batch-genomics-resources/reference/isaac/"
  }
}

When you deploy the job definition, add the command attribute that was previously being constructed in the Lambda function launching the AWS Batch job.

IsaacJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Isaac"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3FolderPath: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam"
        FastQ1S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz"
        FastQ2S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/isaac/"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/isaac"
        Vcpus: 32
        Memory: 80000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_folder_path"
          - "Ref::BAMS3FolderPath"
          - "--fastq1_s3_path"
          - "Ref::FastQ1S3Path"
          - "--fastq2_s3_path"
          - "Ref::FastQ2S3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

The key-value parameters passed into the workflow are mapped using Parameters.$ to the values in the job definition using the keys.  Value substitutions do take place. The Docker run looks like the following:

docker run <isaac_container_uri> --bam_s3_folder_path s3://batch-genomics-pipeline-jobresultsbucket-1kzdu216m2b0k/NA12878_states_3/bam
                                 --fastq1_s3_path s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz
                                 --fastq2_s3_path s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz 
                                 --reference_s3_path s3://aws-batch-genomics-resources/reference/isaac/ 
                                 --working_dir /scratch

Genomics workflow: Before and after

Overall, connectors dramatically simplify your genomics workflow.  The following workflow is a simple genomics secondary analysis pipeline, which we highlighted in our original post series.

The first step aligns the sample against a reference genome.  When alignment is complete, variant calling and QA metrics are calculated in two parallel steps.  When variant calling is complete, variant annotation is performed.  Before, our genomics workflow looked like this:

Now it looks like this:

Here is the new workflow JSON:

{
   "Comment":"A simple genomics secondary-analysis workflow",
   "StartAt":"RunIsaacJob",
   "States":{
      "RunIsaacJob":{
         "Type":"Task",
         "Resource":"arn:aws:states:::batch:submitJob.sync",
         "Parameters":{
            "JobDefinition":"Isaac",
            "JobName.$":"$.isaac.JobName",
            "JobQueue":"HighPriority",
            "Parameters.$": "$.isaac"
         },
         "TimeoutSeconds": 900,
         "HeartbeatSeconds": 60,
         "Next":"Parallel",
         "InputPath":"$",
         "ResultPath":"$.status",
         "Retry" : [
            {
              "ErrorEquals": [ "States.Timeout" ],
              "IntervalSeconds": 3,
              "MaxAttempts": 2,
              "BackoffRate": 1.5
            }
         ]
      },
      "Parallel":{
         "Type":"Parallel",
         "Next":"FinalState",
         "Branches":[
            {
               "StartAt":"RunStrelkaJob",
               "States":{
                  "RunStrelkaJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"Strelka",
                        "JobName.$":"$.strelka.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.strelka"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "Next":"RunSnpEffJob",
                     "InputPath":"$",
                     "ResultPath":"$.status",
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ]
                  },
                  "RunSnpEffJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"SNPEff",
                        "JobName.$":"$.snpeff.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.snpeff"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ],
                     "End":true
                  }
               }
            },
            {
               "StartAt":"RunSamtoolsStatsJob",
               "States":{
                  "RunSamtoolsStatsJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"SamtoolsStats",
                        "JobName.$":"$.samtools.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.samtools"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "End":true,
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ]
                  }
               }
            }
         ]
      },
      "FinalState":{
         "Type":"Pass",
         "End":true
      }
   }
}

Here is the new Amazon CloudFormation template for deploying the AWS Batch job definitions for each tool:

AWSTemplateFormatVersion: 2010-09-09

Description: Batch job definitions for batch genomics

Parameters:
  RoleStackName:
    Description: "Stack that deploys roles for genomic workflow"
    Type: String
  VPCStackName:
    Description: "Stack that deploys vps for genomic workflow"
    Type: String
  JobResultsBucket:
    Description: "Bucket that holds workflow job results"
    Type: String

Resources:
  IsaacJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Isaac"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3FolderPath: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam"
        FastQ1S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz"
        FastQ2S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/isaac/"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/isaac"
        Vcpus: 32
        Memory: 80000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_folder_path"
          - "Ref::BAMS3FolderPath"
          - "--fastq1_s3_path"
          - "Ref::FastQ1S3Path"
          - "--fastq2_s3_path"
          - "Ref::FastQ2S3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  StrelkaJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Strelka"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam"
        BAIS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam.bai"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa"
        ReferenceIndexS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa.fai"
        VCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/strelka"
        Vcpus: 32
        Memory: 32000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_path"
          - "Ref::BAMS3Path"
          - "--bai_s3_path"
          - "Ref::BAIS3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--reference_index_s3_path"
          - "Ref::ReferenceIndexS3Path"
          - "--vcf_s3_path"
          - "Ref::VCFS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  SnpEffJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "SNPEff"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        VCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf/variants/genome.vcf.gz"
        AnnotatedVCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf/genome.anno.vcf"
        CommandArgs: " -t hg38 "
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/snpeff"
        Vcpus: 4
        Memory: 10000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--annotated_vcf_s3_path"
          - "Ref::AnnotatedVCFS3Path"
          - "--vcf_s3_path"
          - "Ref::VCFS3Path"
          - "--cmd_args"
          - "Ref::CommandArgs"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  SamtoolsStatsJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "SamtoolsStats"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa"
        BAMS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam"
        BAMStatsS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam.stats"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/samtools-stats"
        Vcpus: 4
        Memory: 10000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_path"
          - "Ref::BAMS3Path"
          - "--bam_stats_s3_path"
          - "Ref::BAMStatsS3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

Here is the new CloudFormation script that deploys the new workflow:

AWSTemplateFormatVersion: 2010-09-09

Description: State Machine for batch benomics

Parameters:
  RoleStackName:
    Description: "Stack that deploys roles for genomic workflow"
    Type: String
  VPCStackName:
    Description: "Stack that deploys vps for genomic workflow"
    Type: String

Resources:
  # S3
  GenomicWorkflow:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      RoleArn:
        Fn::ImportValue: !Sub "${RoleStackName}:StatesExecutionRole"
      DefinitionString: !Sub |-
        {
           "Comment":"A simple example that submits a job to AWS Batch",
           "StartAt":"RunIsaacJob",
           "States":{
              "RunIsaacJob":{
                 "Type":"Task",
                 "Resource":"arn:aws:states:::batch:submitJob.sync",
                 "Parameters":{
                    "JobDefinition":"Isaac",
                    "JobName.$":"$.isaac.JobName",
                    "JobQueue":"HighPriority",
                    "Parameters.$": "$.isaac"
                 },
                 "TimeoutSeconds": 900,
                 "HeartbeatSeconds": 60,
                 "Next":"Parallel",
                 "InputPath":"$",
                 "ResultPath":"$.status",
                 "Retry" : [
                    {
                      "ErrorEquals": [ "States.Timeout" ],
                      "IntervalSeconds": 3,
                      "MaxAttempts": 2,
                      "BackoffRate": 1.5
                    }
                 ]
              },
              "Parallel":{
                 "Type":"Parallel",
                 "Next":"FinalState",
                 "Branches":[
                    {
                       "StartAt":"RunStrelkaJob",
                       "States":{
                          "RunStrelkaJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"Strelka",
                                "JobName.$":"$.strelka.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.strelka"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "Next":"RunSnpEffJob",
                             "InputPath":"$",
                             "ResultPath":"$.status",
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ]
                          },
                          "RunSnpEffJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"SNPEff",
                                "JobName.$":"$.snpeff.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.snpeff"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ],
                             "End":true
                          }
                       }
                    },
                    {
                       "StartAt":"RunSamtoolsStatsJob",
                       "States":{
                          "RunSamtoolsStatsJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"SamtoolsStats",
                                "JobName.$":"$.samtools.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.samtools"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "End":true,
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ]
                          }
                       }
                    }
                 ]
              },
              "FinalState":{
                 "Type":"Pass",
                 "End":true
              }
           }
        }

Outputs:
  GenomicsWorkflowArn:
    Description: GenomicWorkflow ARN
    Value: !Ref GenomicWorkflow
  StackName:
    Description: StackName
    Value: !Sub ${AWS::StackName}

Conclusion

AWS Step Functions service integrations are a great way to simplify creating complex workflows with asynchronous steps. While we highlighted the use case with AWS Batch today, there are many other ways that healthcare and life sciences customers can use this new feature, such as with message processing.

For more information about how AWS can enable your genomics workloads, be sure to check out the AWS Genomics page.

We’ve updated the open-source project to take advantage of the new AWS Batch integration in Step Functions.  You can find the changes aws-batch-genomics/tree/v2.0.0 folder.

Original posts in this four-part series:

Happy coding!

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Encrypting messages published to Amazon SNS with AWS KMS

Post Syndicated from Michelle Mercier original https://aws.amazon.com/blogs/compute/encrypting-messages-published-to-amazon-sns-with-aws-kms/

Amazon Simple Notification Service (Amazon SNS) is a fully managed pub/sub messaging service for decoupling event-driven microservices, distributed systems, and serverless applications. Enterprises around the world use Amazon SNS to support applications that handle private and sensitive data. Many of these enterprises operate in regulated markets, and their systems are subject to stringent security and compliance standards, such as HIPAA for healthcare, PCI DSS for finance, and FedRAMP for government. To address the requirements of highly critical workloads, Amazon SNS provides message encryption in transit, based on Amazon Trust Services (ATS) certificates, as well as message encryption at rest, using AWS Key Management Service (AWS KMS) keys. 

Message encryption in transit

The Amazon SNS API is served through Secure HTTP (HTTPS) and encrypts all messages in transit with Transport Layer Security (TLS) certificates issued by ATS. These certificates verify the identity of the Amazon SNS API server whenever an encrypted connection is established. A certificate authority (CA) issues the certificate to a specific domain. Thus, when a domain presents a certificate issued by a trusted CA, the API client can determine that it’s safe to establish a connection.

Message encryption at rest

Amazon SNS supports encrypted topics. When you publish messages to encrypted topics, Amazon SNS uses customer master keys (CMK), powered by AWS KMS, to encrypt your messages. Amazon SNS supports customer-managed as well as AWS-managed CMKs. As soon as Amazon SNS receives your messages, the encryption takes place on the server, using a 256-bit AES-GCM algorithm. The messages are stored in encrypted form across multiple Availability Zones (AZs) for durability and are decrypted just before being delivered to subscribed endpoints, such as Amazon Simple Queue Service (Amazon SQS) queues, AWS Lambda functions, and HTTP and HTTPS webhooks.

 

Notes:

  • Amazon SNS encrypts only the body of the messages that you publish. It doesn’t encrypt message metadata (identifier, subject, timestamp, and attributes), topic metadata (name and attributes), or topic metrics. Thus, encryption doesn’t affect the operations of Amazon SNS, such as message fanout and message filtering.
  • Amazon SNS doesn’t retroactively encrypt messages that were published before server-side encryption (SSE) was enabled for the topic. In addition, any encrypted message retains its encryption even if you disable the encryption of its topic. It can take up to a minute for encryption to be effective after enabled.
  • Amazon SNS uses envelope encryption internally. It uses your configured CMK to generate a short-lived data encryption key (DEK) and then reuses this DEK to encrypt your published messages for 5 minutes. When the DEK expires, Amazon SNS automatically rotates to generate a new DEK from AWS KMS.

Applying encrypted topics in a use case

You can use encrypted topics for a variety of scenarios, especially for processing sensitive data, such as personally identifiable information (PII) and protected health information (PHI).

The following example illustrates an electronic medical record (EMR) system deployed in several clinics and hospitals. The system manages patients’ medical histories, diagnoses, medications, immunizations, visits, lab results, and doctors’ notes.

The clinic’s EMR system is integrated with three auxiliary systems. Each system, hosted on an Amazon EC2 instance, polls incoming patient records from an Amazon SQS queue and takes action:

  • The Billing system manages the clinic’s revenue cycles and processes accounts receivable and reimbursements.
  • The Scheduling system keeps patients informed of their upcoming clinic and lab appointments and reminds them to take their medications.
  • The Prescription system transmits electronic prescriptions to authorized pharmacies and tracks medication-filling history.

When a physician inputs a new record into a patient’s file, the EMR system publishes a message to an Amazon SNS topic. The topic in turn fans out a copy of the message to each one of the three subscribing Amazon SQS queues for parallel processing. When the message is retrieved from the queue, the Billing system invoices the patient, the Scheduling system books the patient’s next clinic or lab appointment, and the Prescription system orders the required medication from an authorized pharmacy.

The Amazon SNS topic and all Amazon SQS queues described in this use case are encrypted using AWS KMS keys that the clinic creates. The communication among services is based on HTTPS API calls. This end-to-end encryption protects patients’ medical records by making their content unavailable to unauthorized or anonymous users while messages are either in transit or at rest.

Creating, subscribing, and publishing to encrypted topics

You can create an Amazon SNS encrypted topic or an Amazon SQS encrypted queue by setting its attribute KmsMasterKeyId, which expects an AWS KMS key identifier. The key identifier can be a key ID, key ARN, or key alias. You can use the identifier of either a customer-managed CMK, such as alias/MyKey, or the AWS-managed CMK in your account, whose alias is alias/aws/sns.

The following code snippets work with the AWS SDK for Java. You can use these code samples for the healthcare system scenario in the previous section.

First, the principal publishing messages to the Amazon SNS encrypted topic must have access permission to execute the AWS KMS operations GenerateDataKey and Decrypt, in addition to the Amazon SNS operation Publish. The principal can be either an IAM user or an IAM role. The following IAM policy grants the required access permission to the principal.

{
  "Version": "2012-10-17",
  "Statement": {
    "Effect": "Allow",
    "Action": [
      "kms:GenerateDataKey",
      "kms:Decrypt",
      "sns:Publish"
    ],
    "Resource": "*"
  }
}

If you want to use a customer-managed CMK, a CMK needs to be created and secured by granting the publisher access to the same AWS KMS operations GenerateDataKey and Decrypt. The access permission is granted using KMS key policies. The following JSON document shows an example policy statement for the customer-managed CMK used by the healthcare system. For more information on creating and securing keys, see Creating Keys and Using Key Policies in the AWS Key Management Service Developer Guide.

{
  "Version": "2012-10-17",
  "Id": "EMR-System-KeyPolicy",
  "Statement": [
    {
      "Sid": "Allow access for Root User",
      "Effect": "Allow",
      "Principal": {"AWS": "arn:aws:iam::123456789012:root"},
      "Action": "kms:*",
      "Resource": "*"
    },
    {
      "Sid": "Allow access for Key Administrator",
      "Effect": "Allow",
      "Principal": {"AWS": "arn:aws:iam::123456789012:user/EMRAdmin"},
      "Action": [
        "kms:Create*",
        "kms:Describe*",
        "kms:Enable*",
        "kms:List*",
        "kms:Put*",
        "kms:Update*",
        "kms:Revoke*",
        "kms:Disable*",
        "kms:Get*",
        "kms:Delete*",
        "kms:TagResource",
        "kms:UntagResource",
        "kms:ScheduleKeyDeletion",
        "kms:CancelKeyDeletion"
      ],
      "Resource": "*"
    },
    {
      "Sid": "Allow access for Key User (SNS Publisher)",
      "Effect": "Allow",
      "Principal": {"AWS": "arn:aws:iam::123456789012:user/EMRUser"},
      "Action": [
        "kms:GenerateDataKey*",
        "kms:Decrypt"
      ],
      "Resource": "*"
    }
  ]
}

The following snippet uses the CMK to create an encrypted topic, three encrypted queues, and their corresponding subscriptions.

// Create API clients

String userArn = "arn:aws:iam::123456789012:user/EMRUser";

AWSCredentialsProvider credentials = getCredentials(userArn);

AmazonSNS sns = new AmazonSNSClient(credentials);
AmazonSQS sqs = new AmazonSQSClient(credentials);

// Create an attributes collection for the topic and queues

String keyId = "arn:aws:kms:us-east-1:123456789012:alias/EMRKey"; 

Map<String, String> attributes = new HashMap<>();
attributes.put("KmsMasterKeyId", keyId);

// Create an encrypted topic

String topicArn = sns.createTopic(
    new CreateTopicRequest("Patient-Records")
        .withAttributes(attributes)).getTopicArn();

// Create encrypted queues

String billingQueueUrl = sqs.createQueue(
    new CreateQueueRequest("Billing-Integration")
        .withAttributes(attributes)).getQueueUrl();

String schedulingQueueUrl = sqs.createQueue(
    new CreateQueueRequest("Scheduling-Integration")
        .withAttributes(attributes)).getQueueUrl();

String prescriptionQueueUrl = sqs.createQueue(
    new CreateQueueRequest("Prescription-Integration")
        .withAttributes(attributes)).getQueueUrl();

// Create subscriptions

Topics.subscribeQueue(sns, sqs, topicArn, billingQueueUrl);
Topics.subscribeQueue(sns, sqs, topicArn, schedulingQueueUrl);
Topics.subscribeQueue(sns, sqs, topicArn, prescriptionQueueUrl);

Next, the following code composes a JSON message and publishes it to the encrypted topic.

// Publish message to encrypted topic

String messageBody = "{\"patient\": 2911, \"medication\": 151}";
String messageSubject = "Electronic Medical Record - 3472";

sns.publish(
    new PublishRequest()
        .withSubject(messageSubject)
        .withMessage(messageBody)
        .withTopicArn(topicArn));

Note:

Publishing messages to encrypted topics isn’t different from publishing messages to standard, unencrypted topics. Your publisher needs access to perform AWS KMS operations GenerateDataKey and Decrypt on the configured CMK. All of the encryption logic is offloaded to Amazon SNS, and the message is delivered to all subscribed endpoints.

A copy of the message is now available in each subscribing queue. The final code snippet retrieves the messages from the encrypted queues.

// Retrieve messages from encrypted queues

List<Message> messagesForBilling = sqs.receiveMessage(
    new ReceiveMessageRequest(billingQueueUrl)).getMessages();

List<Message> messagesForScheduling = sqs.receiveMessage(
    new ReceiveMessageRequest(schedulingQueueUrl)).getMessages();

List<Message> messagesForPrescription = sqs.receiveMessage(
    new ReceiveMessageRequest(prescriptionQueueUrl)).getMessages();

Note:

Retrieving messages from encrypted queues isn’t different from retrieving messages from standard, unencrypted queues. All of the decryption logic is offloaded to Amazon SQS.

Enabling compatibility between encrypted topics and event sources

Several AWS services publish events to Amazon SNS topics. To allow these event sources to work with encrypted topics, you must first create a customer-managed CMK and then add the following statement to the policy of the CMK.

{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {"Service": "service.amazonaws.com"},
        "Action": ["kms:GenerateDataKey*", "kms:Decrypt"],
        "Resource": "*"
    }]
}

You can use the following example service principals in the statement:

Other Amazon SNS event sources require you to provide an IAM role, as opposed to their service principal, in the KMS key policy. This set of event sources includes the following:

Once the CMK key policy has been configured, you can enable encryption on the topic using the CMK, and then provide the encrypted topic’s ARN to the event source.

Note:

As of November 2018, Amazon CloudWatch alarms don’t yet work with Amazon SNS encrypted topics. For information on publishing alarms to standard, unencrypted topics, see Using Amazon CloudWatch Alarms in the Amazon CloudWatch User Guide.

Publishing messages privately to encrypted topics through VPC endpoints

In addition to encrypting messages in transit and at rest, you can further harden the security and privacy of your applications by publishing messages to encrypted topics privately, without traversing the public Internet. Amazon SNS supports VPC endpoints via AWS PrivateLink. You can use VPC endpoints to privately publish messages to both standard and encrypted topics from a virtual private cloud (VPC) subnet. When you use AWS PrivateLink, you don’t have to set up an internet gateway, network address translation (NAT) device, or virtual private network (VPN) connection. For more information, see Publishing to Amazon SNS topics from Amazon Virtual Private Cloud in the Amazon Simple Notification Service Developer Guide.

Auditing the usage of encrypted topics

You can use AWS CloudTrail to audit the usage of the AWS KMS keys applied to your Amazon SNS topics. AWS CloudTrail creates log files that contain a history of AWS API calls and related events for your account. These log files include all AWS KMS API requests made with the AWS Management Console , SDKs, and Command Line Tools, as well as those made through integrated AWS services. You can use these log files to get information about when your CMK was used, the operation that was requested, the identity of the requester, and the IP address that the request came from. For more information, see Logging AWS KMS API calls with AWS CloudTrail in the AWS Key Management Service Developer Guide.

Summary

Amazon SNS provides a full set of security features to protect your data from unauthorized and anonymous access, including message encryption in transit with Amazon ATS certificates, message encryption at rest with AWS KMS keys, message privacy with AWS PrivateLink, and auditing with AWS CloudTrail. Moreover, you can subscribe Amazon SQS encrypted queues to Amazon SNS encrypted topics to establish end-to-end encryption in your messaging use cases.

Amazon SNS encrypted topics are available in all AWS Regions where AWS KMS is available. For pricing details, see AWS KMS pricing and Amazon SNS pricing. There is no increase in Amazon SNS charges for using encrypted topics, beyond the AWS KMS request charges incurred. For more information, see Protecting Amazon SNS Data Using Server-Side Encryption (SSE) in the Amazon Simple Notification Service Developer Guide.

Get started today by creating your Amazon SNS encrypted topics via the AWS Management Console and AWS SDKs.

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 2

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-2/

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we discuss how to process and visualize the data by creating a serverless data lake that uses key analytics to create actionable data.

Create a serverless data lake and explore data using AWS Glue, Amazon Athena, and Amazon QuickSight

As we discussed in part 1, you can store heart rate data in an Amazon S3 bucket using Kinesis Data Streams. However, storing data in a repository is not enough. You also need to be able to catalog and store the associated metadata related to your repository so that you can extract the meaningful pieces for analytics.

For a serverless data lake, you can use AWS Glue, which is a fully managed data catalog and ETL (extract, transform, and load) service. AWS Glue simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. As you get your AWS Glue Data Catalog data partitioned and compressed for optimal performance, you can use Amazon Athena for the direct query to S3 data. You can then visualize the data using Amazon QuickSight.

The following diagram depicts the data lake that is created in this demonstration:

Amazon S3 now has the raw data stored from the Kinesis process. The first task is to prepare the Data Catalog and identify what data attributes are available to query and analyze. To do this task, you need to create a database in AWS Glue that will hold the table created by the AWS Glue crawler.

An AWS Glue crawler scans through the raw data available in an S3 bucket and creates a data table with a Data Catalog. You can add a scheduler to the crawler to run periodically and scan new data as required. For specific steps to create a database and crawler in AWS Glue, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3.

The following figure shows the summary screen for a crawler configuration in AWS Glue:

After configuring the crawler, choose Finish, and then choose Crawler in the navigation bar. Select the crawler that you created, and choose Run crawler.

The crawler process can take 20–60 seconds to initiate. It depends on the Data Catalog, and it creates a table in your database as defined during the crawler configuration.

You can choose the table name and explore the Data Catalog and table:

In the demonstration table details, our data has three attribute time stamps as value_time, the person’s ID as id, and the heart rate as colvalue. These attributes are identified and listed by the AWS Glue crawler. You can see other information such as the data format (text) and the record count (approx. 15,000 with each record size of 61 bytes).

You can use Athena to query the raw data. To access Athena directly from the AWS Glue console, choose the table, and then choose View data on the Actions menu, as shown following:

As noted, the data is currently in a JSON format and we haven’t partitioned it. This means that Athena continues to scan more data, which increases the query cost. The best practice is to always partition data and to convert the data into a columnar format like Apache Parquet or Apache ORC. This reduces the amount of data scans while running a query. Having fewer data scans means better query performance at a lower cost.

To accomplish this, AWS Glue generates an ETL script for you. You can schedule it to run periodically for your data processing, which removes the necessity for complex code writing. AWS Glue is a managed service that runs on top of a warm Apache Spark cluster that is managed by AWS. You can run your own script in AWS Glue or modify a script provided by AWS Glue that meets your requirements. For examples of how to build a custom script for your solution, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.

For detailed steps to create a job, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3. The following figure shows the final AWS Glue job configuration summary for this demonstration:

In this example configuration, we enabled the job bookmark, which helps AWS Glue maintain state information and prevents the reprocessing of old data. You only want to process new data when rerunning on a scheduled interval.

When you choose Finish, AWS Glue generates a Python script. This script processes your data and stores it in a columnar format in the destination S3 bucket specified in the job configuration.

If you choose Run Job, it takes time to complete depending on the amount of data and data processing units (DPUs) configured. By default, a job is configured with 10 DPUs, which can be increased. A single DPU provides processing capacity that consists of 4 vCPUs of compute and 16 GB of memory.

After the job is complete, inspect your destination S3 bucket, and you will find that your data is now in columnar Parquet format.

Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For information about efficiently processing partitioned datasets using AWS Glue, see the blog post Work with partitioned data in AWS Glue.

You can create triggers for your job that run the job periodically to process new data as it is transmitted to your S3 bucket. For detailed steps on how to configure a job trigger, see Triggering Jobs in AWS Glue.

The next step is to create a crawler for the Parquet data so that a table can be created. The following image shows the configuration for our Parquet crawler:

Choose Finish, and execute the crawler.

Explore your database, and you will notice that one more table was created in the Parquet format.

You can use this new table for direct queries to reduce costs and to increase the query performance of this demonstration.

Because AWS Glue is integrated with Athena, you will find in the Athena console an AWS Glue catalog already available with the table catalog. Fetch 10 rows from Athena in a new Parquet table like you did for the JSON data table in the previous steps.

As the following image shows, we fetched the first 10 rows of heartbeat data from a Parquet format table. This same Athena query scanned only 4.99 KB of data compared to 205 KB of data that was scanned in a raw format. Also, there was a significant improvement in query performance in terms of run time.

Visualize data in Amazon QuickSight

Amazon QuickSight is a data visualization service that you can use to analyze data that has been combined. For more detailed instructions, see the Amazon QuickSight User Guide.

The first step in Amazon QuickSight is to create a new Amazon Athena data source. Choose the heartbeat database created in AWS Glue, and then choose the table that was created by the AWS Glue crawler.

Choose Import to SPICE for quicker analytics. This option creates a data cache and improves graph loading. All non-database datasets must use SPICE. To learn more about SPICE, see Managing SPICE Capacity.

Choose Visualize, and wait for SPICE to import the data to the cache. You can also schedule a periodic refresh so that new data is loaded to SPICE as the data is pipelined to the S3 bucket.

When the SPICE import is complete, you can create a visual dashboard easily. The following figure shows graphs displaying the occurrence of heart rate records per device.  The first graph is a horizontally stacked bar chart, which shows the percentage of heart rate occurrence per device. In the second graph, you can visualize the heart rate count group to the heart rate device.

Conclusion

Processing streaming data at scale is relevant in every industry. Whether you process data from wearables to tackle human health issues or address predictive maintenance in manufacturing centers, AWS can help you simplify your data ingestion and analysis while keeping your overall IT expenditure manageable.

In this two-part series, you learned how to ingest streaming data from a heart rate sensor and visualize it in such a way to create actionable insights. The current state of the art available in the big data and machine learning space makes it possible to ingest terabytes and petabytes of data and extract useful and actionable information from that process.


Additional Reading

If you found this post useful, be sure to check out Work with partitioned data in AWS Glue, and 10 visualizations to try in Amazon QuickSight with sample data.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team. His passion is leveraging the cloud to transform the carrier industry. He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University. As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS. His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-1/

Sports-related minor traumatic brain injuries (mTBI) continue to incite concern among different groups in the medical, sports, and parenting community. At the recreational level, approximately 1.6–3.8 million related mTBI incidents occur in the United States every year, and in most cases, are not treated at the hospital. (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources.) The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

Although emergency facilities in North America collect data on admitted traumatic brain injuries (TBI) cases, there isn’t meaningful data on the number of unreported mTBIs among athletes. Recent studies indicate a significant rate of under-reporting of sports-related mTBI due to many factors. These factors include the simple inability of team staff to either recognize the signs and symptoms or to actually witness the impact. (See “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates” in Additional resources.)

The majority of players involved in hockey and football are not college or professional athletes. There are over 3 million youth hockey players and approximately 5 million registered participants in football. (See “Head Impact Exposure in Youth Football” in Additional resources.) These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria.

Recently, the use of instrumented sports helmets, including the Head Impact Telemetry System (HITS), has allowed for detailed recording of impacts to the head in many research trials. This practice has led to recommendations to alter contact in practices and certain helmet design parameters. (See “Head impact severity measures for evaluating mild traumatic brain injury risk exposure” in Additional resources.) However, due to the higher costs of the HITS system and complexity of the equipment, it is not a practical impact alert device for the general recreational population.

A simple, practical, and affordable system for measuring head trauma within the sports environment, subject to the absence of trained medical personnel, is required.

Given the proliferation of smartphones, we felt that this was a practical device to investigate to provide this type of monitoring.  All smartphone devices have an embedded Bluetooth communication system to receive and transmit data at various ranges.  For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

Remote monitoring typically involves collecting information from devices (for example, wearables) at the edge, integrating that information into a data lake, and generating inferences that can then be served back to the relevant stakeholders. Additionally, in some cases, compute and inference must also be done at the edge to shorten the feedback loop between data collection and response.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. We use key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we focus on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 3 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Kinesis Data Analytics averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and are refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 3 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 3

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws greengrass create-group --name heartRateGroup

To complete the setup, follow the steps in Create AWS IoT Devices in an AWS Greengrass Group.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. As such, you need to add a single subscription in the Greengrass group to facilitate this message route:

Here, your device is named Heartrate_Sensor, and the target is the IoT Cloud on the topic iot/heartrate. That means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 3 along with the AWS Greengrass core software. The IoT device uses the Discovery API, which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent. The AWS Greengrass core connectivity information should be depicted as follows:

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 3. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a shell script kinesis_client_HeartRate.sh, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

#!/bin/sh
while true
do
  deviceID=$(( ( RANDOM % 10 )  + 1 ))
  heartRate=$(jot -r 1 60 140)
  echo "$deviceID,$heartRate"
  aws kinesis put-record --stream-name <your_stream_name> --data "$deviceID,$heartRate"$'\n' --partition-key $deviceID --region us-east-1
done

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

When launching an AWS CloudFormation template, be sure to enter your email address or mobile phone number with the appropriate endpoint protocol (“Email” or “SMS”) as parameters:

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify abnormal heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Kinesis Data Analytics to perform analytics on streaming data in real time. Kinesis Data Analytics consists of three configurable components: source, real-time analytics, and destination. Refer to the AWS documentation to learn the detailed steps to configure Kinesis Data Analytics.

Kinesis Data Analytics uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

The Kinesis Data Analytics schema editor lets you edit and transform the schema if required. In the following example, we transformed the second column to Value instead of COL_Value.

The SQL code to perform the real-time analysis of the data has to be copied to the SQL Editor for real-time analytics. The following is the sample code that was used for this demonstration.

“CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   VALUEROWTIME TIMESTAMP,
                                   ID INTEGER, 
                                   COLVALUE INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM ROWTIME,
              ID,
              AVG("Value") AS HEARTRATE
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY ID, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) HAVING AVG("Value") > 120 OR AVG("Value") < 40;”

This code generates DESTINATION_SQL_STREAM. It inserts values into the stream only when the average value of the heart beat that is received from SOURCE_SQL_STREAM_001 is greater than 120 or less than 40 in the 60-second time window.

For more information about the tumbling window concept, see Tumbling Windows (Aggregations Using GROUP BY).

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the destination editor, make sure that the stream name selected is the DESTINATION_SQL_STREAM. You only want to trigger the Lambda function when anomalies in the heart rate are detected. The output format can be JSON or CSV. In this example, our Lambda function expects the data in JSON format, so we chose JSON.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

{
  "customerid": {
    "S": "3"
  },
  "deviceid": {
    "S": "7"
  },
  "email": {
    "S": "[email protected]"
  },
  "firstname": {
    "S": "John"
  },
  "lastname": {
    "S": "Smith"
  },
  "mobile": {
    "S": "19999999999"
  }
}

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Analytics application.  The node.js Lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Node.js 6.10 and choose the Create a custom role option for IAM permissions.  If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Analytics.  After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.

Finally, the AWS Lambda function sends a mobile text notification (which does not contain any sensitive information) to the athletic trainer’s mobile number retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage.  To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to GZip, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting.

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

Managing Amazon SNS Subscription Attributes with AWS CloudFormation

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/managing-amazon-sns-subscription-attributes-with-aws-cloudformation/

This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.

Amazon SNS is a fully managed pub/sub messaging and event-driven computing service that can decouple distributed systems and microservices. By default, when your publisher system posts a message to an Amazon SNS topic, all systems subscribed to the topic receive a copy of the message. By using Amazon SNS subscription attributes, you can customize this default behavior and make Amazon SNS fit your use cases even more naturally. The available set of Amazon SNS subscription attributes includes FilterPolicy, DeliveryPolicy, and RawMessageDelivery.

You can manually manage your Amazon SNS subscription attributes via the AWS Management Console or programmatically via AWS Development Tools (SDK and AWS CLI). Now you can automate their provisioning via AWS CloudFormation templates as well. AWS CloudFormation lets you use a simple text file to model and provision all the Amazon SNS resources for your messaging use cases, across AWS Regions and accounts, in an automated and secure manner.

The following sections describe how you can simultaneously create Amazon SNS subscriptions and set their attributes via AWS CloudFormation templates.

Setting the FilterPolicy attribute

The FilterPolicy attribute is valid in the context of message filtering, regardless of the delivery protocol, and defines which type of message the subscriber expects to receive from the topic. Hence, by applying the FilterPolicy attribute, you can offload the message-filtering logic from subscribers and the message-routing logic from publishers.

To set the FilterPolicy attribute in your AWS CloudFormation template, use the syntax in the following JSON snippet. This snippet creates an Amazon SNS subscription whose endpoint is an AWS Lambda function. Simultaneously, this code also sets a subscription filter policy that matches messages carrying an attribute whose key is “pet” and value is either “dog” or “cat.”

{
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "lambda",
            "Endpoint": "arn:aws:lambda:us-east-1:000000000000:function:SavePet",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:PetTopic",
            "FilterPolicy": {
               "pet": ["dog", "cat"]
            }
         }
      }
   }
}

Setting the DeliveryPolicy attribute

The DeliveryPolicy attribute is valid in the context of message delivery to HTTP endpoints and defines a delivery-retry policy. By applying the DeliveryPolicy attribute, you can control the maximum number of retries the subscriber expects, the time delay between each retry, and the backoff function. You should fine-tune these values based on the traffic volume your subscribing HTTP server can handle.

To set the DeliveryPolicy attribute in your AWS CloudFormation template, use the syntax in the following JSON snippet. This snippet creates an Amazon SNS subscription whose endpoint is an HTTP address. The code also sets a delivery policy capped at 10 retries for this subscription, with a linear backoff function.

{
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "https",
            "Endpoint": "https://api.myendpoint.ca/pets",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:PetTopic",
            "DeliveryPolicy": {
               "healthyRetryPolicy": {
                  "numRetries": 10,
                  "minDelayTarget": 10,
                  "maxDelayTarget": 30,
                  "numMinDelayRetries": 3,
                  "numMaxDelayRetries": 7,
                  "numNoDelayRetries": 0,
                  "backoffFunction": "linear"
               }
            }
         }
      }
   }
}

Setting the RawMessageDelivery attribute

The RawMessageDelivery attribute is valid in the context of message delivery to Amazon SQS queues and HTTP endpoints. This Boolean attribute eliminates the need for the subscriber to process the JSON formatting that is created by default to decorate all published messages with Amazon SNS metadata. When you set RawMessageDelivery to true, you get two outcomes. First, your message is delivered as is, with no metadata added. Second, your message attributes propagate from Amazon SNS to Amazon SQS, when the subscribing endpoint is an Amazon SQS queue.

To set the RawMessageDelivery attribute in your AWS CloudFormation template, use the syntax in the following JSON snippet. This snippet creates an Amazon SNS subscription whose endpoint is an Amazon SQS queue. This code also enables raw message delivery for the subscription, which prevents Amazon SNS metadata from being added to the message payload.

{
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "https",
            "Endpoint": "https://api.myendpoint.ca/pets",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:PetTopic",
            "DeliveryPolicy": {
               "healthyRetryPolicy": {
                  "numRetries": 10,
                  "minDelayTarget": 10,
                  "maxDelayTarget": 30,
                  "numMinDelayRetries": 3,
                  "numMaxDelayRetries": 7,
                  "numNoDelayRetries": 0,
                  "backoffFunction": "linear"
               }
            }
         }
      }
   }
}

Applying subscription attributes in a use case

Here’s how everything comes together. The following example is based on a car dealer company, which operates with the following distributed systems hosted on Amazon EC2 instances:

  • Car-Dealer-System – Front-office system that takes orders placed by car buyers
  • ERP-System – Enterprise resource planning, the back-office system that handles finance, accounting, human resources, and related business activities
  • CRM-System – Customer relationship management, the back-office system responsible for storing car buyers’ profile information and running sales workflows
  • SCM-System – Supply chain management, the back-office system that handles inventory tracking and demand forecast and planning

 

Whenever an order is placed in the car dealer system, this event is broadcasted to all back-office systems interested in this type of event. As shown in the preceding diagram, the company applied AWS Messaging services to decouple their distributed systems, promoting more scalability and maintainability for their architecture. The queues and topic used are the following:

  • Car-Sales – Amazon SNS topic that receives messages from the car dealer system. All orders placed by car buyers are published to this topic, then delivered to subscribers (two Amazon SQS queues and one HTTP endpoint).
  • ERP-Integration – Amazon SQS queue that feeds the ERP system with orders published by the car dealer system. The ERP pulls messages from this queue to track revenue and trigger related bookkeeping processes.
  • CRM-Integration – Amazon SQS queue that feeds the CRM system with orders published by the car dealer system. The CRM pulls messages from this queue to track car buyers’ interests and update sales workflows.

The company created the following three Amazon SNS subscriptions:

  • The first subscription refers to the ERP-Integration queue. This subscription has the RawMessageDelivery attribute set to true. Hence, no metadata is added to the message payload, and message attributes are propagated from Amazon SNS to Amazon SQS.
  • The second subscription refers to the CRM-Integration queue. Like the first subscription, this one also has the RawMessageDelivery attribute set to true. Additionally, it has the FilterPolicy attribute set to {“buyer-class”: [“vip”]}. This policy defines that only orders placed by VIP buyers are managed in the CRM system, and orders from other buyers are filtered out.
  • The third subscription points to the HTTP endpoint that serves the SCM-System. Unlike ERP and CRM, the SCM system provides its own HTTP API. Therefore, its HTTP endpoint was subscribed to the topic directly without a queue in between. This subscription has a DeliveryPolicy that caps the number of retries to 20, with exponential back-off function.

The company didn’t want to create all these resources manually, though. They wanted to turn this infrastructure into versionable code, and the ability to quickly spin up and tear down this infrastructure in an automated manner. Therefore, they created an AWS CloudFormation template to manage these AWS messaging resources: Amazon SNS topic, Amazon SNS subscriptions, Amazon SNS subscription attributes, and Amazon SQS queues.

Executing the AWS CloudFormation template

Now you’re ready to execute this AWS CloudFormation template yourself. To bootstrap this architecture in your AWS account:

    1. Download the sample AWS CloudFormation template from the repository.
    2. Go to the AWS CloudFormation console.
    3. Choose Create Stack.
    4. For Select Template, choose to upload a template to Amazon S3, and choose Browse.
    5. Select the template you downloaded and choose Next.
    6. For Specify Details:
      • Enter the following stack name: Car-Dealer-Stack.
      • Enter the HTTP endpoint to be subscribed to your topic. If you don’t have an HTTP endpoint, create a temp one.
      • Choose Next.
    7. For Options, choose Next.
    8. For Review, choose Create.
    9. Wait until your stack creation process is complete.

Now that all the infrastructure is in place, verify the Amazon SNS subscriptions attributes set by the AWS CloudFormation template as follows:

  1. Go to the Amazon SNS console.
  2. Choose Topics and then select the ARN associated with Car-Sales.
  3. Verify the first subscription:
    • Select the subscription related to ERP-Integration (Amazon SQS protocol).
    • Choose Other subscription actions and then choose Edit subscription attributes.
    • Note that raw message delivery is enabled, and choose Cancel to go back.
  4. Verify the second subscription:
    • Select the subscription related to CRM-Integration (Amazon SQS protocol).
    • Choose Other subscription actions and then choose Edit subscription attributes.
    • Note that raw message delivery is enabled and then choose Cancel to go back.
    • Choose Other subscription actions and then choose Edit subscription filter policy.
    • Note that the filter policy is set, and then choose Cancel to go back
  5. Confirm the third subscription.
  6. Verify the third subscription:
    • Select the subscription related to SCM-System (HTTP protocol).
    • Choose Other subscription actions and then choose Edit subscription delivery policy.
    •  Choose Advanced view.
    • Note that an exponential delivery retry policy is set, and then choose Cancel to go back.

Now that you have verified all subscription attributes, you can delete your AWS CloudFormation stack as follows:

  1. Go to the AWS CloudFormation console.
  2. In the list of stacks, select Car-Dealer-Stack.
  3. Choose Actions, choose Delete Stack, and then choose Yes Delete.
  4. Wait for the stack deletion process to complete.

That’s it! At this point, you have deleted all Amazon SNS and Amazon SQS resources created in this exercise from your AWS account.

Summary

AWS CloudFormation templates enable the simultaneous creation of Amazon SNS subscriptions and their attributes (such as FilterPolicy, DeliveryPolicy, and RawMessageDelivery) in an automated and secure manner. AWS CloudFormation support for Amazon SNS subscription attributes is available now in all AWS Regions.

For information about pricing, see AWS CloudFormation Pricing. For more information on setting up Amazon SNS resources via AWS CloudFormation templates, see:

Powering HIPAA-compliant workloads using AWS Serverless technologies

Post Syndicated from Chris Munns original https://aws.amazon.com/blogs/compute/powering-hipaa-compliant-workloads-using-aws-serverless-technologies/

This post courtesy of Mayank Thakkar, AWS Senior Solutions Architect

Serverless computing refers to an architecture discipline that allows you to build and run applications or services without thinking about servers. You can focus on your applications, without worrying about provisioning, scaling, or managing any servers. You can use serverless architectures for nearly any type of application or backend service. AWS handles the heavy lifting around scaling, high availability, and running those workloads.

The AWS HIPAA program enables covered entities—and those business associates subject to the U.S. Health Insurance Portability and Accountability Act of 1996 (HIPAA)—to use the secure AWS environment to process, maintain, and store protected health information (PHI). Based on customer feedback, AWS is trying to add more services to the HIPAA program, including serverless technologies.

AWS recently announced that AWS Step Functions has achieved HIPAA-eligibility status and has been added to the AWS Business Associate Addendum (BAA), adding to a growing list of HIPAA-eligible services. The BAA is an AWS contract that is required under HIPAA rules to ensure that AWS appropriately safeguards PHI. The BAA also serves to clarify and limit, as appropriate, the permissible uses and disclosures of PHI by AWS, based on the relationship between AWS and customers and the activities or services being performed by AWS.

Along with HIPAA eligibility for most of the rest of the serverless platform at AWS, Step Functions inclusion is a major win for organizations looking to process PHI using serverless technologies, opening up numerous new use cases and patterns. You can still use non-eligible services to orchestrate the storage, transmission, and processing of the metadata around PHI, but not the PHI itself.

In this post, I examine some common serverless use cases that I see in the healthcare and life sciences industry and show how AWS Serverless can be used to build powerful, cost-efficient, HIPAA-eligible architectures.

Provider directory web application

Running HIPAA-compliant web applications (like provider directories) on AWS is a common use case in the healthcare industry. Healthcare providers are often looking for ways to build and run applications and services without thinking about servers. They are also looking for ways to provide the most cost-effective and scalable delivery of secure health-related information to members, providers, and partners worldwide.

Unpredictable access patterns and spiky workloads often force organizations to provision for peak in these cases, and they end up paying for idle capacity. AWS Auto Scaling solves this challenge to a great extent but you still have to manage and maintain the underlying servers from a patching, high availability, and scaling perspective. AWS Lambda (along with other serverless technologies from AWS) removes this constraint.

The above architecture shows a serverless way to host a customer-facing website, with Amazon S3 being used for hosting static files (.js, .css, images, and so on). If your website is based on client-side technologies, you can eliminate the need to run a web server farm. In addition, you can use S3 features like server-side encryption and bucket access policies to lock down access to the content.

Using Amazon CloudFront, a global content delivery network, with S3 origins can bring your content closer to the end user and cut down S3 access costs, by caching the content at the edge. In addition, using AWS [email protected] gives you an ability to bring and execute your own code to customize the content that CloudFront delivers. That significantly reduces latency and improves the end user experience while maintaining the same Lambda development model. Some common examples include checking cookies, inspecting headers or authorization tokens, rewriting URLs, and making calls to external resources to confirm user credentials and generate HTTP responses.

You can power the APIs needed for your client application by using Amazon API Gateway, which takes care of creating, publishing, maintaining, monitoring, and securing APIs at any scale. API Gateway also provides robust ways to provide traffic management, authorization and access control, monitoring, API version management, and the other tasks involved in accepting and processing up to hundreds of thousands of concurrent API calls. This allows you to focus on your business logic. Direct, secure, and authenticated integration with Lambda functions allows this serverless architecture to scale up and down seamlessly with incoming traffic.

The CloudFront integration with AWS WAF provides a reliable way to protect your application against common web exploits that could affect application availability, compromise security, or consume excessive resources.

API Gateway can integrate directly with Lambda, which by default can access the public resources. Lambda functions can be configured to access your Amazon VPC resources as well. If you have extended your data center to AWS using AWS Direct Connect or a VPN connection, Lambda can access your on-premises resources, with the traffic flowing over your VPN connection (or Direct Connect) instead of the public internet.

All the services mentioned above (except Amazon EC2) are fully managed by AWS in terms of high availability, scaling, provisioning, and maintenance, giving you a cost-effective way to host your web applications. It’s pay-as-you-go vs. pay-as-you-provision. Spikes in demand, typically encountered during the enrollment season, are handled gracefully, with these services scaling automatically to meet demand and then scale down. You get to keep your costs in control.

All AWS services referenced in the above architecture are HIPAA-eligible, thus enabling you to store, process, and transmit PHI, as long as it complies with the BAA.

Medical device telemetry (ingesting data @ scale)

The ever-increasing presence of IoT devices in the healthcare industry has created the challenges of ingesting this data at scale and making it available for processing as soon as it is produced. Processing this data in real time (or near-real time) is key to delivering urgent care to patients.

The infinite scalability (theoretical) along with low startup times offered by Lambda makes it a great candidate for these kinds of use cases. Balancing ballooning healthcare costs and timely delivery of care is a never-ending challenge. With subsecond billing and no charge for non-execution, Lambda becomes the best choice for AWS customers.

These end-user medical devices emit a lot of telemetry data, which requires constant analysis and real-time tracking and updating. For example, devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. They also require updates for these settings. Consider the following architecture:

Typically, these devices are connected to an edge node or collector, which provides sufficient computing resources to authenticate itself to AWS and start streaming data to Amazon Kinesis Streams. The collector uses the Kinesis Producer Library to simplify high throughput to a Kinesis data stream. You can also use the server-side encryption feature, supported by Kinesis Streams, to achieve encryption-at-rest. Kinesis provides a scalable, highly available way to achieve loose coupling between data-producing (medical devices) and data-consuming (Lambda) layers.

After the data is transported via Kinesis, Lambda can then be used to process this data in real time, storing derived insights in Amazon DynamoDB, which can then power a near-real time health dashboard. Caregivers can access this real-time data to provide timely care and manage device settings.

End-user medical devices, via the edge node, can also connect to and poll an API hosted on API Gateway to check for calibration settings, firmware updates, and so on. The modifications can be easily updated by admins, providing a scalable way to manage these devices.

For historical analysis and pattern prediction, the staged data (stored in S3), can be processed in batches. Use AWS Batch, Amazon EMR, or any custom logic running on a fleet of Amazon EC2 instances to gain actionable insights. Lambda can also be used to process data in a MapReduce fashion, as detailed in the Ad Hoc Big Data Processing Made Simple with Serverless MapReduce post.

You can also build high-throughput batch workflows or orchestrate Apache Spark applications using Step Functions, as detailed in the Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy post. These insights can then be used to calibrate the medical devices to achieve effective outcomes.

Use Lambda to load data into Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering. One of my colleagues, Ian Meyers, pointed this out in his Zero-Administration Amazon Redshift Database Loader post.

Mobile diagnostics

Another use case that I see is using mobile devices to provide diagnostic care in out-patient settings. These environments typically lack the robust IT infrastructure that clinics and hospitals can provide, and often are subjected to intermittent internet connectivity as well. Various biosensors (otoscopes, thermometers, heart rate monitors, and so on) can easily talk to smartphones, which can then act as aggregators and analyzers before forwarding the data to a central processing system. After the data is in the system, caregivers and practitioners can then view and act on the data.

In the above diagram, an application running on a mobile device (iOS or Android) talks to various biosensors and collects diagnostic data. Using AWS mobile SDKs along with Amazon Cognito, these smart devices can authenticate themselves to AWS and access the APIs hosted on API Gateway. Amazon Cognito also offers data synchronization across various mobile devices, which helps you to build “offline” features in your mobile application. Amazon Cognito Sync resolves conflicts and intermittent network connectivity, enabling you to focus on delivering great app experiences instead of creating and managing a user data sync solution.

You can also use CloudFront and [email protected], as detailed in the first use case of this post, to cache content at edge locations and provide some light processing closer to your end users.

Lambda acts as a middle tier, processing the CRUD operations on the incoming data and storing it in DynamoDB, which is again exposed to caregivers through another set of Lambda functions and API Gateway. Caregivers can access the information through a browser-based interface, with Lambda processing the middle-tier application logic. They can view the historical data, compare it with fresh data coming in, and make corrections. Caregivers can also react to incoming data and issue alerts, which are delivered securely to the smart device through Amazon SNS.

Also, by using DynamoDB Streams and its integration with Lambda, you can implement Lambda functions that react to data modifications in DynamoDB tables (and hence, incoming device data). This gives you a way to codify common reactions to incoming data, in near-real time.

Lambda ecosystem

As I discussed in the above use cases, Lambda is a powerful, event-driven, stateless, on-demand compute platform offering scalability, agility, security, and reliability, along with a fine-grained cost structure.

For some organizations, migrating from a traditional programing model to a microservices-driven model can be a steep curve. Also, to build and maintain complex applications using Lambda, you need a vast array of tools, all the way from local debugging support to complex application performance monitoring tools. The following list of tools and services can assist you in building world-class applications with minimal effort:

  • AWS X-Ray is a distributed tracing system that allows developers to analyze and debug production for distributed applications, such as those built using a microservices (Lambda) architecture. AWS X-Ray was recently added to the AWS BAA, opening the doors for processing PHI workloads.
  • AWS Step Functions helps build HIPAA-compliant complex workflows using Lambda. It provides a way to coordinate the components of distributed applications and Lambda functions using visual workflows.
  • AWS SAM provides a fast and easy way of deploying serverless applications. You can write simple templates to describe your functions and their event sources (API Gateway, S3, Kinesis, and so on). AWS recently relaunched the AWS SAM CLI, which allows you to create a local testing environment that simulates the AWS runtime environment for Lambda. It allows faster, iterative development of your Lambda functions by eliminating the need to redeploy your application package to the Lambda runtime.

For more details, see the Serverless Application Developer Tooling webpage.

Conclusion

There are numerous other health care and life science use cases that customers are implementing, using Lambda with other AWS services. AWS is committed to easing the effort of implementing health care solutions in the cloud. Making Lambda HIPAA-eligible is just another milestone in the journey. For more examples of use cases, see Serverless. For the latest list of HIPAA-eligible services, see HIPAA Eligible Services Reference.

Monitoring your Amazon SNS message filtering activity with Amazon CloudWatch

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/monitoring-your-amazon-sns-message-filtering-activity-with-amazon-cloudwatch/

This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.

Amazon SNS message filtering provides a set of string and numeric matching operators that allow each subscription to receive only the messages of interest. Hence, SNS message filtering can simplify your pub/sub messaging architecture by offloading the message filtering logic from your subscriber systems, as well as the message routing logic from your publisher systems.

After you set the subscription attribute that defines a filter policy, the subscribing endpoint receives only the messages that carry attributes matching this filter policy. Other messages published to the topic are filtered out for this subscription. In this way, the native integration between SNS and Amazon CloudWatch provides visibility into the number of messages delivered, as well as the number of messages filtered out.

CloudWatch metrics are captured automatically for you. To get started with SNS message filtering, see Filtering Messages with Amazon SNS.

Message Filtering Metrics

The following six CloudWatch metrics are relevant to understanding your SNS message filtering activity:

  • NumberOfMessagesPublished – Inbound traffic to SNS. This metric tracks all the messages that have been published to the topic.
  • NumberOfNotificationsDelivered – Outbound traffic from SNS. This metric tracks all the messages that have been successfully delivered to endpoints subscribed to the topic. A delivery takes place either when the incoming message attributes match a subscription filter policy, or when the subscription has no filter policy at all, which results in a catch-all behavior.
  • NumberOfNotificationsFilteredOut – This metric tracks all the messages that were filtered out because they carried attributes that didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-NoMessageAttributes – This metric tracks all the messages that were filtered out because they didn’t carry any attributes at all and, consequently, didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-InvalidAttributes – This metric keeps track of messages that were filtered out because they carried invalid or malformed attributes and, thus, didn’t match the subscription filter policy.
  • NumberOfNotificationsFailed – This last metric tracks all the messages that failed to be delivered to subscribing endpoints, regardless of whether a filter policy had been set for the endpoint. This metric is emitted after the message delivery retry policy is exhausted, and SNS stops attempting to deliver the message. At that moment, the subscribing endpoint is likely no longer reachable. For example, the subscribing SQS queue or Lambda function has been deleted by its owner. You may want to closely monitor this metric to address message delivery issues quickly.

Message filtering graphs

Through the AWS Management Console, you can compose graphs to display your SNS message filtering activity. The graph shows the number of messages published, delivered, and filtered out within the timeframe you specify (1h, 3h, 12h, 1d, 3d, 1w, or custom).

SNS message filtering for CloudWatch Metrics

To compose an SNS message filtering graph with CloudWatch:

  1. Open the CloudWatch console.
  2. Choose Metrics, SNS, All Metrics, and Topic Metrics.
  3. Select all metrics to add to the graph, such as:
    • NumberOfMessagesPublished
    • NumberOfNotificationsDelivered
    • NumberOfNotificationsFilteredOut
  4. Choose Graphed metrics.
  5. In the Statistic column, switch from Average to Sum.
  6. Title your graph with a descriptive name, such as “SNS Message Filtering”

After you have your graph set up, you may want to copy the graph link for bookmarking, emailing, or sharing with co-workers. You may also want to add your graph to a CloudWatch dashboard for easy access in the future. Both actions are available to you on the Actions menu, which is found above the graph.

Summary

SNS message filtering defines how SNS topics behave in terms of message delivery. By using CloudWatch metrics, you gain visibility into the number of messages published, delivered, and filtered out. This enables you to validate the operation of filter policies and more easily troubleshoot during development phases.

SNS message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). CloudWatch metrics for SNS message filtering is available now, in all AWS Regions.

For information about pricing, see the CloudWatch pricing page.

For more information, see:

The End of Google Cloud Messaging, and What it Means for Your Apps

Post Syndicated from Zach Barbitta original https://aws.amazon.com/blogs/messaging-and-targeting/the-end-of-google-cloud-messaging-and-what-it-means-for-your-apps/

On April 10, 2018, Google announced the deprecation of its Google Cloud Messaging (GCM) platform. Specifically, the GCM server and client APIs are deprecated and will be removed as soon as April 11, 2019.  What does this mean for you and your applications that use Amazon Simple Notification Service (Amazon SNS) or Amazon Pinpoint?

First, nothing will break now or after April 11, 2019. GCM device tokens are completely interchangeable with the newer Firebase Cloud Messaging (FCM) device tokens. If you have existing GCM tokens, you’ll still be able to use them to send notifications. This statement is also true for GCM tokens that you generate in the future.

On the back end, we’ve already migrated Amazon SNS and Amazon Pinpoint to the server endpoint for FCM (https://fcm.googleapis.com/fcm/send). As a developer, you don’t need to make any changes as a result of this deprecation.

We created the following mini-FAQ to address some of the questions you may have as a developer who uses Amazon SNS or Amazon Pinpoint.

If I migrate to FCM from GCM, can I still use Amazon Pinpoint and Amazon SNS?

Yes. Your ability to connect to your applications and send messages through both Amazon SNS and Amazon Pinpoint doesn’t change. We’ll update the documentation for Amazon SNS and Amazon Pinpoint soon to reflect these changes.

If I don’t migrate to FCM from GCM, can I still use Amazon Pinpoint and Amazon SNS?

Yes. If you do nothing, your existing credentials and GCM tokens will still be valid. All applications that you previously set up to use Amazon Pinpoint or Amazon SNS will continue to work normally. When you call the API for Amazon Pinpoint or Amazon SNS, we initiate a request to the FCM server endpoint directly.

What are the differences between Amazon SNS and Amazon Pinpoint?

Amazon SNS makes it easy for developers to set up, operate, and send notifications at scale, affordably and with a high degree of flexibility. Amazon Pinpoint has many of the same messaging capabilities as Amazon SNS, with the same levels of scalability and flexibility.

The main difference between the two services is that Amazon Pinpoint provides both transactional and targeted messaging capabilities. By using Amazon Pinpoint, marketers and developers can not only send transactional messages to their customers, but can also segment their audiences, create campaigns, and analyze both application and message metrics.

How do I migrate from GCM to FCM?

For more information about migrating from GCM to FCM, see Migrate a GCM Client App for Android to Firebase Cloud Messaging on the Google Developers site.

If you have any questions, please post them in the comments section, or in the Amazon Pinpoint or Amazon SNS forums.

Securing messages published to Amazon SNS with AWS PrivateLink

Post Syndicated from Otavio Ferreira original https://aws.amazon.com/blogs/security/securing-messages-published-to-amazon-sns-with-aws-privatelink/

Amazon Simple Notification Service (SNS) now supports VPC Endpoints (VPCE) via AWS PrivateLink. You can use VPC Endpoints to privately publish messages to SNS topics, from an Amazon Virtual Private Cloud (VPC), without traversing the public internet. When you use AWS PrivateLink, you don’t need to set up an Internet Gateway (IGW), Network Address Translation (NAT) device, or Virtual Private Network (VPN) connection. You don’t need to use public IP addresses, either.

VPC Endpoints doesn’t require code changes and can bring additional security to Pub/Sub Messaging use cases that rely on SNS. VPC Endpoints helps promote data privacy and is aligned with assurance programs, including the Health Insurance Portability and Accountability Act (HIPAA), FedRAMP, and others discussed below.

VPC Endpoints for SNS in action

Here’s how VPC Endpoints for SNS works. The following example is based on a banking system that processes mortgage applications. This banking system, which has been deployed to a VPC, publishes each mortgage application to an SNS topic. The SNS topic then fans out the mortgage application message to two subscribing AWS Lambda functions:

  • Save-Mortgage-Application stores the application in an Amazon DynamoDB table. As the mortgage application contains personally identifiable information (PII), the message must not traverse the public internet.
  • Save-Credit-Report checks the applicant’s credit history against an external Credit Reporting Agency (CRA), then stores the final credit report in an Amazon S3 bucket.

The following diagram depicts the underlying architecture for this banking system:
 
Diagram depicting the architecture for the example banking system
 
To protect applicants’ data, the financial institution responsible for developing this banking system needed a mechanism to prevent PII data from traversing the internet when publishing mortgage applications from their VPC to the SNS topic. Therefore, they created a VPC endpoint to enable their publisher Amazon EC2 instance to privately connect to the SNS API. As shown in the diagram, when the VPC endpoint is created, an Elastic Network Interface (ENI) is automatically placed in the same VPC subnet as the publisher EC2 instance. This ENI exposes a private IP address that is used as the entry point for traffic destined to SNS. This ensures that traffic between the VPC and SNS doesn’t leave the Amazon network.

Set up VPC Endpoints for SNS

The process for creating a VPC endpoint to privately connect to SNS doesn’t require code changes: access the VPC Management Console, navigate to the Endpoints section, and create a new Endpoint. Three attributes are required:

  • The SNS service name.
  • The VPC and Availability Zones (AZs) from which you’ll publish your messages.
  • The Security Group (SG) to be associated with the endpoint network interface. The Security Group controls the traffic to the endpoint network interface from resources in your VPC. If you don’t specify a Security Group, the default Security Group for your VPC will be associated.

Help ensure your security and compliance

SNS can support messaging use cases in regulated market segments, such as healthcare provider systems subject to the Health Insurance Portability and Accountability Act (HIPAA) and financial systems subject to the Payment Card Industry Data Security Standard (PCI DSS), and is also in-scope with the following Assurance Programs:

The SNS API is served through HTTP Secure (HTTPS), and encrypts all messages in transit with Transport Layer Security (TLS) certificates issued by Amazon Trust Services (ATS). The certificates verify the identity of the SNS API server when encrypted connections are established. The certificates help establish proof that your SNS API client (SDK, CLI) is communicating securely with the SNS API server. A Certificate Authority (CA) issues the certificate to a specific domain. Hence, when a domain presents a certificate that’s issued by a trusted CA, the SNS API client knows it’s safe to make the connection.

Summary

VPC Endpoints can increase the security of your pub/sub messaging use cases by allowing you to publish messages to SNS topics, from instances in your VPC, without traversing the internet. Setting up VPC Endpoints for SNS doesn’t require any code changes because the SNS API address remains the same.

VPC Endpoints for SNS is now available in all AWS Regions where AWS PrivateLink is available. For information on pricing and regional availability, visit the VPC pricing page.
For more information and on-boarding, see Publishing to Amazon SNS Topics from Amazon Virtual Private Cloud in the SNS documentation.

If you have comments about this post, submit them in the Comments section below. If you have questions about anything in this post, start a new thread on the Amazon SNS forum or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.