Tag Archives: Amazon MQ*

Migrating from IBM MQ to Amazon MQ using a phased approach

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/migrating-from-ibm-mq-to-amazon-mq-using-a-phased-approach/

This post is contributed by Mithun Mallick, Solutions Architect and Christian Mueller, Solutions Architect

Message-oriented middleware (MOM), or message brokers, are the backbone that integrates business critical applications in many industries. MOMs are used to integrate systems like inventory management, payment systems, and CRM systems. They are also used to orchestrate order-processing workflows across multiple systems, or integrate modern web applications with legacy backend applications.

Some of the most commonly used MOMs in the market are IBM MQ, TIBCO EMS, Rabbit MQ, and Apache ActiveMQ. These systems are often costly to maintain and can have high licensing costs.

Our customers often tell us they want to migrate their applications to the cloud but are hindered by the heavy-lift involved in migrating their MOM systems. They are concerned that migrating their MOM is difficult, and that they must migrate all interconnected systems in one step.

In this post, we show how to build a bridge solution from on-premises IBM MQ to Amazon MQ, to migrate your applications in a step-by-step manner. Amazon MQ is a managed message broker service from AWS that makes it easy to set up and operate message brokers in the cloud.

We explain the steps to move the producers (senders) and consumers (receivers) in phases from your on-premises to the cloud. This process uses Amazon MQ as the message broker, and decommissions IBM MQ once all producers/consumers have been successfully migrated.

You can migrate applications without disrupting existing business systems, and take advantage of the agility, flexibility, and reliability of the cloud. The same pattern can be used if you are using another MOM system, like TIBCO EMS or Rabbit MQ (which supports JMS/NMS, AMQP, or MQTT).

In this post, we show how the phased migration approach helps you mitigate the risk involved a ‘big bang’ cutover. The goal of the solution is to enable an incremental migration of your on-premises applications and MOM to the AWS Cloud. It also replaces your message broker with Amazon MQ. The solution is shown in the diagram below:

Migrating from IBM MQ to Amazon MQ

You can follow the step-by-step instructions in the GitHub’s README file for your own migration.

Initial state and pre-requisites

In the initial state, you are running producers and consumers connected to an on-premises IBM MQ broker.

On-premises IBM MQ broker.

The first step is to facilitate a bridge from IBM MQ to Amazon MQ. Given that IBM MQ is running on-premises, the bridge solution must either exchange messages over the internet by setting up an AWS VPN tunnel, or by using AWS Direct Connect to establish connectivity with the AWS Cloud.

To ensure confidentiality and security for your message exchange, we recommend using AWS VPN. If you have low latency requirements to forward your messages, using AWS Direct Connect is the recommended way.

To complete the hands-on portion of this blog, you must have access to an IBM MQ broker. If you don’t have access, you can provision an IBM MQ broker, running on a Docker container in AWS Fargate.

Once the connectivity is established, you must set up an Amazon MQ broker from the AWS Management Console, or using an AWS CloudFormation template. There is a template provided as part of this post.

Step – 1

Begin by deploying some consumer applications on AWS. These consumers can be new applications or additional instances of applications already running on-premises. You configure these consumer applications to receive messages from Amazon MQ. At this stage, message producers are still on-premises sending messages to the IBM MQ broker.

Next, bridge from IBM MQ to Amazon MQ using a proxy pattern. The proxy pattern is technology-agnostic, and you implement the pattern using Apache Camel to build a JMS bridge. Apache Camel is an open source integration framework for implementing Enterprise Integration Patterns. Apache Camel includes JMS components that easily connect with IBM MQ and Amazon MQ.

In this step, you build an Apache Camel route to consume messages from IBM MQ, and forward to Amazon MQ. Here is an example from the camel-context.xml file, which defines the configuration:

<route id="ibmMQ-to-amazonMQ">
<description>Camel Route from IBM MQ to Amazon MQ</description>
<from uri="ibmMQ:queue:DEV.QUEUE.2?concurrentConsumers=5"/>
<inOnly uri="amazonMQ:queue:DEV.QUEUE.2?preserveMessageQos=true"/>
</route>

This Apache Camel route defines how messages from the producer applications connected to IBM MQ move to Amazon MQ. In this example, there is one sample route but you may have many routes in your production use-case.

Apache Camel is deployed as a Docker container running on AWS Fargate as compute engine for Amazon Elastic Container Service (ECS). ECS is a container orchestration framework that manages the deployment of the containers, and runs them in a highly scalable manner.

AWS Fargate eliminates the heavy lifting of scaling the underlying virtual machines for your ECS cluster. By defining the desired capacity of AWS Fargate tasks, it introduces self-healing capabilities to the JMS bridge. AWS Fargate tracks the number of healthy tasks, and creates new tasks automatically if an old one is no longer available.

Now the JMS bridge and the on-premises consumers are listening on the same queue and waiting for messages. Messages sent to IBM MQ are consumed by on-premises consumers as well as the JMS bridge.  The JMS bridge forwards messages to Amazon MQ to be consumed by the applications already migrated to the cloud. You can now validate that messages are processed by applications on AWS and on-premises.

Now phase 1 of the migration is validated. You can continue to move more consumers as you get more comfortable with the availability and scalability of the bridge solution. The goal of this phase is to reach a state where messages are still produced on-premises, with some consumers running on AWS.

Migrate IBM MQ to Amazon MQ Step 1

Step – 2

Several consumer applications are now migrated to AWS. The goal now is to move the producers to AWS. Start the migration by running a few producer applications on AWS and connect them to Amazon MQ. The Apache Camel bridge is also updated to facilitate bidirectional flow of messages.

The following configuration code shows the route, which moves messages from Amazon MQ to IBM MQ:

<route id="amazonMQ-to-ibmMQ">
<description>Camel Route from Amazon MQ to IBM MQ</description>
<from uri="amazonMQ:queue:DEV.QUEUE.2?concurrentConsumers=5"/>
<inOnly uri="ibmMQ:queue:DEV.QUEUE.2?preserveMessageQos=true"/>
</route>

At this point, messages originating from the producers on AWS have consumers on both on-premises and on AWS. You can validate the processing of messages on AWS as well as on-premises. The state can be seen in the picture as below:

MIgrate IBM MQ to AMazon MQ Step 2

Move more producer applications to AWS as you validate test results, and are comfortable with the bridge solution. Step 2 of the migration is validated once you have confirmed the results of the testing on AWS.

Step – 3

To make this JMS bridge resilient, it must scale in and out automatically, based on your current load. You can configure this by using Amazon CloudWatch metrics and CloudWatch alarms. These alarms can trigger scaling activities to scale in or out, with a fixed number of instances or a percentage-based scaling.

You can also scale out your AWS system based on the utilization of your on-premises broker by defining custom CloudWatch metrics. For example, by running a CRON script on the on-premises broker machine to periodically report the metrics such as queue depth.

Migrate IBM MQ to Amazon MQ Step 3

At this point, this has shown the advantages that the cloud offers for running services efficiently with high availability and resiliency. The automatic scaling capabilities of Amazon ECS generate additional instances of Apache Camel containers as load increases and the queues are filling up. It also scales it in as the load decreases.

You have now established a stable and scalable bridge solution. The next step is to move all the remaining producer/consumer applications to the AWS Cloud. If you have applications that cannot move to the cloud, such as mainframe applications, these can remained connected through your on-premises IBM broker. All other applications can be migrated.

Step – 4

All producers and consumer applications have now been moved to AWS. All the messages that are sent to Amazon MQ broker are processed directly by the consumers running on AWS. The Apache Camel route to move messages from Amazon MQ to IBM MQ and vice versa is disabled.

Migrate IBM MQ to Amazon MQ Step 4

Step – 5

The final goal is to move all application from on-premises to the AWS Cloud. Once all applications are migrated, you can decommission the Apache Camel bridge solution. All the resources deployed in the Apache Camel bridge solution are deleted, along with the automatic scaling and Amazon CloudWatch alarm configuration.

All producers and consumers are now migrated to running on AWS with Amazon MQ as their message broker.

Migrate IBM MQ to Amazon MQ Step 5

Conclusion

In this blog, we showed how to migrate on-premises applications that are dependent on commercial message brokers to the AWS Cloud. The approach relies on a bridge solution, which is based on the proxy pattern, and is technology-independent.

The bridge provides a low risk migration of the applications in phases so that you can validate the migration and avoid any disruption to your business.

For more information on migrating to Amazon MQ and using Apache Camel, please see Migrating from RabbitMQ to Amazon MQ and Integrating Amazon MQ with other AWS services via Apache Camel.

Understanding asynchronous messaging for microservices

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/understanding-asynchronous-messaging-for-microservices/

This post is courtesy of Dirk Fröhner

One of the implications of applying the microservices architectural style is that much communication between components happens over the network. After all, your microservices landscape is a distributed system. To achieve the promises of microservices, such as being able to individually scale, operate, and evolve each service, this communication must happen in a loosely coupled and reliable manner.

A common way to loosely couple services is to expose an API following the REST architectural style. REST APIs are based on the architecture of the web and provide loose coupling between communicating parties. REST APIs offer a great way to decouple interfaces from concrete implementations, and to advise clients about what they can do next, by the use of links and link relations.

While REST APIs are common and useful in microservices design, REST APIs tend to be designed with synchronous communications, where a response is required. A request coming from an end-user client can trigger a complex communications path within your services landscape, which can effectively add coupling between the services at runtime. After all, this is why there are mitigation patterns like circuit-breaker in the first place. REST APIs can also add some heavy lifting to your infrastructure that we will discuss further below.

Asynchronous messaging

If loose-coupling is important, especially in a system that requires high resilience and has unpredictable scale, another option is asynchronous messaging.

Asynchronous messaging is a fundamental approach for integrating independent systems, or building up a set of loosely coupled systems that can operate, scale, and evolve independently and flexibly. As our colleague Tim Bray said, “If your application is cloud-native, or large-scale, or distributed, and doesn’t include a messaging component, that’s probably a bug.” In this blog post, we will outline some fundamental benefits of asynchronous messaging for the communications between microservices.

For a refresher on the fundamental messaging patterns and their implementations with Amazon SQS, Amazon SNS, and Amazon MQ, please read our previous blog posts

For a summary of the semantics of queues and topics:

  • A queue is like a buffer. You can put messages into a queue, and you can retrieve messages from a queue. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue.
  • A topic is like a broadcasting station. You can publish messages to a topic, and anyone interested in these messages can subscribe to the topic. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern).

Use-case

Consider a typical scenario illustrated in the diagram below. An end-user client (EUC) addresses an API resource of one of our services, through Amazon API Gateway in this example. From there, the request can potentially follow a path across the microservices landscape to get completely processed.

To provide the final result, there will be potentially cascading subsequent requests sent between other microservices. This example illustrates the complexity involved in processing a single end user request.

End User Client accessing a service using an API

Diagram 1: End-User Client accessing a service using an API

End-user clients (EUCs) often communicate with services via REST APIs in a synchronous manner. However, the communication can also be designed using an asynchronous approach. For instance, if an EUC submits a request that takes some time to process, the respective API resource can respond with HTTP status 202 Accepted, and a link to a resource that provides the current processing status. Downstream, the communication between the service that receives that request, and other services that are involved in processing the request, can happen asynchronously using messaging services.

There are situations where a communications model using asynchronous messaging can make your life easier than using REST APIs.

Infrastructure complexity

Start with looking at the infrastructure complexity for the backends of your services. Depending on your implementation paradigm, you have to include different components in your infrastructure that you don’t have to deal with when using messaging.

Imagine your services each expose a REST API. Typically, this means you add a load balancer in front of your compute layer, and your backend implementation includes an HTTP server. It is usually a good idea to decouple your services APIs from their concrete implementations, so you could also consider adding Amazon API Gateway in front of your load balancer.

For a serverless approach, you don’t need to worry about load balancing and scaling out infrastructure. Amazon API Gateway with AWS Lambda integration provides a fully managed solution for removing complexities around infrastructure management.

Using Amazon SQS as a cloud-native messaging service for queues, you don’t employ any of the above mentioned components. As described in a prior post, an SQS queue can act as a load balancer in itself. The consumers, or target services, don’t need an HTTP server, but simply ask a queue for available messages. If you use AWS Lambda for your consumers, this process is even simpler, as the Lambda functions are automatically invoked when messages appear in an SQS queue. See Using AWS Lambda with Amazon SQS to learn more.

The same applies to Serverless architectures implementing a publish/subscribe pattern. Lambda function executions can be directly triggered by SNS messages. Without AWS Lambda, you need load balancers and web servers in your backend implementations to receive SNS notifications, as those are injected via web hooks into your services. SNS also provides the fan-out functionality that you would otherwise have to build using an intermediary component to implement a recipient list of subscribers.

Reliability, resilience

For synchronous systems, if a service crashes while it processes the payload of an API request, the information is lost. A good way to prevent this on a microservice is to explicitly persist an incoming request immediately after receiving it. Then process and reprocess, until the request is finally marked as resolved.

This approach requires additional work, and it requires the microservice to not crash while persisting an incoming API request. The microservice sending a request must also resend if the target service doesn’t acknowledge receipt. For example, it doesn’t respond with a successful HTTP status code, or the connection drops.

When sending messages to a queue, this additional work is addressed by the messaging infrastructure. A message will remain in a queue unless a consumer explicitly states that processing is finished by acknowledging the message reception. As long as message reception is not acknowledged by a consumer, it will stay in the queue. Messages can be retained in an SQS queue for a maximum of 14 days.

Scale out latency

Under increased load, your services must scale out to process the requests. You must then consider scale-out latency, which may be managed for you with serverless implementations. It takes a few moments from when an Auto Scaling group triggers the launch of additional instances until these are ready to operate. Also launching new container tasks takes time. When your scaling threshold is not optimal and the scaling event occurs late, your available resources may be unable to serve all incoming requests. These requests may be lost or answered with HTTP status code 5xx.

Using message queues that buffer messages during a scaling event help prevent this. Even in use cases where the EUC is waiting for an immediate response, this is the more reliable architecture. If your infrastructure needs time to scale out and you are not able to process all requests in time, the requests are persisted.

When messaging is your only choice

What happens when your services must respond to peak loads at scale?

For many applications, the scale-out latency, including load balancer pre-warming, will eventually become too large to handle steeply ascending loads fast enough. With a serverless architecture, exposing your Lambda functions with API Gateway can handle steeply ascending loads. But you must still consider downstream systems, which may be easily overwhelmed.

In these scenarios, where rapid scaling without overwhelming downstream systems is important, messaging may be your best choice. Message queues help protect your downstream services by buffering incoming payloads for consumption at the pace of the consuming service. This helps not only for the communications between microservices, but also when peak loads flood your client-facing API. Often, the most important goal is to accept an incoming request, while the actual processing of that request can happen later. You decouple these steps from each other by using queues.

Serverless messaging systems like Amazon SQS and Amazon SNS can respond quickly to support high scale. These are often the best solution when scale is unpredictable.  While the instance-based messaging system, Amazon MQ, provides compatibility with open standards, it requires manual scaling for large workloads, unlike serverless messaging services.

Conclusion

We hope you got some inspiration to also employ asynchronous messaging for your microservices communications architecture. In blog XYZ we provide concrete examples of these patterns. For more information, feel free to consume the following resources:

  1. AWS whitepaper: Implementing Microservices on AWS
  2. AWS blog: Implementing enterprise integration patterns with AWS messaging services: point-to-point channels
  3. AWS blog: Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels
  4. AWS blog: Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox

Read the next blog in the series,  Application Integration Patterns for Microservices: Fan-out Strategies.

Creating static custom domain endpoints with Amazon MQ to simplify broker modification and scaling

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/creating-static-custom-domain-endpoints-with-amazon-mq/

This post is courtesy of Wallace Printz, Senior Solutions Architect, AWS, and Christian Mueller, Senior Solutions Architect, AWS.

Many cloud-native application architectures take advantage of the point-to-point and publish-subscribe (“pub-sub”) model of message-based communication between application components. This architecture is generally more resilient to failure because of the loose coupling and because message processing failures can be retried. It’s also more efficient because individual application components can independently scale up or down to maintain message-processing SLAs, compared to monolithic application architectures. Synchronous (REST-based) systems are tightly coupled. A problem in a synchronous downstream dependency has an immediate impact on the upstream callers.

Retries from upstream callers can all too easily fan out and amplify problems. Amazon SQS and Amazon SNS are fully managed message queuing services, but are not necessarily the right tool for the job in some cases. For applications requiring messaging protocols including JMS, NMS, AMQP, STOMP, MQTT, and WebSocket, Amazon provides Amazon MQ. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

Amazon MQ provides two managed broker deployment connection options: public brokers and private brokers. Public brokers receive internet-accessible IP addresses, while private brokers receive only private IP addresses from the corresponding CIDR range in their VPC subnet.

In some cases, for security purposes, you may prefer to place brokers in a private subnet. You can also allow access to the brokers through a persistent public endpoint, such as a subdomain of their corporate domain like mq.example.com.

In this post, we explain how to provision private Amazon MQ brokers behind a secure public load balancer endpoint using an example subdomain.

Architecture overview

There are several reasons one might want to deploy this architecture beyond the security aspects.

First, human-readable URLs are easier for people to parse when reviewing operations and troubleshooting, such as deploying updates to mq-dev.example.com before mq-prod.example.com.

Second, maintaining static URLs for your brokers helps reduce the necessity of modifying client code when performing maintenance on the brokers.

Third, this pattern allows you to vertically scale your brokers without changing the client code or even notifying the clients that changes have been made.

Finally, the same architecture described here works for a network of brokers configuration as well, whereby you could horizontally scale your brokers without impacting the client code.

Prerequisites

This blog post assumes some familiarity with AWS networking fundamentals, such as VPCs, subnets, load balancers, and Amazon Route 53.

When you are finished, the architecture should be set up as shown in the following diagram. For ease of visualization, we demonstrate with a pair of brokers using the active-standby option.

Solution Overview

Amazon MQ solution overview

The client to broker traffic flow is as follows.

  • First, the client service tries to connect with a failover URL to the domain endpoint setup in Route 53. If a client loses the connection, using the failover URL allows the client to automatically try to reconnect to the broker.
  • The client looks up the domain name from Route 53, and Route 53 returns the IP address of the Network Load Balancer.
  • The client creates a secure socket layer (SSL) connection to the Network Load Balancer with an SSL certificate provided from AWS Certificate Manager (ACM). The Network Load Balancer selects from the healthy brokers in its target group and creates a separate SSL connection between the Network Load Balancer and the broker. This provides secure, end-to-end SSL encrypted messaging between client and brokers.

In this diagram, the healthy broker connection is shown in the solid line. The standby broker, which does not reply to connection requests and is therefore marked as unhealthy in the target group, is shown in the dashed line.

Solution walkthrough

To build this architecture, build the network segmentation first, then the Amazon MQ brokers, and finally the network routing.

Setup

First, you need the following resources:

  • A VPC
  • One private subnet per Availability Zone
  • One public subnet for your bastion host (if desired)

This demonstration VPC uses the 20.0.0.0/16 CIDR range.

Additionally, you must create a custom security group for your brokers. Set up this security group to allow traffic from your Network Load Balancer and, if using a network of brokers, among the brokers as well.

This VPC is not being used for any other workloads. This demonstration allows all incoming traffic originating within the VPC, including the Network Load Balancer, through to the brokers on the following ports:

  • OpenWire communication port of 61617
  • Apache ActiveMQ console port of 8162

If you are using a different protocol, adjust the port numbers accordingly.

Create an amazon mq security group

Building the Amazon MQ brokers

Now that you have the network segmentation set up, build the Amazon MQ brokers. As mentioned previously, this demonstration uses the active-standby pair of private brokers option.

Configure the broker settings by selecting a broker name, instance type, ActiveMQ console user, and password first.

Configure Amazon MQ broker settings

In the Additional Settings area, place the brokers in your previously selected VPC and the associated private subnets.

Configure Amazon MQ additional settings

Finally, select the existing Security Group previously discussed, and make sure that the Public Accessibility option is set to No.

Set Amazon MQ security group settings

That’s it for the brokers. When it is done provisioning, the Amazon MQ dashboard should look like the one shown in the following screenshot. Note the IP addresses of the brokers and the ActiveMQ web console URLs for later.

Amazon MQ dashboard

Configuring a Load Balancer Target Group

The next step in the build process is to configure the load balancer’s target group. This demonstration uses the private IP addresses of the brokers as targets for the Network Load Balancer.

Create and name a target group, select the IP option under Target type, and make sure to select TLS under Protocol and 61617 under Port, as well as the VPC in which your brokers reside. It is important to configure the health check settings so traffic is only routed to active brokers by selecting the TCP protocol and overriding the health check port to 8162, the Apache ActiveMQ console port.

Do not use the OpenWire port as the target group health check port. Because the Network Load Balancer may not be able to recognize the host as healthy on that port, it is better to use the ActiveMQ web console port.

Next, add the brokers’ IP addresses as targets. You can find the broker IP addresses in the Amazon MQ console page after they complete provisioning. Make sure to add both the active and the standby broker to the target group so that when reboots occur, the Network Load Balancer routes traffic to whichever broker is active.

You may be pursuing a more dynamic environment for scaling brokers up and down to handle the demands of a variable message load. In that case, as you scale to add more brokers, make sure that you also add them to the target group.

AWS Lambda would be a great way to programmatically handle adding or removing the broker’s IP addresses to this target group automatically.

Creating a Network Load Balancer

Next, create a Network Load Balancer. This demo uses an internet-facing load balancer with TLS listeners on port 61617, and routes traffic to brokers’ VPC and private subnets.

Configure a network load balancer

Clients must securely connect to the Network Load Balancer, so this demo uses an ACM certificate for the subdomain registered in Route 53, such as mq.example.com. For simplicity, ACM certificate provisioning is not shown. For more information, see Request a Public Certificate.

Make sure that the ACM certificate is provisioned in the same Region as your Network Load Balancer, or the certificate is not displayed in the selection menu.

Next, select the target group that you just created, and select TLS for the connection between the Network Load Balancer and the brokers. Similarly, select the health checks on TCP port 8162.

If all went well, you see the list of brokers’ IP addresses listed as targets. From here, review your settings and confirm you’d like to deploy the Network Load Balancer.

Configuring Route 53

The last step in this build is to configure Route 53 to serve traffic at the subdomain of your choice to your Network Load Balancer.

Go to your Route 53 Hosted Zone, and create a new subdomain record set, such as mq.example.com, that matches the ACM certificate that you previously created. In the Type field, select A – IPv4 address, then select Yes for Alias. This allows you to select the Network Load Balancer as the alias target. Select the Network Load Balancer that you just created from the Alias Target menu and save the record set.

Testing broker connectivity

And that’s it!

There’s an important advantage to this architecture. When you create Amazon MQ active-standby brokers, the Amazon MQ service provides two endpoints. Only one broker host is active at a time, and when configuration changes or other reboot events occur, the standby broker becomes active and the active broker goes to standby. The typical connection string when there is an option to connect to multiple brokers is something similar to the following string

"failover:(ssl://b-ce452fbe-2581-4003-8ce2-4185b1377b43-1.mq.us-west-2.amazonaws.com:61617,ssl://b-ce452fbe-2581-4003-8ce2-4185b1377b43-2.mq.us-west-2.amazonaws.com:61617)"

In this architecture, you use only a single connection URL, but you still want to use the failover protocol to force re-connection if the connection is dropped for any reason.

For ease of use, this solution relies on the Amazon MQ workshop client application code from re:Invent 2018. To test this solution setting the connection URL to the following:

"failover:(ssl://mq.example.com:61617

Run the producer and consumer clients in separate terminal windows.

The messages are sent and received successfully across the internet, while the brokers are hidden behind the Network Load Balancer.

Logging into the broker’s ActiveMQ console

But what if we want to log in to the broker’s ActiveMQ web console?

There are three options. Due to the security group rules, only traffic originating from inside the VPC is allowed to the brokers.

  • Use a VPN connection from the corporate network to the VPC. Most customers likely use this option, but for rapid testing, there is a simple and cost-effective method.
  • Connect to the brokers’ web console through a Route 53 subdomain, which requires creating a separate port 8162 Listener on the existing Network Load Balancer and creating a separate TLS target group on port 8162 for the brokers.
  • Use a bastion host to proxy traffic to the web console.

To use a bastion host, create a small Linux EC2 instance in your public subnet, and make sure that:

  • The EC2 instance has a public IP address.
  • You have access to the SSH key pair.
  • It is placed in a security group that allows SSH port 22 traffic from your location.

For simplicity, this step is not shown, but this demonstration uses a t3.micro Amazon Linux 2 host with all default options as the bastion.

Creating a forwarding tunnel

Next, create a forwarding tunnel through an SSH connection to the bastion host. Below is an example command in the terminal window. This keeps a persistent SSH connection forwarding port 8162 through the bastion host at the public IP address 54.244.188.53.

For example, the command could be:

ssh -D 8162 -C -q -N -I <my-key-pair-name>.pem [email protected]<ec2-ip-address>

You can also configure a browser to tunnel traffic through your proxy.

We have chosen to demonstrate in Firefox. Configure the network settings to use a manual proxy on localhost on the Apache ActiveMQ console port of 8162.  This can be done by opening the Firefox Connection Settings.  In the Configure Proxy Access to the Internet section, select Manual proxy configuration, then set the SOCKS Host to localhost and Port to 8162, leaving other fields empty.

Finally, use the Apache ActiveMQ console URL provided in the Amazon MQ web console details page to connect to the broker through the proxy.

ActiveMQ screenshot

Conclusion

Congratulations! You’ve successfully built a highly available Amazon MQ broker pair in a private subnet. You’ve layered your security defense by putting the brokers behind a highly scalable Network Load Balancer, and you’ve configured routing from a single custom subdomain URL to multiple brokers with health check built in.

To learn more about Amazon MQ and scalable broker communication patterns, we highly recommend the following resources:

Keep on building!

Optimizing a Lift-and-Shift for Security

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/architecture/optimizing-a-lift-and-shift-for-security/

This is the third and final blog within a three-part series that examines how to optimize lift-and-shift workloads. A lift-and-shift is a common approach for migrating to AWS, whereby you move a workload from on-prem with little or no modification. This third blog examines how lift-and-shift workloads can benefit from an improved security posture with no modification to the application codebase. (Read about optimizing a lift-and-shift for performance and for cost effectiveness.)

Moving to AWS can help to strengthen your security posture by eliminating many of the risks present in on-premise deployments. It is still essential to consider how to best use AWS security controls and mechanisms to ensure the security of your workload. Security can often be a significant concern in lift-and-shift workloads, especially for legacy workloads where modern encryption and security features may not present. By making use of AWS security features you can significantly improve the security posture of a lift-and-shift workload, even if it lacks native support for modern security best practices.

Adding TLS with Application Load Balancers

Legacy applications are often the subject of a lift-and-shift. Such migrations can help reduce risks by moving away from out of date hardware but security risks are often harder to manage. Many legacy applications leverage HTTP or other plaintext protocols that are vulnerable to all manner of attacks. Often, modifying a legacy application’s codebase to implement TLS is untenable, necessitating other options.

One comparatively simple approach is to leverage an Application Load Balancer or a Classic Load Balancer to provide SSL offloading. In this scenario, the load balancer would be exposed to users, while the application servers that only support plaintext protocols will reside within a subnet which is can only be accessed by the load balancer. The load balancer would perform the decryption of all traffic destined for the application instance, forwarding the plaintext traffic to the instances. This allows  you to use encryption on traffic between the client and the load balancer, leaving only internal communication between the load balancer and the application in plaintext. Often this approach is sufficient to meet security requirements, however, in more stringent scenarios it is never acceptable for traffic to be transmitted in plaintext, even if within a secured subnet. In this scenario, a sidecar can be used to eliminate plaintext traffic ever traversing the network.

Improving Security and Configuration Management with Sidecars

One approach to providing encryption to legacy applications is to leverage what’s often termed the “sidecar pattern.” The sidecar pattern entails a second process acting as a proxy to the legacy application. The legacy application only exposes its services via the local loopback adapter and is thus accessible only to the sidecar. In turn the sidecar acts as an encrypted proxy, exposing the legacy application’s API to external consumers via TLS. As unencrypted traffic between the sidecar and the legacy application traverses the loopback adapter, it never traverses the network. This approach can help add encryption (or stronger encryption) to legacy applications when it’s not feasible to modify the original codebase. A common approach to implanting sidecars is through container groups such as pod in EKS or a task in ECS.

Implementing the Sidecar Pattern With Containers

Figure 1: Implementing the Sidecar Pattern With Containers

Another use of the sidecar pattern is to help legacy applications leverage modern cloud services. A common example of this is using a sidecar to manage files pertaining to the legacy application. This could entail a number of options including:

  • Having the sidecar dynamically modify the configuration for a legacy application based upon some external factor, such as the output of Lambda function, SNS event or DynamoDB write.
  • Having the sidecar write application state to a cache or database. Often applications will write state to the local disk. This can be problematic for autoscaling or disaster recovery, whereby having the state easily accessible to other instances is advantages. To facilitate this, the sidecar can write state to Amazon S3, Amazon DynamoDB, Amazon Elasticache or Amazon RDS.

A sidecar requires customer development, but it doesn’t require any modification of the lift-and-shifted application. A sidecar treats the application as a blackbox and interacts with it via its API, configuration file, or other standard mechanism.

Automating Security

A lift-and-shift can achieve a significantly stronger security posture by incorporating elements of DevSecOps. DevSecOps is a philosophy that argues that everyone is responsible for security and advocates for automation all parts of the security process. AWS has a number of services which can help implement a DevSecOps strategy. These services include:

  • Amazon GuardDuty: a continuous monitoring system which analyzes AWS CloudTrail Events, Amazon VPC Flow Log and DNS Logs. GuardDuty can detect threats and trigger an automated response.
  • AWS Shield: a managed DDOS protection services
  • AWS WAF: a managed Web Application Firewall
  • AWS Config: a service for assessing, tracking, and auditing changes to AWS configuration

These services can help detect security problems and implement a response in real time, achieving a significantly strong posture than traditional security strategies. You can build a DevSecOps strategy around a lift-and-shift workload using these services, without having to modify the lift-and-shift application.

Conclusion

There are many opportunities for taking advantage of AWS services and features to improve a lift-and-shift workload. Without any alteration to the application you can strengthen your security posture by utilizing AWS security services and by making small environmental and architectural changes that can help alleviate the challenges of legacy workloads.

About the author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to transform their businesses and build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

Optimizing a Lift-and-Shift for Cost Effectiveness and Ease of Management

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/architecture/optimizing-a-lift-and-shift-for-cost/

Lift-and-shift is the process of migrating a workload from on premise to AWS with little or no modification. A lift-and-shift is a common route for enterprises to move to the cloud, and can be a transitionary state to a more cloud native approach. This is the second blog post in a three-part series which investigates how to optimize a lift-and-shift workload. The first post is about performance.

A key concern that many customers have with a lift-and-shift is cost. If you move an application as is  from on-prem to AWS, is there any possibility for meaningful cost savings? By employing AWS services, in lieu of self-managed EC2 instances, and by leveraging cloud capability such as auto scaling, there is potential for significant cost savings. In this blog post, we will discuss a number of AWS services and solutions that you can leverage with minimal or no change to your application codebase in order to significantly reduce management costs and overall Total Cost of Ownership (TCO).

Automate

Even if you can’t modify your application, you can change the way you deploy your application. The adopting-an-infrastructure-as-code approach can vastly improve the ease of management of your application, thereby reducing cost. By templating your application through Amazon CloudFormation, Amazon OpsWorks, or Open Source tools you can make deploying and managing your workloads a simple and repeatable process.

As part of the lift-and-shift process, rationalizing the workload into a set of templates enables less time to spent in the future deploying and modifying the workload. It enables the easy creation of dev/test environments, facilitates blue-green testing, opens up options for DR, and gives the option to roll back in the event of error. Automation is the single step which is most conductive to improving ease of management.

Reserved Instances and Spot Instances

A first initial consideration around cost should be the purchasing model for any EC2 instances. Reserved Instances (RIs) represent a 1-year or 3-year commitment to EC2 instances and can enable up to 75% cost reduction (over on demand) for steady state EC2 workloads. They are ideal for 24/7 workloads that must be continually in operation. An application requires no modification to make use of RIs.

An alternative purchasing model is EC2 spot. Spot instances offer unused capacity available at a significant discount – up to 90%. Spot instances receive a two-minute warning when the capacity is required back by EC2 and can be suspended and resumed. Workloads which are architected for batch runs – such as analytics and big data workloads – often require little or no modification to make use of spot instances. Other burstable workloads such as web apps may require some modification around how they are deployed.

A final alternative is on-demand. For workloads that are not running in perpetuity, on-demand is ideal. Workloads can be deployed, used for as long as required, and then terminated. By leveraging some simple automation (such as AWS Lambda and CloudWatch alarms), you can schedule workloads to start and stop at the open and close of business (or at other meaningful intervals). This typically requires no modification to the application itself. For workloads that are not 24/7 steady state, this can provide greater cost effectiveness compared to RIs and more certainty and ease of use when compared to spot.

Amazon FSx for Windows File Server

Amazon FSx for Windows File Server provides a fully managed Windows filesystem that has full compatibility with SMB and DFS and full AD integration. Amazon FSx is an ideal choice for lift-and-shift architectures as it requires no modification to the application codebase in order to enable compatibility. Windows based applications can continue to leverage standard, Windows-native protocols to access storage with Amazon FSx. It enables users to avoid having to deploy and manage their own fileservers – eliminating the need for patching, automating, and managing EC2 instances. Moreover, it’s easy to scale and minimize costs, since Amazon FSx offers a pay-as-you-go pricing model.

Amazon EFS

Amazon Elastic File System (EFS) provides high performance, highly available multi-attach storage via NFS. EFS offers a drop-in replacement for existing NFS deployments. This is ideal for a range of Linux and Unix usecases as well as cross-platform solutions such as Enterprise Java applications. EFS eliminates the need to manage NFS infrastructure and simplifies storage concerns. Moreover, EFS provides high availability out of the box, which helps to reduce single points of failure and avoids the need to manually configure storage replication. Much like Amazon FSx, EFS enables customers to realize cost improvements by moving to a pay-as-you-go pricing model and requires a modification of the application.

Amazon MQ

Amazon MQ is a managed message broker service that provides compatibility with JMS, AMQP, MQTT, OpenWire, and STOMP. These are amongst the most extensively used middleware and messaging protocols and are a key foundation of enterprise applications. Rather than having to manually maintain a message broker, Amazon MQ provides a performant, highly available managed message broker service that is compatible with existing applications.

To use Amazon MQ without any modification, you can adapt applications that leverage a standard messaging protocol. In most cases, all you need to do is update the application’s MQ endpoint in its configuration. Subsequently, the Amazon MQ service handles the heavy lifting of operating a message broker, configuring HA, fault detection, failure recovery, software updates, and so forth. This offers a simple option for reducing management overhead and improving the reliability of a lift-and-shift architecture. What’s more is that applications can migrate to Amazon MQ without the need for any downtime, making this an easy and effective way to improve a lift-and-shift.

You can also use Amazon MQ to integrate legacy applications with modern serverless applications. Lambda functions can subscribe to MQ topics and trigger serverless workflows, enabling compatibility between legacy and new workloads.

Integrating Lift-and-Shift Workloads with Lambda via Amazon MQ

Figure 1: Integrating Lift-and-Shift Workloads with Lambda via Amazon MQ

Amazon Managed Streaming Kafka

Lift-and-shift workloads which include a streaming data component are often built around Apache Kafka. There is a certain amount of complexity involved in operating a Kafka cluster which incurs management and operational expense. Amazon Kinesis is a managed alternative to Apache Kafka, but it is not a drop-in replacement. At re:Invent 2018, we announced the launch of Amazon Managed Streaming Kafka (MSK) in public preview. MSK provides a managed Kafka deployment with pay-as-you-go pricing and an acts as a drop-in replacement in existing Kafka workloads. MSK can help reduce management costs and improve cost efficiency and is ideal for lift-and-shift workloads.

Leveraging S3 for Static Web Hosting

A significant portion of any web application is static content. This includes videos, image, text, and other content that changes seldom, if ever. In many lift-and-shifted applications, web servers are migrated to EC2 instances and host all content – static and dynamic. Hosting static content from an EC2 instance incurs a number of costs including the instance, EBS volumes, and likely, a load balancer. By moving static content to S3, you can significantly reduce the amount of compute required to host your web applications. In many cases, this change is non-disruptive and can be done at the DNS or CDN layer, requiring no change to your application.

Reducing Web Hosting Costs with S3 Static Web Hosting

Figure 2: Reducing Web Hosting Costs with S3 Static Web Hosting

Conclusion

There are numerous opportunities for reducing the cost of a lift-and-shift. Without any modification to the application, lift-and-shift workloads can benefit from cloud-native features. By using AWS services and features, you can significantly reduce the undifferentiated heavy lifting inherent in on-prem workloads and reduce resources and management overheads.

About the author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to transform their businesses and build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

Optimizing a Lift-and-Shift for Performance

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/architecture/optimizing-a-lift-and-shift-for-performance/

Many organizations begin their cloud journey with a lift-and-shift of applications from on-premise to AWS. This approach involves migrating software deployments with little, or no, modification. A lift-and-shift avoids a potentially expensive application rewrite but can result in a less optimal workload that a cloud native solution. For many organizations, a lift-and-shift is a transitional stage to an eventual cloud native solution, but there are some applications that can’t feasibly be made cloud-native such as legacy systems or proprietary third-party solutions. There are still clear benefits of moving these workloads to AWS, but how can they be best optimized?

In this blog series post, we’ll look at different approaches for optimizing a black box lift-and-shift. We’ll consider how we can significantly improve a lift-and-shift application across three perspectives: performance, cost, and security. We’ll show that without modifying the application we can integrate services and features that will make a lift-and-shift workload cheaper, faster, more secure, and more reliable. In this first blog, we’ll investigate how a lift-and-shift workload can have improved performance through leveraging AWS features and services.

Performance gains are often a motivating factor behind a cloud migration. On-premise systems may suffer from performance bottlenecks owing to legacy infrastructure or through capacity issues. When performing a lift-and-shift, how can you improve performance? Cloud computing is famous for enabling horizontally scalable architectures but many legacy applications don’t support this mode of operation. Traditional business applications are often architected around a fixed number of servers and are unable to take advantage of horizontal scalability. Even if a lift-and-shift can’t make use of auto scaling groups and horizontal scalability, you can achieve significant performance gains by moving to AWS.

Scaling Up

The easiest alternative to scale up to compute is vertical scalability. AWS provides the widest selection of virtual machine types and the largest machine types. Instances range from small, burstable t3 instances series all the way to memory optimized x1 series. By leveraging the appropriate instance, lift-and-shifts can benefit from significant performance. Depending on your workload, you can also swap out the instances used to power your workload to better meet demand. For example, on days in which you anticipate high load you could move to more powerful instances. This could be easily automated via a Lambda function.

The x1 family of instances offers considerable CPU, memory, storage, and network performance and can be used to accelerate applications that are designed to maximize single machine performance. The x1e.32xlarge instance, for example, offers 128 vCPUs, 4TB RAM, and 14,000 Mbps EBS bandwidth. This instance is ideal for high performance in-memory workloads such as real time financial risk processing or SAP Hana.

Through selecting the appropriate instance types and scaling that instance up and down to meet demand, you can achieve superior performance and cost effectiveness compared to running a single static instance. This affords lift-and-shift workloads far greater efficiency that their on-prem counterparts.

Placement Groups and C5n Instances

EC2 Placement groups determine how you deploy instances to underlying hardware. One can either choose to cluster instances into a low latency group within a single AZ or spread instances across distinct underlying hardware. Both types of placement groups are useful for optimizing lift-and-shifts.

The spread placement group is valuable in applications that rely on a small number of critical instances. If you can’t modify your application  to leverage auto scaling, liveness probes, or failover, then spread placement groups can help reduce the risk of simultaneous failure while improving the overall reliability of the application.

Cluster placement groups help improve network QoS between instances. When used in conjunction with enhanced networking, cluster placement groups help to ensure low latency, high throughput, and high network packets per second. This is beneficial for chatty applications and any application that leveraged physical co-location for performance on-prem.

There is no additional charge for using placement groups.

You can extend this approach further with C5n instances. These instances offer 100Gbps networking and can be used in placement group for the most demanding networking intensive workloads. Using both placement groups and the C5n instances require no modification to your application, only to how it is deployed – making it a strong solution for providing network performance to lift-and-shift workloads.

Leverage Tiered Storage to Optimize for Price and Performance

AWS offers a range of storage options, each with its own performance characteristics and price point. Through leveraging a combination of storage types, lift-and-shifts can achieve the performance and availability requirements in a price effective manner. The range of storage options include:

Amazon EBS is the most common storage service involved with lift-and-shifts. EBS provides block storage that can be attached to EC2 instances and formatted with a typical file system such as NTFS or ext4. There are several different EBS types, ranging from inexpensive magnetic storage to highly performant provisioned IOPS SSDs. There are also storage-optimized instances that offer high performance EBS access and NVMe storage. By utilizing the appropriate type of EBS volume and instance, a compromise of performance and price can be achieved. RAID offers a further option to optimize EBS. EBS utilizes RAID 1 by default, providing replication at no additional cost, however an EC2 instance can apply other RAID levels. For instance, you can apply RAID 0 over a number of EBS volumes in order to improve storage performance.

In addition to EBS, EC2 instances can utilize the EC2 instance store. The instance store provides ephemeral direct attached storage to EC2 instances. The instance store is included with the EC2 instance and provides a facility to store non-persistent data. This makes it ideal for temporary files that an application produces, which require performant storage. Both EBS and the instance store are expose to the EC2 instance as block level devices, and the OS can use its native management tools to format and mount these volumes as per a traditional disk – requiring no significant departure from the on prem configuration. In several instance types including the C5d and P3d are equipped with local NVMe storage which can support extremely IO intensive workloads.

Not all workloads require high performance storage. In many cases finding a compromise between price and performance is top priority. Amazon S3 provides highly durable, object storage at a significantly lower price point than block storage. S3 is ideal for a large number of use cases including content distribution, data ingestion, analytics, and backup. S3, however, is accessible via a RESTful API and does not provide conventional file system semantics as per EBS. This may make S3 less viable for applications that you can’t easily modify, but there are still options for using S3 in such a scenario.

An option for leveraging S3 is AWS Storage Gateway. Storage Gateway is a virtual appliance than can be run on-prem or on EC2. The Storage Gateway appliance can operate in three configurations: file gateway, volume gateway and tape gateway. File gateway provides an NFS interface, Volume Gateway provides an iSCSI interface, and Tape Gateway provides an iSCSI virtual tape library interface. This allows files, volumes, and tapes to be exposed to an application host through conventional protocols with the Storage Gateway appliance persisting data to S3. This allows an application to be agnostic to S3 while leveraging typical enterprise storage protocols.

Using S3 Storage via Storage Gateway

Figure 1: Using S3 Storage via Storage Gateway

Conclusion

A lift-and-shift can achieve significant performance gains on AWS by making use of a range of instance types, storage services, and other features. Even without any modification to the application, lift-and-shift workloads can benefit from cutting edge compute, network, and IO which can help realize significant, meaningful performance gains.

About the author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to transform their businesses and build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Migrating from RabbitMQ to Amazon MQ

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/migrating-from-rabbitmq-to-amazon-mq/

This post is courtesy of Sam Dengler, AWS Solutions Architect.

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Some AWS customers are using RabbitMQ today and would like to migrate to a managed service to reduce the overhead of operating their own message broker.

Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easier to operate and scale message brokers in the cloud. Amazon MQ provides compatibility with your existing workloads that use standard protocols such as OpenWire, AMQP, MQTT, and Stomp (all enabled with SSL). Amazon MQ automatically provisions infrastructure configured as a single-instance broker or as an active/standby broker for high availability.

In this post, I describe how to launch a new Amazon MQ instance. I review example Java code to migrate from a RabbitMQ to Amazon MQ message broker using clients for ActiveMQ, Apache Qpid JMS, and Spring JmsTemplates. I also review best practices for Amazon MQ and changes from RabbitMQ to Amazon MQ to support Publish/Subscribe message patterns.

Getting started with Amazon MQ

To start, open the Amazon MQ console. Enter a broker name and choose Next step.

Launch a new Amazon MQ instance, choosing the mq.t2.micro instance type and Single-instance broker deployment mode, creating a user name and password, and choosing Create broker.

After several minutes, your instance changes status from Creation in progress to Running.  You can visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your instance queues, etc. In the following code examples, you use the OpenWire and AMQP endpoints.

To be able to access your broker, you must configure one of your security groups to allow inbound traffic. For more information, see the link to Detailed instructions in the blue box in the Connections section.

Now that your Amazon MQ broker is running, let’s look at some code!

Dependencies

The following code examples have dependencies across a range of libraries in order to demonstrate RabbitMQ, ActiveMQ, Qpid, Spring JMS templates, and connection pooling. I’ve listed all the dependencies in a single Maven pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MyGroup</groupId>
    <artifactId>MyArtifact</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
    
        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- Apache Connection Pooling -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache QPid -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-client</artifactId>
            <version>6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.29.0</version>
        </dependency>
                
        <!-- Spring JmsTemplate -->
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.3.RELEASE</version>
        </dependency>
        
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>
</project>

RabbitMQ

Here’s an example using RabbitMQ to send and receive a message via queue. The installation and configuration of RabbitMQ is out of scope for this post. For instructions for downloading and installing RabbitMQ, see Downloading and Installing RabbitMQ.

RabbitMQ uses the AMQP 0-9-1 protocol by default, with support for AMQP 1.0 via a plugin. The RabbitMQ examples in this post use the AMQP 0-9

RabbitMQ queue example

To start, here’s some sample code to send and receive a message in RabbitMQ using a queue.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT;
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish("", QUEUE, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        Connection consumerConnection = connectionFactory.newConnection();

        // Create a channel for the consumer.
        Channel consumerChannel = consumerConnection.createChannel();

        // Create a queue named "MyQueue".
        consumerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Receive the message.
        GetResponse response = consumerChannel.basicGet(QUEUE, ACKNOWLEDGE_MODE);
        String message = new String(response.getBody(), "UTF-8");
        System.out.println("Message received: " + message);

        // Clean up the consumer.
        consumerChannel.close();
        consumerConnection.close();
    }
}

In this example, you need to specify the ENDPOINT, USERNAME, and PASSWORD for your RabbitMQ message broker using environment variables or dependency injection.

This example uses the RabbitMQ client library to establish connectivity to the message broker and a channel for communication. In RabbitMQ, messages are sent over the channel to a named queue, which stores messages in a buffer, and from which consumers can receive and process messages. In this example, you publish a message using the Channel.basicPublish method, using the default exchange, identified by an empty string (“”).

To receive and process the messages in the queue, create a second connection, channel, and queue. Queue declaration is an idempotent operation, so there is no harm in declaring it twice. In this example, you receive the message using the Channel.basicGet method, automatically acknowledging message receipt to the broker.

This example demonstrates the basics of sending and receiving a message of one type. However, what if you wanted to publish messages of different types such that various consumers could subscribe only to pertinent message types (that is, pub/sub)?  Here’s a RabbitMQ example using topic exchanges to route messages to different queues.

RabbitMQ topic example

This example is similar to the one earlier. To enable topic publishing, specify two additional properties: EXCHANGE and ROUTING_KEY. RabbitMQ uses the exchange and routing key properties for routing messaging. Look at how these properties change the code to publish a message.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, Queue, Exhange, and Routing Key should
    // be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "amqp://localhost:5672"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    private static final String EXCHANGE = "MyExchange";
    private static final String ROUTING_KEY = "MyRoutingKey";
    
    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create an exchange named "MyExchange".
        producerChannel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);

        // Bind "MyQueue" to "MyExchange", using routing key "MyRoutingKey".
        producerChannel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish(EXCHANGE, ROUTING_KEY, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();
        
        ...


As before, you establish a connection to the RabbitMQ message broker, a channel for communication, and a queue to buffer messages for consumption. In addition to these components, you declare an explicit exchange of type BuiltinExchangeType.TOPIC and bind the queue to the exchange using the ROUTING_KEY that filters messages to send to the queue.

Again, publish a message using the Channel.basicPublish method. This time, instead of publishing the message to a queue, specify the EXCHANGE and ROUTING_KEY values for the message. RabbitMQ uses these properties to route the message to the appropriate queue, from which a consumer receives the message using the same code from the first example.

JMS API

Now that you’ve seen examples for queue and topic publishing in RabbitMQ, look at code changes to support Amazon MQ, starting with the ActiveMQ client. But first, a quick review of the Java Messaging Service (JMS) API.

The remainder of the examples in this post use the JMS API, which abstracts messaging methods from underlying protocol and client implementations. The JMS API programming model uses a combination of connection factories, connections, sessions, destinations, message producers, and message consumers to send and receive messages. The following image (from The Java EE 6 Tutorial) shows the relationship between these components:

ActiveMQ OpenWire connectivity to Amazon MQ

Here’s how JMS is used with ActiveMQ to send and receive messages on a queue.

The ActiveMQ client uses the OpenWire protocol, supported by Amazon MQ. The OpenWire protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

ActiveMQ queue example

Next, here’s an example to send and receive messages to Amazon MQ using the ActiveMQ client. This example should look familiar, as it follows the same flow to send and receive messages via a queue. I’ve included the example in full and then highlighted the differences to consider when migrating from RabbitMQ.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, you use the ActiveMQ client to establish connectivity to AmazonMQ using the OpenWire protocol with the ActiveMQConnectionFactory class to specify the endpoint and user credentials. For this example, use the master user name and password chosen when creating the Amazon MQ broker earlier. However, it’s a best practice to create additional Amazon MQ users for brokers in non-sandbox environments.

You could use the ActiveMQConnectionFactory to establish connectivity to the Amazon MQ broker. However, it is a best practice in Amazon MQ to group multiple producer requests using the ActiveMQ PooledConnectionFactory to wrap the ActiveMQConnectionFactory.

Using the PooledConnectionFactory, you can create a connection to Amazon MQ and establish a session to send a message. Like the RabbitMQ queue example, create a message queue destination using the Session.createQueue method, and a message producer to send the message to the queue.

For the consumer, use the ActiveMQConnectionFactory, NOT the PooledConnectionFactory, to create a connection, session, queue destination, and message consumer to receive the message because pooling of consumers is not considered a best practice. For more information, see the ActiveMQ Spring Support page.

ActiveMQ virtual destinations on Amazon MQ

Here’s how topic publishing differs from RabbitMQ to Amazon MQ.

If you remember from the RabbitMQ topic example, you bound a queue to an exchange using a routing key to control queue destinations when sending messages using a key attribute.

You run into a problem if you try to implement topic subscription using the message consumer in the preceding ActiveMQ queue example. The following is an excerpt from Virtual Destinations, which provides more detail on this subject:

A JMS durable subscriber MessageConsumer is created with a unique JMS clientID and durable subscriber name. To be JMS-compliant, only one JMS connection can be active at any point in time for one JMS clientID, and only one consumer can be active for a clientID and subscriber name. That is, only one thread can be actively consuming from a given logical topic subscriber.

To solve this, ActiveMQ supports the concept of a virtual destination, which provides a logical topic subscription access to a physical queue for consumption without breaking JMS compliance. To do so, ActiveMQ uses a simple convention for specifying the topic and queue names to configure message routing.

  • Topic names must use the “VirtualTopic.” prefix, followed by the topic name. For example, VirtualTopic.MyTopic.
  • Consumer names must use the “Consumer.” prefix, followed by the consumer name, followed by the topic name. For example, Consumer.MyConsumer.VirtualTopic.MyTopic.

ActiveMQ topic example

Next, here’s an example for the ActiveMQ client that demonstrates publishing messaging to topics. This example is similar to the ActiveMQ Queue Example. In this one, create a Topic destination instead of a queue destination.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a topic named "VirtualTopic.MyTopic".
        Destination producerDestination = producerSession.createTopic(PRODUCER_TOPIC);

        // Create a producer from the session to the topic.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue called "Consumer.Consumer1.VirtualTopic.MyTopic".
        Destination consumerDestination = consumerSession.createQueue(CONSUMER1_TOPIC);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, the message producer uses the Session.createTopic method with the topic name, VirtualTopic.MyTopic, as the publishing destination. The message consumer code does not change, but the queue destination uses the virtual destination convention, Consumer.Consumer1.VirtualTopic.MyTopic. ActiveMQ uses these names for the topic and queue to route messages accordingly.

AMQP connectivity to Amazon MQ

Now that you’ve explored some examples using an ActiveMQ client, look at examples using the Qpid JMS client to connect to the Amazon MQ broker over the AMQP 1.0 protocol and see how they differ.

The Qpid client uses the Advanced Message Queuing Protocol (AMQP) 1.0 protocol, supported by Amazon MQ. The AMQP 1.0 protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It uses port 5671, which must be opened in the Security Group associated with the Amazon MQ broker.

The AMQP endpoint specifies a transport, amqp+ssl. For encrypted connections, Qpid expects the protocol name to be amqps, instead of amqp+ssl, however the rest of the connection address remains the same.

Qpid JMS queue example

Next, here’s an example to send and receive messages to Amazon MQ using the Qpid client. The Qpid JMS client is built using Apache Qpid Proton, an AMQP messaging toolkit.

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class QpidClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:5671"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException, NamingException {
        // Use JNDI to specify the AMQP endpoint
        Hashtable<Object, Object> env = new Hashtable<Object, Object>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put("connectionfactory.factoryLookup", ENDPOINT);
        javax.naming.Context context = new javax.naming.InitialContext(env);

        // Create a connection factory.
        ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection(USERNAME, PASSWORD);
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Qpid Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection(USERNAME, PASSWORD);
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

The Qpid queue example is similar to the ActiveMQ Queue Example. They both use the JMS API model to send and receive messages, but the difference is in how the ConnectionFactory and AMQP endpoint is specified. According to the Qpid client configuration documentation, the ConnectionFactory is specified using a JNDI InitialContext to look up JMS objects. The JNDI configuration is popularly specified in a file named jndi.properties on the Java Classpath. In this example, do it programmatically using a HashTable for simplicity.

NOTE: Although the Qpid client and Qpid JMS client are used to establish connectivity to Amazon MQ using the AMQP 1.0 protocol, the producer should still use the ActiveMQ PooledConnectionFactory to wrap the Qpid ConnectionFactory. This can be confusing because Qpid client provides a PooledConnectionFactory that should NOT be used for AMQP 1.0.

The Qpid topic example is identical to the earlier ActiveMQ topic example with the same substitution, which establishes the ConnectionFactory to the AMQP 1.0 endpoint via JNDI.

Spring JMS template queue example

Finally, here are examples using the Spring JmsTemplate to send and receive messages.

This example established connectivity to Amazon MQ using the same protocol and client library used in the ActiveMQ queue example. That example requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

The Spring JmsTemplate provides a higher-level abstraction on top of JMS. Code using the JmsTemplate class only needs to implement handlers to process messages, while the management of connections, sessions, message producers, and message consumers is delegated to Spring. Look at the following code:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
        
        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}

Although Spring manages connections, sessions, and message producers, the grouping of producer connections is still a best practice. The ActiveMQ PooledConnectionFactory class is used in this example. However, the Spring CachingConnectionFactory object is another option.

Following the PooledConnectionFactory creation, a JmsTemplate is created for the producer and an ActiveMQQueue is created as the message destination. To use JmsTemplate to send a message, a MessageCreator callback is defined that generates a text message via the JmsTemplate.

A second JmsTemplate with an ActiveMQQueue is created for the consumer. In this example, a single message is received synchronously, however, asynchronous message reception is a popular alternative when using message-driven POJOs.

Unlike the ActiveMQ examples, the Spring JMS template example does not require the explicit cleanup of the connection, session, message producer, or message consumer resources, as that is managed by Spring. Make sure to call the PooledConnectionFactory.stop method to cleanly exit the main method.

Finally, here’s an example using a Spring JmsTemplate for topic publishing.

Spring JmsTemplate topic example

This example combines the Spring JmsTemplate queue example with the virtual destinations approach from the ActiveMQ topic example. Look at the following code.

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQTopic(PRODUCER_TOPIC));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);

        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(CONSUMER1_TOPIC));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}
In this example, follow the ActiveMQ virtual destination naming convention for topics and queues:
  • When creating the producer JMS template, specify an ActiveMQTopic as the destination using the name VirtualTopic.MyTopic.
  • When creating the consumer JMS template, specify an ActiveMQQueue as the destination using the name Consumer.Consumer1.VirtualTopic.MyTopic.

ActiveMQ automatically handles routing messages from topic to queue.

Conclusion

In this post, I reviewed how to get started with an Amazon MQ broker and walked you through several code examples that explored the differences between RabbitMQ and Apache ActiveMQ client integrations. If you are considering migrating to Amazon MQ, these examples should help you understand the changes that might be required.

If you’re thinking about integrating your existing apps with new serverless apps, see the related post, Invoking AWS Lambda from Amazon MQ.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year for new AWS accounts.

Measuring the throughput for Amazon MQ using the JMS Benchmark

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/measuring-the-throughput-for-amazon-mq-using-the-jms-benchmark/

This post is courtesy of Alan Protasio, Software Development Engineer, Amazon Web Services

Just like compute and storage, messaging is a fundamental building block of enterprise applications. Message brokers (aka “message-oriented middleware”) enable different software systems, often written in different languages, on different platforms, running in different locations, to communicate and exchange information. Mission-critical applications, such as CRM and ERP, rely on message brokers to work.

A common performance consideration for customers deploying a message broker in a production environment is the throughput of the system, measured as messages per second. This is important to know so that application environments (hosts, threads, memory, etc.) can be configured correctly.

In this post, we demonstrate how to measure the throughput for Amazon MQ, a new managed message broker service for ActiveMQ, using JMS Benchmark. It should take between 15–20 minutes to set up the environment and an hour to run the benchmark. We also provide some tips on how to configure Amazon MQ for optimal throughput.

Benchmarking throughput for Amazon MQ

ActiveMQ can be used for a number of use cases. These use cases can range from simple fire and forget tasks (that is, asynchronous processing), low-latency request-reply patterns, to buffering requests before they are persisted to a database.

The throughput of Amazon MQ is largely dependent on the use case. For example, if you have non-critical workloads such as gathering click events for a non-business-critical portal, you can use ActiveMQ in a non-persistent mode and get extremely high throughput with Amazon MQ.

On the flip side, if you have a critical workload where durability is extremely important (meaning that you can’t lose a message), then you are bound by the I/O capacity of your underlying persistence store. We recommend using mq.m4.large for the best results. The mq.t2.micro instance type is intended for product evaluation. Performance is limited, due to the lower memory and burstable CPU performance.

Tip: To improve your throughput with Amazon MQ, make sure that you have consumers processing messaging as fast as (or faster than) your producers are pushing messages.

Because it’s impossible to talk about how the broker (ActiveMQ) behaves for each and every use case, we walk through how to set up your own benchmark for Amazon MQ using our favorite open-source benchmarking tool: JMS Benchmark. We are fans of the JMS Benchmark suite because it’s easy to set up and deploy, and comes with a built-in visualizer of the results.

Non-Persistent Scenarios – Queue latency as you scale producer throughput

JMS Benchmark nonpersistent scenarios

Getting started

At the time of publication, you can create an mq.m4.large single-instance broker for testing for $0.30 per hour (US pricing).

This walkthrough covers the following tasks:

  1.  Create and configure the broker.
  2. Create an EC2 instance to run your benchmark
  3. Configure the security groups
  4.  Run the benchmark.

Step 1 – Create and configure the broker
Create and configure the broker using Tutorial: Creating and Configuring an Amazon MQ Broker.

Step 2 – Create an EC2 instance to run your benchmark
Launch the EC2 instance using Step 1: Launch an Instance. We recommend choosing the m5.large instance type.

Step 3 – Configure the security groups
Make sure that all the security groups are correctly configured to let the traffic flow between the EC2 instance and your broker.

  1. Sign in to the Amazon MQ console.
  2. From the broker list, choose the name of your broker (for example, MyBroker)
  3. In the Details section, under Security and network, choose the name of your security group or choose the expand icon ( ).
  4. From the security group list, choose your security group.
  5. At the bottom of the page, choose Inbound, Edit.
  6. In the Edit inbound rules dialog box, add a role to allow traffic between your instance and the broker:
    • Choose Add Rule.
    • For Type, choose Custom TCP.
    • For Port Range, type the ActiveMQ SSL port (61617).
    • For Source, leave Custom selected and then type the security group of your EC2 instance.
    • Choose Save.

Your broker can now accept the connection from your EC2 instance.

Step 4 – Run the benchmark
Connect to your EC2 instance using SSH and run the following commands:

$ cd ~
$ curl -L https://github.com/alanprot/jms-benchmark/archive/master.zip -o master.zip
$ unzip master.zip
$ cd jms-benchmark-master
$ chmod a+x bin/*
$ env \
  SERVER_SETUP=false \
  SERVER_ADDRESS={activemq-endpoint} \
  ACTIVEMQ_TRANSPORT=ssl\
  ACTIVEMQ_PORT=61617 \
  ACTIVEMQ_USERNAME={activemq-user} \
  ACTIVEMQ_PASSWORD={activemq-password} \
  ./bin/benchmark-activemq

After the benchmark finishes, you can find the results in the ~/reports directory. As you may notice, the performance of ActiveMQ varies based on the number of consumers, producers, destinations, and message size.

Amazon MQ architecture

The last bit that’s important to know so that you can better understand the results of the benchmark is how Amazon MQ is architected.

Amazon MQ is architected to be highly available (HA) and durable. For HA, we recommend using the multi-AZ option. After a message is sent to Amazon MQ in persistent mode, the message is written to the highly durable message store that replicates the data across multiple nodes in multiple Availability Zones. Because of this replication, for some use cases you may see a reduction in throughput as you migrate to Amazon MQ. Customers have told us they appreciate the benefits of message replication as it helps protect durability even in the face of the loss of an Availability Zone.

Conclusion

We hope this gives you an idea of how Amazon MQ performs. We encourage you to run tests to simulate your own use cases.

To learn more, see the Amazon MQ website. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.

Running ActiveMQ in a Hybrid Cloud Environment with Amazon MQ

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/running-activemq-in-a-hybrid-cloud-environment-with-amazon-mq/

This post courtesy of Greg Share, AWS Solutions Architect

Many organizations, particularly enterprises, rely on message brokers to connect and coordinate different systems. Message brokers enable distributed applications to communicate with one another, serving as the technological backbone for their IT environment, and ultimately their business services. Applications depend on messaging to work.

In many cases, those organizations have started to build new or “lift and shift” applications to AWS. In some cases, there are applications, such as mainframe systems, too costly to migrate. In these scenarios, those on-premises applications still need to interact with cloud-based components.

Amazon MQ is a managed message broker service for ActiveMQ that enables organizations to send messages between applications in the cloud and on-premises to enable hybrid environments and application modernization. For example, you can invoke AWS Lambda from queues and topics managed by Amazon MQ brokers to integrate legacy systems with serverless architectures. ActiveMQ is an open-source message broker written in Java that is packaged with clients in multiple languages, Java Message Server (JMS) client being one example.

This post shows you can use Amazon MQ to integrate on-premises and cloud environments using the network of brokers feature of ActiveMQ. It provides configuration parameters for a one-way duplex connection for the flow of messages from an on-premises ActiveMQ message broker to Amazon MQ.

ActiveMQ and the network of brokers

First, look at queues within ActiveMQ and then at the network of brokers as a mechanism to distribute messages.

The network of brokers behaves differently from models such as physical networks. The key consideration is that the production (sending) of a message is disconnected from the consumption of that message. Think of the delivery of a parcel: The parcel is sent by the supplier (producer) to the end customer (consumer). The path it took to get there is of little concern to the customer, as long as it receives the package.

The same logic can be applied to the network of brokers. Here’s how you build the flow from a simple message to a queue and build toward a network of brokers. Before you look at setting up a hybrid connection, I discuss how a broker processes messages in a simple scenario.

When a message is sent from a producer to a queue on a broker, the following steps occur:

  1. A message is sent to a queue from the producer.
  2. The broker persists this in its store or journal.
  3. At this point, an acknowledgement (ACK) is sent to the producer from the broker.

When a consumer looks to consume the message from that same queue, the following steps occur:

  1. The message listener (consumer) calls the broker, which creates a subscription to the queue.
  2. Messages are fetched from the message store and sent to the consumer.
  3. The consumer acknowledges that the message has been received before processing it.
  4. Upon receiving the ACK, the broker sets the message as having been consumed. By default, this deletes it from the queue.
    • You can set the consumer to ACK after processing by setting up transaction management or handle it manually using Session.CLIENT_ACKNOWLEDGE.

Static propagation

I now introduce the concept of static propagation with the network of brokers as the mechanism for message transfer from on-premises brokers to Amazon MQ.  Static propagation refers to message propagation that occurs in the absence of subscription information. In this case, the objective is to transfer messages arriving at your selected on-premises broker to the Amazon MQ broker for consumption within the cloud environment.

After you configure static propagation with a network of brokers, the following occurs:

  1. The on-premises broker receives a message from a producer for a specific queue.
  2. The on-premises broker sends (statically propagates) the message to the Amazon MQ broker.
  3. The Amazon MQ broker sends an acknowledgement to the on-premises broker, which marks the message as having been consumed.
  4. Amazon MQ holds the message in its queue ready for consumption.
  5. A consumer connects to Amazon MQ broker, subscribes to the queue in which the message resides, and receives the message.
  6. Amazon MQ broker marks the message as having been consumed.

Getting started

The first step is creating an Amazon MQ broker.

  1. Sign in to the Amazon MQ console and launch a new Amazon MQ broker.
  2. Name your broker and choose Next step.
  3. For Broker instance type, choose your instance size:
    mq.t2.micro
    mq.m4.large
  4. For Deployment mode, enter one of the following:
    Single-instance broker for development and test implementations (recommended)
    Active/standby broker for high availability in production environments
  5. Scroll down and enter your user name and password.
  6. Expand Advanced Settings.
  7. For VPC, Subnet, and Security Group, pick the values for the resources in which your broker will reside.
  8. For Public Accessibility, choose Yes, as connectivity is internet-based. Another option would be to use private connectivity between your on-premises network and the VPC, an example being an AWS Direct Connect or VPN connection. In that case, you could set Public Accessibility to No.
  9. For Maintenance, leave the default value, No preference.
  10. Choose Create Broker. Wait several minutes for the broker to be created.

After creation is complete, you see your broker listed.

For connectivity to work, you must configure the security group where Amazon MQ resides. For this post, I focus on the OpenWire protocol.

For Openwire connectivity, allow port 61617 access for Amazon MQ from your on-premises ActiveMQ broker source IP address. For alternate protocols, see the Amazon MQ broker configuration information for the ports required:

OpenWire – ssl://xxxxxxx.xxx.com:61617
AMQP – amqp+ssl:// xxxxxxx.xxx.com:5671
STOMP – stomp+ssl:// xxxxxxx.xxx.com:61614
MQTT – mqtt+ssl:// xxxxxxx.xxx.com:8883
WSS – wss:// xxxxxxx.xxx.com:61619

Configuring the network of brokers

Configuring the network of brokers with static propagation occurs on the on-premises broker by applying changes to the following file:
<activemq install directory>/conf activemq.xml

Network connector

This is the first configuration item required to enable a network of brokers. It is only required on the on-premises broker, which initiates and creates the connection with Amazon MQ. This connection, after it’s established, enables the flow of messages in either direction between the on-premises broker and Amazon MQ. The focus of this post is the uni-directional flow of messages from the on-premises broker to Amazon MQ.

The default activemq.xml file does not include the network connector configuration. Add this with the networkConnector element. In this scenario, edit the on-premises broker activemq.xml file to include the following information between <systemUsage> and <transportConnectors>:

<networkConnectors>
             <networkConnector 
                name="Q:source broker name->target broker name"
                duplex="false" 
                uri="static:(ssl:// aws mq endpoint:61617)" 
                userName="username"
                password="password" 
                networkTTL="2" 
                dynamicOnly="false">
                <staticallyIncludedDestinations>
                    <queue physicalName="queuename"/>
                </staticallyIncludedDestinations> 
                <excludedDestinations>
                      <queue physicalName=">" />
                </excludedDestinations>
             </networkConnector> 
     <networkConnectors>

The highlighted components are the most important elements when configuring your on-premises broker.

  • name – Name of the network bridge. In this case, it specifies two things:
    • That this connection relates to an ActiveMQ queue (Q) as opposed to a topic (T), for reference purposes.
    • The source broker and target broker.
  • duplex –Setting this to false ensures that messages traverse uni-directionally from the on-premises broker to Amazon MQ.
  • uri –Specifies the remote endpoint to which to connect for message transfer. In this case, it is an Openwire endpoint on your Amazon MQ broker. This information could be obtained from the Amazon MQ console or via the API.
  • username and password – The same username and password configured when creating the Amazon MQ broker, and used to access the Amazon MQ ActiveMQ console.
  • networkTTL – Number of brokers in the network through which messages and subscriptions can pass. Leave this setting at the current value, if it is already included in your broker connection.
  • staticallyIncludedDestinations > queue physicalName – The destination ActiveMQ queue for which messages are destined. This is the queue that is propagated from the on-premises broker to the Amazon MQ broker for message consumption.

After the network connector is configured, you must restart the ActiveMQ service on the on-premises broker for the changes to be applied.

Verify the configuration

There are a number of places within the ActiveMQ console of your on-premises and Amazon MQ brokers to browse to verify that the configuration is correct and the connection has been established.

On-premises broker

Launch the ActiveMQ console of your on-premises broker and navigate to Network. You should see an active network bridge similar to the following:

This identifies that the connection between your on-premises broker and your Amazon MQ broker is up and running.

Now navigate to Connections and scroll to the bottom of the page. Under the Network Connectors subsection, you should see a connector labeled with the name: value that you provided within the ActiveMQ.xml configuration file. You should see an entry similar to:

Amazon MQ broker

Launch the ActiveMQ console of your Amazon MQ broker and navigate to Connections. Scroll to the Connections openwire subsection and you should see a connection specified that references the name: value that you provided within the ActiveMQ.xml configuration file. You should see an entry similar to:

If you configured the uri: for AMQP, STOMP, MQTT, or WSS as opposed to Openwire, you would see this connection under the corresponding section of the Connections page.

Testing your message flow

The setup described outlines a way for messages produced on premises to be propagated to the cloud for consumption in the cloud. This section provides steps on verifying the message flow.

Verify that the queue has been created

After you specify this queue name as staticallyIncludedDestinations > queue physicalName: and your ActiveMQ service starts, you see the following on your on-premises ActiveMQ console Queues page.

As you can see, no messages have been sent but you have one consumer listed. If you then choose Active Consumers under the Views column, you see Active Consumers for TestingQ.

This is telling you that your Amazon MQ broker is a consumer of your on-premises broker for the testing queue.

Produce and send a message to the on-premises broker

Now, produce a message on an on-premises producer and send it to your on-premises broker to a queue named TestingQ. If you navigate back to the queues page of your on-premises ActiveMQ console, you see that the messages enqueued and messages dequeued column count for your TestingQ queue have changed:

What this means is that the message originating from the on-premises producer has traversed the on-premises broker and propagated immediately to the Amazon MQ broker. At this point, the message is no longer available for consumption from the on-premises broker.

If you access the ActiveMQ console of your Amazon MQ broker and navigate to the Queues page, you see the following for the TestingQ queue:

This means that the message originally sent to your on-premises broker has traversed the network of brokers unidirectional network bridge, and is ready to be consumed from your Amazon MQ broker. The indicator is the Number of Pending Messages column.

Consume the message from an Amazon MQ broker

Connect to the Amazon MQ TestingQ queue from a consumer within the AWS Cloud environment for message consumption. Log on to the ActiveMQ console of your Amazon MQ broker and navigate to the Queue page:

As you can see, the Number of Pending Messages column figure has changed to 0 as that message has been consumed.

This diagram outlines the message lifecycle from the on-premises producer to the on-premises broker, traversing the hybrid connection between the on-premises broker and Amazon MQ, and finally consumption within the AWS Cloud.

Conclusion

This post focused on an ActiveMQ-specific scenario for transferring messages within an ActiveMQ queue from an on-premises broker to Amazon MQ.

For other on-premises brokers, such as IBM MQ, another approach would be to run ActiveMQ on-premises broker and use JMS bridging to IBM MQ, while using the approach in this post to forward to Amazon MQ. Yet another approach would be to use Apache Camel for more sophisticated routing.

I hope that you have found this example of hybrid messaging between an on-premises environment in the AWS Cloud to be useful. Many customers are already using on-premises ActiveMQ brokers, and this is a great use case to enable hybrid cloud scenarios.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.

 

Invoking AWS Lambda from Amazon MQ

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/invoking-aws-lambda-from-amazon-mq/

Contributed by Josh Kahn, AWS Solutions Architect

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

In this post, I discuss one approach to invoking AWS Lambda from queues and topics managed by Amazon MQ brokers. This and other similar patterns can be useful in integrating legacy systems with serverless architectures. You could also integrate systems already migrated to the cloud that use common APIs such as JMS.

For example, imagine that you work for a company that produces training videos and which recently migrated its video management system to AWS. The on-premises system used to publish a message to an ActiveMQ broker when a video was ready for processing by an on-premises transcoder. However, on AWS, your company uses Amazon Elastic Transcoder. Instead of modifying the management system, Lambda polls the broker for new messages and starts a new Elastic Transcoder job. This approach avoids changes to the existing application while refactoring the workload to leverage cloud-native components.

This solution uses Amazon CloudWatch Events to trigger a Lambda function that polls the Amazon MQ broker for messages. Instead of starting an Elastic Transcoder job, the sample writes the received message to an Amazon DynamoDB table with a time stamp indicating the time received.

Getting started

To start, navigate to the Amazon MQ console. Next, launch a new Amazon MQ instance, selecting Single-instance Broker and supplying a broker name, user name, and password. Be sure to document the user name and password for later.

For the purposes of this sample, choose the default options in the Advanced settings section. Your new broker is deployed to the default VPC in the selected AWS Region with the default security group. For this post, you update the security group to allow access for your sample Lambda function. In a production scenario, I recommend deploying both the Lambda function and your Amazon MQ broker in your own VPC.

After several minutes, your instance changes status from “Creation Pending” to “Available.” You can then visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your broker, publish test messages, and so on. In this example, use the Stomp protocol to connect to your broker. Be sure to capture the broker host name, for example:

<BROKER_ID>.mq.us-east-1.amazonaws.com

You should also modify the Security Group for the broker by clicking on its Security Group ID. Click the Edit button and then click Add Rule to allow inbound traffic on port 8162 for your IP address.

Deploying and scheduling the Lambda function

To simplify the deployment of this example, I’ve provided an AWS Serverless Application Model (SAM) template that deploys the sample function and DynamoDB table, and schedules the function to be invoked every five minutes. Detailed instructions can be found with sample code on GitHub in the amazonmq-invoke-aws-lambda repository, with sample code. I discuss a few key aspects in this post.

First, SAM makes it easy to deploy and schedule invocation of our function:

SubscriberFunction:
	Type: AWS::Serverless::Function
	Properties:
		CodeUri: subscriber/
		Handler: index.handler
		Runtime: nodejs6.10
		Role: !GetAtt SubscriberFunctionRole.Arn
		Timeout: 15
		Environment:
			Variables:
				HOST: !Ref AmazonMQHost
				LOGIN: !Ref AmazonMQLogin
				PASSWORD: !Ref AmazonMQPassword
				QUEUE_NAME: !Ref AmazonMQQueueName
				WORKER_FUNCTIOn: !Ref WorkerFunction
		Events:
			Timer:
				Type: Schedule
				Properties:
					Schedule: rate(5 minutes)

WorkerFunction:
Type: AWS::Serverless::Function
	Properties:
		CodeUri: worker/
		Handler: index.handler
		Runtime: nodejs6.10
Role: !GetAtt WorkerFunctionRole.Arn
		Environment:
			Variables:
				TABLE_NAME: !Ref MessagesTable

In the code, you include the URI, user name, and password for your newly created Amazon MQ broker. These allow the function to poll the broker for new messages on the sample queue.

The sample Lambda function is written in Node.js, but clients exist for a number of programming languages.

stomp.connect(options, (error, client) => {
	if (error) { /* do something */ }

	let headers = {
		destination: ‘/queue/SAMPLE_QUEUE’,
		ack: ‘auto’
	}

	client.subscribe(headers, (error, message) => {
		if (error) { /* do something */ }

		message.readString(‘utf-8’, (error, body) => {
			if (error) { /* do something */ }

			let params = {
				FunctionName: MyWorkerFunction,
				Payload: JSON.stringify({
					message: body,
					timestamp: Date.now()
				})
			}

			let lambda = new AWS.Lambda()
			lambda.invoke(params, (error, data) => {
				if (error) { /* do something */ }
			})
		}
})
})

Sending a sample message

For the purpose of this example, use the Amazon MQ console to send a test message. Navigate to the details page for your broker.

About midway down the page, choose ActiveMQ Web Console. Next, choose Manage ActiveMQ Broker to launch the admin console. When you are prompted for a user name and password, use the credentials created earlier.

At the top of the page, choose Send. From here, you can send a sample message from the broker to subscribers. For this example, this is how you generate traffic to test the end-to-end system. Be sure to set the Destination value to “SAMPLE_QUEUE.” The message body can contain any text. Choose Send.

You now have a Lambda function polling for messages on the broker. To verify that your function is working, you can confirm in the DynamoDB console that the message was successfully received and processed by the sample Lambda function.

First, choose Tables on the left and select the table name “amazonmq-messages” in the middle section. With the table detail in view, choose Items. If the function was successful, you’ll find a new entry similar to the following:

If there is no message in DynamoDB, check again in a few minutes or review the CloudWatch Logs group for Lambda functions that contain debug messages.

Alternative approaches

Beyond the approach described here, you may consider other approaches as well. For example, you could use an intermediary system such as Apache Flume to pass messages from the broker to Lambda or deploy Apache Camel to trigger Lambda via a POST to API Gateway. There are trade-offs to each of these approaches. My goal in using CloudWatch Events was to introduce an easily repeatable pattern familiar to many Lambda developers.

Summary

I hope that you have found this example of how to integrate AWS Lambda with Amazon MQ useful. If you have expertise or legacy systems that leverage APIs such as JMS, you may find this useful as you incorporate serverless concepts in your enterprise architectures.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.

Glenn’s Take on re:Invent 2017 Part 1

Post Syndicated from Glenn Gore original https://aws.amazon.com/blogs/architecture/glenns-take-on-reinvent-2017-part-1/

GREETINGS FROM LAS VEGAS

Glenn Gore here, Chief Architect for AWS. I’m in Las Vegas this week — with 43K others — for re:Invent 2017. We have a lot of exciting announcements this week. I’m going to post to the AWS Architecture blog each day with my take on what’s interesting about some of the announcements from a cloud architectural perspective.

Why not start at the beginning? At the Midnight Madness launch on Sunday night, we announced Amazon Sumerian, our platform for VR, AR, and mixed reality. The hype around VR/AR has existed for many years, though for me, it is a perfect example of how a working end-to-end solution often requires innovation from multiple sources. For AR/VR to be successful, we need many components to come together in a coherent manner to provide a great experience.

First, we need lightweight, high-definition goggles with motion tracking that are comfortable to wear. Second, we need to track movement of our body and hands in a 3-D space so that we can interact with virtual objects in the virtual world. Third, we need to build the virtual world itself and populate it with assets and define how the interactions will work and connect with various other systems.

There has been rapid development of the physical devices for AR/VR, ranging from iOS devices to Oculus Rift and HTC Vive, which provide excellent capabilities for the first and second components defined above. With the launch of Amazon Sumerian we are solving for the third area, which will help developers easily build their own virtual worlds and start experimenting and innovating with how to apply AR/VR in new ways.

Already, within 48 hours of Amazon Sumerian being announced, I have had multiple discussions with customers and partners around some cool use cases where VR can help in training simulations, remote-operator controls, or with new ideas around interacting with complex visual data sets, which starts bringing concepts straight out of sci-fi movies into the real (virtual) world. I am really excited to see how Sumerian will unlock the creative potential of developers and where this will lead.

Amazon MQ
I am a huge fan of distributed architectures where asynchronous messaging is the backbone of connecting the discrete components together. Amazon Simple Queue Service (Amazon SQS) is one of my favorite services due to its simplicity, scalability, performance, and the incredible flexibility of how you can use Amazon SQS in so many different ways to solve complex queuing scenarios.

While Amazon SQS is easy to use when building cloud-native applications on AWS, many of our customers running existing applications on-premises required support for different messaging protocols such as: Java Message Service (JMS), .Net Messaging Service (NMS), Advanced Message Queuing Protocol (AMQP), MQ Telemetry Transport (MQTT), Simple (or Streaming) Text Orientated Messaging Protocol (STOMP), and WebSockets. One of the most popular applications for on-premise message brokers is Apache ActiveMQ. With the release of Amazon MQ, you can now run Apache ActiveMQ on AWS as a managed service similar to what we did with Amazon ElastiCache back in 2012. For me, there are two compelling, major benefits that Amazon MQ provides:

  • Integrate existing applications with cloud-native applications without having to change a line of application code if using one of the supported messaging protocols. This removes one of the biggest blockers for integration between the old and the new.
  • Remove the complexity of configuring Multi-AZ resilient message broker services as Amazon MQ provides out-of-the-box redundancy by always storing messages redundantly across Availability Zones. Protection is provided against failure of a broker through to complete failure of an Availability Zone.

I believe that Amazon MQ is a major component in the tools required to help you migrate your existing applications to AWS. Having set up cross-data center Apache ActiveMQ clusters in the past myself and then testing to ensure they work as expected during critical failure scenarios, technical staff working on migrations to AWS benefit from the ease of deploying a fully redundant, managed Apache ActiveMQ cluster within minutes.

Who would have thought I would have been so excited to revisit Apache ActiveMQ in 2017 after using SQS for many, many years? Choice is a wonderful thing.

Amazon GuardDuty
Maintaining application and information security in the modern world is increasingly complex and is constantly evolving and changing as new threats emerge. This is due to the scale, variety, and distribution of services required in a competitive online world.

At Amazon, security is our number one priority. Thus, we are always looking at how we can increase security detection and protection while simplifying the implementation of advanced security practices for our customers. As a result, we released Amazon GuardDuty, which provides intelligent threat detection by using a combination of multiple information sources, transactional telemetry, and the application of machine learning models developed by AWS. One of the biggest benefits of Amazon GuardDuty that I appreciate is that enabling this service requires zero software, agents, sensors, or network choke points. which can all impact performance or reliability of the service you are trying to protect. Amazon GuardDuty works by monitoring your VPC flow logs, AWS CloudTrail events, DNS logs, as well as combing other sources of security threats that AWS is aggregating from our own internal and external sources.

The use of machine learning in Amazon GuardDuty allows it to identify changes in behavior, which could be suspicious and require additional investigation. Amazon GuardDuty works across all of your AWS accounts allowing for an aggregated analysis and ensuring centralized management of detected threats across accounts. This is important for our larger customers who can be running many hundreds of AWS accounts across their organization, as providing a single common threat detection of their organizational use of AWS is critical to ensuring they are protecting themselves.

Detection, though, is only the beginning of what Amazon GuardDuty enables. When a threat is identified in Amazon GuardDuty, you can configure remediation scripts or trigger Lambda functions where you have custom responses that enable you to start building automated responses to a variety of different common threats. Speed of response is required when a security incident may be taking place. For example, Amazon GuardDuty detects that an Amazon Elastic Compute Cloud (Amazon EC2) instance might be compromised due to traffic from a known set of malicious IP addresses. Upon detection of a compromised EC2 instance, we could apply an access control entry restricting outbound traffic for that instance, which stops loss of data until a security engineer can assess what has occurred.

Whether you are a customer running a single service in a single account, or a global customer with hundreds of accounts with thousands of applications, or a startup with hundreds of micro-services with hourly release cycle in a devops world, I recommend enabling Amazon GuardDuty. We have a 30-day free trial available for all new customers of this service. As it is a monitor of events, there is no change required to your architecture within AWS.

Stay tuned for tomorrow’s post on AWS Media Services and Amazon Neptune.

 

Glenn during the Tour du Mont Blanc

Amazon MQ – Managed Message Broker Service for ActiveMQ

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/amazon-mq-managed-message-broker-service-for-activemq/

Messaging holds the parts of a distributed application together, while also adding resiliency and enabling the implementation of highly scalable architectures. For example, earlier this year, Amazon Simple Queue Service (SQS) and Amazon Simple Notification Service (SNS) supported the processing of customer orders on Prime Day, collectively processing 40 billion messages at a rate of 10 million per second, with no customer-visible issues.

SQS and SNS have been used extensively for applications that were born in the cloud. However, many of our larger customers are already making use of open-sourced or commercially-licensed message brokers. Their applications are mission-critical, and so is the messaging that powers them. Our customers describe the setup and on-going maintenance of their messaging infrastructure as “painful” and report that they spend at least 10 staff-hours per week on this chore.

New Amazon MQ
Today we are launching Amazon MQ – a managed message broker service for Apache ActiveMQ that lets you get started in minutes with just three clicks! As you may know, ActiveMQ is a popular open-source message broker that is fast & feature-rich. It offers queues and topics, durable and non-durable subscriptions, push-based and poll-based messaging, and filtering.

As a managed service, Amazon MQ takes care of the administration and maintenance of ActiveMQ. This includes responsibility for broker provisioning, patching, failure detection & recovery for high availability, and message durability. With Amazon MQ, you get direct access to the ActiveMQ console and industry standard APIs and protocols for messaging, including JMS, NMS, AMQP, STOMP, MQTT, and WebSocket. This allows you to move from any message broker that uses these standards to Amazon MQ–along with the supported applications–without rewriting code.

You can create a single-instance Amazon MQ broker for development and testing, or an active/standby pair that spans AZs, with quick, automatic failover. Either way, you get data replication across AZs and a pay-as-you-go model for the broker instance and message storage.

Amazon MQ is a full-fledged part of the AWS family, including the use of AWS Identity and Access Management (IAM) for authentication and authorization to use the service API. You can use Amazon CloudWatch metrics to keep a watchful eye metrics such as queue depth and initiate Auto Scaling of your consumer fleet as needed.

Launching an Amazon MQ Broker
To get started, I open up the Amazon MQ Console, select the desired AWS Region, enter a name for my broker, and click on Next step:

Then I choose the instance type, indicate that I want to create a standby , and click on Create broker (I can select a VPC and fine-tune other settings in the Advanced settings section):

My broker will be created and ready to use in 5-10 minutes:

The URLs and endpoints that I use to access my broker are all available at a click:

I can access the ActiveMQ Web Console at the link provided:

The broker publishes instance, topic, and queue metrics to CloudWatch. Here are the instance metrics:

Available Now
Amazon MQ is available now and you can start using it today in the US East (Northern Virginia), US East (Ohio), US West (Oregon), EU (Ireland), EU (Frankfurt), and Asia Pacific (Sydney) Regions.

The AWS Free Tier lets you use a single-AZ micro instance for up to 750 hours and to store up to 1 gigabyte each month, for one year. After that, billing is based on instance-hours and message storage, plus charges Internet data transfer if the broker is accessed from outside of AWS.

Jeff;