Tag Archives: messaging

Designing durable serverless apps with DLQs for Amazon SNS, Amazon SQS, AWS Lambda

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/designing-durable-serverless-apps-with-dlqs-for-amazon-sns-amazon-sqs-aws-lambda/

This post is courtesy of Otavio Ferreira, Sr Manager, SNS.

In a postal system, a dead-letter office is a facility for processing undeliverable mail. In pub/sub messaging, a dead-letter queue (DLQ) is a queue to which messages published to a topic can be sent, in case those messages cannot be delivered to a subscribed endpoint.

Amazon SNS supports DLQs, making your applications more resilient and durable upon delivery failure modes.

Understanding message delivery failures and retries

The delivery of a message fails when it’s not possible for Amazon SNS to access the subscribed endpoint. There are two reasons why this might happen:

  • Client errors, where the client is SNS (the message sender).
  • Server errors, where the server is the system that hosts the subscription endpoint (the message receiver), such as Amazon SQS or AWS Lambda.

Client errors

Client errors happen when SNS has stale subscription metadata. One common cause of client errors is when you (the endpoint owner) delete the endpoint. For example, you might delete the SQS queue that is subscribed to your SNS topic, without also deleting the SNS subscription corresponding to the queue. Another common cause is when you change the resource policy attached to your endpoint in a way that prevents SNS from delivering messages to that endpoint.

These errors are considered client errors because the client has attempted the delivery of a message to a destination that, from the client’s perspective, is no longer accessible. SNS does not retry the delivery of messages that failed as the result of client errors.

Server errors

Server errors happen when the system that powers the subscribed endpoint is unavailable, or when it returns an exception response indicating that it failed to process a valid request from SNS.

When server errors occur, SNS retries the failed deliveries according to a backoff function, which can be either linear or exponential. When a server error occurs for an AWS managed endpoint, backed by either SQS or Lambda, then SNS retries the delivery for up to 100,015 times, over 23 days.

Server errors can also happen with customer managed endpoints, namely HTTP, SMS, email, and mobile push endpoints. SNS also retries the delivery for these types of endpoints. HTTP endpoints support customer-defined retry policies, while SNS sets an internal delivery retry policy for SMS, email, and mobile push endpoints to 50 times, over 6 hours.

Delivery retries

SNS may receive a client error, or continue to receive a server error for a message beyond the number of retries defined by the corresponding retry policy. In that event, SNS discards the message. Setting a DLQ to your SNS subscription enables you to keep this message, regardless of the type of error, either client or server. DLQs give you more control over messages that cannot be delivered.

For more information on the delivery retry policy for each delivery protocol supported by SNS, see Amazon SNS Message Delivery Retry.

Using DLQs for AWS services

SNS, SQS, and Lambda support DLQs, addressing different failure modes. All DLQs are regular queues powered by SQS.

In SNS, DLQs store the messages that failed to be delivered to subscribed endpoints. For more information, see Amazon SNS Dead-Letter Queues.

In SQS, DLQs store the messages that failed to be processed by your consumer application. This failure mode can happen when producers and consumers fail to interpret aspects of the protocol that they use to communicate. In that case, the consumer receives the message from the queue, but fails to process it, as the message doesn’t have the structure or content that the consumer expects. The consumer can’t delete the message from the queue either. After exhausting the receive count in the redrive policy, SQS can sideline the message to the DLQ. For more information, see Amazon SQS Dead-Letter Queues.

In Lambda, DLQs store the messages that resulted in failed asynchronous executions of your Lambda function. An execution can result in an error for several reasons. Your code might raise an exception, time out, or run out of memory. The runtime executing your code might encounter an error and stop. Your function might hit its concurrency limit and be throttled. Regardless of the error type, when the error occurs, your code might have run completely, partially, or not at all. By default, Lambda retries an asynchronous execution twice. After exhausting the retries, Lambda can sideline the message to the DQL. For more information, see AWS Lambda Dead-Letter Queues.

When you have a fan-out architecture, with SQS queues and Lambda functions subscribed to an SNS topic, we recommend that you set DLQs to your SNS subscriptions, and to your destination queues and functions as well. This approach gives your application resilience against message delivery failures, message processing failures, and function execution failures too.

Applying DLQs in a use case

Here’s how everything comes together. The following diagram shows a serverless backend architecture that supports a car rental application. This is a durable serverless architecture based on DLQs for SNS, SQS, and Lambda.

Dead Letter Queue - DLQ SNS use case with architecture diagram

When a customer places an order to rent a car, the application sends that request to an API, which is powered by Amazon API Gateway. The REST API is backed by an SNS topic named Rental-Orders, and deployed onto an Amazon VPC subnet. The topic then fans out that order to the following two subscribed endpoints, for parallel processing:

  • An SQS queue, named Rental-Fulfilment, which feeds the integration with an internal fulfilment system hosted on Amazon EC2.
  • A Lambda function, named Rental-Billing, which processes and loads the customer order into a third-party billing system, also hosted on Amazon EC2.

To increase the durability of this serverless backend API, the following DLQs have been set up:

  • Two SNS DLQs, namely Rental-Fulfilment-Fanout-DLQ and Rental-Billing-Fanout-DLQ, which store the order in case either the subscribed SQS queue or Lambda function ever becomes unreachable.
  • An SQS DLQ, named Rental-Fulfilment-DLQ, which stores the order when the fulfilment system fails to process the order.
  • A Lambda DLQ, named Rental-Billing-DLQ, which stores the order when the function fails to process and load the order into the billing system.

When the DLQ captures the message, you can inspect the message for troubleshooting purposes. After you address the error at hand, you can poll the DLQ to retry the processing of the message.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or AWS CloudFormation. You can use the SDK, CLI, and API for polling the DLQs as well.

Configuring DLQs for subscriptions

You can attach a DLQ to an SNS subscription by setting the subscription’s RedrivePolicy parameter. The policy is a JSON object that refers to the DLQ ARN. The ARN must point to an SQS queue in the same AWS account as that of the SNS subscription. Also, both the DLQ and the subscription must be in the same AWS Region.

Here’s how you can configure one of the SNS DLQs applied in the car rental application example, presented earlier.

The following JSON object is a CloudFormation template that subscribes the SQS queue Rental-Fulfilment to the SNS topic Rental-Orders. The template also sets a RedrivePolicy that targets Rental-Fulfilment-Fanout-DLQ as a DLQ.

Lastly, the template sets a FilterPolicy value. It makes SNS deliver a message to the subscribed queue only if the published message carries an attribute named order-status with value set to either confirmed or canceled. As Amazon SNS Message Filtering happens before message delivery, messages that are filtered out aren’t sent to that subscription’s DLQ.

Internally, the CloudFormation template uses the SNS Subscribe API action for deploying the subscription and setting both policies, all part of the same API request.

{  
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
            "TopicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
            "RedrivePolicy": {
               "deadLetterTargetArn": 
                  "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"
            },
            "FilterPolicy": { 
               "order-status": [ "confirmed", "canceled" ]
            }
         }
      }
   }
}

Maybe the SNS topic and subscription are already deployed. In that case, you can use the SNS SetSubscriptionAttributes API action to set the RedrivePolicy, as shown by the following code examples, based on the AWS CLI and the AWS SDK for Java.

$ aws sns set-subscription-attributes 
   --region us-east-1
   --subscription-arn arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2
   --attribute-name RedrivePolicy 
   --attribute-value '{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"}'
AmazonSNS sns = AmazonSNSClientBuilder.defaultClient();

String subscriptionArn = "arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2";

String redrivePolicy = "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}";

SetSubscriptionAttributesRequest request = new SetSubscriptionAttributesRequest(
  subscriptionArn, 
  "RedrivePolicy", 
  redrivePolicy
);

sns.setSubscriptionAttributes(request);

Monitoring DLQs

You can use Amazon CloudWatch metrics and alarms to monitor the DLQs associated with your SNS subscriptions. In the car rental example, you can monitor the DLQs to be notified when the API failed to distribute any car rental order to the fulfillment or billing systems.

As regular SQS queues, the DLQs in SNS emit a number of metrics to CloudWatch, in 5-minute data points, such as NumberOfMessagesSent, NumberOfMessagesReceived and NumberOfMessagesDeleted. You can use these SQS metrics to be notified upon activity in your DLQs in SNS, so you may trigger a message recovery protocol.

You might have a case where you expect the DLQ to be always empty. In that case, create an CloudWatch alarm on NumberOfMessagesSent, set the alarm threshold to zero, and provide a separate SNS topic to be notified when the alarm goes off. The SNS topic, in its turn, can delivery your alarm notification to any endpoint type that you choose, such as email address, phone number, or mobile pager app.

Additionally, SNS itself provides its own set of metrics that are relevant to DLQs. Specifically, SNS metrics include the following:

  • NumberOfNotificationsRedrivenToDlq – Used when sending the message to the DLQ succeeds.
  • NumberOfNotificationsFailedToRedriveToDlq – Used when sending the message to the DLQ fails. This can happen because the DLQ either doesn’t exist anymore or doesn’t have the required access permissions to allow SNS to send messages to it. For more information about setting up the required access policy, see Giving Permissions for Amazon SNS to Send Messages to Amazon SQS.

Debugging with DLQs

Use CloudWatch Logs to see the exceptions that caused your SNS deliveries to fail and your messages to be sidelined to DLQs. In the car rental example, you can inspect the rental orders in the DLQs, as well as the logs associated with these queues. Then you can understand why those orders failed to be fanned out to the fulfilment or billing systems.

SNS can log both successful and failed deliveries in CloudWatch. You can enable Amazon SNS Delivery Status Logging by setting three SNS topic attributes, which are delivery protocol-specific. As an example, for SNS deliveries to SQS queues, you must set the following topic attributes: SQSSuccessFeedbackRoleArn,  SQSFailureFeedbackRoleArn, and SQSSuccessFeedbackSampleRate.

The following JSON object represents a successful SNS delivery in an CloudWatch Logs entry. The status code logged is 200 (SUCCESS). The attribute RedrivePolicy shows that the SNS subscription in question had its DLQ set.

{
  "notification": {
    "messageMD5Sum": "7bb3327ac55e49485bad42e159ca4d4b",
    "messageId": "e8c2bb09-235c-5f5d-b583-efd8df0f7d74",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:13:55.876"
  },
  "delivery": {
    "deliveryId": "6adf232e-fb12-5062-a564-27ff3741051f",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"sqsRequestId\":\"b2608a46-ccc4-51cc-003d-de972097debc\",\"sqsMessageId\":\"05fecd22-60a1-4d7d-bb79-026d49700b5a\"}",
    "dwellTimeMs": 58,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

The following JSON object represents a failed SNS delivery in CloudWatch Logs. In the following code example, the subscribed queue doesn’t exist. As a client error, the status code logged is 400 (FAILURE). Again, the RedrivePolicy attribute refers to a DLQ.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "9959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:51.116"
  },
  "delivery": {
    "deliveryId": "be743821-4c2c-5acc-a586-6cf0807f6fb1",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"ErrorCode\":\"AWS.SimpleQueueService.NonExistentQueue\", \"ErrorMessage\":\"The specified queue does not exist or you do not have access to it.\",\"sqsRequestId\":\"Unrecoverable\"}",
    "dwellTimeMs": 53,
    "attempts": 1,
    "statusCode": 400
  },
  "status": "FAILURE"
}

When the message delivery fails and there is a DLQ attached to the subscription, the message is sent to the DLQ and an additional entry is logged in CloudWatch. This new entry is specific to the delivery to the DLQ and refers to the DLQ ARN as the destination, as shown in the following JSON object.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "8959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:52.876"
  },
  "delivery": {
    "deliveryId": "a877c79f-a3ee-5105-9bbd-92596eae0232",
    "destination":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ",
    "providerResponse": "{\"sqsRequestId\":\"8cef1af5-e86a-519e-ad36-4f33252aa5ec\",\"sqsMessageId\":\"2b742c5c-0750-4ec5-a717-b95897adda8e\"}",
    "dwellTimeMs": 51,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

By analyzing Amazon CloudWatch Logs entries, you can understand why an SNS message was moved to a DLQ, and then take the required set of steps to recover the message. When you enable delivery status logging in SNS, you can configure the sample rate in which deliveries are logged, from 0% to 100%.

Encrypting DLQs

When your SNS subscription targets an SQS encrypted queue, then you probably want your DLQ to be an SQS encrypted queue as well. This configuration provides consistency in the form that your messages are encrypted at rest.

To follow this security recommendation, give the CMK you used to encrypt your DLQ a key policy that grants the SNS service principal access to AWS KMS API actions. For example, see the following sample key policy:

{
    "Sid": "GrantSnsAccessToKms",
    "Effect": "Allow",
    "Principal": { "Service": "sns.amazonaws.com" },
    "Action": [ "kms:Decrypt", "kms:GenerateDataKey*" ],
    "Resource": "*"
}

If you have an SNS encrypted topic, but a subscription in this topic points to a DLQ that isn’t an SQS encrypted queue, then messages sidelined to the DLQ aren’t encrypted at rest.

For more information, see Enabling Server-Side Encryption (SSE) for an Amazon SNS Topic with an Amazon SQS Encrypted Queue Subscribed.

Summary

DLQs for SNS, SQS, and Lambda increase the resiliency and durability of your applications. These DLQs address different failure modes, and can be used together.

  • SNS DLQs store messages that failed to be delivered to subscribed endpoints.
  • SQS DLQs store messages that the consumer system failed to process.
  • Lambda DLQs store the messages that resulted in failed asynchronous executions of your functions.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or CloudFormation. DLQs are available in all AWS Regions. Start today by running the tutorials:

Visit the AWS Digital User Engagement team at AWS re:Invent 2019

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/visit-the-aws-digital-user-engagement-team-at-aws-reinvent-2019/

AWS re:Invent 2019 is less than 50 days away, and that means it’s time to start planning your agenda. The Digital User Engagement team is hosting several builders sessions, chalk talks, and workshops this year. Come join us and learn more about using Amazon Pinpoint and Amazon SES to engage with and delight your customers.

Visit our booth

You’ll find our booth in the Expo Hall in the Venetian. Stop by to meet the team, see a demo, and pick up some swag!

Leadership session

EUC206: How AWS is defining the future of engagement and messaging

  • What: Simon Poile, the General Manager of the AWS Digital User Engagement team, talks about how AWS is building on Amazon’s customer-centric culture of innovation to help you better engage your customers. You’ll also hear from AWS customer Coinbase, which uses Amazon Pinpoint to delight its customers while growing its business.
  • When: Wednesday, Dec 4, 10:45 AM – 11:45 AM
  • Where: MGM, Level 3, Chairman’s Ballroom 368

Sessions

EUC207: Build high-volume email applications with Amazon SES

  • What: Companies in many industries use AWS to send millions of emails every day, including Amazon.com. In this session, learn how to build applications using the highly scalable, highly reliable, and multi-tenant-capable email infrastructure of Amazon Simple Email Service (Amazon SES). You also learn how to monitor delivery rates and other important metrics, and how to use this data to improve deliverability. Members of the Amazon.com team discuss the architecture of their multi-tenant email-sending platform, the historical challenges they faced, and the ways Amazon Pinpoint and Amazon SES helped them meet their goals around Prime Day, Cyber Monday, and other retail events.
  • When: Monday, Dec 2, 1:00 PM – 2:00 PM
  • Where: MGM, Level 1, Grand Ballroom 119

Chalk talks

EUC328: Engage with your customers using SMS text messages

  • What: Text messages form a vital part of customer-engagement strategy for organizations around the world. In this workshop, learn how to use Amazon Pinpoint to send promotional, transactional, and two-way SMS messages. You also see demonstrations of how other AWS customers use SMS messaging to engage with their customers.
  • When: Wednesday, Dec 4, 2:30 PM – 3:30 PM
  • Where: Bellagio, Bellagio Ballroom 5

EUC336: Surprise and delight customers with location-based notifications

  • What: In this chalk talk, learn how to use AWS Amplify, AWS AppSync, and Amazon Pinpoint to geo-target customers. We teach you how to build and configure geofences to trigger location-based mobile-app notifications. We also walk you through the published solution and provide dedicated time for Q&A with an AWS solutions architect.
  • When: Thursday, Dec 5, 2:30 PM – 3:30 PM
  • Where: Aria, Plaza Level East, Orovada 3

AIM346-R and AIM346-R1: Personalized user engagement with machine learning

  • What: In this chalk talk, we discuss how to use Amazon Personalize and Amazon Pinpoint to provide a personalized, omni-channel experience starting in your mobile application. We discuss best practices for real-time updates, personalized notifications (push), and messaging (email and text) that drives user engagement and product discovery. We also demonstrate how other mobile services can be used to facilitate rapid prototyping.
Session #WhenWhere
AIM346-RMonday, Dec 2, 11:30 AM – 12:30 PMBellagio, Bellagio Ballroom 7
AIM346-R1Tuesday, Dec 3, 4:00 PM – 5:00 PMAria, Plaza Level East, Orovada 3

ENT315: Improve message deliverability to ensure customer reach

  • What: Do you have outbound and inbound email requirements? Is email a critical workload for your enterprise? Several factors determine whether your email messages reach your recipients. In this chalk talk, learn how to safely migrate your outbound and inbound email volumes over to AWS and Amazon Simple Email Service (Amazon SES). Learn how to onboard, safely ramp up, and ensure that business continues without disruption. Also learn best practices for delivering email messages into your customers’ inboxes rather than their spam folders, and receive guidance on scaling and improving the deliverability of your email campaigns.
  • When: Thursday, Dec 5, 11:30 AM – 12:30 PM
  • Where: Aria, Plaza Level East, Orovada 3

Builder’s sessions

EUC308-R and EUC308-R1: Build and deploy your own two-way text chatbot

  • What: In this builders session, you build an AI-powered chatbot that your customers can engage with by sending SMS messages. Your chatbot can help customers quickly ask questions, get answers, book appointments, check order status, and much more.
Session #WhenWhere
EUC308-RTuesday, Dec 3, 1:00 PM – 2:00 PMMirage, Grand Ballroom B – Table 9
EUC308-R1Wednesday, Dec 4, 4:00 PM – 5:00 PMAria, Level 1 West, Bristlecone 2 – Table 2

EUC309-R and EUC309-R1: Build your own omnichannel e-commerce experience

  • What: In this hands-on session, you learn how to integrate AWS Amplify and Amazon Pinpoint to create a retail website. You use the event data that’s generated by customers’ activities on your site to send custom-tailored emails and push notifications, creating a curated, omnichannel experience. This session is intended for builders who want to expand the user-engagement capabilities of their sites and apps.
Session #WhenWhere
EUC309-RMonday, Dec 2, 12:15 PM – 1:15 PMAria, Level 1 West, Bristlecone 4 – Table 8
EUC309-R1Tuesday, Dec 3, 11:30 AM – 12:30 PMAria, Level 1 West, Bristlecone 2 – Table 1

EUC322: Improve customer engagement by predicting user behavior

  • What: In this hands-on session, you learn how to use Amazon SageMaker and Amazon Pinpoint to create customer engagement scenarios powered by machine learning. You use cross-channel customer-activity and demographic data to train your own behavioral models. After you use your model to categorize your customers, you use Amazon Pinpoint to send engagement campaigns that are optimized to reengage users. This session is intended for builders, marketers, or data scientists who want to improve user engagement using machine learning.
  • When: Monday, Dec 2, 10:45 AM – 11:45 AM
  • Where: Aria, Level 1 West, Bristlecone 4 – Table 2

Sending Push Notifications to iOS 13 Devices with Amazon SNS

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/sending-push-notifications-to-ios-13-devices-with-amazon-sns/

Note: This post was written by Alan Chuang, a Senior Product Manager on the AWS Messaging team.


On September 19, 2019, Apple released iOS 13. This update introduced changes to the Apple Push Notification service (APNs) that can impact your existing workloads. Amazon SNS has made some changes that ensure uninterrupted delivery of push notifications to iOS 13 devices.

iOS 13 introduced a new and required header called apns-push-type. The value of this header informs APNs of the contents of your notification’s payload so that APNs can respond to the message appropriately. The possible values for this header are:

  • alert
  • background
  • voip
  • complication
  • fileprovider
  • mdm

Apple’s documentation indicates that the value of this header “must accurately reflect the contents of your notification’s payload. If there is a mismatch, or if the header is missing on required systems, APNs may return an error, delay the delivery of the notification, or drop it altogether.”

We’ve made some changes to the Amazon SNS API that make it easier for developers to handle this breaking change. When you send push notifications of the alert or background type, Amazon SNS automatically sets the apns-push-type header to the appropriate value for your message. For more information about creating an alert type and a background type notification, see Generating a Remote Notification and Pushing Background Updates to Your App on the Apple Developer website.

Because you might not have enough time to react to this breaking change, Amazon SNS provides two options:

  • If you want to set the apns-push-type header value yourself, or the contents of your notification’s payload require the header value to be set to voip, complication, fileprovider, or mdm, Amazon SNS lets you set the header value as a message attribute using the Amazon SNS Publish API action. For more information, see Specifying Custom APNs Header Values and Reserved Message Attributes for Mobile Push Notifications in the Amazon SNS Developer Guide.
  • If you send push notifications as alert or background type, and if the contents of your notification’s payload follow the format described in the Apple Developer documentation, then Amazon SNS automatically sets the correct header value. To send a background notification, the format requires the aps dictionary to have only the content-available field set to 1. For more information about creating an alert type and a background type notification, see Generating a Remote Notification and Pushing Background Updates to Your App on the Apple Developer website.

We hope these changes make it easier for you to send messages to your customers who use iOS 13. If you have questions about these changes, please open a ticket with our Customer Support team through the AWS Management Console.

Predictive User Engagement using Amazon Pinpoint and Amazon Personalize

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/predictive-user-engagement-using-amazon-pinpoint-and-amazon-personalize/

Note: This post was written by John Burry, a Solution Architect on the AWS Customer Engagement team.


Predictive User Engagement (PUE) refers to the integration of machine learning (ML) and customer engagement services. By implementing a PUE solution, you can combine ML-based predictions and recommendations with real-time notifications and analytics, all based on your customers’ behaviors.

This blog post shows you how to set up a PUE solution by using Amazon Pinpoint and Amazon Personalize. Best of all, you can implement this solution even if you don’t have any prior machine learning experience. By completing the steps in this post, you’ll be able to build your own model in Personalize, integrate it with Pinpoint, and start sending personalized campaigns.

Prerequisites

Before you complete the steps in this post, you need to set up the following:

  • Create an admin user in Amazon Identity and Account Management (IAM). For more information, see Creating Your First IAM Admin User and Group in the IAM User Guide. You need to specify the credentials of this user when you set up the AWS Command Line Interface.
  • Install Python 3 and the pip package manager. Python 3 is installed by default on recent versions of Linux and macOS. If it isn’t already installed on your computer, you can download an installer from the Python website.
  • Use pip to install the following modules:
    • awscli
    • boto3
    • jupyter
    • matplotlib
    • sklearn
    • sagemaker

    For more information about installing modules, see Installing Python Modules in the Python 3.X Documentation.

  • Configure the AWS Command Line Interface (AWS CLI). During the configuration process, you have to specify a default AWS Region. This solution uses Amazon Sagemaker to build a model, so the Region that you specify has to be one that supports Amazon Sagemaker. For a complete list of Regions where Sagemaker is supported, see AWS Service Endpoints in the AWS General Reference. For more information about setting up the AWS CLI, see Configuring the AWS CLI in the AWS Command Line Interface User Guide.
  • Install Git. Git is installed by default on most versions of Linux and macOS. If Git isn’t already installed on your computer, you can download an installer from the Git website.

Step 1: Create an Amazon Pinpoint Project

In this section, you create and configure a project in Amazon Pinpoint. This project contains all of the customers that we will target, as well as the recommendation data that’s associated with each one. Later, we’ll use this data to create segments and campaigns.

To set up the Amazon Pinpoint project

  1. Sign in to the Amazon Pinpoint console at http://console.aws.amazon.com/pinpoint/.
  2. On the All projects page, choose Create a project. Enter a name for the project, and then choose Create.
  3. On the Configure features page, under SMS and voice, choose Configure.
  4. Under General settings, select Enable the SMS channel for this project, and then choose Save changes.
  5. In the navigation pane, under Settings, choose General settings. In the Project details section, copy the value under Project ID. You’ll need this value later.

Step 2: Create an Endpoint

In Amazon Pinpoint, an endpoint represents a specific method of contacting a customer, such as their email address (for email messages) or their phone number (for SMS messages). Endpoints can also contain custom attributes, and you can associate multiple endpoints with a single user. In this example, we use these attributes to store the recommendation data that we receive from Amazon Personalize.

In this section, we create a new endpoint and user by using the AWS CLI. We’ll use this endpoint to test the SMS channel, and to test the recommendations that we receive from Personalize.

To create an endpoint by using the AWS CLI

  1. At the command line, enter the following command:
    aws pinpoint update-endpoint --application-id <project-id> \
    --endpoint-id 12456 --endpoint-request "Address='<mobile-number>', \
    ChannelType='SMS',User={UserAttributes={recommended_items=['none']},UserId='12456'}"

    In the preceding example, replace <project-id> with the Amazon Pinpoint project ID that you copied in Step 1. Replace <mobile-number> with your phone number, formatted in E.164 format (for example, +12065550142).

Note that this endpoint contains hard-coded UserId and EndpointId values of 12456. These IDs match an ID that we’ll create later when we generate the Personalize data set.

Step 3: Create a Segment and Campaign in Amazon Pinpoint

Now that we have an endpoint, we need to add it to a segment so that we can use it within a campaign. By sending a campaign, we can verify that our Pinpoint project is configured correctly, and that we created the endpoint correctly.

To create the segment and campaign

  1. Open the Pinpoint console at http://console.aws.amazon.com/pinpoint, and then choose the project that you created in Step 1.
  2. In the navigation pane, choose Segments, and then choose Create a segment.
  3. Name the segment “No recommendations”. Under Segment group 1, on the Add a filter menu, choose Filter by user.
  4. On the Choose a user attribute menu, choose recommended-items. Set the value of the filter to “none”.
  5. Confirm that the Segment estimate section shows that there is one eligible endpoint, and then choose Create segment.
  6. In the navigation pane, choose Campaigns, and then choose Create a campaign.
  7. Name the campaign “SMS to users with no recommendations”. Under Choose a channel for this campaign, choose SMS, and then choose Next.
  8. On the Choose a segment page, choose the “No recommendations” segment that you just created, and then choose Next.
  9. In the message editor, type a test message, and then choose Next.
  10. On the Choose when to send the campaign page, keep all of the default values, and then choose Next.
  11. On the Review and launch page, choose Launch campaign. Within a few seconds, you receive a text message at the phone number that you specified when you created the endpoint.

Step 4: Load sample data into Amazon Personalize

At this point, we’ve finished setting up Amazon Pinpoint. Now we can start loading data into Amazon Personalize.

To load the data into Amazon Personalize

  1. At the command line, enter the following command to clone the sample data and Jupyter Notebooks to your computer:
    git clone https://github.com/markproy/personalize-car-search.git

  2. At the command line, change into the directory that contains the data that you just cloned. Enter the following command:
    jupyter notebook

    A new window opens in your web browser.

  3. In your web browser, open the first notebook (01_generate_data.ipynb). On the Cell menu, choose Run all. Wait for the commands to finish running.
  4. Open the second notebook (02_make_dataset_group.ipynb). In the first step, replace the value of the account_id variable with the ID of your AWS account. Then, on the Cell menu, choose Run all. This step takes several minutes to complete. Make sure that all of the commands have run successfully before you proceed to the next step.
  5. Open the third notebook (03_make_campaigns.ipynb). In the first step, replace the value of the account_id variable with the ID of your AWS account. Then, on the Cell menu, choose Run all. This step takes several minutes to complete. Make sure that all of the commands have run successfully before you proceed to the next step.
  6. Open the fourth notebook (04_use_the_campaign.ipynb). In the first step, replace the value of the account_id variable with the ID of your AWS account. Then, on the Cell menu, choose Run all. This step takes several minutes to complete.
  7. After the fourth notebook is finished running, choose Quit to terminate the Jupyter Notebook. You don’t need to run the fifth notebook for this example.
  8. Open the Amazon Personalize console at http://console.aws.amazon.com/personalize. Verify that Amazon Personalize contains one dataset group named car-dg.
  9. In the navigation pane, choose Campaigns. Verify that it contains all of the following campaigns, and that the status for each campaign is Active:
    • car-popularity-count
    • car-personalized-ranking
    • car-hrnn-metadata
    • car-sims
    • car-hrnn

Step 5: Create the Lambda function

We’ve loaded the data into Amazon Personalize, and now we need to create a Lambda function to update the endpoint attributes in Pinpoint with the recommendations provided by Personalize.

The version of the AWS SDK for Python that’s included with Lambda doesn’t include the libraries for Amazon Personalize. For this reason, you need to download these libraries to your computer, put them in a .zip file, and upload the entire package to Lambda.

To create the Lambda function

  1. In a text editor, create a new file. Paste the following code.
    # Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
    #
    # This file is licensed under the Apache License, Version 2.0 (the "License").
    # You may not use this file except in compliance with the License. A copy of the
    # License is located at
    #
    # http://aws.amazon.com/apache2.0/
    #
    # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
    # OF ANY KIND, either express or implied. See the License for the specific
    # language governing permissions and limitations under the License.
    
    AWS_REGION = "<region>"
    PROJECT_ID = "<project-id>"
    CAMPAIGN_ARN = "<car-hrnn-campaign-arn>"
    USER_ID = "12456"
    endpoint_id = USER_ID
    
    from datetime import datetime
    import json
    import boto3
    import logging
    from botocore.exceptions import ClientError
    
    DATE = datetime.now()
    
    personalize           = boto3.client('personalize')
    personalize_runtime   = boto3.client('personalize-runtime')
    personalize_events    = boto3.client('personalize-events')
    pinpoint              = boto3.client('pinpoint')
    
    def lambda_handler(event, context):
        itemList = get_recommended_items(USER_ID,CAMPAIGN_ARN)
        response = update_pinpoint_endpoint(PROJECT_ID,endpoint_id,itemList)
    
        return {
            'statusCode': 200,
            'body': json.dumps('Lambda execution completed.')
        }
    
    def get_recommended_items(user_id, campaign_arn):
        response = personalize_runtime.get_recommendations(campaignArn=campaign_arn, 
                                                           userId=str(user_id), 
                                                           numResults=10)
        itemList = response['itemList']
        return itemList
    
    def update_pinpoint_endpoint(project_id,endpoint_id,itemList):
        itemlistStr = []
        
        for item in itemList:
            itemlistStr.append(item['itemId'])
    
        pinpoint.update_endpoint(
        ApplicationId=project_id,
        EndpointId=endpoint_id,
        EndpointRequest={
                            'User': {
                                'UserAttributes': {
                                    'recommended_items': 
                                        itemlistStr
                                }
                            }
                        }
        )    
    
        return
    

    In the preceding code, make the following changes:

    • Replace <region> with the name of the AWS Region that you want to use, such as us-east-1.
    • Replace <project-id> with the ID of the Amazon Pinpoint project that you created earlier.
    • Replace <car-hrnn-campaign-arn> with the Amazon Resource Name (ARN) of the car-hrnn campaign in Amazon Personalize. You can find this value in the Amazon Personalize console.
  2. Save the file as pue-get-recs.py.
  3. Create and activate a virtual environment. In the virtual environment, use pip to download the latest versions of the boto3 and botocore libraries. For complete procedures, see Updating a Function with Additional Dependencies With a Virtual Environment in the AWS Lambda Developer Guide. Also, add the pue-get-recs.py file to the .zip file that contains the libraries.
  4. Open the IAM console at http://console.aws.amazon.com/iam. Create a new role. Attach the following policy to the role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogStream",
                    "logs:DescribeLogGroups",
                    "logs:CreateLogGroup",
                    "logs:PutLogEvents",
                    "personalize:GetRecommendations",
                    "mobiletargeting:GetUserEndpoints",
                    "mobiletargeting:GetApp",
                    "mobiletargeting:UpdateEndpointsBatch",
                    "mobiletargeting:GetApps",
                    "mobiletargeting:GetEndpoint",
                    "mobiletargeting:GetApplicationSettings",
                    "mobiletargeting:UpdateEndpoint"
                ],
                "Resource": "*"
            }
        ]
    }
    
  5. Open the Lambda console at http://console.aws.amazon.com/lambda, and then choose Create function.
  6. Create a new Lambda function from scratch. Choose the Python 3.7 runtime. Under Permissions, choose Use an existing role, and then choose the IAM role that you just created. When you finish, choose Create function.
  7. Upload the .zip file that contains the Lambda function and the boto3 and botocore libraries.
  8. Under Function code, change the Handler value to pue-get-recs.lambda_handler. Save your changes.

When you finish creating the function, you can test it to make sure it was set up correctly.

To test the Lambda function

  1. On the Select a test event menu, choose Configure test events. On the Configure test events window, specify an Event name, and then choose Create.
  2. Choose the Test button to execute the function.
  3. If the function executes successfully, open the Amazon Pinpoint console at http://console.aws.amazon.com/pinpoint.
  4. In the navigation pane, choose Segments, and then choose the “No recommendations” segment that you created earlier. Verify that the number under total endpoints is 0. This is the expected value; the segment is filtered to only include endpoints with no recommendation attributes, but when you ran the Lambda function, it added recommendations to the test endpoint.

Step 7: Create segments and campaigns based on recommended items

In this section, we’ll create a targeted segment based on the recommendation data provided by our Personalize dataset. We’ll then use that segment to create a campaign.

To create a segment and campaign based on personalized recommendations

  1. Open the Amazon Pinpoint console at http://console.aws.amazon.com/pinpoint. On the All projects page, choose the project that you created earlier.
  2. In the navigation pane, choose Segments, and then choose Create a segment. Name the new segment “Recommendations for product 26304”.
  3. Under Segment group 1, on the Add a filter menu, choose Filter by user. On the Choose a user attribute menu, choose recommended-items. Set the value of the filter to “26304”. Confirm that the Segment estimate section shows that there is one eligible endpoint, and then choose Create segment.
  4. In the navigation pane, choose Campaigns, and then choose Create a campaign.
  5. Name the campaign “SMS to users with recommendations for product 26304”. Under Choose a channel for this campaign, choose SMS, and then choose Next.
  6. On the Choose a segment page, choose the “Recommendations for product 26304” segment that you just created, and then choose Next.
  7. In the message editor, type a test message, and then choose Next.
  8. On the Choose when to send the campaign page, keep all of the default values, and then choose Next.
  9. On the Review and launch page, choose Launch campaign. Within a few seconds, you receive a text message at the phone number that you specified when you created the endpoint.

Next steps

Your PUE solution is now ready to use. From here, there are several ways that you can make the solution your own:

  • Expand your usage: If you plan to continue sending SMS messages, you should request a spending limit increase.
  • Extend to additional channels: This post showed the process of setting up an SMS campaign. You can add more endpoints—for the email or push notification channels, for example—and associate them with your users. You can then create new segments and new campaigns in those channels.
  • Build your own model: This post used a sample data set, but Amazon Personalize makes it easy to provide your own data. To start building a model with Personalize, you have to provide a data set that contains information about your users, items, and interactions. To learn more, see Getting Started in the Amazon Personalize Developer Guide.
  • Optimize your model: You can enrich your model by sending your mobile, web, and campaign engagement data to Amazon Personalize. In Pinpoint, you can use event streaming to move data directly to S3, and then use that data to retrain your Personalize model. To learn more about streaming events, see Streaming App and Campaign Events in the Amazon Pinpoint User Guide.
  • Update your recommendations on a regular basis: Use the create-campaign API to create a new recurring campaign. Rather than sending messages, include the hook property with a reference to the ARN of the pue-get-recs function. By completing this step, you can configure Pinpoint to retrieve the most up-to-date recommendation data each time the campaign recurs. For more information about using Lambda to modify segments, see Customizing Segments with AWS Lambda in the Amazon Pinpoint Developer Guide.

Forward Incoming Email to an External Destination

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/forward-incoming-email-to-an-external-destination/

Note: This post was written by Vesselin Tzvetkov, an AWS Senior Security Architect, and by Rostislav Markov, an AWS Senior Engagement Manager.


Amazon SES has included support for incoming email for several years now. However, some customers have told us that they need a solution for forwarding inbound emails to domains that aren’t managed by Amazon SES. In this post, you’ll learn how to use Amazon SES, Amazon S3, and AWS Lambda to create a solution that forwards incoming email to an email address that’s managed outside of Amazon SES.

Architecture

This solution uses several AWS services to forward incoming emails to a single, external email address. The following diagram shows the flow of information in this solution.

The following actions occur in this solution:

  1. A new email is sent from an external sender to your domain. Amazon SES handles the incoming email for your domain.
  2. An Amazon SES receipt rule saves the incoming message in an S3 bucket.
  3. An Amazon SES receipt rule triggers the execution of a Lambda function.
  4. The Lambda function retrieves the message content from S3, and then creates a new message and sends it to Amazon SES.
  5. Amazon SES sends the message to the destination server.

Limitations

This solution works in all AWS Regions where Amazon SES is currently available. For a complete list of supported Regions, see AWS Service Endpoints in the AWS General Reference.

Prerequisites

In order to complete this procedure, you need to have a domain that receives incoming email. If you don’t already have a domain, you can purchase one through Amazon Route 53. For more information, see Registering a New Domain in the Amazon Route 53 Developer Guide. You can also purchase a domain through one of several third-party vendors.

Procedures

Step 1: Set up Your Domain

  1. In Amazon SES, verify the domain that you want to use to receive incoming email. For more information, see Verifying Domains in the Amazon SES Developer Guide.
  2. Add the following MX record to the DNS configuration for your domain:
    10 inbound-smtp.<regionInboundUrl>.amazonaws.com

    Replace <regionInboundUrl> with the URL of the email receiving endpoint for the AWS Region that you use Amazon SES in. For a complete list of URLs, see AWS Service Endpoints – Amazon SES in the AWS General Reference.

  3. If your account is still in the Amazon SES sandbox, submit a request to have it removed. For more information, see Moving Out of the Sandbox in the Amazon SES Developer Guide.

Step 2: Configure Your S3 Bucket

  1. In Amazon S3, create a new bucket. For more information, see Create a Bucket in the Amazon S3 Getting Started Guide.
  2. Apply the following policy to the bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowSESPuts",
                "Effect": "Allow",
                "Principal": {
                    "Service": "ses.amazonaws.com"
                },
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::<bucketName>/*",
                "Condition": {
                    "StringEquals": {
                        "aws:Referer": "<awsAccountId>"
                    }
                }
            }
        ]
    }

  3. In the policy, make the following changes:
    • Replace <bucketName> with the name of your S3 bucket.
    • Replace <awsAccountId> with your AWS account ID.

    For more information, see Using Bucket Policies and User Policies in the Amazon S3 Developer Guide.

Step 3: Create an IAM Policy and Role

  1. Create a new IAM Policy with the following permissions:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogStream",
                    "logs:CreateLogGroup",
                    "logs:PutLogEvents"
                ],
                "Resource": "*"
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "ses:SendRawEmail"
                ],
                "Resource": [
                    "arn:aws:s3:::<bucketName>/*",
                    "arn:aws:ses:<region>:<awsAccountId>:identity/*"
                ]
            }
        ]
    }

    In the preceding policy, make the following changes:

    • Replace <bucketName> with the name of the S3 bucket that you created earlier.
    • Replace <region> with the name of the AWS Region that you created the bucket in.
    • Replace <awsAccountId> with your AWS account ID. For more information, see Create a Customer Managed Policy in the IAM User Guide.
  2. Create a new IAM role. Attach the policy that you just created to the new role. For more information, see Creating Roles in the IAM User Guide.

Step 4: Create the Lambda Function

  1. In the Lambda console, create a new Python 3.7 function from scratch. For the execution role, choose the IAM role that you created earlier.
  2. In the code editor, paste the following code.
    # Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
    #
    # This file is licensed under the Apache License, Version 2.0 (the "License").
    # You may not use this file except in compliance with the License. A copy of the
    # License is located at
    #
    # http://aws.amazon.com/apache2.0/
    #
    # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
    # OF ANY KIND, either express or implied. See the License for the specific
    # language governing permissions and limitations under the License.
    
    import os
    import boto3
    import email
    import re
    from botocore.exceptions import ClientError
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.application import MIMEApplication
    
    region = os.environ['Region']
    
    def get_message_from_s3(message_id):
    
        incoming_email_bucket = os.environ['MailS3Bucket']
        incoming_email_prefix = os.environ['MailS3Prefix']
    
        if incoming_email_prefix:
            object_path = (incoming_email_prefix + "/" + message_id)
        else:
            object_path = message_id
    
        object_http_path = (f"http://s3.console.aws.amazon.com/s3/object/{incoming_email_bucket}/{object_path}?region={region}")
    
        # Create a new S3 client.
        client_s3 = boto3.client("s3")
    
        # Get the email object from the S3 bucket.
        object_s3 = client_s3.get_object(Bucket=incoming_email_bucket,
            Key=object_path)
        # Read the content of the message.
        file = object_s3['Body'].read()
    
        file_dict = {
            "file": file,
            "path": object_http_path
        }
    
        return file_dict
    
    def create_message(file_dict):
    
        sender = os.environ['MailSender']
        recipient = os.environ['MailRecipient']
    
        separator = ";"
    
        # Parse the email body.
        mailobject = email.message_from_string(file_dict['file'].decode('utf-8'))
    
        # Create a new subject line.
        subject_original = mailobject['Subject']
        subject = "FW: " + subject_original
    
        # The body text of the email.
        body_text = ("The attached message was received from "
                  + separator.join(mailobject.get_all('From'))
                  + ". This message is archived at " + file_dict['path'])
    
        # The file name to use for the attached message. Uses regex to remove all
        # non-alphanumeric characters, and appends a file extension.
        filename = re.sub('[^0-9a-zA-Z]+', '_', subject_original) + ".eml"
    
        # Create a MIME container.
        msg = MIMEMultipart()
        # Create a MIME text part.
        text_part = MIMEText(body_text, _subtype="html")
        # Attach the text part to the MIME message.
        msg.attach(text_part)
    
        # Add subject, from and to lines.
        msg['Subject'] = subject
        msg['From'] = sender
        msg['To'] = recipient
    
        # Create a new MIME object.
        att = MIMEApplication(file_dict["file"], filename)
        att.add_header("Content-Disposition", 'attachment', filename=filename)
    
        # Attach the file object to the message.
        msg.attach(att)
    
        message = {
            "Source": sender,
            "Destinations": recipient,
            "Data": msg.as_string()
        }
    
        return message
    
    def send_email(message):
        aws_region = os.environ['Region']
    
    # Create a new SES client.
        client_ses = boto3.client('ses', region)
    
        # Send the email.
        try:
            #Provide the contents of the email.
            response = client_ses.send_raw_email(
                Source=message['Source'],
                Destinations=[
                    message['Destinations']
                ],
                RawMessage={
                    'Data':message['Data']
                }
            )
    
        # Display an error if something goes wrong.
        except ClientError as e:
            output = e.response['Error']['Message']
        else:
            output = "Email sent! Message ID: " + response['MessageId']
    
        return output
    
    def lambda_handler(event, context):
        # Get the unique ID of the message. This corresponds to the name of the file
        # in S3.
        message_id = event['Records'][0]['ses']['mail']['messageId']
        print(f"Received message ID {message_id}")
    
        # Retrieve the file from the S3 bucket.
        file_dict = get_message_from_s3(message_id)
    
        # Create the message.
        message = create_message(file_dict)
    
        # Send the email and print the result.
        result = send_email(message)
        print(result)
  3. Create the following environment variables for the Lambda function:

    KeyValue
    MailS3BucketThe name of the S3 bucket that you created earlier.
    MailS3PrefixThe path of the folder in the S3 bucket where you will store incoming email.
    MailSenderThe email address that the forwarded message will be sent from. This address has to be verified.
    MailRecipientThe address that you want to forward the message to.
    RegionThe name of the AWS Region that you want to use to send the email.
  4. Under Basic settings, set the Timeout value to 30 seconds.

(Optional) Step 5: Create an Amazon SNS Topic

You can optionally create an Amazon SNS topic. This step is helpful for troubleshooting purposes, or if you just want to receive additional notifications when you receive a message.

  1. Create a new Amazon SNS topic. For more information, see Creating a Topic in the Amazon SNS Developer Guide.
  2. Subscribe an endpoint, such as an email address, to the topic. For more information, see Subscribing an Endpoint to a Topic in the Amazon SNS Developer Guide.

Step 6: Create a Receipt Rule Set

  1. In the Amazon SES console, create a new Receipt Rule Set. For more information, see Creating a Receipt Rule Set in the Amazon SES Developer Guide.
  2. In the Receipt Rule Set that you just created, add a Receipt Rule. In the Receipt Rule, add an S3 Action. Set up the S3 Action to send your email to the S3 bucket that you created earlier.
  3. Add a Lambda action to the Receipt Rule. Configure the Receipt Rule to invoke the Lambda function that you created earlier.

For more information, see Setting Up a Receipt Rule in the Amazon SES Developer Guide.

Step 7: Test the Function

  • Send an email to an address that corresponds with an address in the Receipt Rule you created earlier. Make sure that the email arrives in the correct S3 bucket. In a minute or two, the email arrives in the inbox that you specified in the MailRecipient variable of the Lambda function.

Troubleshooting

If you send a test message, but it is never forwarded to your destination email address, do the following:

  • Make sure that the Amazon SES Receipt Rule is active.
  • Make sure that the email address that you specified in the MailRecipient variable of the Lambda function is correct.
  • Subscribe an email address or phone number to the SNS topic. Send another test email to your domain. Make sure that SNS sends a Received notification to your subscribed email address or phone number.
  • Check the CloudWatch Log for your Lambda function to see if any errors occurred.

If you send a test email to your receiving domain, but you receive a bounce notification, do the following:

  • Make sure that the verification process for your domain completed successfully.
  • Make sure that the MX record for your domain specifies the correct Amazon SES receiving endpoint.
  • Make sure that you’re sending to an address that is handled by the receipt rule.

Costs of using this solution

The cost of implementing this solution is minimal. If you receive 10,000 emails per month, and each email is 2KB in size, you pay $1.00 for your use of Amazon SES. For more information, see Amazon SES Pricing.

You also pay a small charge to store incoming emails in Amazon S3. The charge for storing 1,000 emails that are each 2KB in size is less than one cent. Your use of Amazon S3 might qualify for the AWS Free Usage Tier. For more information, see Amazon S3 Pricing.

Finally, you pay for your use of AWS Lambda. With Lambda, you pay for the number of requests you make, for the amount of compute time that you use, and for the amount of memory that you use. If you use Lambda to forward 1,000 emails that are each 2KB in size, you pay no more than a few cents. Your use of AWS Lambda might qualify for the AWS Free Usage Tier. For more information, see AWS Lambda Pricing.

Note: These cost estimates don’t include the costs associated with purchasing a domain, since many users already have their own domains. The cost of obtaining a domain is the most expensive part of implementing this solution.

Conclusion

This solution makes it possible to forward incoming email from one of your Amazon SES verified domains to an email address that isn’t necessarily verified. It’s also useful if you have multiple AWS accounts, and you want incoming messages to be sent from each of those accounts to a single destination. We hope you’ve found this tutorial to be helpful!

Sending Push Notifications to iOS 13 and watchOS 6 Devices

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/sending-push-notifications-to-ios-13-and-watchos-6-devices/

Last week, we made some changes to the way that Amazon Pinpoint sends Apple Push Notification service (APNs) push notifications.

In June, Apple announced that push notifications sent to iOS 13 and watchOS devices would require the new apns-push-type header. APNs uses this header to determine whether a notification should be shown on the display of the recipient’s device, or if it should be sent to the background.

When you use Amazon Pinpoint to send an APNs message, you can choose whether you want to send the message as a standard message, or as a silent notification. Amazon Pinpoint uses your selection to determine which value to apply to the apns-push-type header: if you send the message as a standard message, Amazon Pinpoint automatically sets the value of the apns-push-type header to alert; if you send the message as a silent notification, it sets the apns-push-type header to silent. Amazon Pinpoint applies these settings automatically—you don’t have to do any additional work in order to send messages to recipients with iOS 13 and watchOS 6 devices.

One last thing to keep in mind: if you specify the raw content of an APNs push notification, the message payload has to include the content-available key. The value of the content-available key has to be an integer, and can only be 0 or 1. If you’re sending an alert, set the value of content-available to 0. If you’re sending a background notification, set the value of content-available to 1. Additionally, background notification payloads can’t include the alert, badge, or sound keys.

To learn more about sending APNs notifications, see Generating a Remote Notification and Pushing Background Updates to Your App on the Apple Developer website.

Creating static custom domain endpoints with Amazon MQ to simplify broker modification and scaling

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/creating-static-custom-domain-endpoints-with-amazon-mq/

This post is courtesy of Wallace Printz, Senior Solutions Architect, AWS, and Christian Mueller, Senior Solutions Architect, AWS.

Many cloud-native application architectures take advantage of the point-to-point and publish-subscribe (“pub-sub”) model of message-based communication between application components. This architecture is generally more resilient to failure because of the loose coupling and because message processing failures can be retried. It’s also more efficient because individual application components can independently scale up or down to maintain message-processing SLAs, compared to monolithic application architectures. Synchronous (REST-based) systems are tightly coupled. A problem in a synchronous downstream dependency has an immediate impact on the upstream callers.

Retries from upstream callers can all too easily fan out and amplify problems. Amazon SQS and Amazon SNS are fully managed message queuing services, but are not necessarily the right tool for the job in some cases. For applications requiring messaging protocols including JMS, NMS, AMQP, STOMP, MQTT, and WebSocket, Amazon provides Amazon MQ. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

Amazon MQ provides two managed broker deployment connection options: public brokers and private brokers. Public brokers receive internet-accessible IP addresses, while private brokers receive only private IP addresses from the corresponding CIDR range in their VPC subnet.

In some cases, for security purposes, you may prefer to place brokers in a private subnet. You can also allow access to the brokers through a persistent public endpoint, such as a subdomain of their corporate domain like mq.example.com.

In this post, we explain how to provision private Amazon MQ brokers behind a secure public load balancer endpoint using an example subdomain.

Architecture overview

There are several reasons one might want to deploy this architecture beyond the security aspects.

First, human-readable URLs are easier for people to parse when reviewing operations and troubleshooting, such as deploying updates to mq-dev.example.com before mq-prod.example.com.

Second, maintaining static URLs for your brokers helps reduce the necessity of modifying client code when performing maintenance on the brokers.

Third, this pattern allows you to vertically scale your brokers without changing the client code or even notifying the clients that changes have been made.

Finally, the same architecture described here works for a network of brokers configuration as well, whereby you could horizontally scale your brokers without impacting the client code.

Prerequisites

This blog post assumes some familiarity with AWS networking fundamentals, such as VPCs, subnets, load balancers, and Amazon Route 53.

When you are finished, the architecture should be set up as shown in the following diagram. For ease of visualization, we demonstrate with a pair of brokers using the active-standby option.

Solution Overview

Amazon MQ solution overview

The client to broker traffic flow is as follows.

  • First, the client service tries to connect with a failover URL to the domain endpoint setup in Route 53. If a client loses the connection, using the failover URL allows the client to automatically try to reconnect to the broker.
  • The client looks up the domain name from Route 53, and Route 53 returns the IP address of the Network Load Balancer.
  • The client creates a secure socket layer (SSL) connection to the Network Load Balancer with an SSL certificate provided from AWS Certificate Manager (ACM). The Network Load Balancer selects from the healthy brokers in its target group and creates a separate SSL connection between the Network Load Balancer and the broker. This provides secure, end-to-end SSL encrypted messaging between client and brokers.

In this diagram, the healthy broker connection is shown in the solid line. The standby broker, which does not reply to connection requests and is therefore marked as unhealthy in the target group, is shown in the dashed line.

Solution walkthrough

To build this architecture, build the network segmentation first, then the Amazon MQ brokers, and finally the network routing.

Setup

First, you need the following resources:

  • A VPC
  • One private subnet per Availability Zone
  • One public subnet for your bastion host (if desired)

This demonstration VPC uses the 20.0.0.0/16 CIDR range.

Additionally, you must create a custom security group for your brokers. Set up this security group to allow traffic from your Network Load Balancer and, if using a network of brokers, among the brokers as well.

This VPC is not being used for any other workloads. This demonstration allows all incoming traffic originating within the VPC, including the Network Load Balancer, through to the brokers on the following ports:

  • OpenWire communication port of 61617
  • Apache ActiveMQ console port of 8162

If you are using a different protocol, adjust the port numbers accordingly.

Create an amazon mq security group

Building the Amazon MQ brokers

Now that you have the network segmentation set up, build the Amazon MQ brokers. As mentioned previously, this demonstration uses the active-standby pair of private brokers option.

Configure the broker settings by selecting a broker name, instance type, ActiveMQ console user, and password first.

Configure Amazon MQ broker settings

In the Additional Settings area, place the brokers in your previously selected VPC and the associated private subnets.

Configure Amazon MQ additional settings

Finally, select the existing Security Group previously discussed, and make sure that the Public Accessibility option is set to No.

Set Amazon MQ security group settings

That’s it for the brokers. When it is done provisioning, the Amazon MQ dashboard should look like the one shown in the following screenshot. Note the IP addresses of the brokers and the ActiveMQ web console URLs for later.

Amazon MQ dashboard

Configuring a Load Balancer Target Group

The next step in the build process is to configure the load balancer’s target group. This demonstration uses the private IP addresses of the brokers as targets for the Network Load Balancer.

Create and name a target group, select the IP option under Target type, and make sure to select TLS under Protocol and 61617 under Port, as well as the VPC in which your brokers reside. It is important to configure the health check settings so traffic is only routed to active brokers by selecting the TCP protocol and overriding the health check port to 8162, the Apache ActiveMQ console port.

Do not use the OpenWire port as the target group health check port. Because the Network Load Balancer may not be able to recognize the host as healthy on that port, it is better to use the ActiveMQ web console port.

Next, add the brokers’ IP addresses as targets. You can find the broker IP addresses in the Amazon MQ console page after they complete provisioning. Make sure to add both the active and the standby broker to the target group so that when reboots occur, the Network Load Balancer routes traffic to whichever broker is active.

You may be pursuing a more dynamic environment for scaling brokers up and down to handle the demands of a variable message load. In that case, as you scale to add more brokers, make sure that you also add them to the target group.

AWS Lambda would be a great way to programmatically handle adding or removing the broker’s IP addresses to this target group automatically.

Creating a Network Load Balancer

Next, create a Network Load Balancer. This demo uses an internet-facing load balancer with TLS listeners on port 61617, and routes traffic to brokers’ VPC and private subnets.

Configure a network load balancer

Clients must securely connect to the Network Load Balancer, so this demo uses an ACM certificate for the subdomain registered in Route 53, such as mq.example.com. For simplicity, ACM certificate provisioning is not shown. For more information, see Request a Public Certificate.

Make sure that the ACM certificate is provisioned in the same Region as your Network Load Balancer, or the certificate is not displayed in the selection menu.

Next, select the target group that you just created, and select TLS for the connection between the Network Load Balancer and the brokers. Similarly, select the health checks on TCP port 8162.

If all went well, you see the list of brokers’ IP addresses listed as targets. From here, review your settings and confirm you’d like to deploy the Network Load Balancer.

Configuring Route 53

The last step in this build is to configure Route 53 to serve traffic at the subdomain of your choice to your Network Load Balancer.

Go to your Route 53 Hosted Zone, and create a new subdomain record set, such as mq.example.com, that matches the ACM certificate that you previously created. In the Type field, select A – IPv4 address, then select Yes for Alias. This allows you to select the Network Load Balancer as the alias target. Select the Network Load Balancer that you just created from the Alias Target menu and save the record set.

Testing broker connectivity

And that’s it!

There’s an important advantage to this architecture. When you create Amazon MQ active-standby brokers, the Amazon MQ service provides two endpoints. Only one broker host is active at a time, and when configuration changes or other reboot events occur, the standby broker becomes active and the active broker goes to standby. The typical connection string when there is an option to connect to multiple brokers is something similar to the following string

"failover:(ssl://b-ce452fbe-2581-4003-8ce2-4185b1377b43-1.mq.us-west-2.amazonaws.com:61617,ssl://b-ce452fbe-2581-4003-8ce2-4185b1377b43-2.mq.us-west-2.amazonaws.com:61617)"

In this architecture, you use only a single connection URL, but you still want to use the failover protocol to force re-connection if the connection is dropped for any reason.

For ease of use, this solution relies on the Amazon MQ workshop client application code from re:Invent 2018. To test this solution setting the connection URL to the following:

"failover:(ssl://mq.example.com:61617

Run the producer and consumer clients in separate terminal windows.

The messages are sent and received successfully across the internet, while the brokers are hidden behind the Network Load Balancer.

Logging into the broker’s ActiveMQ console

But what if we want to log in to the broker’s ActiveMQ web console?

There are three options. Due to the security group rules, only traffic originating from inside the VPC is allowed to the brokers.

  • Use a VPN connection from the corporate network to the VPC. Most customers likely use this option, but for rapid testing, there is a simple and cost-effective method.
  • Connect to the brokers’ web console through a Route 53 subdomain, which requires creating a separate port 8162 Listener on the existing Network Load Balancer and creating a separate TLS target group on port 8162 for the brokers.
  • Use a bastion host to proxy traffic to the web console.

To use a bastion host, create a small Linux EC2 instance in your public subnet, and make sure that:

  • The EC2 instance has a public IP address.
  • You have access to the SSH key pair.
  • It is placed in a security group that allows SSH port 22 traffic from your location.

For simplicity, this step is not shown, but this demonstration uses a t3.micro Amazon Linux 2 host with all default options as the bastion.

Creating a forwarding tunnel

Next, create a forwarding tunnel through an SSH connection to the bastion host. Below is an example command in the terminal window. This keeps a persistent SSH connection forwarding port 8162 through the bastion host at the public IP address 54.244.188.53.

For example, the command could be:

ssh -D 8162 -C -q -N -I <my-key-pair-name>.pem [email protected]<ec2-ip-address>

You can also configure a browser to tunnel traffic through your proxy.

We have chosen to demonstrate in Firefox. Configure the network settings to use a manual proxy on localhost on the Apache ActiveMQ console port of 8162.  This can be done by opening the Firefox Connection Settings.  In the Configure Proxy Access to the Internet section, select Manual proxy configuration, then set the SOCKS Host to localhost and Port to 8162, leaving other fields empty.

Finally, use the Apache ActiveMQ console URL provided in the Amazon MQ web console details page to connect to the broker through the proxy.

ActiveMQ screenshot

Conclusion

Congratulations! You’ve successfully built a highly available Amazon MQ broker pair in a private subnet. You’ve layered your security defense by putting the brokers behind a highly scalable Network Load Balancer, and you’ve configured routing from a single custom subdomain URL to multiple brokers with health check built in.

To learn more about Amazon MQ and scalable broker communication patterns, we highly recommend the following resources:

Keep on building!

Creating custom Pinpoint dashboards using Amazon QuickSight, part 3

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/creating-custom-pinpoint-dashboards-using-amazon-quicksight-part-3/

Note: This post was written by Manan Nayar and Aprajita Arora, Software Development Engineers on the AWS Digital User Engagement team.


This is the third and final post in our series about creating custom visualizations of your Amazon Pinpoint metrics using Amazon QuickSight.

In our first post, we used the Metrics APIs to retrieve specific Key Performance Indicators (KPIs), and then created visualizations using QuickSight. In the second post, we used the event stream feature in Amazon Pinpoint to enable more in-depth analyses.

The examples in the first two posts used Amazon S3 to store the metrics that we retrieved from Amazon Pinpoint. This post takes a different approach, using Amazon Redshift to store the data. By using Redshift to store this data, you gain the ability to create visualizations on large data sets. This example is useful in situations where you have a large volume of event data, and situations where you need to store your data for long periods of time.

Step 1: Provision the storage

The first step in setting up this solution is to create the destinations where you’ll store the Amazon Pinpoint event data. Since we’ll be storing the data in Amazon Redshift, we need to create a new Redshift cluster. We’ll also create an S3 bucket, which will house the original event data that’s streamed from Amazon Pinpoint.

To create the Redshift cluster and the S3 bucket

  1. Create a new Redshift cluster. To learn more, see the Amazon Redshift Getting Started Guide.
  2. Create a new table in the Redshift cluster that contains the appropriate columns. Use the following query to create the table:
    create table if not exists pinpoint_events_table(
      rowid varchar(255) not null,
      project_key varchar(100) not null,
      event_type varchar(100) not null,
      event_timestamp timestamp not null,
      campaign_id varchar(100),
      campaign_activity_id varchar(100),
      treatment_id varchar(100),
      PRIMARY KEY (rowid)
    );
  3. Create a new Amazon S3 bucket. For complete procedures, see Create a Bucket in the Amazon S3 Getting Started Guide.

Step 2: Set up the event stream

This example uses the event stream feature of Amazon Pinpoint to send event data to S3. Later, we’ll create a Lambda function that sends the event data to your Redshift cluster when new event data is added to the S3 bucket. This method lets us store the original event data in S3, and transfer a subset of that data to Redshift for analysis.

To set up the event stream

  1. Sign in to the Amazon Pinpoint console at http://console.aws.amazon.com/pinpoint. In the list of projects, choose the project that you want to enable event streaming for.
  2. Under Settings, choose Event stream.
  3. Choose Edit, and then configure the event stream to use Amazon Kinesis Data Firehose. If you don’t already have a Kinesis Data Firehose stream, follow the link to create one in the Kinesis console. Configure the stream to send data to an S3 bucket. For more information about creating streams, see Creating an Amazon Kinesis Data Firehose Delivery Stream.
  4. Under IAM role, choose Automatically create a role. Choose Save.

Step 3: Create the Lambda function

In this section, you create a Lambda function that processes the raw event stream data, and then writes it to a table in your Redshift cluster.
To create the Lambda function:

  1. Download the psycopg2 binary from https://github.com/jkehler/awslambda-psycopg2. This Python library lets you interact with PostgreSQL databases, such as Amazon Redshift. It contains certain libraries that aren’t included in Lambda.
    • Note: This Github repository is not an official AWS-managed repository.
  2. Within the awslambda-psycopg2-master folder, you’ll find a folder called psycopg2-37. Rename the folder to psycopg2 (you may need to delete the existing folder with that name), and then compress the entire folder to a .zip file.
  3. Create a new Lambda function from scratch, using the Python 3.7 runtime.
  4. Upload the psycopg2.zip file that you created in step 1 to Lambda.
  5. In Lambda, create a new function called lambda_function.py. Paste the following code into the function:
    import datetime
    import json
    import re
    import uuid
    import os
    import boto3
    import psycopg2
    from psycopg2 import Error
    
    cluster_redshift = "<clustername>"
    dbname_redshift = "<dbname>"
    user_redshift = "<username>"
    password_redshift = "<password>"
    endpoint_redshift = "<endpoint>"
    port_redshift = "5439"
    table_redshift = "pinpoint_events_table"
    
    # Get the file that contains the event data from the appropriate S3 bucket.
    def get_file_from_s3(bucket, key):
        s3 = boto3.client('s3')
        obj = s3.get_object(Bucket=bucket, Key=key)
        text = obj["Body"].read().decode()
    
        return text
    
    # If the object that we retrieve contains newline-delineated JSON, split it into
    # multiple objects.
    def clean_and_split(json_raw):
        json_delimited = re.sub('}\s{','}---X-DELIMITER---{',json_raw)
        json_clean = re.sub('\s+','',json_delimited)
        data = json_clean.split("---X-DELIMITER---")
    
        return data
    
    # Set all of the variables that we'll use to create the new row in Redshift.
    def set_variables(in_json):
    
        for line in in_json:
            content = json.loads(line)
            app_id = content['application']['app_id']
            event_type = content['event_type']
            event_timestamp = datetime.datetime.fromtimestamp(content['event_timestamp'] / 1e3).strftime('%Y-%m-%d %H:%M:%S')
    
            if (content['attributes'].get('campaign_id') is None):
                campaign_id = ""
            else:
                campaign_id = content['attributes']['campaign_id']
    
            if (content['attributes'].get('campaign_activity_id') is None):
                campaign_activity_id = ""
            else:
                campaign_activity_id = content['attributes']['campaign_activity_id']
    
            if (content['attributes'].get('treatment_id') is None):
                treatment_id = ""
            else:
                treatment_id = content['attributes']['treatment_id']
    
            write_to_redshift(app_id, event_type, event_timestamp, campaign_id, campaign_activity_id, treatment_id)
                
    # Write the event stream data to the Redshift table.
    def write_to_redshift(app_id, event_type, event_timestamp, campaign_id, campaign_activity_id, treatment_id):
        row_id = str(uuid.uuid4())
    
        query = ("INSERT INTO " + table_redshift + "(rowid, project_key, event_type, "
                + "event_timestamp, campaign_id, campaign_activity_id, treatment_id) "
                + "VALUES ('" + row_id + "', '"
                + app_id + "', '"
                + event_type + "', '"
                + event_timestamp + "', '"
                + campaign_id + "', '"
                + campaign_activity_id + "', '"
                + treatment_id + "');")
    
        try:
            conn = psycopg2.connect(user = user_redshift,
                                    password = password_redshift,
                                    host = endpoint_redshift,
                                    port = port_redshift,
                                    database = dbname_redshift)
    
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            print("Updated table.")
    
        except (Exception, psycopg2.DatabaseError) as error :
            print("Database error: ", error)
        finally:
            if (conn):
                cur.close()
                conn.close()
                print("Connection closed.")
    
    # Handle the event notification that we receive when a new item is sent to the 
    # S3 bucket.
    def lambda_handler(event,context):
        print("Received event: \n" + str(event))
    
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = event['Records'][0]['s3']['object']['key']
        data = get_file_from_s3(bucket, key)
    
        in_json = clean_and_split(data)
    
        set_variables(in_json)

    In the preceding code, make the following changes:

    • Replace <clustername> with the name of the cluster.
    • Replace <dbname> with the name of the database.
    • Replace <username> with the user name that you specified when you created the Redshift cluster.
    • Replace <password> with the password that you specified when you created the Redshift cluster.
    • Replace <endpoint> with the endpoint address of the Redshift cluster.
  6. In IAM, update the execution role that’s associated with the Lambda function to include the GetObject permission for the S3 bucket that contains the event data. For more information, see Editing IAM Policies in the AWS IAM User Guide.

Step 4: Set up notifications on the S3 bucket

Now that we’ve created the Lambda function, we’ll set up a notification on the S3 bucket. In this case, the notification will refer to the Lambda function that we created in the previous section. Every time a new file is added to the bucket, the notification will cause the Lambda function to run.

To create the event notification

  1. In S3, create a new bucket notification. The notification should be triggered when PUT events occur, and should trigger the Lambda function that you created in the previous section. For more information about creating notifications, see Configuring Amazon S3 Event Notifications in the Amazon S3 Developer Guide.
  2. Test the event notification by sending a test campaign. If you send an email campaign, your Redshift database should contain events such as _campaign.send, _email.send, _email.delivered, and others. You can check the contents of the Redshift table by running the following query in the Query Editor in the Redshift console:
    select * from pinpoint_events_table;

Step 5: Add the data set in Amazon QuickSight

If your Lambda function is sending event data to Redshift as expected, you can use your Redshift database to create a new data set in Amazon QuickSight. QuickSight includes an automatic database discovery feature that helps you add your Redshift database as a data set with only a few clicks. For more information, see Creating a Data Set from a Database in the Amazon QuickSight User Guide.

Step 6: Create your visualizations

Now that QuickSight is retrieving information from your Redshift database, you can use that data to create visualizations. To learn more about creating visualizations in QuickSight, see Creating an Analysis in the Amazon QuickSight User Guide.

This brings us to the end of our series. While these posts focused on using Amazon QuickSight to visualize your analytics data, you can also use these same techniques to create visualizations using 3rd party applications. We hope you enjoyed this series, and we can’t wait to see what you build using these examples!

Creating custom Pinpoint dashboards using Amazon QuickSight, part 2

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/creating-custom-pinpoint-dashboards-using-amazon-quicksight-part-2/

Note: This post was written by Manan Nayar and Aprajita Arora, Software Development Engineers on the AWS Digital User Engagement team.


In our previous post, we discussed the process of visualizing specific, pre-aggregated Amazon Pinpoint metrics—such as delivery rate or open rate—using the Amazon Pinpoint Metrics APIs. In that example, we showed how to create a Lambda function that retrieves your metrics, and then make those metrics available for creating visualizations in Amazon QuickSight.

This post discusses shows a different approach to exporting data from Amazon Pinpoint and using it to build visualizations. Rather than retrieve specific metrics, you can use the event stream feature in Amazon Pinpoint to export raw event data. You can use this data in Amazon QuickSight to create in-depth analyses of your data, as opposed to visualizing pre-calculated metrics. As an added benefit, when you use this solution, you don’t have to modify any code, and the underlying data is updated every few minutes.

Step 1: Configure the event stream in Amazon Pinpoint

The Amazon Pinpoint event stream includes information about campaign events (such as campaign.send) and application events (such as session.start). It also includes response information related to all of the emails and SMS messages that you send from your Amazon Pinpoint project, regardless of whether they were sent from campaigns or on a transactional basis. When you enable event streams, Amazon Pinpoint automatically sends this data to your S3 bucket (via Amazon Kinesis Data Firehose) every few minutes.

To set up the event stream

  1. Sign in to the Amazon Pinpoint console at http://console.aws.amazon.com/pinpoint. In the list of projects, choose the project that you want to enable event streaming for.
  2. Under Settings, choose Event stream.
  3. Choose Edit, and then configure the event stream to use Amazon Kinesis Data Firehose. If you don’t already have a Kinesis Data Firehose stream, follow the link to create one in the Kinesis console. Configure the stream to send data to an S3 bucket. For more information about creating streams, see Creating an Amazon Kinesis Data Firehose Delivery Stream.
  4. Under IAM role, choose Automatically create a role. Choose Save.

Step 2: Add a data set in Amazon QuickSight

Now that you’ve started streaming your Amazon Pinpoint data to S3, you can set Amazon QuickSight to look for data in the S3 bucket. You connect QuickSight to sources of data by creating data sets.

To create a data set

    1. In a text editor, create a new file. Paste the following code:
      {
          "fileLocations": [
              {
                  "URIPrefixes": [ 
                      "s3://<bucketName>/"          
                  ]
              }
          ],
          "globalUploadSettings": {
              "format": "JSON"
          }
      }

      In the preceding code, replace <bucketName> with the name of the S3 bucket that you’re using to store the event stream data. Save the file as manifest.json.

    2. Sign in to the QuickSight console at https://quicksight.aws.amazon.com.
    3. Create a new S3 data set. When prompted, choose the manifest file that you created in step 1. For more information about creating S3 data sets, see Creating a Data Set Using Amazon S3 Files in the Amazon QuickSight User Guide.
    4. Create a new analysis. From here, you can start creating visualizations of your data. To learn more, see Creating an Analysis in the Amazon QuickSight User Guide.

Step 3: Set the refresh rate for the data set

You can configure your data sets in Amazon QuickSight to automatically refresh on a certain schedule. In this section, you configure the data set to refresh every day, one minute before midnight.

To set the refresh schedule

  1. Go to the QuickSight start page at https://quicksight.aws.amazon.com/sn/start. Choose Manage data.
  2. Choose the data set that you created in the previous section.
  3. Choose Schedule refresh. Follow the prompts to set up a daily refresh schedule.

Step 4: Create your visualizations

From this point, you can start creating visualizations of your data. To learn more about creating visualizations, see Creating an Analysis in the Amazon QuickSight User Guide.

Creating custom Pinpoint dashboards using Amazon QuickSight, part 1

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/creating-custom-pinpoint-dashboards-using-amazon-quicksight-part-1/

Note: This post was written by Manan Nayar and Aprajita Arora, Software Development Engineers on the AWS Digital User Engagement team.


Amazon Pinpoint helps you create customer-centric engagement experiences across the mobile, web, and other messaging channels. It also provides a variety of Key Performance Indicators (KPIs) that you can use to track the performance of your messaging programs.

You can access these KPIs through the console, or by using the Amazon Pinpoint API. In some cases, you might want to create custom dashboards that aren’t included by default, or even combine these metrics with other data. Over the next few days, we’ll discuss several different methods that you can use to create your own custom dashboards.

In this post, you’ll learn how to use the Amazon Pinpoint API to retrieve metrics, and then display them in visualizations that you create in Amazon QuickSight. This option is ideal for creating custom dashboards that highlight a specific set of metrics, or for embedding these metrics in your existing application or website.

In the next post (which we’ll post on Monday, August 19), you’ll learn how to export raw event data to an S3 bucket, and use that data to create dashboards in by using QuickSight’s Super-fast, Parallel, In-memory Calculation Engine (SPICE). This option enables you to perform in-depth analyses and quickly update visualizations. It’s also cost-effective, because all of the event data is stored in an S3 bucket.

The final post (which we’ll post on Wednesday, August 21) will also discuss the process of creating visualizations from event stream data. However, in this solution, the data will be sent from Amazon Kinesis to a Redshift cluster. This option is ideal if you need to process very large volumes of event data.

Creating a QuickSight dashboard that uses specific metrics

You can use the Amazon Pinpoint API to programmatically access many of the metrics that are shown on the Analytics pages of the Amazon Pinpoint console. You can learn more about using the API to obtain specific KPIs in our recent blog post, Tracking Campaign Performance Using the Metrics APIs.

The following sections show you how to parse and store those results in Amazon S3, and then create custom dashboards by using Amazon Quicksight. The steps below are meant to provide general guidance, rather than specific procedures. If you’ve used other AWS services in the past, most of the concepts here will be familiar. If not, don’t worry—we’ve included links to the documentation to make things easier.

Step 1: Package the Dependencies

Lambda currently uses a version of the AWS SDK that is a few versions behind the current version. However, the ability to retrieve Pinpoint metrics programmatically is a relatively new feature. For this reason, you have to download the latest version of the SDK libraries to your computer, create a .zip archive, and then upload that archive to Lambda.

To package the dependencies

    1. Paste the following code into a text editor:
      from datetime import datetime
      import boto3
      import json
      
      AWS_REGION = "<us-east-1>"
      PROJECT_ID = "<projectId>"
      BUCKET_NAME = "<bucketName>"
      BUCKET_PREFIX = "quicksight-data"
      DATE = datetime.now()
      
      # Get today's push open rate KPI values.
      def get_kpi(kpi_name):
      
          client = boto3.client('pinpoint',region_name=AWS_REGION)
      
          response = client.get_application_date_range_kpi(
              ApplicationId=PROJECT_ID,
              EndTime=DATE.strftime("%Y-%m-%d"),
              KpiName=kpi_name,
              StartTime=DATE.strftime("%Y-%m-%d")
          )
          rows = response['ApplicationDateRangeKpiResponse']['KpiResult']['Rows'][0]['Values']
      
          # Create a JSON object that contains the values we'll use to build QuickSight visualizations.
          data = construct_json_object(rows[0]['Key'], rows[0]['Value'])
      
          # Send the data to the S3 bucket.
          write_results_to_s3(kpi_name, json.dumps(data).encode('UTF-8'))
      
      # Create the JSON object that we'll send to S3.
      def construct_json_object(kpi_name, value):
          data = {
              "applicationId": PROJECT_ID,
              "kpiName": kpi_name,
              "date": str(DATE),
              "value": value
          }
      
          return data
      
      # Send the data to the designated S3 bucket.
      def write_results_to_s3(kpi_name, data):
          # Create a file path with folders for year, month, date, and hour.
          path = (
              BUCKET_PREFIX + "/"
              + DATE.strftime("%Y") + "/"
              + DATE.strftime("%m") + "/"
              + DATE.strftime("%d") + "/"
              + DATE.strftime("%H") + "/"
              + kpi_name
          )
      
          client = boto3.client('s3')
      
          # Send the data to the S3 bucket.
          response = client.put_object(
              Bucket=BUCKET_NAME,
              Key=path,
              Body=bytes(data)
          )
      
      def lambda_handler(event, context):
          get_kpi('email-open-rate')
          get_kpi('successful-delivery-rate')
          get_kpi('unique-deliveries')

      In the preceding code, make the following changes:

      • Replace <us-east-1> with the name of the AWS Region that you use Amazon Pinpoint in.
      • Replace <projectId> with the ID of the Amazon Pinpoint project that the metrics are associated with.
      • Replace <bucketName> with the name of the Amazon S3 bucket that you want to use to store the data. For more information about creating S3 buckets, see Create a Bucket in the Amazon S3 Getting Started Guide.
      • Optionally, modify the lambda_handler function so that it calls the get_kpi function for the specific metrics that you want to retrieve.

      When you finish, save the file as retrieve_pinpoint_kpis.py.

  1. Use pip to download the latest versions of the boto3 and botocore libraries. Add these libraries to a .zip file. Also add retrieve_pinpoint_kpis.py to the .zip file. You can learn more about all of these tasks in Updating a Function with Additional Dependencies With a Virtual Environment in the AWS Lambda Developer Guide.

Step 2: Set up the Lambda function

In this section, you upload the package that you created in the previous section to Lambda.

To set up the Lambda function

  1. In the Lambda console, create a new function from scratch. Choose the Python 3.7 runtime.
  2. Choose a Lambda execution role that contains the following permissions:
    • Allows the action mobiletargeting:GetApplicationDateRangeKpi for the resource arn:aws:mobiletargeting:<awsRegion>:<yourAwsAccountId>:apps/*/kpis/*/*, where <awsRegion> is the Region where you use Amazon Pinpoint, and <yourAwsAccountId> is your AWS account number.
    • Allows the action s3:PutObject for the resource arn:aws:s3:::<my_bucket>/*, where <my_bucket> is the name of the S3 bucket where you want to store the metrics.
  3. Upload the .zip file that you created in the previous section.
  4. Change the Handler value to retrieve_pinpoint_kpis.lambda_handler.
  5. Save your changes.

Step 3: Schedule the execution of the function

At this point, the Lambda function is ready to run. The next step is to set up the trigger that will cause it to run. In this case, since we’re retrieving an entire day’s worth of data, we’ll set up a scheduled trigger that runs every day at 11:59 PM.

To set up the trigger

  1. In the Lambda console, in the Designer section, choose Add trigger.
  2. Create a new CloudWatch Events rule that uses the Schedule expression rule type.
  3. For the schedule expression, enter cron(59 23 ? * * *).

Step 4: Create QuickSight Analyses

Once the data is populated in S3, you can start creating analyses in Amazon QuickSight. The process of creating new analyses involves a couple of tasks: creating a new data set, and creating your visualizations.

To create analyses in QuickSight
1.    In a text editor, create a new file. Paste the following code:

{
    "fileLocations": [
        {
            "URIPrefixes": [ 
                "s3://<bucketName>/quicksight-data/"          
            ]
        }
    ],
    "globalUploadSettings": {
        "format": "JSON"
    }
}

In the preceding code, replace <bucketName> with the name of the S3 bucket that you’re using to store the metrics data. Save the file as manifest.json.
2.    Sign in to the QuickSight console at https://quicksight.aws.amazon.com.
3.    Create a new S3 data set. When prompted, choose the manifest file that you created in step 1. For more information about creating S3 data sets, see Creating a Data Set Using Amazon S3 Files in the Amazon QuickSight User Guide.
4.    Create a new analysis. From here, you can start creating visualizations of your data. To learn more, see Creating an Analysis in the Amazon QuickSight User Guide.

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/simple-two-way-messaging-using-the-amazon-sqs-temporary-queue-client/

This post is contributed by Robin Salkeld, Sr. Software Development Engineer

Amazon SQS is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Asynchronous workflows have always been the primary use case for SQS. Using queues ensures one component can keep running smoothly without losing data when another component is unavailable or slow.

We were surprised, then, to discover that many customers use SQS in synchronous workflows. For example, many applications use queues to communicate between frontends and backends when processing a login request from a user.

Why would anyone use SQS for this? The service stores messages for up to 14 days with high durability, but messages in a synchronous workflow often must be processed within a few minutes, or even seconds. Why not just set up an HTTPS endpoint?

The more we talked to customers, the more we understood. Here’s what we learned:

  • Creating a queue is often easier and faster than creating an HTTPS endpoint and the infrastructure necessary to ensure the endpoint’s scalability.
  • Queues are safe by default because they are locked down to the AWS account that created them. In addition, any DDoS attempt on your service is absorbed by SQS instead of loading down your own servers.
  • There is generally no need to create firewall rules for the communication between microservices if they use queues.
  • Although SQS provides durable storage (which isn’t necessary for short-lived messages), it is still a cost-effective solution for this use case. This is especially true when you consider all the messaging broker maintenance that is done for you.

However, setting up efficient two-way communication through one-way queues requires some non-trivial client-side code. In our previous two-part post series on implementing enterprise integration patterns with AWS messaging services, Point-to-point channels and Publish-subscribe channels, we discussed the Request-Response Messaging Pattern. In this pattern, each requester creates a temporary destination to receive each response message.

The simplest approach is to create a new queue for each response, but this is like building a road just so a single car can drive on it before tearing it down. Technically, this can work (and SQS can create and delete queues quickly), but we can definitely make it faster and cheaper.

To better support short-lived, lightweight messaging destinations, we are pleased to present the Amazon SQS Temporary Queue Client. This client makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill.

Virtual queues

The key concept behind the client is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS and no costs associated with creating a virtual queue.

The Temporary Queue Client includes the AmazonSQSVirtualQueuesClient class for creating and managing virtual queues. This class implements the AmazonSQS interface and adds support for attributes related to virtual queues. You can create a virtual queue using this client by calling the CreateQueue API action and including the HostQueueURL queue attribute. This attribute specifies the existing SQS queue on which to host the virtual queue. The queue URL for a virtual queue is in the form <host queue URL>#<virtual queue name>. For example:

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue#MyVirtualQueueName

When you call the SendMessage or SendMessageBatch API actions on AmazonSQSVirtualQueuesClient with a virtual queue URL, the client first extracts the virtual queue name. It then attaches this name as an additional message attribute to each message, and sends the messages to the host queue. When you call the ReceiveMessage API action on a virtual queue, the calling thread waits for messages to appear in the in-memory buffer for the virtual queue. Meanwhile, a background thread polls the host queue and dispatches messages to these buffers according to the additional message attribute.

This mechanism is similar to how the AmazonSQSBufferedAsyncClient prefetches messages, and the benefits are similar. A single call to SQS can provide messages for up to 10 virtual queues, reducing the API calls that you pay for by up to a factor of ten. Deleting a virtual queue simply removes the client-side resources used to implement them, again without making API calls to SQS.

The diagram below illustrates the end-to-end process for sending messages through virtual queues:

Sending messages through virtual queues

Virtual queues are similar to virtual machines. Just as a virtual machine provides the same experience as a physical machine, a virtual queue divides the resources of a single SQS queue into smaller logical queues. This is ideal for temporary queues, since they frequently only receive a handful of messages in their lifetime. Virtual queues are currently implemented entirely within the Temporary Queue Client, but additional support and optimizations might be added to SQS itself in the future.

In most cases, you don’t have to manage virtual queues yourself. The library also includes the AmazonSQSTemporaryQueuesClient class. This class automatically creates virtual queues when the CreateQueue API action is called and creates host queues on demand for all queues with the same queue attributes. To optimize existing application code that creates and deletes queues, you can use this class as a drop-in replacement implementation of the AmazonSQS interface.

The client also includes the AmazonSQSRequester and AmazonSQSResponder interfaces, which enable two-way communication through SQS queues. The following is an example of an RPC implementation for a web application’s login process.

/**
 * This class handles a user's login request on the client side.
 */
public class LoginClient {

    // The SQS queue to send the requests to.
    private final String requestQueueUrl;

    // The AmazonSQSRequester creates a temporary queue for each response.
    private final AmazonSQSRequester sqsRequester = AmazonSQSRequesterClientBuilder.defaultClient();

    private final LoginClient(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Send a login request to the server.
     */
    public String login(String body) throws TimeoutException {
        SendMessageRequest request = new SendMessageRequest()
                .withMessageBody(body)
                .withQueueUrl(requestQueueUrl);

        // This:
        //  - creates a temporary queue
        //  - attaches its URL as an attribute on the message
        //  - sends the message
        //  - receives the response from the temporary queue
        //  - deletes the temporary queue
        //  - returns the response
        //
        // If something goes wrong and the server's response never shows up, this method throws a
        // TimeoutException.
        Message response = sqsRequester.sendMessageAndGetResponse(request, 20, TimeUnit.SECONDS);
        
        return response.getBody();
    }
}

/**
 * This class processes users' login requests on the server side.
 */
public class LoginServer {

    // The SQS queue to poll for login requests.
    // Assume that on construction a thread is created to poll this queue and call
    // handleLoginRequest() below for each message.
    private final String requestQueueUrl;

    // The AmazonSQSResponder sends responses to the correct response destination.
    private final AmazonSQSResponder sqsResponder = AmazonSQSResponderClientBuilder.defaultClient();

    private final AmazonSQS(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Handle a login request sent from the client above.
     */
    public void handleLoginRequest(Message message) {
        // Assume doLogin does the actual work, and returns a serialized result
        String response = doLogin(message.getBody());

        // This extracts the URL of the temporary queue from the message attribute and sends the
        // response to that queue.
        sqsResponder.sendResponseMessage(MessageContent.fromMessage(message), new MessageContent(response));  
    }
}

Cleaning up

As with any other AWS SDK client, your code should call the shutdown() method when the temporary queue client is no longer needed. The AmazonSQSRequester interface also provides a shutdown() method, which shuts down its internal temporary queue client. This ensures that the in-memory resources needed for any virtual queues are reclaimed, and that the host queue that the client automatically created is also deleted automatically.

However, in the world of distributed systems things are a little more complex. Processes can run out of memory and crash, and hosts can reboot suddenly and unexpectedly. There are even cases where you don’t have the opportunity to run custom code on shutdown.

The Temporary Queue Client client addresses this issue as well. For each host queue with recent API calls, the client periodically uses the TagQueue API action to attach a fresh tag value that indicates the queue is still being used. The tagging process serves as a heartbeat to keep the queue alive. According to a configurable time period (by default, 5 minutes), a background thread uses the ListQueues API action to obtain the URLs of all queues with the configured prefix. Then, it deletes each queue that has not been tagged recently. The mechanism is similar to how the Amazon DynamoDB Lock Client expires stale lock leases.

If you use the AmazonSQSTemporaryQueuesClient directly, you can customize how long queues must be idle before they is deleted by configuring the IdleQueueRetentionPeriodSeconds queue attribute. The client supports setting this attribute on both host queues and virtual queues. For virtual queues, setting this attribute ensures that the in-memory resources do not become a memory leak in the presence of application bugs.

Any API call to a queue marks it as non-idle, including ReceiveMessage calls that don’t return any messages. The only reason to increase the retention period attribute is to give the client more time when it can’t send heartbeats—for example, due to garbage collection pauses or networking issues.

But what if you want to use this client in a fleet of a thousand EC2 instances? Won’t every client spend a lot of time checking every queue for idleness? Doesn’t that imply duplicate work that increases as the fleet is scaled up?

We thought of this too. The Temporary Queue Client creates a shared queue for all clients using the same queue prefix, and uses this queue as a work queue for the distributed task. Instead of every client calling the ListQueues API action every five minutes, a new seed message (which triggers the sweeping process) is sent to this queue every five minutes.

When one of the clients receives this message, it calls the ListQueues API action and sends each queue URL in the result as another kind of message to the same shared work queue. The work of actually checking each queue for idleness is distributed roughly evenly to the active clients, ensuring scalability. There is even a mechanism that works around the fact that the ListQueues API action currently only returns no more than 1,000 queue URLs at time.

Summary

We are excited about how the new Amazon SQS Temporary Queue Client makes more messaging patterns easier and cheaper for you. Download the code from GitHub, have a look at Temporary Queues in the Amazon SQS Developer Guide, try out the client, and let us know what you think!

Amazon Managed Streaming for Apache Kafka (MSK) – Now Generally Available

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/amazon-managed-streaming-for-apache-kafka-msk-now-generally-available/

I am always amazed at how our customers are using streaming data. For example, Thomson Reuters, one of the world’s most trusted news organizations for businesses and professionals, built a solution to capture, analyze, and visualize analytics data to help product teams continuously improve the user experience. Supercell, the social game company providing games such as Hay Day, Clash of Clans, and Boom Beach, is delivering in-game data in real-time, handling 45 billion events per day.

Since we launched Amazon Kinesis at re:Invent 2013, we have continually expanded the ways in in which customers work with streaming data on AWS. Some of the available tools are:

  • Kinesis Data Streams, to capture, store, and process data streams with your own applications.
  • Kinesis Data Firehose, to transform and collect data into destinations such as Amazon S3, Amazon Elasticsearch Service, and Amazon Redshift.
  • Kinesis Data Analytics, to continuously analyze data using SQL or Java (via Apache Flink applications), for example to detect anomalies or for time series aggregation.
  • Kinesis Video Streams, to simplify processing of media streams.

At re:Invent 2018, we introduced in open preview Amazon Managed Streaming for Apache Kafka (MSK), a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data.

I am excited to announce that Amazon MSK is generally available today!

How it works

Apache Kafka (Kafka) is an open-source platform that enables customers to capture streaming data like click stream events, transactions, IoT events, application and machine logs, and have applications that perform real-time analytics, run continuous transformations, and distribute this data to data lakes and databases in real time. You can use Kafka as a streaming data store to decouple applications producing streaming data (producers) from those consuming streaming data (consumers).

While Kafka is a popular enterprise data streaming and messaging framework, it can be difficult to setup, scale, and manage in production. Amazon MSK takes care of these managing tasks and makes it easy to set up, configure, and run Kafka, along with Apache ZooKeeper, in an environment following best practices for high availability and security.

Your MSK clusters always run within an Amazon VPC managed by the MSK service. Your MSK resources are made available to your own VPC, subnet, and security group through elastic network interfaces (ENIs) which will appear in your account, as described in the following architectural diagram:

Customers can create a cluster in minutes, use AWS Identity and Access Management (IAM) to control cluster actions, authorize clients using TLS private certificate authorities fully managed by AWS Certificate Manager (ACM), encrypt data in-transit using TLS, and encrypt data at rest using AWS Key Management Service (KMS) encryption keys.

Amazon MSK continuously monitors server health and automatically replaces servers when they fail, automates server patching, and operates highly available ZooKeeper nodes as a part of the service at no additional cost. Key Kafka performance metrics are published in the console and in Amazon CloudWatch. Amazon MSK is fully compatible with Kafka versions 1.1.1 and 2.1.0, so that you can continue to run your applications, use Kafka’s admin tools, and and use Kafka compatible tools and frameworks without having to change your code.

Based on our customer feedback during the open preview, Amazon MSK added may features such as:

  • Encryption in-transit via TLS between clients and brokers, and between brokers
  • Mutual TLS authentication using ACM private certificate authorities
  • Support for Kafka version 2.1.0
  • 99.9% availability SLA
  • HIPAA eligible
  • Cluster-wide storage scale up
  • Integration with AWS CloudTrail for MSK API logging
  • Cluster tagging and tag-based IAM policy application
  • Defining custom, cluster-wide configurations for topics and brokers

AWS CloudFormation support is coming in the next few weeks.

Creating a cluster

Let’s create a cluster using the AWS management console. I give the cluster a name, select the VPC I want to use the cluster from, and the Kafka version.

I then choose the Availability Zones (AZs) and the corresponding subnets to use in the VPC. In the next step, I select how many Kafka brokers to deploy in each AZ. More brokers allow you to scale the throughtput of a cluster by allocating partitions to different brokers.

I can add tags to search and filter my resources, apply IAM policies to the Amazon MSK API, and track my costs. For storage, I leave the default storage volume size per broker.

I select to use encryption within the cluster and to allow both TLS and plaintext traffic between clients and brokers. For data at rest, I use the AWS-managed customer master key (CMK), but you can select a CMK in your account, using KMS, to have further control. You can use private TLS certificates to authenticate the identity of clients that connect to your cluster. This feature is using Private Certificate Authorities (CA) from ACM. For now, I leave this option unchecked.

In the advanced setting, I leave the default values. For example, I could have chosen here a different instance type for my brokers. Some of these settings can be updated using the AWS CLI.

I create the cluster and monitor the status from the cluster summary, including the Amazon Resource Name (ARN) that I can use when interacting via CLI or SDKs.

When the status is active, the client information section provides specific details to connect to the cluster, such as:

  • The bootstrap servers I can use with Kafka tools to connect to the cluster.
  • The Zookeper connect list of hosts and ports.

I can get similar information using the AWS CLI:

  • aws kafka list-clusters to see the ARNs of your clusters in a specific region
  • aws kafka get-bootstrap-brokers --cluster-arn <ClusterArn> to get the Kafka bootstrap servers
  • aws kafka describe-cluster --cluster-arn <ClusterArn> to see more details on the cluster, including the Zookeeper connect string

Quick demo of using Kafka

To start using Kafka, I create two EC2 instances in the same VPC, one will be a producer and one a consumer. To set them up as client machines, I download and extract the Kafka tools from the Apache website or any mirror. Kafka requires Java 8 to run, so I install Amazon Corretto 8.

On the producer instance, in the Kafka directory, I create a topic to send data from the producer to the consumer:

bin/kafka-topics.sh --create --zookeeper <ZookeeperConnectString> \
--replication-factor 3 --partitions 1 --topic MyTopic

Then I start a console-based producer:

bin/kafka-console-producer.sh --broker-list <BootstrapBrokerString> \
--topic MyTopic

On the consumer instance, in the Kafka directory, I start a console-based consumer:

bin/kafka-console-consumer.sh --bootstrap-server <BootstrapBrokerString> \
--topic MyTopic --from-beginning

Here’s a recording of a quick demo where I create the topic and then send messages from a producer (top terminal) to a consumer of that topic (bottom terminal):

Pricing and availability

Pricing is per Kafka broker-hour and per provisioned storage-hour. There is no cost for the Zookeeper nodes used by your clusters. AWS data transfer rates apply for data transfer in and out of MSK. You will not be charged for data transfer within the cluster in a region, including data transfer between brokers and data transfer between brokers and ZooKeeper nodes.

You can migrate your existing Kafka cluster to MSK using tools like MirrorMaker (that comes with open source Kafka) to replicate data from your clusters into a MSK cluster.

Upstream compatibility is a core tenet of Amazon MSK. Our code changes to the Kafka platform are released back to open source.

Amazon MSK is available in US East (N. Virginia), US East (Ohio), US West (Oregon), Asia Pacific (Tokyo), Asia Pacific (Singapore), Asia Pacific (Sydney), EU (Frankfurt), EU (Ireland), EU (Paris), and EU (London).

I look forward to see how are you going to use Amazon MSK to simplify building and migrating streaming applications to the cloud!

New Regions, New Features, and a New Web Site

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/new-regions-new-features-and-a-new-web-site/

It’s a busy time here on the Digital User Engagement Team at AWS!

Last week, we made Amazon Pinpoint available in the Asia Pacific (Mumbai) and Asia Pacific (Sydney) AWS Regions. This is great news for new Pinpoint customers in these areas of the globe who were previously concerned with issues related to latency and data residency. Existing Amazon Pinpoint customers can also use these new Regions to increase availability and create geographical redundancy.

On Tuesday of this week, we also launched two exciting improvements to the Amazon Pinpoint console. The first improvement is a tool that you can use to import customer segments in just a few clicks. Previously, if you wanted to import customer data into Pinpoint, you had to save the data in a CSV or JSON file, upload it to an S3 bucket, create a segment in Pinpoint, and enter the full path to the S3 bucket. Now, you can drag and drop files right into the segment importer. To learn more, see the Pinpoint User Guide.

The other new feature that we released this week is an improved email editor. Our previous email editor only allowed you to include a limited set of HTML tags in your emails. With our new editor, however, you can include any HTML tags that you want. The new editor also includes a helpful side-by-side view that renders your message in real-time, as shown in the following image.

Users who don’t want to work with HTML code can also use the Design view to create and modify emails in an intuitive, WYSIWYG interface. For more information, see the Pinpoint User Guide.

Finally, we launched a new website for Amazon Pinpoint at https://aws.amazon.com/pinpoint. On our new site, you can learn more about the capabilities of Amazon Pinpoint. You’ll find in-depth information about all of the features, channels, and use cases that Amazon Pinpoint supports.

Every day, we’re amazed by the things that our customers do with Amazon Pinpoint. We hope these changes help you do even more incredible things!

Learn About Amazon Pinpoint at Upcoming Events Around the World

Post Syndicated from Hannah Nilsson original https://aws.amazon.com/blogs/messaging-and-targeting/learn-about-amazon-pinpoint-at-upcoming-events/

Connect with the AWS Customer Engagement team at events around the world to learn how our technology can to help you better engage with your customers. Get demos on recent feature releases, discover how you can use Pinpoint for your specific use case, and attend informative sessions to hear how companies around the world are using AWS Customer Engagement solutions to deliver better experiences for their customers. Plus, read below to find out how Amazon Pinpoint and Amazon SES both enable you to create innovative email experiences with the recent AMP Project launch.

AWS Customer Engagement in the news: Amazon SES and Amazon Pinpoint support build the future of email with AMP

The AMP Project’s mission is to enable more user-first experiences on the web, including web-based technology like email. On March 26, the AMP Project announced that they are bringing AMP technology to email in order to give users an interactive, real-time experience that also keeps inboxes safe.

Amazon Pinpoint and Amazon SES both provide out-of-the-box support for AMP for email with no additional configuration. This allows you to easily create experiences for your customers such as  submitting RSVPs to events, filling out questionnaires, browsing catalogs, or responding to comments right within the email.

Read the AMP announcement for more information about these new capabilities. To learn how to use the AMP format with Amazon SES, visit the SES Developer Guide. To learn how to use the AMP format with Amazon Pinpoint, read this Amazon Pinpoint API Reference. View these instructions for more information on how to add AMP to an existing email.

Amazon Pinpoint has been busy building. You can now:

  • Learn how to set up an email preference management web page that enables customers to manage their email subscription preferences. Read now.
  • Learn how to set up a web form that collects information from new customers, and then sends them an SMS message to confirm that they want to receive content from you. Read now.
  • Use Amazon Pinpoint in the US West (Oregon), EU (Frankfurt), and EU (Ireland) regions in addition to the US East (Virginia) region. Learn more.
  • Deliver voice messages to your users with Amazon Pinpoint Voice. Learn more.
  • Set up campaigns that auto-send messages to your customers when they take specific actions. Learn more.
  • Detect and understand issues impacting your email deliverability with the Amazon Pinpoint Deliverability Dashboard. Learn more. 

Meet an Amazon Pinpoint expert at these upcoming events. We will teach you how to take advantage of recent updates so that you can create better engagement experiences for your customers. Plus, we can give you an inside look on what’s on our roadmap, and we’ll be giving out custom Pinpoint swag!

AWS Summit, Singapore 

April 10, 2019
Singapore Expo Convention & Exhibition Centre
Amazon Pinpoint will host an informative session about our Customer Engagement solutions at the AWS Singapore Summit. In this session, we will describe how AWS enables companies to better understand and engage their customers with personalized, timely, and relevant communications on multiple channels. You will also learn how Disney Streaming Services is using Amazon Pinpoint to engage their users.
Register for the Summit here.

“Mobile Days” at the AWS San Francisco Loft   

April 24, 2019 
AWS San Francisco Loft
Join us for an engaging day of discussion and education. Amazon Pinpoint experts will host the following sessions:

  • 2:30pm – 3:30pm: How Do You Measure Customer Success? Featuring Amazon Pinpoint. 
  • 3:30pm – 4:30pm: Using ML to Create Enhance Your Marketing. Featuring Amazon Pinpoint and Amazon Personalize. 

Space for this event is limited, please reserve your seat here.

AWS Summit, Sydney

May 1-2, 2019
International Convention Centre (ICC), Darling Harbour, Sydney
Don’t miss the customer engagement session on April 30th. This session, part of Amazon’s Innovation Day event, features a keynote address by Neil Lindsay, Vice President of Global Marketing at Amazon. The session explores how AWS technologies power organizations that deliver customer-centric innovations. Learn about how Australia’s largest brands and digital agencies use AWS technologies to engage customers, build new business models, and transform customer experiences.
Register for the Summit here

AWS Summit, Mumbai

May 15, 2019
Bombay Exhibition Center, Mumbai
The Amazon Pinpoint team will be at the “Ask an Expert” booth. Stop by to meet the team, ask questions, and pick up Amazon Pinpoint swag!
Register for the summit here

Enriching Event-Driven Architectures with AWS Event Fork Pipelines

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/enriching-event-driven-architectures-with-aws-event-fork-pipelines/

This post is courtesy of Otavio Ferreira, Mgr, Amazon SNS, and James Hood, Sr. Software Dev Engineer

Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.

Customers often fork event processing into pipelines that address common event handling requirements, such as event storage, backup, search, analytics, or replay. To help you build event-driven applications even faster, AWS introduces Event Fork Pipelines, a collection of open-source event handling pipelines that you can subscribe to Amazon SNS topics in your AWS account.

Event Fork Pipelines is a suite of open-source nested applications, based on the AWS Serverless Application Model (AWS SAM). You can deploy it directly from the AWS Serverless Application Repository into your AWS account.

Event Fork Pipelines is built on top of serverless services, including Amazon SNS, Amazon SQS, and AWS Lambda. These services provide serverless building blocks that help you build fully managed, highly available, and scalable event-driven platforms. Lambda enables you to build event-driven microservices as serverless functions. SNS and SQS provide serverless topics and queues for integrating these microservices and other distributed systems in your architecture. These building blocks are at the core of the modern application development best practices.

Surfacing the event fork pattern

At AWS, we’ve worked closely with customers across market segments and geographies on event-driven architectures. For example:

  • Financial platforms that handle events related to bank transactions and stock ticks
  • Retail platforms that trigger checkout and fulfillment events

At scale, event-driven architectures often require a set of supporting services to address common requirements such as system auditability, data discoverability, compliance, business insights, and disaster recovery. Translated to AWS, customers often connect event-driven applications to services such as Amazon S3 for event storage and backup, and to Amazon Elasticsearch Service for event search and analytics. Also, customers often implement an event replay mechanism to recover from failure modes in their applications.

AWS created Event Fork Pipelines to encapsulate these common requirements, reducing the amount of effort required for you to connect your event-driven architectures to these supporting AWS services.

AWS then started sharing this pattern more broadly, so more customers could benefit. At the 2018 AWS re:Invent conference in Las Vegas, Amazon CTO Werner Vogels announced the launch of nested applications in his keynote. Werner shared the Event Fork Pipelines pattern with the audience as an example of common application logic that had been encapsulated as a set of nested applications.

The following reference architecture diagram shows an application supplemented by three nested applications:

Each pipeline is subscribed to the same SNS topic, and can process events in parallel as these events are published to the topic. Each pipeline is independent and can set its own subscription filter policy. That way, it processes only the subset of events that it’s interested in, rather than all events published to the topic.

Amazon SNS Fork pipelines reference architecture

Figure 1 – Reference architecture using Event Fork Pipelines

The three event fork pipelines are placed alongside your regular event processing pipelines, which are potentially already subscribed to your SNS topic. Therefore, you don’t have to change any portion of your current message publisher to take advantage of Event Fork Pipelines in your existing workloads. The following sections describe these pipelines and how to deploy them in your system architecture.

Understanding the catalog of event fork pipelines

In the abstract, Event Fork Pipelines is a serverless design pattern. Concretely, Event Fork Pipelines is also a suite of nested serverless applications, based on AWS SAM. You deploy the nested applications directly from the AWS Serverless Application Repository to your AWS account, to enrich your event-driven platforms. You can deploy them individually in your architecture, as needed.

Here’s more information about each nested application in the Event Fork Pipelines suite.

Event Storage & Backup pipeline

Event Fork Pipeline for Event Storage & Backup

Figure 2 – Event Fork Pipeline for Event Storage & Backup

The preceding diagram shows the Event Storage & Backup pipeline. You can subscribe this pipeline to your SNS topic to automatically back up the events flowing through your system. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that automatically polls for these events in the queue and pushes them into an Amazon Kinesis Data Firehose delivery stream
  • An S3 bucket that durably backs up the events loaded by the stream

You can configure this pipeline to fine-tune the behavior of your delivery stream. For example, you can configure your pipeline so that the underlying delivery stream buffers, transforms, and compresses your events before loading them into the bucket. As events are loaded, you can use Amazon Athena to query the bucket using standard SQL queries. Also, you can configure the pipeline to either reuse an existing S3 bucket or create a new one for you.

Event Search & Analytics pipeline

Event Fork Pipeline for Event Search & Analytics

Figure 3 – Event Fork Pipeline for Event Search & Analytics

The preceding diagram shows the Event Search & Analytics pipeline. You can subscribe this pipeline to your SNS topic to index in a search domain the events flowing through your system, and then run analytics on them. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and pushes them into a Data Firehose delivery stream
  • An Amazon ES domain that indexes the events loaded by the delivery stream
  • An S3 bucket that stores the dead-letter events that couldn’t be indexed in the search domain

You can configure this pipeline to fine-tune your delivery stream in terms of event buffering, transformation and compression. You can also decide whether the pipeline should reuse an existing Amazon ES domain in your AWS account or create a new one for you. As events are indexed in the search domain, you can use Kibana to run analytics on your events and update visual dashboards in real time.

Event Replay pipeline

Event Fork Pipeline for Event Replay

Figure 4 – Event Fork Pipeline for Event Replay

The preceding diagram shows the Event Replay pipeline. You can subscribe this pipeline to your SNS topic to record the events that have been processed by your system for up to 14 days. You can then reprocess them in case your platform is recovering from a failure or a disaster. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and redrives them into your regular event processing pipeline, which is also subscribed to your topic

By default, the replay function is disabled, which means it isn’t redriving your events. If the events need to be reprocessed, your operators must enable the replay function.

Applying event fork pipelines in a use case

This is how everything comes together. The following scenario describes an event-driven, serverless ecommerce application that uses the Event Fork Pipelines pattern. This example ecommerce application is available in AWS Serverless Application Repository. You can deploy it to your AWS account using the Lambda console, test it, and look at its source code in GitHub.

Example ecommerce application using Event Fork Pipelines

Figure 5 – Example e-commerce application using Event Fork Pipelines

The ecommerce application takes orders from buyers through a RESTful API hosted by Amazon API Gateway and backed by a Lambda function named CheckoutFunction. This function publishes all orders received to an SNS topic named CheckoutEventsTopic, which in turn fans out the orders to four different pipelines. The first pipeline is the regular checkout-processing pipeline designed and implemented by you as the ecommerce application owner. This pipeline has the following resources:

  • An SQS queue named CheckoutQueue that buffers all orders received
  • A Lambda function named CheckoutFunction that polls the queue to process these orders
  • An Amazon DynamoDB table named CheckoutTable that securely saves all orders as they’re placed

The components of the system described thus far handle what you might think of as the core business logic. But in addition, you should address the set of elements necessary for making the system resilient, compliant, and searchable:

  • Backing up all orders securely. Compressed backups must be encrypted at rest, with sensitive payment details removed for security and compliance purposes.
  • Searching and running analytics on orders, if the amount is $100 or more. Analytics are needed for key ecommerce metrics, such as average ticket size, average shipping time, most popular products, and preferred payment options.
  • Replaying recent orders. If the fulfillment process is disrupted at any point, you should be able to replay the most recent orders from up to two weeks. This is a key requirement that guarantees the continuity of the ecommerce business.

Rather than implementing all the event processing logic yourself, you can choose to subscribe Event Fork Pipelines to your existing SNS topic CheckoutEventsTopic. The pipelines are configured as follows:

  • The Event Storage & Backup pipeline is configured to transform data as follows:
    • Remove credit card details
    • Buffer data for 60 seconds
    • Compress data using GZIP
    • Encrypt data using the default customer master key (CMK) for S3

This CMK is managed by AWS and powered by AWS Key Management Service (AWS KMS). For more information, see Choosing Amazon S3 for Your Destination, Data Transformation, and Configuration Settings in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Search & Analytics pipeline is configured with:
    • An index retry duration of 30 seconds
    • A bucket for storing orders that failed to be indexed in the search domain
    • A filter policy to restrict the set of orders that are indexed

For more information, see Choosing Amazon ES for Your Destination, in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Replay pipeline is configured with the SQS queue name that is part of the regular checkout processing pipeline. For more information, see Queue Name and URL in the Amazon SQS Developer Guide.

The filter policy, shown in JSON format, is set in the configuration for the Event Search & Analytics pipeline. This filter policy matches only incoming orders in which the total amount is $100 or more. For more information, see Message Filtering in the Amazon SNS Developer Guide.


{

    "amount": [

        { "numeric": [ ">=", 100 ] }

    ]

}

By using the Event Fork Pipelines pattern, you avoid the development overhead associated with coding undifferentiated logic for handling events.

Event Fork Pipelines can be deployed directly from AWS Serverless Application Repository into your AWS account.

Deploying event fork pipelines

Event Fork Pipelines is available as a set of public apps in the AWS Serverless Application Repository (to find the apps, select the ‘Show apps that create custom IAM roles or resource policies’ check box under the search bar). It can be deployed and tested manually via the Lambda console. In a production scenario, we recommend embedding fork pipelines within the AWS SAM template of your overall application. The nested applications feature enables you to do this by adding an AWS::Serverless::Application resource to your AWS SAM template. The resource references the ApplicationId and SemanticVersion values of the application to nest.

For example, you can include the Event Storage & Backup pipeline as a nested application by adding the following YAML snippet to the Resources section of your AWS SAM template:


Backup:

  Type: AWS::Serverless::Application

  Properties:

    Location:

      ApplicationId: arn:aws:serverlessrepo:us-east-1:012345678901:applications/fork-event-storage-backup-pipeline

      SemanticVersion: 1.0.0

    Parameters:

      # SNS topic ARN whose messages should be backed up to the S3 bucket.

      TopicArn: !Ref MySNSTopic

When specifying parameter values, you can use AWS CloudFormation intrinsic functions to reference other resources in your template. In the preceding example, the TopicArn parameter is filled in by referencing an AWS::SNS::Topic called MySNSTopic, defined elsewhere in the AWS SAM template. For more information, see Intrinsic Function Reference in the AWS CloudFormation User Guide.

To copy the YAML required for nesting, in the Lambda console page for an AWS Serverless Application Repository application, choose Copy as SAM Resource.

Authoring new event fork pipelines

We invite you to fork the Event Fork Pipelines repository in GitHub and submit pull requests for contributing with new pipelines. In addition to event storage and backup, event search and analytics, and event replay, what other common event handling requirements have you seen?

We look forward to seeing what you’ll come up with for extending the Event Fork Pipelines suite.

Summary

Event Fork Pipelines is a serverless design pattern and a suite of open-source nested serverless applications, based on AWS SAM. You can deploy it directly from AWS Serverless Application Repository to enrich your event-driven system architecture. Event Fork Pipelines lets you store, back up, replay, search, and run analytics on the events flowing through your system. There’s no need to write code, manually stitch resources together, or set up infrastructure.

You can deploy Event Fork Pipelines in any AWS Region that supports the underlying AWS services used in the pipelines. There are no additional costs associated with Event Fork Pipelines itself, and you pay only for using the AWS resources inside each nested application.

Get started today by deploying the example ecommerce application or searching for Event Fork Pipelines in AWS Serverless Application Repository.

Two-Way SMS with Amazon Pinpoint

Post Syndicated from Hannah Nilsson original https://aws.amazon.com/blogs/messaging-and-targeting/two-way-sms-with-amazon-pinpoint/

pinpoint-2way-sms

Learn to implement two-way SMS messaging for a simple approach that results in higher levels of customer engagement

SMS, or text messaging, is the simplest way to reach your users outside of normal customer-facing web or mobile applications. Compared to other communication channels, such as email and push notifications, text messaging results in higher engagement.

SMS messaging is extremely convenient — users don’t have to authenticate, download your app, or go to your website. They simply receive your message on their device. When it comes to customer acquisition and retention, it doesn’t get any easier than this.

In this article posted on A Cloud Guru, Dennis Hills explains what two-way SMS is and how you can quickly and easily start sending personalized, timely, and relevant text messages to your customers with Amazon Pinpoint. He then shows how you can implement a practical solution for setting up an SMS long code so you can start sending and receiving text messages.

Read the article now, and be sure to let us know in the comments what types of advanced topics  for SMS messaging you’d like to see us or Dennis write about in the future.

Optimizing a Lift-and-Shift for Security

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/architecture/optimizing-a-lift-and-shift-for-security/

This is the third and final blog within a three-part series that examines how to optimize lift-and-shift workloads. A lift-and-shift is a common approach for migrating to AWS, whereby you move a workload from on-prem with little or no modification. This third blog examines how lift-and-shift workloads can benefit from an improved security posture with no modification to the application codebase. (Read about optimizing a lift-and-shift for performance and for cost effectiveness.)

Moving to AWS can help to strengthen your security posture by eliminating many of the risks present in on-premise deployments. It is still essential to consider how to best use AWS security controls and mechanisms to ensure the security of your workload. Security can often be a significant concern in lift-and-shift workloads, especially for legacy workloads where modern encryption and security features may not present. By making use of AWS security features you can significantly improve the security posture of a lift-and-shift workload, even if it lacks native support for modern security best practices.

Adding TLS with Application Load Balancers

Legacy applications are often the subject of a lift-and-shift. Such migrations can help reduce risks by moving away from out of date hardware but security risks are often harder to manage. Many legacy applications leverage HTTP or other plaintext protocols that are vulnerable to all manner of attacks. Often, modifying a legacy application’s codebase to implement TLS is untenable, necessitating other options.

One comparatively simple approach is to leverage an Application Load Balancer or a Classic Load Balancer to provide SSL offloading. In this scenario, the load balancer would be exposed to users, while the application servers that only support plaintext protocols will reside within a subnet which is can only be accessed by the load balancer. The load balancer would perform the decryption of all traffic destined for the application instance, forwarding the plaintext traffic to the instances. This allows  you to use encryption on traffic between the client and the load balancer, leaving only internal communication between the load balancer and the application in plaintext. Often this approach is sufficient to meet security requirements, however, in more stringent scenarios it is never acceptable for traffic to be transmitted in plaintext, even if within a secured subnet. In this scenario, a sidecar can be used to eliminate plaintext traffic ever traversing the network.

Improving Security and Configuration Management with Sidecars

One approach to providing encryption to legacy applications is to leverage what’s often termed the “sidecar pattern.” The sidecar pattern entails a second process acting as a proxy to the legacy application. The legacy application only exposes its services via the local loopback adapter and is thus accessible only to the sidecar. In turn the sidecar acts as an encrypted proxy, exposing the legacy application’s API to external consumers via TLS. As unencrypted traffic between the sidecar and the legacy application traverses the loopback adapter, it never traverses the network. This approach can help add encryption (or stronger encryption) to legacy applications when it’s not feasible to modify the original codebase. A common approach to implanting sidecars is through container groups such as pod in EKS or a task in ECS.

Implementing the Sidecar Pattern With Containers

Figure 1: Implementing the Sidecar Pattern With Containers

Another use of the sidecar pattern is to help legacy applications leverage modern cloud services. A common example of this is using a sidecar to manage files pertaining to the legacy application. This could entail a number of options including:

  • Having the sidecar dynamically modify the configuration for a legacy application based upon some external factor, such as the output of Lambda function, SNS event or DynamoDB write.
  • Having the sidecar write application state to a cache or database. Often applications will write state to the local disk. This can be problematic for autoscaling or disaster recovery, whereby having the state easily accessible to other instances is advantages. To facilitate this, the sidecar can write state to Amazon S3, Amazon DynamoDB, Amazon Elasticache or Amazon RDS.

A sidecar requires customer development, but it doesn’t require any modification of the lift-and-shifted application. A sidecar treats the application as a blackbox and interacts with it via its API, configuration file, or other standard mechanism.

Automating Security

A lift-and-shift can achieve a significantly stronger security posture by incorporating elements of DevSecOps. DevSecOps is a philosophy that argues that everyone is responsible for security and advocates for automation all parts of the security process. AWS has a number of services which can help implement a DevSecOps strategy. These services include:

  • Amazon GuardDuty: a continuous monitoring system which analyzes AWS CloudTrail Events, Amazon VPC Flow Log and DNS Logs. GuardDuty can detect threats and trigger an automated response.
  • AWS Shield: a managed DDOS protection services
  • AWS WAF: a managed Web Application Firewall
  • AWS Config: a service for assessing, tracking, and auditing changes to AWS configuration

These services can help detect security problems and implement a response in real time, achieving a significantly strong posture than traditional security strategies. You can build a DevSecOps strategy around a lift-and-shift workload using these services, without having to modify the lift-and-shift application.

Conclusion

There are many opportunities for taking advantage of AWS services and features to improve a lift-and-shift workload. Without any alteration to the application you can strengthen your security posture by utilizing AWS security services and by making small environmental and architectural changes that can help alleviate the challenges of legacy workloads.

About the author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to transform their businesses and build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

Optimizing a Lift-and-Shift for Cost Effectiveness and Ease of Management

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/architecture/optimizing-a-lift-and-shift-for-cost/

Lift-and-shift is the process of migrating a workload from on premise to AWS with little or no modification. A lift-and-shift is a common route for enterprises to move to the cloud, and can be a transitionary state to a more cloud native approach. This is the second blog post in a three-part series which investigates how to optimize a lift-and-shift workload. The first post is about performance.

A key concern that many customers have with a lift-and-shift is cost. If you move an application as is  from on-prem to AWS, is there any possibility for meaningful cost savings? By employing AWS services, in lieu of self-managed EC2 instances, and by leveraging cloud capability such as auto scaling, there is potential for significant cost savings. In this blog post, we will discuss a number of AWS services and solutions that you can leverage with minimal or no change to your application codebase in order to significantly reduce management costs and overall Total Cost of Ownership (TCO).

Automate

Even if you can’t modify your application, you can change the way you deploy your application. The adopting-an-infrastructure-as-code approach can vastly improve the ease of management of your application, thereby reducing cost. By templating your application through Amazon CloudFormation, Amazon OpsWorks, or Open Source tools you can make deploying and managing your workloads a simple and repeatable process.

As part of the lift-and-shift process, rationalizing the workload into a set of templates enables less time to spent in the future deploying and modifying the workload. It enables the easy creation of dev/test environments, facilitates blue-green testing, opens up options for DR, and gives the option to roll back in the event of error. Automation is the single step which is most conductive to improving ease of management.

Reserved Instances and Spot Instances

A first initial consideration around cost should be the purchasing model for any EC2 instances. Reserved Instances (RIs) represent a 1-year or 3-year commitment to EC2 instances and can enable up to 75% cost reduction (over on demand) for steady state EC2 workloads. They are ideal for 24/7 workloads that must be continually in operation. An application requires no modification to make use of RIs.

An alternative purchasing model is EC2 spot. Spot instances offer unused capacity available at a significant discount – up to 90%. Spot instances receive a two-minute warning when the capacity is required back by EC2 and can be suspended and resumed. Workloads which are architected for batch runs – such as analytics and big data workloads – often require little or no modification to make use of spot instances. Other burstable workloads such as web apps may require some modification around how they are deployed.

A final alternative is on-demand. For workloads that are not running in perpetuity, on-demand is ideal. Workloads can be deployed, used for as long as required, and then terminated. By leveraging some simple automation (such as AWS Lambda and CloudWatch alarms), you can schedule workloads to start and stop at the open and close of business (or at other meaningful intervals). This typically requires no modification to the application itself. For workloads that are not 24/7 steady state, this can provide greater cost effectiveness compared to RIs and more certainty and ease of use when compared to spot.

Amazon FSx for Windows File Server

Amazon FSx for Windows File Server provides a fully managed Windows filesystem that has full compatibility with SMB and DFS and full AD integration. Amazon FSx is an ideal choice for lift-and-shift architectures as it requires no modification to the application codebase in order to enable compatibility. Windows based applications can continue to leverage standard, Windows-native protocols to access storage with Amazon FSx. It enables users to avoid having to deploy and manage their own fileservers – eliminating the need for patching, automating, and managing EC2 instances. Moreover, it’s easy to scale and minimize costs, since Amazon FSx offers a pay-as-you-go pricing model.

Amazon EFS

Amazon Elastic File System (EFS) provides high performance, highly available multi-attach storage via NFS. EFS offers a drop-in replacement for existing NFS deployments. This is ideal for a range of Linux and Unix usecases as well as cross-platform solutions such as Enterprise Java applications. EFS eliminates the need to manage NFS infrastructure and simplifies storage concerns. Moreover, EFS provides high availability out of the box, which helps to reduce single points of failure and avoids the need to manually configure storage replication. Much like Amazon FSx, EFS enables customers to realize cost improvements by moving to a pay-as-you-go pricing model and requires a modification of the application.

Amazon MQ

Amazon MQ is a managed message broker service that provides compatibility with JMS, AMQP, MQTT, OpenWire, and STOMP. These are amongst the most extensively used middleware and messaging protocols and are a key foundation of enterprise applications. Rather than having to manually maintain a message broker, Amazon MQ provides a performant, highly available managed message broker service that is compatible with existing applications.

To use Amazon MQ without any modification, you can adapt applications that leverage a standard messaging protocol. In most cases, all you need to do is update the application’s MQ endpoint in its configuration. Subsequently, the Amazon MQ service handles the heavy lifting of operating a message broker, configuring HA, fault detection, failure recovery, software updates, and so forth. This offers a simple option for reducing management overhead and improving the reliability of a lift-and-shift architecture. What’s more is that applications can migrate to Amazon MQ without the need for any downtime, making this an easy and effective way to improve a lift-and-shift.

You can also use Amazon MQ to integrate legacy applications with modern serverless applications. Lambda functions can subscribe to MQ topics and trigger serverless workflows, enabling compatibility between legacy and new workloads.

Integrating Lift-and-Shift Workloads with Lambda via Amazon MQ

Figure 1: Integrating Lift-and-Shift Workloads with Lambda via Amazon MQ

Amazon Managed Streaming Kafka

Lift-and-shift workloads which include a streaming data component are often built around Apache Kafka. There is a certain amount of complexity involved in operating a Kafka cluster which incurs management and operational expense. Amazon Kinesis is a managed alternative to Apache Kafka, but it is not a drop-in replacement. At re:Invent 2018, we announced the launch of Amazon Managed Streaming Kafka (MSK) in public preview. MSK provides a managed Kafka deployment with pay-as-you-go pricing and an acts as a drop-in replacement in existing Kafka workloads. MSK can help reduce management costs and improve cost efficiency and is ideal for lift-and-shift workloads.

Leveraging S3 for Static Web Hosting

A significant portion of any web application is static content. This includes videos, image, text, and other content that changes seldom, if ever. In many lift-and-shifted applications, web servers are migrated to EC2 instances and host all content – static and dynamic. Hosting static content from an EC2 instance incurs a number of costs including the instance, EBS volumes, and likely, a load balancer. By moving static content to S3, you can significantly reduce the amount of compute required to host your web applications. In many cases, this change is non-disruptive and can be done at the DNS or CDN layer, requiring no change to your application.

Reducing Web Hosting Costs with S3 Static Web Hosting

Figure 2: Reducing Web Hosting Costs with S3 Static Web Hosting

Conclusion

There are numerous opportunities for reducing the cost of a lift-and-shift. Without any modification to the application, lift-and-shift workloads can benefit from cloud-native features. By using AWS services and features, you can significantly reduce the undifferentiated heavy lifting inherent in on-prem workloads and reduce resources and management overheads.

About the author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to transform their businesses and build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources