All posts by Rachel Richardson

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/simple-two-way-messaging-using-the-amazon-sqs-temporary-queue-client/

This post is contributed by Robin Salkeld, Sr. Software Development Engineer

Amazon SQS is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Asynchronous workflows have always been the primary use case for SQS. Using queues ensures one component can keep running smoothly without losing data when another component is unavailable or slow.

We were surprised, then, to discover that many customers use SQS in synchronous workflows. For example, many applications use queues to communicate between frontends and backends when processing a login request from a user.

Why would anyone use SQS for this? The service stores messages for up to 14 days with high durability, but messages in a synchronous workflow often must be processed within a few minutes, or even seconds. Why not just set up an HTTPS endpoint?

The more we talked to customers, the more we understood. Here’s what we learned:

  • Creating a queue is often easier and faster than creating an HTTPS endpoint and the infrastructure necessary to ensure the endpoint’s scalability.
  • Queues are safe by default because they are locked down to the AWS account that created them. In addition, any DDoS attempt on your service is absorbed by SQS instead of loading down your own servers.
  • There is generally no need to create firewall rules for the communication between microservices if they use queues.
  • Although SQS provides durable storage (which isn’t necessary for short-lived messages), it is still a cost-effective solution for this use case. This is especially true when you consider all the messaging broker maintenance that is done for you.

However, setting up efficient two-way communication through one-way queues requires some non-trivial client-side code. In our previous two-part post series on implementing enterprise integration patterns with AWS messaging services, Point-to-point channels and Publish-subscribe channels, we discussed the Request-Response Messaging Pattern. In this pattern, each requester creates a temporary destination to receive each response message.

The simplest approach is to create a new queue for each response, but this is like building a road just so a single car can drive on it before tearing it down. Technically, this can work (and SQS can create and delete queues quickly), but we can definitely make it faster and cheaper.

To better support short-lived, lightweight messaging destinations, we are pleased to present the Amazon SQS Temporary Queue Client. This client makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill.

Virtual queues

The key concept behind the client is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS and no costs associated with creating a virtual queue.

The Temporary Queue Client includes the AmazonSQSVirtualQueuesClient class for creating and managing virtual queues. This class implements the AmazonSQS interface and adds support for attributes related to virtual queues. You can create a virtual queue using this client by calling the CreateQueue API action and including the HostQueueURL queue attribute. This attribute specifies the existing SQS queue on which to host the virtual queue. The queue URL for a virtual queue is in the form <host queue URL>#<virtual queue name>. For example:

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue#MyVirtualQueueName

When you call the SendMessage or SendMessageBatch API actions on AmazonSQSVirtualQueuesClient with a virtual queue URL, the client first extracts the virtual queue name. It then attaches this name as an additional message attribute to each message, and sends the messages to the host queue. When you call the ReceiveMessage API action on a virtual queue, the calling thread waits for messages to appear in the in-memory buffer for the virtual queue. Meanwhile, a background thread polls the host queue and dispatches messages to these buffers according to the additional message attribute.

This mechanism is similar to how the AmazonSQSBufferedAsyncClient prefetches messages, and the benefits are similar. A single call to SQS can provide messages for up to 10 virtual queues, reducing the API calls that you pay for by up to a factor of ten. Deleting a virtual queue simply removes the client-side resources used to implement them, again without making API calls to SQS.

The diagram below illustrates the end-to-end process for sending messages through virtual queues:

Sending messages through virtual queues

Virtual queues are similar to virtual machines. Just as a virtual machine provides the same experience as a physical machine, a virtual queue divides the resources of a single SQS queue into smaller logical queues. This is ideal for temporary queues, since they frequently only receive a handful of messages in their lifetime. Virtual queues are currently implemented entirely within the Temporary Queue Client, but additional support and optimizations might be added to SQS itself in the future.

In most cases, you don’t have to manage virtual queues yourself. The library also includes the AmazonSQSTemporaryQueuesClient class. This class automatically creates virtual queues when the CreateQueue API action is called and creates host queues on demand for all queues with the same queue attributes. To optimize existing application code that creates and deletes queues, you can use this class as a drop-in replacement implementation of the AmazonSQS interface.

The client also includes the AmazonSQSRequester and AmazonSQSResponder interfaces, which enable two-way communication through SQS queues. The following is an example of an RPC implementation for a web application’s login process.

/**
 * This class handles a user's login request on the client side.
 */
public class LoginClient {

    // The SQS queue to send the requests to.
    private final String requestQueueUrl;

    // The AmazonSQSRequester creates a temporary queue for each response.
    private final AmazonSQSRequester sqsRequester = AmazonSQSRequesterClientBuilder.defaultClient();

    private final LoginClient(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Send a login request to the server.
     */
    public String login(String body) throws TimeoutException {
        SendMessageRequest request = new SendMessageRequest()
                .withMessageBody(body)
                .withQueueUrl(requestQueueUrl);

        // This:
        //  - creates a temporary queue
        //  - attaches its URL as an attribute on the message
        //  - sends the message
        //  - receives the response from the temporary queue
        //  - deletes the temporary queue
        //  - returns the response
        //
        // If something goes wrong and the server's response never shows up, this method throws a
        // TimeoutException.
        Message response = sqsRequester.sendMessageAndGetResponse(request, 20, TimeUnit.SECONDS);
        
        return response.getBody();
    }
}

/**
 * This class processes users' login requests on the server side.
 */
public class LoginServer {

    // The SQS queue to poll for login requests.
    // Assume that on construction a thread is created to poll this queue and call
    // handleLoginRequest() below for each message.
    private final String requestQueueUrl;

    // The AmazonSQSResponder sends responses to the correct response destination.
    private final AmazonSQSResponder sqsResponder = AmazonSQSResponderClientBuilder.defaultClient();

    private final AmazonSQS(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Handle a login request sent from the client above.
     */
    public void handleLoginRequest(Message message) {
        // Assume doLogin does the actual work, and returns a serialized result
        String response = doLogin(message.getBody());

        // This extracts the URL of the temporary queue from the message attribute and sends the
        // response to that queue.
        sqsResponder.sendResponseMessage(MessageContent.fromMessage(message), new MessageContent(response));  
    }
}

Cleaning up

As with any other AWS SDK client, your code should call the shutdown() method when the temporary queue client is no longer needed. The AmazonSQSRequester interface also provides a shutdown() method, which shuts down its internal temporary queue client. This ensures that the in-memory resources needed for any virtual queues are reclaimed, and that the host queue that the client automatically created is also deleted automatically.

However, in the world of distributed systems things are a little more complex. Processes can run out of memory and crash, and hosts can reboot suddenly and unexpectedly. There are even cases where you don’t have the opportunity to run custom code on shutdown.

The Temporary Queue Client client addresses this issue as well. For each host queue with recent API calls, the client periodically uses the TagQueue API action to attach a fresh tag value that indicates the queue is still being used. The tagging process serves as a heartbeat to keep the queue alive. According to a configurable time period (by default, 5 minutes), a background thread uses the ListQueues API action to obtain the URLs of all queues with the configured prefix. Then, it deletes each queue that has not been tagged recently. The mechanism is similar to how the Amazon DynamoDB Lock Client expires stale lock leases.

If you use the AmazonSQSTemporaryQueuesClient directly, you can customize how long queues must be idle before they is deleted by configuring the IdleQueueRetentionPeriodSeconds queue attribute. The client supports setting this attribute on both host queues and virtual queues. For virtual queues, setting this attribute ensures that the in-memory resources do not become a memory leak in the presence of application bugs.

Any API call to a queue marks it as non-idle, including ReceiveMessage calls that don’t return any messages. The only reason to increase the retention period attribute is to give the client more time when it can’t send heartbeats—for example, due to garbage collection pauses or networking issues.

But what if you want to use this client in a fleet of a thousand EC2 instances? Won’t every client spend a lot of time checking every queue for idleness? Doesn’t that imply duplicate work that increases as the fleet is scaled up?

We thought of this too. The Temporary Queue Client creates a shared queue for all clients using the same queue prefix, and uses this queue as a work queue for the distributed task. Instead of every client calling the ListQueues API action every five minutes, a new seed message (which triggers the sweeping process) is sent to this queue every five minutes.

When one of the clients receives this message, it calls the ListQueues API action and sends each queue URL in the result as another kind of message to the same shared work queue. The work of actually checking each queue for idleness is distributed roughly evenly to the active clients, ensuring scalability. There is even a mechanism that works around the fact that the ListQueues API action currently only returns no more than 1,000 queue URLs at time.

Summary

We are excited about how the new Amazon SQS Temporary Queue Client makes more messaging patterns easier and cheaper for you. Download the code from GitHub, have a look at Temporary Queues in the Amazon SQS Developer Guide, try out the client, and let us know what you think!

Configuring user creation workflows with AWS Step Functions and AWS Managed Microsoft AD logs

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/configuring-user-creation-workflows-with-aws-step-functions-and-aws-managed-microsoft-ad-logs/

This post is contributed by Taka Matsumoto, Cloud Support Engineer

AWS Directory Service lets you run Microsoft Active Directory as a managed service. Directory Service for Microsoft Active Directory, also referred to as AWS Managed Microsoft AD, is powered by Microsoft Windows Server 2012 R2. It manages users and makes it easy to integrate with compatible AWS services and other applications. Using the log forwarding feature, you can stay aware of all security events in Amazon CloudWatch Logs. This helps monitor events like the addition of a new user.

When new users are created in your AWS Managed Microsoft AD, you might go through the initial setup workflow manually. However, AWS Step Functions can coordinate new user creation activities into serverless workflows that automate the process. With Step Functions, AWS Lambda can be also used to run code for the automation workflows without provisioning or managing servers.

In this post, I show how to create and trigger a new user creation workflow in Step Functions. This workflow creates a WorkSpace in Amazon WorkSpaces and a user in Amazon Connect using AWS Managed Microsoft AD, Step Functions, Lambda, and Amazon CloudWatch Logs.

Overview

The following diagram shows the solution graphically.

Configuring user creation workflows with AWS Step Functions and AWS Managed Microsoft AD logs

Walkthrough

Using the following procedures, create an automated user creation workflow with AWS Managed Microsoft AD. The solution requires the creation of new resources in CloudWatch, Lambda, and Step Functions, and a new user in Amazon WorkSpaces and Amazon Connect. Here’s the list of steps:

  1. Enable log forwarding.
  2. Create the Lambda functions.
  3. Set up log streaming.
  4. Create a state machine in Step Functions.
  5. Test the solution.

Requirements

To follow along, you need the following resources:

  • AWS Managed Microsoft AD
    • Must be registered with Amazon WorkSpaces
    • Must be registered with Amazon Connect

In this example, you use an Amazon Connect instance with SAML 2.0-based authentication as identity management. For more information, see Configure SAML for Identity Management in Amazon Connect.

Enable log forwarding

Enable log forwarding for your AWS Managed Microsoft AD.  Use /aws/directoryservice/<directory id> for the CloudWatch log group name. You will use this log group name when creating a Log Streaming in Step 3.

Create Lambda functions

Create two Lambda functions. The first starts a Step Functions execution with CloudWatch Logs. The second performs a user registration process with Amazon WorkSpaces and Amazon Connect within a Step Functions execution.

Create the first function with the following settings:

  • Name: DS-Log-Stream-Function
  • Runtime: Python 3.7
  • Memory: 128 MB
  • Timeout: 3 seconds
  • Environment variables:
    • Key: stateMachineArn
    • Value: arn:aws:states:<Region>:<AccountId>:stateMachine:NewUserWorkFlow
  • IAM role with the following permissions:
    • AWSLambdaBasicExecutionRole
    • The following permissions policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "states:StartExecution",
            "Resource": "*"
        }
    ]
}
import base64
import boto3
import gzip
import json
import re
import os
def lambda_handler(event, context):
    logEvents = DecodeCWPayload(event)
    print('Event payload:', logEvents)
    returnResultDict = []
    
    # Because there can be more than one message pushed in a single payload, use a for loop to start a workflow for every user
    for logevent in logEvents:
        logMessage = logevent['message']
        upnMessage =  re.search("(<Data Name='UserPrincipalName'>)(.*?)(<\/Data>)",logMessage)
        if upnMessage != None:
            upn = upnMessage.group(2).lower()
            userNameAndDomain = upn.split('@')
            userName = userNameAndDomain[0].lower()
            userNameAndDomain = upn.split('@')
            domainName = userNameAndDomain[1].lower()
            sfnInputDict = {'Username': userName, 'UPN': upn, 'DomainName': domainName}
            sfnResponse = StartSFNExecution(json.dumps(sfnInputDict))
            print('Username:',upn)
            print('Execution ARN:', sfnResponse['executionArn'])
            print('Execution start time:', sfnResponse['startDate'])
            returnResultDict.append({'Username': upn, 'ExectionArn': sfnResponse['executionArn'], 'Time': str(sfnResponse['startDate'])})

    returnObject = {'Result':returnResultDict}
    return {
        'statusCode': 200,
        'body': json.dumps(returnObject)
    }

# Helper function decode the payload
def DecodeCWPayload(payload):
    # CloudWatch Log Stream event 
    cloudWatchLog = payload['awslogs']['data']
    # Base 64 decode the log 
    base64DecodedValue = base64.b64decode(cloudWatchLog)
    # Uncompress the gzipped decoded value
    gunzipValue = gzip.decompress(base64DecodedValue)
    dictPayload = json.loads(gunzipValue)
    decodedLogEvents = dictPayload['logEvents']
    return decodedLogEvents

# Step Functions state machine execution function
def StartSFNExecution(sfnInput):
    sfnClient = boto3.client('stepfunctions')
    try:
        response = sfnClient.start_execution(
            stateMachineArn=os.environ['stateMachineArn'],
            input=sfnInput
        )
        return response
    except Exception as e:
        return e

For the other function used to perform a user creation task, use the following settings:

  • Name: SFN-New-User-Flow
  • Runtime: Python 3.7
  • Memory: 128 MB
  • Timeout: 3 seconds
  • Environment variables:
    • Key: nameDelimiter
    • Value: . [period]

This delimiter is used to split the username into a first name and last name, as Amazon Connect instances with SAML-based authentication require both a first name and last name for users. For more information, see CreateUser API and UserIdentity Info.

  • Key: bundleId
  • Value: <WorkSpaces bundle ID>

Run the following AWS CLI command to return Amazon-owned WorkSpaces bundles. Use one of the bundle IDs for the key-value pair.

aws workspaces describe-workspace-bundles –owner AMAZON

  • Key: directoryId
  • Value: <WorkSpaces directory ID>

Run the following AWS CLI command to return Amazon WorkSpaces directories. Use your directory ID for the key-value pair.

aws workspaces describe-workspace-directories

  • Key: instanceId
  • Value: <Amazon Connect instance ID>

Find the Amazon Connect instance ID the Amazon Connect instance ID.

  • Key: routingProfile
  • Value: <Amazon Connect routing profile>

Run the following AWS CLI command to list routing profiles with their IDs. For this walkthrough, use the ID for the basic routing profile.

aws connect list-routing-profiles –instance-id <instance id>

  • Key: securityProfile
  • Value: <Amazon Connect security profile>

Run the following AWS CLI command to list security profiles with their IDs. For this walkthrough, use the ID for an agent security profile.

aws connect list-security-profiles –instance-id  <instance id>

  • IAM role permissions:
    • AWSLambdaBasicExecutionRole

The following permissions policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "connect:CreateUser",
                "workspaces:CreateWorkspaces"
            ],
            "Resource": "*"
        }
    ]
}
import json
import os
import boto3

def lambda_handler(event, context):
    userName = event['input']['User']
    nameDelimiter = os.environ['nameDelimiter']
    if nameDelimiter in userName:
        firstName = userName.split(nameDelimiter)[0]
        lastName = userName.split(nameDelimiter)[1]
    else:
        firstName = userName
        lastName = userName
    domainName = event['input']['Domain']
    upn = event['input']['UPN']
    serviceName = event['input']['Service']
    if serviceName == 'WorkSpaces':
        # Setting WorkSpaces variables
        workspacesDirectoryId = os.environ['directoryId']
        workspacesUsername = upn
        workspacesBundleId = os.environ['bundleId']
        createNewWorkSpace = create_new_workspace(
            directoryId=workspacesDirectoryId,
            username=workspacesUsername,
            bundleId=workspacesBundleId
        )
        return createNewWorkSpace
    elif serviceName == 'Connect':
        createConnectUser = create_connect_user(
            connectUsername=upn,
            connectFirstName=firstName,
            connectLastName=lastName,
            securityProfile=os.environ['securityProfile'], 
            routingProfile=os.environ['routingProfile'], 
            instanceId=os.environ['instanceId']
        )
        return createConnectUser
    else:
        print(serviceName, 'is not recognized...')
        print('Available service names are WorkSpaces and Connect')
        unknownServiceException = {
            'statusCode': 500,
            'body': json.dumps(f'Service name, {serviceName}, is not recognized')}
        raise Exception(unknownServiceException)

class FailedWorkSpaceCreationException(Exception):
    pass

class WorkSpaceResourceExists(Exception):
    pass

def create_new_workspace(directoryId, username, bundleId):
    workspacesClient = boto3.client('workspaces')
    response = workspacesClient.create_workspaces(
        Workspaces=[{
                'DirectoryId': directoryId,
                'UserName': username,
                'BundleId': bundleId,
                'WorkspaceProperties': {
                    'RunningMode': 'AUTO_STOP',
                    'RunningModeAutoStopTimeoutInMinutes': 60,
                    'RootVolumeSizeGib': 80,
                    'UserVolumeSizeGib': 100,
                    'ComputeTypeName': 'VALUE'
                    }}]
                    )
    print('create_workspaces response:',response)
    for pendingRequest in response['PendingRequests']:
        if pendingRequest['UserName'] == username:
            workspacesResultObject = {'UserName':username, 'ServiceName':'WorkSpaces', 'Status': 'Success'}
            return {
                'statusCode': 200,
                'body': json.dumps(workspacesResultObject)
                }
    for failedRequest in response['FailedRequests']:
        if failedRequest['WorkspaceRequest']['UserName'] == username:
            errorCode = failedRequest['ErrorCode']
            errorMessage = failedRequest['ErrorMessage']
            errorResponse = {'Error Code:', errorCode, 'Error Message:', errorMessage}
            if errorCode == "ResourceExists.WorkSpace": 
                raise WorkSpaceResourceExists(str(errorResponse))
            else:
                raise FailedWorkSpaceCreationException(str(errorResponse))
                
def create_connect_user(connectUsername, connectFirstName,connectLastName,securityProfile,routingProfile,instanceId):
    connectClient = boto3.client('connect')
    response = connectClient.create_user(
                    Username=connectUsername,
                    IdentityInfo={
                        'FirstName': connectFirstName,
                        'LastName': connectLastName
                        },
                    PhoneConfig={
                        'PhoneType': 'SOFT_PHONE',
                        'AutoAccept': False,
                        },
                    SecurityProfileIds=[
                        securityProfile,
                        ],
                    RoutingProfileId=routingProfile,
                    InstanceId = instanceId
                    )
    connectSuccessResultObject = {'UserName':connectUsername,'ServiceName':'Connect','FirstName': connectFirstName, 'LastName': connectLastName,'Status': 'Success'}
    return {
        'statusCode': 200,
        'body': json.dumps(connectSuccessResultObject)
        }

Set up log streaming

Create a new CloudWatch Logs subscription filter that sends log data to the Lambda function DS-Log-Stream-Function created in Step 2.

  1. In the CloudWatch console, choose Logs, Log Groups, and select the log group, /aws/directoryservice/<directory id>, for the directory set up in Step 1.
  2. Choose Actions, Stream to AWS Lambda.
  3. Choose Destination, and select the Lambda function DS-Log-Stream-Function.
  4. For Log format, choose Other as the log format and enter “<EventID>4720</EventID>” (include the double quotes).
  5. Choose Start streaming.

If there is an existing subscription filter for the log group, run the following AWS CLI command to create a subscription filter for the Lambda function, DS-Log-Stream-Function.

aws logs put-subscription-filter \

--log-group-name /aws/directoryservice/<directoryid> \

--filter-name NewUser \

--filter-pattern "<EventID>4720</EventID>" \

--destination-arn arn:aws:lambda:<Region>:<ACCOUNT_NUMBER>:function:DS-Log-Stream-Function

For more information, see Using CloudWatch Logs Subscription Filters.

Create a state machine in Step Functions

The next step is to create a state machine in Step Functions. This state machine runs the Lambda function, SFN-New-User-Flow, to create a user in Amazon WorkSpaces and Amazon Connect.

Define the state machine, using the following settings:

  • Name: NewUserWorkFlow
  • State machine definition: Copy the following state machine definition:
{
    "Comment": "An example state machine for a new user creation workflow",
    "StartAt": "Parallel",
    "States": {
        "Parallel": {
            "Type": "Parallel",
            "End": true,
            "Branches": [
                {
                    "StartAt": "CreateWorkSpace",
                    "States": {
                        "CreateWorkSpace": {
                            "Type": "Task",
                            "Parameters": {
                                "input": {
                                    "User.$": "$.Username",
                                    "UPN.$": "$.UPN",
                                    "Domain.$": "$.DomainName",
                                    "Service": "WorkSpaces"
                                }
                            },
                            "Resource": "arn:aws:lambda:{region}:{account id}:function:SFN-New-User-Flow",
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "WorkSpaceResourceExists"
                                    ],
                                    "IntervalSeconds": 1,
                                    "MaxAttempts": 0,
                                    "BackoffRate": 1
                                },
                                {
                                    "ErrorEquals": [
                                        "States.ALL"
                                    ],
                                    "IntervalSeconds": 10,
                                    "MaxAttempts": 2,
                                    "BackoffRate": 2
                                }
                            ],
                            "Catch": [
                                {
                                    "ErrorEquals": [
                                        "WorkSpaceResourceExists"
                                    ],
                                    "ResultPath": "$.workspacesResult",
                                    "Next": "WorkSpacesPassState"
                                },
                                {
                                    "ErrorEquals": [
                                        "States.ALL"
                                    ],
                                    "ResultPath": "$.workspacesResult",
                                    "Next": "WorkSpacesPassState"
                                }
                            ],
                            "End": true
                        },
                        "WorkSpacesPassState": {
                            "Type": "Pass",
                            "Parameters": {
                                "Result.$": "$.workspacesResult"
                            },
                            "End": true
                        }
                    }
                },
                {
                    "StartAt": "CreateConnectUser",
                    "States": {
                        "CreateConnectUser": {
                            "Type": "Task",
                            "Parameters": {
                                "input": {
                                    "User.$": "$.Username",
                                    "UPN.$": "$.UPN",
                                    "Domain.$": "$.DomainName",
                                    "Service": "Connect"
                                }
                            },
                            "Resource": "arn:aws:lambda:{region}:{account id}:function:SFN-New-User-Flow",
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "DuplicateResourceException"
                                    ],
                                    "IntervalSeconds": 1,
                                    "MaxAttempts": 0,
                                    "BackoffRate": 1
                                },
                                {
                                    "ErrorEquals": [
                                        "States.ALL"
                                    ],
                                    "IntervalSeconds": 10,
                                    "MaxAttempts": 2,
                                    "BackoffRate": 2
                                }
                            ],
                            "Catch": [
                                {
                                    "ErrorEquals": [
                                        "DuplicateResourceException"
                                    ],
                                    "ResultPath": "$.connectResult",
                                    "Next": "ConnectPassState"
                                },
                                {
                                    "ErrorEquals": [
                                        "States.ALL"
                                    ],
                                    "ResultPath": "$.connectResult",
                                    "Next": "ConnectPassState"
                                }
                            ],
                            "End": true,
                            "ResultPath": "$.connectResult"
                        },
                        "ConnectPassState": {
                            "Type": "Pass",
                            "Parameters": {
                                "Result.$": "$.connectResult"
                            },
                            "End": true
                        }
                    }
                }
            ]
        }
    }
}

After entering the name and state machine definition, choose Next.

Configure the settings by choosing Create an IAM role for me. This creates an IAM role for the state machine to run the Lambda function SFN-New-User-Flow.

Here’s the list of states in the NewUserWorkFlow state machine definition:

  • Start—When the state machine starts, it creates a parallel state to start both the CreateWorkSpace and CreateConnectUser states.
  • CreateWorkSpace—This task state runs the SFN-New-User-Flow Lambda function to create a new WorkSpace for the user. If this is successful, it goes to the End state.
  • WorkSpacesPassState—This pass state returns the result from the CreateWorkSpace state.
  • CreateConnectUse — This task state runs the SFN-New-User-Flow Lambda function to create a user in Amazon Connect. If this is successful, it goes to the End state.
  • ConnectPassState—This pass state returns the result from the CreateWorkSpace state.
  • End

The following diagram shows how these states relate to each other.

Step Functions State Machine

Test the solution

It’s time to test the solution. Create a user in AWS Managed Microsoft AD. The new user should have the following attributes:

This starts a new state machine execution in Step Functions. Here’s the flow:

  1. When there is a user creation event (Event ID: 4720) in the AWS Managed Microsoft AD security log, CloudWatch invokes the Lambda function, DS-Log-Stream-Function, to start a new state machine execution in Step Functions.
  2. To create a new WorkSpace and create a user in the Amazon Connect instance, the state machine execution runs tasks to invoke the other Lambda function, SFN-New-User-Flow.

Conclusion

This solution automates the initial user registration workflow. Step Functions provides the flexibility to customize the workflow to meet your needs. This walkthrough included Amazon WorkSpaces and Amazon Connect; both services are used to register the new user. For organizations that create a number of new users on a regular basis, this new user automation workflow can save time when configuring resources for a new user.

The event source of the automation workflow can be any event that triggers the new user workflow, so the event source isn’t limited to CloudWatch Logs. Also, the integrated service used for new user registration can be any AWS service that offers API and works with AWS Managed Microsoft AD. Other programmatically accessible services within or outside AWS can also fill that role.

In this post, I showed you how serverless workflows can streamline and coordinate user creation activities. Step Functions provides this functionality, with the help of Lambda, Amazon WorkSpaces, AWS Managed Microsoft AD, and Amazon Connect. Together, these services offer increased power and functionality when managing users, monitoring security, and integrating with compatible AWS services.

Getting started with serverless

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/getting-started-with-serverless/

This post is contributed by Maureen Lonergan, Director, AWS Training and Certification

We consistently hear from customers that they’re interested in building serverless applications to take advantage of the increased agility and decreased total cost of ownership (TCO) that serverless delivers. But we also know that serverless may be intimidating for those who are more accustomed to using instances or containers for compute.

Since we launched AWS Lambda in 2014, our serverless portfolio has expanded beyond event-driven computing. We now have serverless databases, integration, and orchestration tools. This enables you to build end-to-end serverless applications—but it also means that you must learn how to build using a new serverless operational model.

For this reason, AWS Training and Certification is pleased to offer a new course through Coursera entitled AWS Fundamentals: Building Serverless Applications.

This scenario-based course, developed by the experts at AWS, will:

  • Introduce the AWS serverless framework and architecture in the context of a real business problem.
  • Provide the foundational knowledge to become more proficient in choosing and creating serverless solutions using AWS.
  • Provide demonstrations of the AWS services needed for deploying serverless solutions.
  • Help you develop skills in building and deploying serverless solutions using real-world examples of a serverless website and chatbot.

The syllabus allocates more than nine hours of video content and reading material over four weekly lessons. Each lesson has an estimated 2–3 hours per week of study time (though you can set your own pace and deadlines), with suggested exercises in the AWS Management Console. There is an end-of-course assessment that covers all the learning objectives and content.

The course is on-demand and 100% digital; you can even audit it for free. A completion certificate and access to the graded assessments are available for $49.

What can you expect?

In this course you will learn to use the AWS Serverless portfolio to create a chatbot that answers the question, “Can I let my cat outside?” You will build an application using every one of the concepts and services discussed in the class, including:

At the end of the class, you can audibly interact with the application to ask that essential question, “Can my cat go out in Denver?” (See the conversation in the following screenshot.)

Serverless Coursera training app

Across the four weeks of the course, you learn:

  1. What serverless computing is and how to create a chatbot with Amazon Lex using an S3 bucket to host a web application.
  2. How to build a highly scalable API with API Gateway and use Amazon CloudFront as a content delivery network (CDN) for your site and API.
  3. How to use Lambda to build serverless functions that write data to DynamoDB.
  4. How to apply lessons from the previous weeks to extend and add functionality to the chatbot.

Serverless Coursera training

AWS Fundamentals: Building Serverless Applications is now available. This course complements other standalone digital courses by AWS Training and Certification. They include the highly recommended Introduction to Serverless Development, as well as the following:

Enriching Event-Driven Architectures with AWS Event Fork Pipelines

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/enriching-event-driven-architectures-with-aws-event-fork-pipelines/

This post is courtesy of Otavio Ferreira, Mgr, Amazon SNS, and James Hood, Sr. Software Dev Engineer

Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.

Customers often fork event processing into pipelines that address common event handling requirements, such as event storage, backup, search, analytics, or replay. To help you build event-driven applications even faster, AWS introduces Event Fork Pipelines, a collection of open-source event handling pipelines that you can subscribe to Amazon SNS topics in your AWS account.

Event Fork Pipelines is a suite of open-source nested applications, based on the AWS Serverless Application Model (AWS SAM). You can deploy it directly from the AWS Serverless Application Repository into your AWS account.

Event Fork Pipelines is built on top of serverless services, including Amazon SNS, Amazon SQS, and AWS Lambda. These services provide serverless building blocks that help you build fully managed, highly available, and scalable event-driven platforms. Lambda enables you to build event-driven microservices as serverless functions. SNS and SQS provide serverless topics and queues for integrating these microservices and other distributed systems in your architecture. These building blocks are at the core of the modern application development best practices.

Surfacing the event fork pattern

At AWS, we’ve worked closely with customers across market segments and geographies on event-driven architectures. For example:

  • Financial platforms that handle events related to bank transactions and stock ticks
  • Retail platforms that trigger checkout and fulfillment events

At scale, event-driven architectures often require a set of supporting services to address common requirements such as system auditability, data discoverability, compliance, business insights, and disaster recovery. Translated to AWS, customers often connect event-driven applications to services such as Amazon S3 for event storage and backup, and to Amazon Elasticsearch Service for event search and analytics. Also, customers often implement an event replay mechanism to recover from failure modes in their applications.

AWS created Event Fork Pipelines to encapsulate these common requirements, reducing the amount of effort required for you to connect your event-driven architectures to these supporting AWS services.

AWS then started sharing this pattern more broadly, so more customers could benefit. At the 2018 AWS re:Invent conference in Las Vegas, Amazon CTO Werner Vogels announced the launch of nested applications in his keynote. Werner shared the Event Fork Pipelines pattern with the audience as an example of common application logic that had been encapsulated as a set of nested applications.

The following reference architecture diagram shows an application supplemented by three nested applications:

Each pipeline is subscribed to the same SNS topic, and can process events in parallel as these events are published to the topic. Each pipeline is independent and can set its own subscription filter policy. That way, it processes only the subset of events that it’s interested in, rather than all events published to the topic.

Amazon SNS Fork pipelines reference architecture

Figure 1 – Reference architecture using Event Fork Pipelines

The three event fork pipelines are placed alongside your regular event processing pipelines, which are potentially already subscribed to your SNS topic. Therefore, you don’t have to change any portion of your current message publisher to take advantage of Event Fork Pipelines in your existing workloads. The following sections describe these pipelines and how to deploy them in your system architecture.

Understanding the catalog of event fork pipelines

In the abstract, Event Fork Pipelines is a serverless design pattern. Concretely, Event Fork Pipelines is also a suite of nested serverless applications, based on AWS SAM. You deploy the nested applications directly from the AWS Serverless Application Repository to your AWS account, to enrich your event-driven platforms. You can deploy them individually in your architecture, as needed.

Here’s more information about each nested application in the Event Fork Pipelines suite.

Event Storage & Backup pipeline

Event Fork Pipeline for Event Storage & Backup

Figure 2 – Event Fork Pipeline for Event Storage & Backup

The preceding diagram shows the Event Storage & Backup pipeline. You can subscribe this pipeline to your SNS topic to automatically back up the events flowing through your system. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that automatically polls for these events in the queue and pushes them into an Amazon Kinesis Data Firehose delivery stream
  • An S3 bucket that durably backs up the events loaded by the stream

You can configure this pipeline to fine-tune the behavior of your delivery stream. For example, you can configure your pipeline so that the underlying delivery stream buffers, transforms, and compresses your events before loading them into the bucket. As events are loaded, you can use Amazon Athena to query the bucket using standard SQL queries. Also, you can configure the pipeline to either reuse an existing S3 bucket or create a new one for you.

Event Search & Analytics pipeline

Event Fork Pipeline for Event Search & Analytics

Figure 3 – Event Fork Pipeline for Event Search & Analytics

The preceding diagram shows the Event Search & Analytics pipeline. You can subscribe this pipeline to your SNS topic to index in a search domain the events flowing through your system, and then run analytics on them. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and pushes them into a Data Firehose delivery stream
  • An Amazon ES domain that indexes the events loaded by the delivery stream
  • An S3 bucket that stores the dead-letter events that couldn’t be indexed in the search domain

You can configure this pipeline to fine-tune your delivery stream in terms of event buffering, transformation and compression. You can also decide whether the pipeline should reuse an existing Amazon ES domain in your AWS account or create a new one for you. As events are indexed in the search domain, you can use Kibana to run analytics on your events and update visual dashboards in real time.

Event Replay pipeline

Event Fork Pipeline for Event Replay

Figure 4 – Event Fork Pipeline for Event Replay

The preceding diagram shows the Event Replay pipeline. You can subscribe this pipeline to your SNS topic to record the events that have been processed by your system for up to 14 days. You can then reprocess them in case your platform is recovering from a failure or a disaster. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and redrives them into your regular event processing pipeline, which is also subscribed to your topic

By default, the replay function is disabled, which means it isn’t redriving your events. If the events need to be reprocessed, your operators must enable the replay function.

Applying event fork pipelines in a use case

This is how everything comes together. The following scenario describes an event-driven, serverless ecommerce application that uses the Event Fork Pipelines pattern. This example ecommerce application is available in AWS Serverless Application Repository. You can deploy it to your AWS account using the Lambda console, test it, and look at its source code in GitHub.

Example ecommerce application using Event Fork Pipelines

Figure 5 – Example e-commerce application using Event Fork Pipelines

The ecommerce application takes orders from buyers through a RESTful API hosted by Amazon API Gateway and backed by a Lambda function named CheckoutFunction. This function publishes all orders received to an SNS topic named CheckoutEventsTopic, which in turn fans out the orders to four different pipelines. The first pipeline is the regular checkout-processing pipeline designed and implemented by you as the ecommerce application owner. This pipeline has the following resources:

  • An SQS queue named CheckoutQueue that buffers all orders received
  • A Lambda function named CheckoutFunction that polls the queue to process these orders
  • An Amazon DynamoDB table named CheckoutTable that securely saves all orders as they’re placed

The components of the system described thus far handle what you might think of as the core business logic. But in addition, you should address the set of elements necessary for making the system resilient, compliant, and searchable:

  • Backing up all orders securely. Compressed backups must be encrypted at rest, with sensitive payment details removed for security and compliance purposes.
  • Searching and running analytics on orders, if the amount is $100 or more. Analytics are needed for key ecommerce metrics, such as average ticket size, average shipping time, most popular products, and preferred payment options.
  • Replaying recent orders. If the fulfillment process is disrupted at any point, you should be able to replay the most recent orders from up to two weeks. This is a key requirement that guarantees the continuity of the ecommerce business.

Rather than implementing all the event processing logic yourself, you can choose to subscribe Event Fork Pipelines to your existing SNS topic CheckoutEventsTopic. The pipelines are configured as follows:

  • The Event Storage & Backup pipeline is configured to transform data as follows:
    • Remove credit card details
    • Buffer data for 60 seconds
    • Compress data using GZIP
    • Encrypt data using the default customer master key (CMK) for S3

This CMK is managed by AWS and powered by AWS Key Management Service (AWS KMS). For more information, see Choosing Amazon S3 for Your Destination, Data Transformation, and Configuration Settings in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Search & Analytics pipeline is configured with:
    • An index retry duration of 30 seconds
    • A bucket for storing orders that failed to be indexed in the search domain
    • A filter policy to restrict the set of orders that are indexed

For more information, see Choosing Amazon ES for Your Destination, in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Replay pipeline is configured with the SQS queue name that is part of the regular checkout processing pipeline. For more information, see Queue Name and URL in the Amazon SQS Developer Guide.

The filter policy, shown in JSON format, is set in the configuration for the Event Search & Analytics pipeline. This filter policy matches only incoming orders in which the total amount is $100 or more. For more information, see Message Filtering in the Amazon SNS Developer Guide.


{

    "amount": [

        { "numeric": [ ">=", 100 ] }

    ]

}

By using the Event Fork Pipelines pattern, you avoid the development overhead associated with coding undifferentiated logic for handling events.

Event Fork Pipelines can be deployed directly from AWS Serverless Application Repository into your AWS account.

Deploying event fork pipelines

Event Fork Pipelines is available as a set of public apps in the AWS Serverless Application Repository (to find the apps, select the ‘Show apps that create custom IAM roles or resource policies’ check box under the search bar). It can be deployed and tested manually via the Lambda console. In a production scenario, we recommend embedding fork pipelines within the AWS SAM template of your overall application. The nested applications feature enables you to do this by adding an AWS::Serverless::Application resource to your AWS SAM template. The resource references the ApplicationId and SemanticVersion values of the application to nest.

For example, you can include the Event Storage & Backup pipeline as a nested application by adding the following YAML snippet to the Resources section of your AWS SAM template:


Backup:

  Type: AWS::Serverless::Application

  Properties:

    Location:

      ApplicationId: arn:aws:serverlessrepo:us-east-1:012345678901:applications/fork-event-storage-backup-pipeline

      SemanticVersion: 1.0.0

    Parameters:

      # SNS topic ARN whose messages should be backed up to the S3 bucket.

      TopicArn: !Ref MySNSTopic

When specifying parameter values, you can use AWS CloudFormation intrinsic functions to reference other resources in your template. In the preceding example, the TopicArn parameter is filled in by referencing an AWS::SNS::Topic called MySNSTopic, defined elsewhere in the AWS SAM template. For more information, see Intrinsic Function Reference in the AWS CloudFormation User Guide.

To copy the YAML required for nesting, in the Lambda console page for an AWS Serverless Application Repository application, choose Copy as SAM Resource.

Authoring new event fork pipelines

We invite you to fork the Event Fork Pipelines repository in GitHub and submit pull requests for contributing with new pipelines. In addition to event storage and backup, event search and analytics, and event replay, what other common event handling requirements have you seen?

We look forward to seeing what you’ll come up with for extending the Event Fork Pipelines suite.

Summary

Event Fork Pipelines is a serverless design pattern and a suite of open-source nested serverless applications, based on AWS SAM. You can deploy it directly from AWS Serverless Application Repository to enrich your event-driven system architecture. Event Fork Pipelines lets you store, back up, replay, search, and run analytics on the events flowing through your system. There’s no need to write code, manually stitch resources together, or set up infrastructure.

You can deploy Event Fork Pipelines in any AWS Region that supports the underlying AWS services used in the pipelines. There are no additional costs associated with Event Fork Pipelines itself, and you pay only for using the AWS resources inside each nested application.

Get started today by deploying the example ecommerce application or searching for Event Fork Pipelines in AWS Serverless Application Repository.

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Managing Amazon SNS Subscription Attributes with AWS CloudFormation

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/managing-amazon-sns-subscription-attributes-with-aws-cloudformation/

This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.

Amazon SNS is a fully managed pub/sub messaging and event-driven computing service that can decouple distributed systems and microservices. By default, when your publisher system posts a message to an Amazon SNS topic, all systems subscribed to the topic receive a copy of the message. By using Amazon SNS subscription attributes, you can customize this default behavior and make Amazon SNS fit your use cases even more naturally. The available set of Amazon SNS subscription attributes includes FilterPolicy, DeliveryPolicy, and RawMessageDelivery.

You can manually manage your Amazon SNS subscription attributes via the AWS Management Console or programmatically via AWS Development Tools (SDK and AWS CLI). Now you can automate their provisioning via AWS CloudFormation templates as well. AWS CloudFormation lets you use a simple text file to model and provision all the Amazon SNS resources for your messaging use cases, across AWS Regions and accounts, in an automated and secure manner.

The following sections describe how you can simultaneously create Amazon SNS subscriptions and set their attributes via AWS CloudFormation templates.

Setting the FilterPolicy attribute

The FilterPolicy attribute is valid in the context of message filtering, regardless of the delivery protocol, and defines which type of message the subscriber expects to receive from the topic. Hence, by applying the FilterPolicy attribute, you can offload the message-filtering logic from subscribers and the message-routing logic from publishers.

To set the FilterPolicy attribute in your AWS CloudFormation template, use the syntax in the following JSON snippet. This snippet creates an Amazon SNS subscription whose endpoint is an AWS Lambda function. Simultaneously, this code also sets a subscription filter policy that matches messages carrying an attribute whose key is “pet” and value is either “dog” or “cat.”

{
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "lambda",
            "Endpoint": "arn:aws:lambda:us-east-1:000000000000:function:SavePet",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:PetTopic",
            "FilterPolicy": {
               "pet": ["dog", "cat"]
            }
         }
      }
   }
}

Setting the DeliveryPolicy attribute

The DeliveryPolicy attribute is valid in the context of message delivery to HTTP endpoints and defines a delivery-retry policy. By applying the DeliveryPolicy attribute, you can control the maximum number of retries the subscriber expects, the time delay between each retry, and the backoff function. You should fine-tune these values based on the traffic volume your subscribing HTTP server can handle.

To set the DeliveryPolicy attribute in your AWS CloudFormation template, use the syntax in the following JSON snippet. This snippet creates an Amazon SNS subscription whose endpoint is an HTTP address. The code also sets a delivery policy capped at 10 retries for this subscription, with a linear backoff function.

{
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "https",
            "Endpoint": "https://api.myendpoint.ca/pets",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:PetTopic",
            "DeliveryPolicy": {
               "healthyRetryPolicy": {
                  "numRetries": 10,
                  "minDelayTarget": 10,
                  "maxDelayTarget": 30,
                  "numMinDelayRetries": 3,
                  "numMaxDelayRetries": 7,
                  "numNoDelayRetries": 0,
                  "backoffFunction": "linear"
               }
            }
         }
      }
   }
}

Setting the RawMessageDelivery attribute

The RawMessageDelivery attribute is valid in the context of message delivery to Amazon SQS queues and HTTP endpoints. This Boolean attribute eliminates the need for the subscriber to process the JSON formatting that is created by default to decorate all published messages with Amazon SNS metadata. When you set RawMessageDelivery to true, you get two outcomes. First, your message is delivered as is, with no metadata added. Second, your message attributes propagate from Amazon SNS to Amazon SQS, when the subscribing endpoint is an Amazon SQS queue.

To set the RawMessageDelivery attribute in your AWS CloudFormation template, use the syntax in the following JSON snippet. This snippet creates an Amazon SNS subscription whose endpoint is an Amazon SQS queue. This code also enables raw message delivery for the subscription, which prevents Amazon SNS metadata from being added to the message payload.

{
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "https",
            "Endpoint": "https://api.myendpoint.ca/pets",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:PetTopic",
            "DeliveryPolicy": {
               "healthyRetryPolicy": {
                  "numRetries": 10,
                  "minDelayTarget": 10,
                  "maxDelayTarget": 30,
                  "numMinDelayRetries": 3,
                  "numMaxDelayRetries": 7,
                  "numNoDelayRetries": 0,
                  "backoffFunction": "linear"
               }
            }
         }
      }
   }
}

Applying subscription attributes in a use case

Here’s how everything comes together. The following example is based on a car dealer company, which operates with the following distributed systems hosted on Amazon EC2 instances:

  • Car-Dealer-System – Front-office system that takes orders placed by car buyers
  • ERP-System – Enterprise resource planning, the back-office system that handles finance, accounting, human resources, and related business activities
  • CRM-System – Customer relationship management, the back-office system responsible for storing car buyers’ profile information and running sales workflows
  • SCM-System – Supply chain management, the back-office system that handles inventory tracking and demand forecast and planning

 

Whenever an order is placed in the car dealer system, this event is broadcasted to all back-office systems interested in this type of event. As shown in the preceding diagram, the company applied AWS Messaging services to decouple their distributed systems, promoting more scalability and maintainability for their architecture. The queues and topic used are the following:

  • Car-Sales – Amazon SNS topic that receives messages from the car dealer system. All orders placed by car buyers are published to this topic, then delivered to subscribers (two Amazon SQS queues and one HTTP endpoint).
  • ERP-Integration – Amazon SQS queue that feeds the ERP system with orders published by the car dealer system. The ERP pulls messages from this queue to track revenue and trigger related bookkeeping processes.
  • CRM-Integration – Amazon SQS queue that feeds the CRM system with orders published by the car dealer system. The CRM pulls messages from this queue to track car buyers’ interests and update sales workflows.

The company created the following three Amazon SNS subscriptions:

  • The first subscription refers to the ERP-Integration queue. This subscription has the RawMessageDelivery attribute set to true. Hence, no metadata is added to the message payload, and message attributes are propagated from Amazon SNS to Amazon SQS.
  • The second subscription refers to the CRM-Integration queue. Like the first subscription, this one also has the RawMessageDelivery attribute set to true. Additionally, it has the FilterPolicy attribute set to {“buyer-class”: [“vip”]}. This policy defines that only orders placed by VIP buyers are managed in the CRM system, and orders from other buyers are filtered out.
  • The third subscription points to the HTTP endpoint that serves the SCM-System. Unlike ERP and CRM, the SCM system provides its own HTTP API. Therefore, its HTTP endpoint was subscribed to the topic directly without a queue in between. This subscription has a DeliveryPolicy that caps the number of retries to 20, with exponential back-off function.

The company didn’t want to create all these resources manually, though. They wanted to turn this infrastructure into versionable code, and the ability to quickly spin up and tear down this infrastructure in an automated manner. Therefore, they created an AWS CloudFormation template to manage these AWS messaging resources: Amazon SNS topic, Amazon SNS subscriptions, Amazon SNS subscription attributes, and Amazon SQS queues.

Executing the AWS CloudFormation template

Now you’re ready to execute this AWS CloudFormation template yourself. To bootstrap this architecture in your AWS account:

    1. Download the sample AWS CloudFormation template from the repository.
    2. Go to the AWS CloudFormation console.
    3. Choose Create Stack.
    4. For Select Template, choose to upload a template to Amazon S3, and choose Browse.
    5. Select the template you downloaded and choose Next.
    6. For Specify Details:
      • Enter the following stack name: Car-Dealer-Stack.
      • Enter the HTTP endpoint to be subscribed to your topic. If you don’t have an HTTP endpoint, create a temp one.
      • Choose Next.
    7. For Options, choose Next.
    8. For Review, choose Create.
    9. Wait until your stack creation process is complete.

Now that all the infrastructure is in place, verify the Amazon SNS subscriptions attributes set by the AWS CloudFormation template as follows:

  1. Go to the Amazon SNS console.
  2. Choose Topics and then select the ARN associated with Car-Sales.
  3. Verify the first subscription:
    • Select the subscription related to ERP-Integration (Amazon SQS protocol).
    • Choose Other subscription actions and then choose Edit subscription attributes.
    • Note that raw message delivery is enabled, and choose Cancel to go back.
  4. Verify the second subscription:
    • Select the subscription related to CRM-Integration (Amazon SQS protocol).
    • Choose Other subscription actions and then choose Edit subscription attributes.
    • Note that raw message delivery is enabled and then choose Cancel to go back.
    • Choose Other subscription actions and then choose Edit subscription filter policy.
    • Note that the filter policy is set, and then choose Cancel to go back
  5. Confirm the third subscription.
  6. Verify the third subscription:
    • Select the subscription related to SCM-System (HTTP protocol).
    • Choose Other subscription actions and then choose Edit subscription delivery policy.
    •  Choose Advanced view.
    • Note that an exponential delivery retry policy is set, and then choose Cancel to go back.

Now that you have verified all subscription attributes, you can delete your AWS CloudFormation stack as follows:

  1. Go to the AWS CloudFormation console.
  2. In the list of stacks, select Car-Dealer-Stack.
  3. Choose Actions, choose Delete Stack, and then choose Yes Delete.
  4. Wait for the stack deletion process to complete.

That’s it! At this point, you have deleted all Amazon SNS and Amazon SQS resources created in this exercise from your AWS account.

Summary

AWS CloudFormation templates enable the simultaneous creation of Amazon SNS subscriptions and their attributes (such as FilterPolicy, DeliveryPolicy, and RawMessageDelivery) in an automated and secure manner. AWS CloudFormation support for Amazon SNS subscription attributes is available now in all AWS Regions.

For information about pricing, see AWS CloudFormation Pricing. For more information on setting up Amazon SNS resources via AWS CloudFormation templates, see:

Migrating from RabbitMQ to Amazon MQ

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/migrating-from-rabbitmq-to-amazon-mq/

This post is courtesy of Sam Dengler, AWS Solutions Architect.

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Some AWS customers are using RabbitMQ today and would like to migrate to a managed service to reduce the overhead of operating their own message broker.

Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easier to operate and scale message brokers in the cloud. Amazon MQ provides compatibility with your existing workloads that use standard protocols such as OpenWire, AMQP, MQTT, and Stomp (all enabled with SSL). Amazon MQ automatically provisions infrastructure configured as a single-instance broker or as an active/standby broker for high availability.

In this post, I describe how to launch a new Amazon MQ instance. I review example Java code to migrate from a RabbitMQ to Amazon MQ message broker using clients for ActiveMQ, Apache Qpid JMS, and Spring JmsTemplates. I also review best practices for Amazon MQ and changes from RabbitMQ to Amazon MQ to support Publish/Subscribe message patterns.

Getting started with Amazon MQ

To start, open the Amazon MQ console. Enter a broker name and choose Next step.

Launch a new Amazon MQ instance, choosing the mq.t2.micro instance type and Single-instance broker deployment mode, creating a user name and password, and choosing Create broker.

After several minutes, your instance changes status from Creation in progress to Running.  You can visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your instance queues, etc. In the following code examples, you use the OpenWire and AMQP endpoints.

To be able to access your broker, you must configure one of your security groups to allow inbound traffic. For more information, see the link to Detailed instructions in the blue box in the Connections section.

Now that your Amazon MQ broker is running, let’s look at some code!

Dependencies

The following code examples have dependencies across a range of libraries in order to demonstrate RabbitMQ, ActiveMQ, Qpid, Spring JMS templates, and connection pooling. I’ve listed all the dependencies in a single Maven pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MyGroup</groupId>
    <artifactId>MyArtifact</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
    
        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- Apache Connection Pooling -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache QPid -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-client</artifactId>
            <version>6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.29.0</version>
        </dependency>
                
        <!-- Spring JmsTemplate -->
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.3.RELEASE</version>
        </dependency>
        
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>
</project>

RabbitMQ

Here’s an example using RabbitMQ to send and receive a message via queue. The installation and configuration of RabbitMQ is out of scope for this post. For instructions for downloading and installing RabbitMQ, see Downloading and Installing RabbitMQ.

RabbitMQ uses the AMQP 0-9-1 protocol by default, with support for AMQP 1.0 via a plugin. The RabbitMQ examples in this post use the AMQP 0-9

RabbitMQ queue example

To start, here’s some sample code to send and receive a message in RabbitMQ using a queue.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT;
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish("", QUEUE, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        Connection consumerConnection = connectionFactory.newConnection();

        // Create a channel for the consumer.
        Channel consumerChannel = consumerConnection.createChannel();

        // Create a queue named "MyQueue".
        consumerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Receive the message.
        GetResponse response = consumerChannel.basicGet(QUEUE, ACKNOWLEDGE_MODE);
        String message = new String(response.getBody(), "UTF-8");
        System.out.println("Message received: " + message);

        // Clean up the consumer.
        consumerChannel.close();
        consumerConnection.close();
    }
}

In this example, you need to specify the ENDPOINT, USERNAME, and PASSWORD for your RabbitMQ message broker using environment variables or dependency injection.

This example uses the RabbitMQ client library to establish connectivity to the message broker and a channel for communication. In RabbitMQ, messages are sent over the channel to a named queue, which stores messages in a buffer, and from which consumers can receive and process messages. In this example, you publish a message using the Channel.basicPublish method, using the default exchange, identified by an empty string (“”).

To receive and process the messages in the queue, create a second connection, channel, and queue. Queue declaration is an idempotent operation, so there is no harm in declaring it twice. In this example, you receive the message using the Channel.basicGet method, automatically acknowledging message receipt to the broker.

This example demonstrates the basics of sending and receiving a message of one type. However, what if you wanted to publish messages of different types such that various consumers could subscribe only to pertinent message types (that is, pub/sub)?  Here’s a RabbitMQ example using topic exchanges to route messages to different queues.

RabbitMQ topic example

This example is similar to the one earlier. To enable topic publishing, specify two additional properties: EXCHANGE and ROUTING_KEY. RabbitMQ uses the exchange and routing key properties for routing messaging. Look at how these properties change the code to publish a message.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, Queue, Exhange, and Routing Key should
    // be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "amqp://localhost:5672"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    private static final String EXCHANGE = "MyExchange";
    private static final String ROUTING_KEY = "MyRoutingKey";
    
    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create an exchange named "MyExchange".
        producerChannel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);

        // Bind "MyQueue" to "MyExchange", using routing key "MyRoutingKey".
        producerChannel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish(EXCHANGE, ROUTING_KEY, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();
        
        ...


As before, you establish a connection to the RabbitMQ message broker, a channel for communication, and a queue to buffer messages for consumption. In addition to these components, you declare an explicit exchange of type BuiltinExchangeType.TOPIC and bind the queue to the exchange using the ROUTING_KEY that filters messages to send to the queue.

Again, publish a message using the Channel.basicPublish method. This time, instead of publishing the message to a queue, specify the EXCHANGE and ROUTING_KEY values for the message. RabbitMQ uses these properties to route the message to the appropriate queue, from which a consumer receives the message using the same code from the first example.

JMS API

Now that you’ve seen examples for queue and topic publishing in RabbitMQ, look at code changes to support Amazon MQ, starting with the ActiveMQ client. But first, a quick review of the Java Messaging Service (JMS) API.

The remainder of the examples in this post use the JMS API, which abstracts messaging methods from underlying protocol and client implementations. The JMS API programming model uses a combination of connection factories, connections, sessions, destinations, message producers, and message consumers to send and receive messages. The following image (from The Java EE 6 Tutorial) shows the relationship between these components:

ActiveMQ OpenWire connectivity to Amazon MQ

Here’s how JMS is used with ActiveMQ to send and receive messages on a queue.

The ActiveMQ client uses the OpenWire protocol, supported by Amazon MQ. The OpenWire protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

ActiveMQ queue example

Next, here’s an example to send and receive messages to Amazon MQ using the ActiveMQ client. This example should look familiar, as it follows the same flow to send and receive messages via a queue. I’ve included the example in full and then highlighted the differences to consider when migrating from RabbitMQ.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, you use the ActiveMQ client to establish connectivity to AmazonMQ using the OpenWire protocol with the ActiveMQConnectionFactory class to specify the endpoint and user credentials. For this example, use the master user name and password chosen when creating the Amazon MQ broker earlier. However, it’s a best practice to create additional Amazon MQ users for brokers in non-sandbox environments.

You could use the ActiveMQConnectionFactory to establish connectivity to the Amazon MQ broker. However, it is a best practice in Amazon MQ to group multiple producer requests using the ActiveMQ PooledConnectionFactory to wrap the ActiveMQConnectionFactory.

Using the PooledConnectionFactory, you can create a connection to Amazon MQ and establish a session to send a message. Like the RabbitMQ queue example, create a message queue destination using the Session.createQueue method, and a message producer to send the message to the queue.

For the consumer, use the ActiveMQConnectionFactory, NOT the PooledConnectionFactory, to create a connection, session, queue destination, and message consumer to receive the message because pooling of consumers is not considered a best practice. For more information, see the ActiveMQ Spring Support page.

ActiveMQ virtual destinations on Amazon MQ

Here’s how topic publishing differs from RabbitMQ to Amazon MQ.

If you remember from the RabbitMQ topic example, you bound a queue to an exchange using a routing key to control queue destinations when sending messages using a key attribute.

You run into a problem if you try to implement topic subscription using the message consumer in the preceding ActiveMQ queue example. The following is an excerpt from Virtual Destinations, which provides more detail on this subject:

A JMS durable subscriber MessageConsumer is created with a unique JMS clientID and durable subscriber name. To be JMS-compliant, only one JMS connection can be active at any point in time for one JMS clientID, and only one consumer can be active for a clientID and subscriber name. That is, only one thread can be actively consuming from a given logical topic subscriber.

To solve this, ActiveMQ supports the concept of a virtual destination, which provides a logical topic subscription access to a physical queue for consumption without breaking JMS compliance. To do so, ActiveMQ uses a simple convention for specifying the topic and queue names to configure message routing.

  • Topic names must use the “VirtualTopic.” prefix, followed by the topic name. For example, VirtualTopic.MyTopic.
  • Consumer names must use the “Consumer.” prefix, followed by the consumer name, followed by the topic name. For example, Consumer.MyConsumer.VirtualTopic.MyTopic.

ActiveMQ topic example

Next, here’s an example for the ActiveMQ client that demonstrates publishing messaging to topics. This example is similar to the ActiveMQ Queue Example. In this one, create a Topic destination instead of a queue destination.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a topic named "VirtualTopic.MyTopic".
        Destination producerDestination = producerSession.createTopic(PRODUCER_TOPIC);

        // Create a producer from the session to the topic.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue called "Consumer.Consumer1.VirtualTopic.MyTopic".
        Destination consumerDestination = consumerSession.createQueue(CONSUMER1_TOPIC);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, the message producer uses the Session.createTopic method with the topic name, VirtualTopic.MyTopic, as the publishing destination. The message consumer code does not change, but the queue destination uses the virtual destination convention, Consumer.Consumer1.VirtualTopic.MyTopic. ActiveMQ uses these names for the topic and queue to route messages accordingly.

AMQP connectivity to Amazon MQ

Now that you’ve explored some examples using an ActiveMQ client, look at examples using the Qpid JMS client to connect to the Amazon MQ broker over the AMQP 1.0 protocol and see how they differ.

The Qpid client uses the Advanced Message Queuing Protocol (AMQP) 1.0 protocol, supported by Amazon MQ. The AMQP 1.0 protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It uses port 5671, which must be opened in the Security Group associated with the Amazon MQ broker.

The AMQP endpoint specifies a transport, amqp+ssl. For encrypted connections, Qpid expects the protocol name to be amqps, instead of amqp+ssl, however the rest of the connection address remains the same.

Qpid JMS queue example

Next, here’s an example to send and receive messages to Amazon MQ using the Qpid client. The Qpid JMS client is built using Apache Qpid Proton, an AMQP messaging toolkit.

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class QpidClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:5671"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException, NamingException {
        // Use JNDI to specify the AMQP endpoint
        Hashtable<Object, Object> env = new Hashtable<Object, Object>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put("connectionfactory.factoryLookup", ENDPOINT);
        javax.naming.Context context = new javax.naming.InitialContext(env);

        // Create a connection factory.
        ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection(USERNAME, PASSWORD);
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Qpid Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection(USERNAME, PASSWORD);
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

The Qpid queue example is similar to the ActiveMQ Queue Example. They both use the JMS API model to send and receive messages, but the difference is in how the ConnectionFactory and AMQP endpoint is specified. According to the Qpid client configuration documentation, the ConnectionFactory is specified using a JNDI InitialContext to look up JMS objects. The JNDI configuration is popularly specified in a file named jndi.properties on the Java Classpath. In this example, do it programmatically using a HashTable for simplicity.

NOTE: Although the Qpid client and Qpid JMS client are used to establish connectivity to Amazon MQ using the AMQP 1.0 protocol, the producer should still use the ActiveMQ PooledConnectionFactory to wrap the Qpid ConnectionFactory. This can be confusing because Qpid client provides a PooledConnectionFactory that should NOT be used for AMQP 1.0.

The Qpid topic example is identical to the earlier ActiveMQ topic example with the same substitution, which establishes the ConnectionFactory to the AMQP 1.0 endpoint via JNDI.

Spring JMS template queue example

Finally, here are examples using the Spring JmsTemplate to send and receive messages.

This example established connectivity to Amazon MQ using the same protocol and client library used in the ActiveMQ queue example. That example requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

The Spring JmsTemplate provides a higher-level abstraction on top of JMS. Code using the JmsTemplate class only needs to implement handlers to process messages, while the management of connections, sessions, message producers, and message consumers is delegated to Spring. Look at the following code:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
        
        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}

Although Spring manages connections, sessions, and message producers, the grouping of producer connections is still a best practice. The ActiveMQ PooledConnectionFactory class is used in this example. However, the Spring CachingConnectionFactory object is another option.

Following the PooledConnectionFactory creation, a JmsTemplate is created for the producer and an ActiveMQQueue is created as the message destination. To use JmsTemplate to send a message, a MessageCreator callback is defined that generates a text message via the JmsTemplate.

A second JmsTemplate with an ActiveMQQueue is created for the consumer. In this example, a single message is received synchronously, however, asynchronous message reception is a popular alternative when using message-driven POJOs.

Unlike the ActiveMQ examples, the Spring JMS template example does not require the explicit cleanup of the connection, session, message producer, or message consumer resources, as that is managed by Spring. Make sure to call the PooledConnectionFactory.stop method to cleanly exit the main method.

Finally, here’s an example using a Spring JmsTemplate for topic publishing.

Spring JmsTemplate topic example

This example combines the Spring JmsTemplate queue example with the virtual destinations approach from the ActiveMQ topic example. Look at the following code.

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQTopic(PRODUCER_TOPIC));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);

        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(CONSUMER1_TOPIC));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}
In this example, follow the ActiveMQ virtual destination naming convention for topics and queues:
  • When creating the producer JMS template, specify an ActiveMQTopic as the destination using the name VirtualTopic.MyTopic.
  • When creating the consumer JMS template, specify an ActiveMQQueue as the destination using the name Consumer.Consumer1.VirtualTopic.MyTopic.

ActiveMQ automatically handles routing messages from topic to queue.

Conclusion

In this post, I reviewed how to get started with an Amazon MQ broker and walked you through several code examples that explored the differences between RabbitMQ and Apache ActiveMQ client integrations. If you are considering migrating to Amazon MQ, these examples should help you understand the changes that might be required.

If you’re thinking about integrating your existing apps with new serverless apps, see the related post, Invoking AWS Lambda from Amazon MQ.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year for new AWS accounts.

Monitoring your Amazon SNS message filtering activity with Amazon CloudWatch

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/monitoring-your-amazon-sns-message-filtering-activity-with-amazon-cloudwatch/

This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.

Amazon SNS message filtering provides a set of string and numeric matching operators that allow each subscription to receive only the messages of interest. Hence, SNS message filtering can simplify your pub/sub messaging architecture by offloading the message filtering logic from your subscriber systems, as well as the message routing logic from your publisher systems.

After you set the subscription attribute that defines a filter policy, the subscribing endpoint receives only the messages that carry attributes matching this filter policy. Other messages published to the topic are filtered out for this subscription. In this way, the native integration between SNS and Amazon CloudWatch provides visibility into the number of messages delivered, as well as the number of messages filtered out.

CloudWatch metrics are captured automatically for you. To get started with SNS message filtering, see Filtering Messages with Amazon SNS.

Message Filtering Metrics

The following six CloudWatch metrics are relevant to understanding your SNS message filtering activity:

  • NumberOfMessagesPublished – Inbound traffic to SNS. This metric tracks all the messages that have been published to the topic.
  • NumberOfNotificationsDelivered – Outbound traffic from SNS. This metric tracks all the messages that have been successfully delivered to endpoints subscribed to the topic. A delivery takes place either when the incoming message attributes match a subscription filter policy, or when the subscription has no filter policy at all, which results in a catch-all behavior.
  • NumberOfNotificationsFilteredOut – This metric tracks all the messages that were filtered out because they carried attributes that didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-NoMessageAttributes – This metric tracks all the messages that were filtered out because they didn’t carry any attributes at all and, consequently, didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-InvalidAttributes – This metric keeps track of messages that were filtered out because they carried invalid or malformed attributes and, thus, didn’t match the subscription filter policy.
  • NumberOfNotificationsFailed – This last metric tracks all the messages that failed to be delivered to subscribing endpoints, regardless of whether a filter policy had been set for the endpoint. This metric is emitted after the message delivery retry policy is exhausted, and SNS stops attempting to deliver the message. At that moment, the subscribing endpoint is likely no longer reachable. For example, the subscribing SQS queue or Lambda function has been deleted by its owner. You may want to closely monitor this metric to address message delivery issues quickly.

Message filtering graphs

Through the AWS Management Console, you can compose graphs to display your SNS message filtering activity. The graph shows the number of messages published, delivered, and filtered out within the timeframe you specify (1h, 3h, 12h, 1d, 3d, 1w, or custom).

SNS message filtering for CloudWatch Metrics

To compose an SNS message filtering graph with CloudWatch:

  1. Open the CloudWatch console.
  2. Choose Metrics, SNS, All Metrics, and Topic Metrics.
  3. Select all metrics to add to the graph, such as:
    • NumberOfMessagesPublished
    • NumberOfNotificationsDelivered
    • NumberOfNotificationsFilteredOut
  4. Choose Graphed metrics.
  5. In the Statistic column, switch from Average to Sum.
  6. Title your graph with a descriptive name, such as “SNS Message Filtering”

After you have your graph set up, you may want to copy the graph link for bookmarking, emailing, or sharing with co-workers. You may also want to add your graph to a CloudWatch dashboard for easy access in the future. Both actions are available to you on the Actions menu, which is found above the graph.

Summary

SNS message filtering defines how SNS topics behave in terms of message delivery. By using CloudWatch metrics, you gain visibility into the number of messages published, delivered, and filtered out. This enables you to validate the operation of filter policies and more easily troubleshoot during development phases.

SNS message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). CloudWatch metrics for SNS message filtering is available now, in all AWS Regions.

For information about pricing, see the CloudWatch pricing page.

For more information, see:

Measuring the throughput for Amazon MQ using the JMS Benchmark

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/measuring-the-throughput-for-amazon-mq-using-the-jms-benchmark/

This post is courtesy of Alan Protasio, Software Development Engineer, Amazon Web Services

Just like compute and storage, messaging is a fundamental building block of enterprise applications. Message brokers (aka “message-oriented middleware”) enable different software systems, often written in different languages, on different platforms, running in different locations, to communicate and exchange information. Mission-critical applications, such as CRM and ERP, rely on message brokers to work.

A common performance consideration for customers deploying a message broker in a production environment is the throughput of the system, measured as messages per second. This is important to know so that application environments (hosts, threads, memory, etc.) can be configured correctly.

In this post, we demonstrate how to measure the throughput for Amazon MQ, a new managed message broker service for ActiveMQ, using JMS Benchmark. It should take between 15–20 minutes to set up the environment and an hour to run the benchmark. We also provide some tips on how to configure Amazon MQ for optimal throughput.

Benchmarking throughput for Amazon MQ

ActiveMQ can be used for a number of use cases. These use cases can range from simple fire and forget tasks (that is, asynchronous processing), low-latency request-reply patterns, to buffering requests before they are persisted to a database.

The throughput of Amazon MQ is largely dependent on the use case. For example, if you have non-critical workloads such as gathering click events for a non-business-critical portal, you can use ActiveMQ in a non-persistent mode and get extremely high throughput with Amazon MQ.

On the flip side, if you have a critical workload where durability is extremely important (meaning that you can’t lose a message), then you are bound by the I/O capacity of your underlying persistence store. We recommend using mq.m4.large for the best results. The mq.t2.micro instance type is intended for product evaluation. Performance is limited, due to the lower memory and burstable CPU performance.

Tip: To improve your throughput with Amazon MQ, make sure that you have consumers processing messaging as fast as (or faster than) your producers are pushing messages.

Because it’s impossible to talk about how the broker (ActiveMQ) behaves for each and every use case, we walk through how to set up your own benchmark for Amazon MQ using our favorite open-source benchmarking tool: JMS Benchmark. We are fans of the JMS Benchmark suite because it’s easy to set up and deploy, and comes with a built-in visualizer of the results.

Non-Persistent Scenarios – Queue latency as you scale producer throughput

JMS Benchmark nonpersistent scenarios

Getting started

At the time of publication, you can create an mq.m4.large single-instance broker for testing for $0.30 per hour (US pricing).

This walkthrough covers the following tasks:

  1.  Create and configure the broker.
  2. Create an EC2 instance to run your benchmark
  3. Configure the security groups
  4.  Run the benchmark.

Step 1 – Create and configure the broker
Create and configure the broker using Tutorial: Creating and Configuring an Amazon MQ Broker.

Step 2 – Create an EC2 instance to run your benchmark
Launch the EC2 instance using Step 1: Launch an Instance. We recommend choosing the m5.large instance type.

Step 3 – Configure the security groups
Make sure that all the security groups are correctly configured to let the traffic flow between the EC2 instance and your broker.

  1. Sign in to the Amazon MQ console.
  2. From the broker list, choose the name of your broker (for example, MyBroker)
  3. In the Details section, under Security and network, choose the name of your security group or choose the expand icon ( ).
  4. From the security group list, choose your security group.
  5. At the bottom of the page, choose Inbound, Edit.
  6. In the Edit inbound rules dialog box, add a role to allow traffic between your instance and the broker:
    • Choose Add Rule.
    • For Type, choose Custom TCP.
    • For Port Range, type the ActiveMQ SSL port (61617).
    • For Source, leave Custom selected and then type the security group of your EC2 instance.
    • Choose Save.

Your broker can now accept the connection from your EC2 instance.

Step 4 – Run the benchmark
Connect to your EC2 instance using SSH and run the following commands:

$ cd ~
$ curl -L https://github.com/alanprot/jms-benchmark/archive/master.zip -o master.zip
$ unzip master.zip
$ cd jms-benchmark-master
$ chmod a+x bin/*
$ env \
  SERVER_SETUP=false \
  SERVER_ADDRESS={activemq-endpoint} \
  ACTIVEMQ_TRANSPORT=ssl\
  ACTIVEMQ_PORT=61617 \
  ACTIVEMQ_USERNAME={activemq-user} \
  ACTIVEMQ_PASSWORD={activemq-password} \
  ./bin/benchmark-activemq

After the benchmark finishes, you can find the results in the ~/reports directory. As you may notice, the performance of ActiveMQ varies based on the number of consumers, producers, destinations, and message size.

Amazon MQ architecture

The last bit that’s important to know so that you can better understand the results of the benchmark is how Amazon MQ is architected.

Amazon MQ is architected to be highly available (HA) and durable. For HA, we recommend using the multi-AZ option. After a message is sent to Amazon MQ in persistent mode, the message is written to the highly durable message store that replicates the data across multiple nodes in multiple Availability Zones. Because of this replication, for some use cases you may see a reduction in throughput as you migrate to Amazon MQ. Customers have told us they appreciate the benefits of message replication as it helps protect durability even in the face of the loss of an Availability Zone.

Conclusion

We hope this gives you an idea of how Amazon MQ performs. We encourage you to run tests to simulate your own use cases.

To learn more, see the Amazon MQ website. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.