Tag Archives: message queues

Creating static custom domain endpoints with Amazon MQ to simplify broker modification and scaling

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/creating-static-custom-domain-endpoints-with-amazon-mq/

This post is courtesy of Wallace Printz, Senior Solutions Architect, AWS, and Christian Mueller, Senior Solutions Architect, AWS.

Many cloud-native application architectures take advantage of the point-to-point and publish-subscribe (“pub-sub”) model of message-based communication between application components. This architecture is generally more resilient to failure because of the loose coupling and because message processing failures can be retried. It’s also more efficient because individual application components can independently scale up or down to maintain message-processing SLAs, compared to monolithic application architectures. Synchronous (REST-based) systems are tightly coupled. A problem in a synchronous downstream dependency has an immediate impact on the upstream callers.

Retries from upstream callers can all too easily fan out and amplify problems. Amazon SQS and Amazon SNS are fully managed message queuing services, but are not necessarily the right tool for the job in some cases. For applications requiring messaging protocols including JMS, NMS, AMQP, STOMP, MQTT, and WebSocket, Amazon provides Amazon MQ. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

Amazon MQ provides two managed broker deployment connection options: public brokers and private brokers. Public brokers receive internet-accessible IP addresses, while private brokers receive only private IP addresses from the corresponding CIDR range in their VPC subnet.

In some cases, for security purposes, you may prefer to place brokers in a private subnet. You can also allow access to the brokers through a persistent public endpoint, such as a subdomain of their corporate domain like mq.example.com.

In this post, we explain how to provision private Amazon MQ brokers behind a secure public load balancer endpoint using an example subdomain.

Architecture overview

There are several reasons one might want to deploy this architecture beyond the security aspects.

First, human-readable URLs are easier for people to parse when reviewing operations and troubleshooting, such as deploying updates to mq-dev.example.com before mq-prod.example.com.

Second, maintaining static URLs for your brokers helps reduce the necessity of modifying client code when performing maintenance on the brokers.

Third, this pattern allows you to vertically scale your brokers without changing the client code or even notifying the clients that changes have been made.

Finally, the same architecture described here works for a network of brokers configuration as well, whereby you could horizontally scale your brokers without impacting the client code.

Prerequisites

This blog post assumes some familiarity with AWS networking fundamentals, such as VPCs, subnets, load balancers, and Amazon Route 53.

When you are finished, the architecture should be set up as shown in the following diagram. For ease of visualization, we demonstrate with a pair of brokers using the active-standby option.

Solution Overview

Amazon MQ solution overview

The client to broker traffic flow is as follows.

  • First, the client service tries to connect with a failover URL to the domain endpoint setup in Route 53. If a client loses the connection, using the failover URL allows the client to automatically try to reconnect to the broker.
  • The client looks up the domain name from Route 53, and Route 53 returns the IP address of the Network Load Balancer.
  • The client creates a secure socket layer (SSL) connection to the Network Load Balancer with an SSL certificate provided from AWS Certificate Manager (ACM). The Network Load Balancer selects from the healthy brokers in its target group and creates a separate SSL connection between the Network Load Balancer and the broker. This provides secure, end-to-end SSL encrypted messaging between client and brokers.

In this diagram, the healthy broker connection is shown in the solid line. The standby broker, which does not reply to connection requests and is therefore marked as unhealthy in the target group, is shown in the dashed line.

Solution walkthrough

To build this architecture, build the network segmentation first, then the Amazon MQ brokers, and finally the network routing.

Setup

First, you need the following resources:

  • A VPC
  • One private subnet per Availability Zone
  • One public subnet for your bastion host (if desired)

This demonstration VPC uses the 20.0.0.0/16 CIDR range.

Additionally, you must create a custom security group for your brokers. Set up this security group to allow traffic from your Network Load Balancer and, if using a network of brokers, among the brokers as well.

This VPC is not being used for any other workloads. This demonstration allows all incoming traffic originating within the VPC, including the Network Load Balancer, through to the brokers on the following ports:

  • OpenWire communication port of 61617
  • Apache ActiveMQ console port of 8162

If you are using a different protocol, adjust the port numbers accordingly.

Create an amazon mq security group

Building the Amazon MQ brokers

Now that you have the network segmentation set up, build the Amazon MQ brokers. As mentioned previously, this demonstration uses the active-standby pair of private brokers option.

Configure the broker settings by selecting a broker name, instance type, ActiveMQ console user, and password first.

Configure Amazon MQ broker settings

In the Additional Settings area, place the brokers in your previously selected VPC and the associated private subnets.

Configure Amazon MQ additional settings

Finally, select the existing Security Group previously discussed, and make sure that the Public Accessibility option is set to No.

Set Amazon MQ security group settings

That’s it for the brokers. When it is done provisioning, the Amazon MQ dashboard should look like the one shown in the following screenshot. Note the IP addresses of the brokers and the ActiveMQ web console URLs for later.

Amazon MQ dashboard

Configuring a Load Balancer Target Group

The next step in the build process is to configure the load balancer’s target group. This demonstration uses the private IP addresses of the brokers as targets for the Network Load Balancer.

Create and name a target group, select the IP option under Target type, and make sure to select TLS under Protocol and 61617 under Port, as well as the VPC in which your brokers reside. It is important to configure the health check settings so traffic is only routed to active brokers by selecting the TCP protocol and overriding the health check port to 8162, the Apache ActiveMQ console port.

Do not use the OpenWire port as the target group health check port. Because the Network Load Balancer may not be able to recognize the host as healthy on that port, it is better to use the ActiveMQ web console port.

Next, add the brokers’ IP addresses as targets. You can find the broker IP addresses in the Amazon MQ console page after they complete provisioning. Make sure to add both the active and the standby broker to the target group so that when reboots occur, the Network Load Balancer routes traffic to whichever broker is active.

You may be pursuing a more dynamic environment for scaling brokers up and down to handle the demands of a variable message load. In that case, as you scale to add more brokers, make sure that you also add them to the target group.

AWS Lambda would be a great way to programmatically handle adding or removing the broker’s IP addresses to this target group automatically.

Creating a Network Load Balancer

Next, create a Network Load Balancer. This demo uses an internet-facing load balancer with TLS listeners on port 61617, and routes traffic to brokers’ VPC and private subnets.

Configure a network load balancer

Clients must securely connect to the Network Load Balancer, so this demo uses an ACM certificate for the subdomain registered in Route 53, such as mq.example.com. For simplicity, ACM certificate provisioning is not shown. For more information, see Request a Public Certificate.

Make sure that the ACM certificate is provisioned in the same Region as your Network Load Balancer, or the certificate is not displayed in the selection menu.

Next, select the target group that you just created, and select TLS for the connection between the Network Load Balancer and the brokers. Similarly, select the health checks on TCP port 8162.

If all went well, you see the list of brokers’ IP addresses listed as targets. From here, review your settings and confirm you’d like to deploy the Network Load Balancer.

Configuring Route 53

The last step in this build is to configure Route 53 to serve traffic at the subdomain of your choice to your Network Load Balancer.

Go to your Route 53 Hosted Zone, and create a new subdomain record set, such as mq.example.com, that matches the ACM certificate that you previously created. In the Type field, select A – IPv4 address, then select Yes for Alias. This allows you to select the Network Load Balancer as the alias target. Select the Network Load Balancer that you just created from the Alias Target menu and save the record set.

Testing broker connectivity

And that’s it!

There’s an important advantage to this architecture. When you create Amazon MQ active-standby brokers, the Amazon MQ service provides two endpoints. Only one broker host is active at a time, and when configuration changes or other reboot events occur, the standby broker becomes active and the active broker goes to standby. The typical connection string when there is an option to connect to multiple brokers is something similar to the following string

"failover:(ssl://b-ce452fbe-2581-4003-8ce2-4185b1377b43-1.mq.us-west-2.amazonaws.com:61617,ssl://b-ce452fbe-2581-4003-8ce2-4185b1377b43-2.mq.us-west-2.amazonaws.com:61617)"

In this architecture, you use only a single connection URL, but you still want to use the failover protocol to force re-connection if the connection is dropped for any reason.

For ease of use, this solution relies on the Amazon MQ workshop client application code from re:Invent 2018. To test this solution setting the connection URL to the following:

"failover:(ssl://mq.example.com:61617

Run the producer and consumer clients in separate terminal windows.

The messages are sent and received successfully across the internet, while the brokers are hidden behind the Network Load Balancer.

Logging into the broker’s ActiveMQ console

But what if we want to log in to the broker’s ActiveMQ web console?

There are three options. Due to the security group rules, only traffic originating from inside the VPC is allowed to the brokers.

  • Use a VPN connection from the corporate network to the VPC. Most customers likely use this option, but for rapid testing, there is a simple and cost-effective method.
  • Connect to the brokers’ web console through a Route 53 subdomain, which requires creating a separate port 8162 Listener on the existing Network Load Balancer and creating a separate TLS target group on port 8162 for the brokers.
  • Use a bastion host to proxy traffic to the web console.

To use a bastion host, create a small Linux EC2 instance in your public subnet, and make sure that:

  • The EC2 instance has a public IP address.
  • You have access to the SSH key pair.
  • It is placed in a security group that allows SSH port 22 traffic from your location.

For simplicity, this step is not shown, but this demonstration uses a t3.micro Amazon Linux 2 host with all default options as the bastion.

Creating a forwarding tunnel

Next, create a forwarding tunnel through an SSH connection to the bastion host. Below is an example command in the terminal window. This keeps a persistent SSH connection forwarding port 8162 through the bastion host at the public IP address 54.244.188.53.

For example, the command could be:

ssh -D 8162 -C -q -N -I <my-key-pair-name>.pem [email protected]<ec2-ip-address>

You can also configure a browser to tunnel traffic through your proxy.

We have chosen to demonstrate in Firefox. Configure the network settings to use a manual proxy on localhost on the Apache ActiveMQ console port of 8162.  This can be done by opening the Firefox Connection Settings.  In the Configure Proxy Access to the Internet section, select Manual proxy configuration, then set the SOCKS Host to localhost and Port to 8162, leaving other fields empty.

Finally, use the Apache ActiveMQ console URL provided in the Amazon MQ web console details page to connect to the broker through the proxy.

ActiveMQ screenshot

Conclusion

Congratulations! You’ve successfully built a highly available Amazon MQ broker pair in a private subnet. You’ve layered your security defense by putting the brokers behind a highly scalable Network Load Balancer, and you’ve configured routing from a single custom subdomain URL to multiple brokers with health check built in.

To learn more about Amazon MQ and scalable broker communication patterns, we highly recommend the following resources:

Keep on building!

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/simple-two-way-messaging-using-the-amazon-sqs-temporary-queue-client/

This post is contributed by Robin Salkeld, Sr. Software Development Engineer

Amazon SQS is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Asynchronous workflows have always been the primary use case for SQS. Using queues ensures one component can keep running smoothly without losing data when another component is unavailable or slow.

We were surprised, then, to discover that many customers use SQS in synchronous workflows. For example, many applications use queues to communicate between frontends and backends when processing a login request from a user.

Why would anyone use SQS for this? The service stores messages for up to 14 days with high durability, but messages in a synchronous workflow often must be processed within a few minutes, or even seconds. Why not just set up an HTTPS endpoint?

The more we talked to customers, the more we understood. Here’s what we learned:

  • Creating a queue is often easier and faster than creating an HTTPS endpoint and the infrastructure necessary to ensure the endpoint’s scalability.
  • Queues are safe by default because they are locked down to the AWS account that created them. In addition, any DDoS attempt on your service is absorbed by SQS instead of loading down your own servers.
  • There is generally no need to create firewall rules for the communication between microservices if they use queues.
  • Although SQS provides durable storage (which isn’t necessary for short-lived messages), it is still a cost-effective solution for this use case. This is especially true when you consider all the messaging broker maintenance that is done for you.

However, setting up efficient two-way communication through one-way queues requires some non-trivial client-side code. In our previous two-part post series on implementing enterprise integration patterns with AWS messaging services, Point-to-point channels and Publish-subscribe channels, we discussed the Request-Response Messaging Pattern. In this pattern, each requester creates a temporary destination to receive each response message.

The simplest approach is to create a new queue for each response, but this is like building a road just so a single car can drive on it before tearing it down. Technically, this can work (and SQS can create and delete queues quickly), but we can definitely make it faster and cheaper.

To better support short-lived, lightweight messaging destinations, we are pleased to present the Amazon SQS Temporary Queue Client. This client makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill.

Virtual queues

The key concept behind the client is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS and no costs associated with creating a virtual queue.

The Temporary Queue Client includes the AmazonSQSVirtualQueuesClient class for creating and managing virtual queues. This class implements the AmazonSQS interface and adds support for attributes related to virtual queues. You can create a virtual queue using this client by calling the CreateQueue API action and including the HostQueueURL queue attribute. This attribute specifies the existing SQS queue on which to host the virtual queue. The queue URL for a virtual queue is in the form <host queue URL>#<virtual queue name>. For example:

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue#MyVirtualQueueName

When you call the SendMessage or SendMessageBatch API actions on AmazonSQSVirtualQueuesClient with a virtual queue URL, the client first extracts the virtual queue name. It then attaches this name as an additional message attribute to each message, and sends the messages to the host queue. When you call the ReceiveMessage API action on a virtual queue, the calling thread waits for messages to appear in the in-memory buffer for the virtual queue. Meanwhile, a background thread polls the host queue and dispatches messages to these buffers according to the additional message attribute.

This mechanism is similar to how the AmazonSQSBufferedAsyncClient prefetches messages, and the benefits are similar. A single call to SQS can provide messages for up to 10 virtual queues, reducing the API calls that you pay for by up to a factor of ten. Deleting a virtual queue simply removes the client-side resources used to implement them, again without making API calls to SQS.

The diagram below illustrates the end-to-end process for sending messages through virtual queues:

Sending messages through virtual queues

Virtual queues are similar to virtual machines. Just as a virtual machine provides the same experience as a physical machine, a virtual queue divides the resources of a single SQS queue into smaller logical queues. This is ideal for temporary queues, since they frequently only receive a handful of messages in their lifetime. Virtual queues are currently implemented entirely within the Temporary Queue Client, but additional support and optimizations might be added to SQS itself in the future.

In most cases, you don’t have to manage virtual queues yourself. The library also includes the AmazonSQSTemporaryQueuesClient class. This class automatically creates virtual queues when the CreateQueue API action is called and creates host queues on demand for all queues with the same queue attributes. To optimize existing application code that creates and deletes queues, you can use this class as a drop-in replacement implementation of the AmazonSQS interface.

The client also includes the AmazonSQSRequester and AmazonSQSResponder interfaces, which enable two-way communication through SQS queues. The following is an example of an RPC implementation for a web application’s login process.

/**
 * This class handles a user's login request on the client side.
 */
public class LoginClient {

    // The SQS queue to send the requests to.
    private final String requestQueueUrl;

    // The AmazonSQSRequester creates a temporary queue for each response.
    private final AmazonSQSRequester sqsRequester = AmazonSQSRequesterClientBuilder.defaultClient();

    private final LoginClient(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Send a login request to the server.
     */
    public String login(String body) throws TimeoutException {
        SendMessageRequest request = new SendMessageRequest()
                .withMessageBody(body)
                .withQueueUrl(requestQueueUrl);

        // This:
        //  - creates a temporary queue
        //  - attaches its URL as an attribute on the message
        //  - sends the message
        //  - receives the response from the temporary queue
        //  - deletes the temporary queue
        //  - returns the response
        //
        // If something goes wrong and the server's response never shows up, this method throws a
        // TimeoutException.
        Message response = sqsRequester.sendMessageAndGetResponse(request, 20, TimeUnit.SECONDS);
        
        return response.getBody();
    }
}

/**
 * This class processes users' login requests on the server side.
 */
public class LoginServer {

    // The SQS queue to poll for login requests.
    // Assume that on construction a thread is created to poll this queue and call
    // handleLoginRequest() below for each message.
    private final String requestQueueUrl;

    // The AmazonSQSResponder sends responses to the correct response destination.
    private final AmazonSQSResponder sqsResponder = AmazonSQSResponderClientBuilder.defaultClient();

    private final AmazonSQS(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Handle a login request sent from the client above.
     */
    public void handleLoginRequest(Message message) {
        // Assume doLogin does the actual work, and returns a serialized result
        String response = doLogin(message.getBody());

        // This extracts the URL of the temporary queue from the message attribute and sends the
        // response to that queue.
        sqsResponder.sendResponseMessage(MessageContent.fromMessage(message), new MessageContent(response));  
    }
}

Cleaning up

As with any other AWS SDK client, your code should call the shutdown() method when the temporary queue client is no longer needed. The AmazonSQSRequester interface also provides a shutdown() method, which shuts down its internal temporary queue client. This ensures that the in-memory resources needed for any virtual queues are reclaimed, and that the host queue that the client automatically created is also deleted automatically.

However, in the world of distributed systems things are a little more complex. Processes can run out of memory and crash, and hosts can reboot suddenly and unexpectedly. There are even cases where you don’t have the opportunity to run custom code on shutdown.

The Temporary Queue Client client addresses this issue as well. For each host queue with recent API calls, the client periodically uses the TagQueue API action to attach a fresh tag value that indicates the queue is still being used. The tagging process serves as a heartbeat to keep the queue alive. According to a configurable time period (by default, 5 minutes), a background thread uses the ListQueues API action to obtain the URLs of all queues with the configured prefix. Then, it deletes each queue that has not been tagged recently. The mechanism is similar to how the Amazon DynamoDB Lock Client expires stale lock leases.

If you use the AmazonSQSTemporaryQueuesClient directly, you can customize how long queues must be idle before they is deleted by configuring the IdleQueueRetentionPeriodSeconds queue attribute. The client supports setting this attribute on both host queues and virtual queues. For virtual queues, setting this attribute ensures that the in-memory resources do not become a memory leak in the presence of application bugs.

Any API call to a queue marks it as non-idle, including ReceiveMessage calls that don’t return any messages. The only reason to increase the retention period attribute is to give the client more time when it can’t send heartbeats—for example, due to garbage collection pauses or networking issues.

But what if you want to use this client in a fleet of a thousand EC2 instances? Won’t every client spend a lot of time checking every queue for idleness? Doesn’t that imply duplicate work that increases as the fleet is scaled up?

We thought of this too. The Temporary Queue Client creates a shared queue for all clients using the same queue prefix, and uses this queue as a work queue for the distributed task. Instead of every client calling the ListQueues API action every five minutes, a new seed message (which triggers the sweeping process) is sent to this queue every five minutes.

When one of the clients receives this message, it calls the ListQueues API action and sends each queue URL in the result as another kind of message to the same shared work queue. The work of actually checking each queue for idleness is distributed roughly evenly to the active clients, ensuring scalability. There is even a mechanism that works around the fact that the ListQueues API action currently only returns no more than 1,000 queue URLs at time.

Summary

We are excited about how the new Amazon SQS Temporary Queue Client makes more messaging patterns easier and cheaper for you. Download the code from GitHub, have a look at Temporary Queues in the Amazon SQS Developer Guide, try out the client, and let us know what you think!

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Migrating from RabbitMQ to Amazon MQ

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/migrating-from-rabbitmq-to-amazon-mq/

This post is courtesy of Sam Dengler, AWS Solutions Architect.

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Some AWS customers are using RabbitMQ today and would like to migrate to a managed service to reduce the overhead of operating their own message broker.

Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easier to operate and scale message brokers in the cloud. Amazon MQ provides compatibility with your existing workloads that use standard protocols such as OpenWire, AMQP, MQTT, and Stomp (all enabled with SSL). Amazon MQ automatically provisions infrastructure configured as a single-instance broker or as an active/standby broker for high availability.

In this post, I describe how to launch a new Amazon MQ instance. I review example Java code to migrate from a RabbitMQ to Amazon MQ message broker using clients for ActiveMQ, Apache Qpid JMS, and Spring JmsTemplates. I also review best practices for Amazon MQ and changes from RabbitMQ to Amazon MQ to support Publish/Subscribe message patterns.

Getting started with Amazon MQ

To start, open the Amazon MQ console. Enter a broker name and choose Next step.

Launch a new Amazon MQ instance, choosing the mq.t2.micro instance type and Single-instance broker deployment mode, creating a user name and password, and choosing Create broker.

After several minutes, your instance changes status from Creation in progress to Running.  You can visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your instance queues, etc. In the following code examples, you use the OpenWire and AMQP endpoints.

To be able to access your broker, you must configure one of your security groups to allow inbound traffic. For more information, see the link to Detailed instructions in the blue box in the Connections section.

Now that your Amazon MQ broker is running, let’s look at some code!

Dependencies

The following code examples have dependencies across a range of libraries in order to demonstrate RabbitMQ, ActiveMQ, Qpid, Spring JMS templates, and connection pooling. I’ve listed all the dependencies in a single Maven pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MyGroup</groupId>
    <artifactId>MyArtifact</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
    
        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- Apache Connection Pooling -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache QPid -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-client</artifactId>
            <version>6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.29.0</version>
        </dependency>
                
        <!-- Spring JmsTemplate -->
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.3.RELEASE</version>
        </dependency>
        
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>
</project>

RabbitMQ

Here’s an example using RabbitMQ to send and receive a message via queue. The installation and configuration of RabbitMQ is out of scope for this post. For instructions for downloading and installing RabbitMQ, see Downloading and Installing RabbitMQ.

RabbitMQ uses the AMQP 0-9-1 protocol by default, with support for AMQP 1.0 via a plugin. The RabbitMQ examples in this post use the AMQP 0-9

RabbitMQ queue example

To start, here’s some sample code to send and receive a message in RabbitMQ using a queue.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT;
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish("", QUEUE, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        Connection consumerConnection = connectionFactory.newConnection();

        // Create a channel for the consumer.
        Channel consumerChannel = consumerConnection.createChannel();

        // Create a queue named "MyQueue".
        consumerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Receive the message.
        GetResponse response = consumerChannel.basicGet(QUEUE, ACKNOWLEDGE_MODE);
        String message = new String(response.getBody(), "UTF-8");
        System.out.println("Message received: " + message);

        // Clean up the consumer.
        consumerChannel.close();
        consumerConnection.close();
    }
}

In this example, you need to specify the ENDPOINT, USERNAME, and PASSWORD for your RabbitMQ message broker using environment variables or dependency injection.

This example uses the RabbitMQ client library to establish connectivity to the message broker and a channel for communication. In RabbitMQ, messages are sent over the channel to a named queue, which stores messages in a buffer, and from which consumers can receive and process messages. In this example, you publish a message using the Channel.basicPublish method, using the default exchange, identified by an empty string (“”).

To receive and process the messages in the queue, create a second connection, channel, and queue. Queue declaration is an idempotent operation, so there is no harm in declaring it twice. In this example, you receive the message using the Channel.basicGet method, automatically acknowledging message receipt to the broker.

This example demonstrates the basics of sending and receiving a message of one type. However, what if you wanted to publish messages of different types such that various consumers could subscribe only to pertinent message types (that is, pub/sub)?  Here’s a RabbitMQ example using topic exchanges to route messages to different queues.

RabbitMQ topic example

This example is similar to the one earlier. To enable topic publishing, specify two additional properties: EXCHANGE and ROUTING_KEY. RabbitMQ uses the exchange and routing key properties for routing messaging. Look at how these properties change the code to publish a message.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, Queue, Exhange, and Routing Key should
    // be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "amqp://localhost:5672"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    private static final String EXCHANGE = "MyExchange";
    private static final String ROUTING_KEY = "MyRoutingKey";
    
    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create an exchange named "MyExchange".
        producerChannel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);

        // Bind "MyQueue" to "MyExchange", using routing key "MyRoutingKey".
        producerChannel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish(EXCHANGE, ROUTING_KEY, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();
        
        ...


As before, you establish a connection to the RabbitMQ message broker, a channel for communication, and a queue to buffer messages for consumption. In addition to these components, you declare an explicit exchange of type BuiltinExchangeType.TOPIC and bind the queue to the exchange using the ROUTING_KEY that filters messages to send to the queue.

Again, publish a message using the Channel.basicPublish method. This time, instead of publishing the message to a queue, specify the EXCHANGE and ROUTING_KEY values for the message. RabbitMQ uses these properties to route the message to the appropriate queue, from which a consumer receives the message using the same code from the first example.

JMS API

Now that you’ve seen examples for queue and topic publishing in RabbitMQ, look at code changes to support Amazon MQ, starting with the ActiveMQ client. But first, a quick review of the Java Messaging Service (JMS) API.

The remainder of the examples in this post use the JMS API, which abstracts messaging methods from underlying protocol and client implementations. The JMS API programming model uses a combination of connection factories, connections, sessions, destinations, message producers, and message consumers to send and receive messages. The following image (from The Java EE 6 Tutorial) shows the relationship between these components:

ActiveMQ OpenWire connectivity to Amazon MQ

Here’s how JMS is used with ActiveMQ to send and receive messages on a queue.

The ActiveMQ client uses the OpenWire protocol, supported by Amazon MQ. The OpenWire protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

ActiveMQ queue example

Next, here’s an example to send and receive messages to Amazon MQ using the ActiveMQ client. This example should look familiar, as it follows the same flow to send and receive messages via a queue. I’ve included the example in full and then highlighted the differences to consider when migrating from RabbitMQ.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, you use the ActiveMQ client to establish connectivity to AmazonMQ using the OpenWire protocol with the ActiveMQConnectionFactory class to specify the endpoint and user credentials. For this example, use the master user name and password chosen when creating the Amazon MQ broker earlier. However, it’s a best practice to create additional Amazon MQ users for brokers in non-sandbox environments.

You could use the ActiveMQConnectionFactory to establish connectivity to the Amazon MQ broker. However, it is a best practice in Amazon MQ to group multiple producer requests using the ActiveMQ PooledConnectionFactory to wrap the ActiveMQConnectionFactory.

Using the PooledConnectionFactory, you can create a connection to Amazon MQ and establish a session to send a message. Like the RabbitMQ queue example, create a message queue destination using the Session.createQueue method, and a message producer to send the message to the queue.

For the consumer, use the ActiveMQConnectionFactory, NOT the PooledConnectionFactory, to create a connection, session, queue destination, and message consumer to receive the message because pooling of consumers is not considered a best practice. For more information, see the ActiveMQ Spring Support page.

ActiveMQ virtual destinations on Amazon MQ

Here’s how topic publishing differs from RabbitMQ to Amazon MQ.

If you remember from the RabbitMQ topic example, you bound a queue to an exchange using a routing key to control queue destinations when sending messages using a key attribute.

You run into a problem if you try to implement topic subscription using the message consumer in the preceding ActiveMQ queue example. The following is an excerpt from Virtual Destinations, which provides more detail on this subject:

A JMS durable subscriber MessageConsumer is created with a unique JMS clientID and durable subscriber name. To be JMS-compliant, only one JMS connection can be active at any point in time for one JMS clientID, and only one consumer can be active for a clientID and subscriber name. That is, only one thread can be actively consuming from a given logical topic subscriber.

To solve this, ActiveMQ supports the concept of a virtual destination, which provides a logical topic subscription access to a physical queue for consumption without breaking JMS compliance. To do so, ActiveMQ uses a simple convention for specifying the topic and queue names to configure message routing.

  • Topic names must use the “VirtualTopic.” prefix, followed by the topic name. For example, VirtualTopic.MyTopic.
  • Consumer names must use the “Consumer.” prefix, followed by the consumer name, followed by the topic name. For example, Consumer.MyConsumer.VirtualTopic.MyTopic.

ActiveMQ topic example

Next, here’s an example for the ActiveMQ client that demonstrates publishing messaging to topics. This example is similar to the ActiveMQ Queue Example. In this one, create a Topic destination instead of a queue destination.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a topic named "VirtualTopic.MyTopic".
        Destination producerDestination = producerSession.createTopic(PRODUCER_TOPIC);

        // Create a producer from the session to the topic.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue called "Consumer.Consumer1.VirtualTopic.MyTopic".
        Destination consumerDestination = consumerSession.createQueue(CONSUMER1_TOPIC);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, the message producer uses the Session.createTopic method with the topic name, VirtualTopic.MyTopic, as the publishing destination. The message consumer code does not change, but the queue destination uses the virtual destination convention, Consumer.Consumer1.VirtualTopic.MyTopic. ActiveMQ uses these names for the topic and queue to route messages accordingly.

AMQP connectivity to Amazon MQ

Now that you’ve explored some examples using an ActiveMQ client, look at examples using the Qpid JMS client to connect to the Amazon MQ broker over the AMQP 1.0 protocol and see how they differ.

The Qpid client uses the Advanced Message Queuing Protocol (AMQP) 1.0 protocol, supported by Amazon MQ. The AMQP 1.0 protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It uses port 5671, which must be opened in the Security Group associated with the Amazon MQ broker.

The AMQP endpoint specifies a transport, amqp+ssl. For encrypted connections, Qpid expects the protocol name to be amqps, instead of amqp+ssl, however the rest of the connection address remains the same.

Qpid JMS queue example

Next, here’s an example to send and receive messages to Amazon MQ using the Qpid client. The Qpid JMS client is built using Apache Qpid Proton, an AMQP messaging toolkit.

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class QpidClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:5671"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException, NamingException {
        // Use JNDI to specify the AMQP endpoint
        Hashtable<Object, Object> env = new Hashtable<Object, Object>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put("connectionfactory.factoryLookup", ENDPOINT);
        javax.naming.Context context = new javax.naming.InitialContext(env);

        // Create a connection factory.
        ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection(USERNAME, PASSWORD);
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Qpid Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection(USERNAME, PASSWORD);
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

The Qpid queue example is similar to the ActiveMQ Queue Example. They both use the JMS API model to send and receive messages, but the difference is in how the ConnectionFactory and AMQP endpoint is specified. According to the Qpid client configuration documentation, the ConnectionFactory is specified using a JNDI InitialContext to look up JMS objects. The JNDI configuration is popularly specified in a file named jndi.properties on the Java Classpath. In this example, do it programmatically using a HashTable for simplicity.

NOTE: Although the Qpid client and Qpid JMS client are used to establish connectivity to Amazon MQ using the AMQP 1.0 protocol, the producer should still use the ActiveMQ PooledConnectionFactory to wrap the Qpid ConnectionFactory. This can be confusing because Qpid client provides a PooledConnectionFactory that should NOT be used for AMQP 1.0.

The Qpid topic example is identical to the earlier ActiveMQ topic example with the same substitution, which establishes the ConnectionFactory to the AMQP 1.0 endpoint via JNDI.

Spring JMS template queue example

Finally, here are examples using the Spring JmsTemplate to send and receive messages.

This example established connectivity to Amazon MQ using the same protocol and client library used in the ActiveMQ queue example. That example requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

The Spring JmsTemplate provides a higher-level abstraction on top of JMS. Code using the JmsTemplate class only needs to implement handlers to process messages, while the management of connections, sessions, message producers, and message consumers is delegated to Spring. Look at the following code:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
        
        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}

Although Spring manages connections, sessions, and message producers, the grouping of producer connections is still a best practice. The ActiveMQ PooledConnectionFactory class is used in this example. However, the Spring CachingConnectionFactory object is another option.

Following the PooledConnectionFactory creation, a JmsTemplate is created for the producer and an ActiveMQQueue is created as the message destination. To use JmsTemplate to send a message, a MessageCreator callback is defined that generates a text message via the JmsTemplate.

A second JmsTemplate with an ActiveMQQueue is created for the consumer. In this example, a single message is received synchronously, however, asynchronous message reception is a popular alternative when using message-driven POJOs.

Unlike the ActiveMQ examples, the Spring JMS template example does not require the explicit cleanup of the connection, session, message producer, or message consumer resources, as that is managed by Spring. Make sure to call the PooledConnectionFactory.stop method to cleanly exit the main method.

Finally, here’s an example using a Spring JmsTemplate for topic publishing.

Spring JmsTemplate topic example

This example combines the Spring JmsTemplate queue example with the virtual destinations approach from the ActiveMQ topic example. Look at the following code.

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQTopic(PRODUCER_TOPIC));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);

        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(CONSUMER1_TOPIC));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}
In this example, follow the ActiveMQ virtual destination naming convention for topics and queues:
  • When creating the producer JMS template, specify an ActiveMQTopic as the destination using the name VirtualTopic.MyTopic.
  • When creating the consumer JMS template, specify an ActiveMQQueue as the destination using the name Consumer.Consumer1.VirtualTopic.MyTopic.

ActiveMQ automatically handles routing messages from topic to queue.

Conclusion

In this post, I reviewed how to get started with an Amazon MQ broker and walked you through several code examples that explored the differences between RabbitMQ and Apache ActiveMQ client integrations. If you are considering migrating to Amazon MQ, these examples should help you understand the changes that might be required.

If you’re thinking about integrating your existing apps with new serverless apps, see the related post, Invoking AWS Lambda from Amazon MQ.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year for new AWS accounts.

Running ActiveMQ in a Hybrid Cloud Environment with Amazon MQ

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/running-activemq-in-a-hybrid-cloud-environment-with-amazon-mq/

This post courtesy of Greg Share, AWS Solutions Architect

Many organizations, particularly enterprises, rely on message brokers to connect and coordinate different systems. Message brokers enable distributed applications to communicate with one another, serving as the technological backbone for their IT environment, and ultimately their business services. Applications depend on messaging to work.

In many cases, those organizations have started to build new or “lift and shift” applications to AWS. In some cases, there are applications, such as mainframe systems, too costly to migrate. In these scenarios, those on-premises applications still need to interact with cloud-based components.

Amazon MQ is a managed message broker service for ActiveMQ that enables organizations to send messages between applications in the cloud and on-premises to enable hybrid environments and application modernization. For example, you can invoke AWS Lambda from queues and topics managed by Amazon MQ brokers to integrate legacy systems with serverless architectures. ActiveMQ is an open-source message broker written in Java that is packaged with clients in multiple languages, Java Message Server (JMS) client being one example.

This post shows you can use Amazon MQ to integrate on-premises and cloud environments using the network of brokers feature of ActiveMQ. It provides configuration parameters for a one-way duplex connection for the flow of messages from an on-premises ActiveMQ message broker to Amazon MQ.

ActiveMQ and the network of brokers

First, look at queues within ActiveMQ and then at the network of brokers as a mechanism to distribute messages.

The network of brokers behaves differently from models such as physical networks. The key consideration is that the production (sending) of a message is disconnected from the consumption of that message. Think of the delivery of a parcel: The parcel is sent by the supplier (producer) to the end customer (consumer). The path it took to get there is of little concern to the customer, as long as it receives the package.

The same logic can be applied to the network of brokers. Here’s how you build the flow from a simple message to a queue and build toward a network of brokers. Before you look at setting up a hybrid connection, I discuss how a broker processes messages in a simple scenario.

When a message is sent from a producer to a queue on a broker, the following steps occur:

  1. A message is sent to a queue from the producer.
  2. The broker persists this in its store or journal.
  3. At this point, an acknowledgement (ACK) is sent to the producer from the broker.

When a consumer looks to consume the message from that same queue, the following steps occur:

  1. The message listener (consumer) calls the broker, which creates a subscription to the queue.
  2. Messages are fetched from the message store and sent to the consumer.
  3. The consumer acknowledges that the message has been received before processing it.
  4. Upon receiving the ACK, the broker sets the message as having been consumed. By default, this deletes it from the queue.
    • You can set the consumer to ACK after processing by setting up transaction management or handle it manually using Session.CLIENT_ACKNOWLEDGE.

Static propagation

I now introduce the concept of static propagation with the network of brokers as the mechanism for message transfer from on-premises brokers to Amazon MQ.  Static propagation refers to message propagation that occurs in the absence of subscription information. In this case, the objective is to transfer messages arriving at your selected on-premises broker to the Amazon MQ broker for consumption within the cloud environment.

After you configure static propagation with a network of brokers, the following occurs:

  1. The on-premises broker receives a message from a producer for a specific queue.
  2. The on-premises broker sends (statically propagates) the message to the Amazon MQ broker.
  3. The Amazon MQ broker sends an acknowledgement to the on-premises broker, which marks the message as having been consumed.
  4. Amazon MQ holds the message in its queue ready for consumption.
  5. A consumer connects to Amazon MQ broker, subscribes to the queue in which the message resides, and receives the message.
  6. Amazon MQ broker marks the message as having been consumed.

Getting started

The first step is creating an Amazon MQ broker.

  1. Sign in to the Amazon MQ console and launch a new Amazon MQ broker.
  2. Name your broker and choose Next step.
  3. For Broker instance type, choose your instance size:
    mq.t2.micro
    mq.m4.large
  4. For Deployment mode, enter one of the following:
    Single-instance broker for development and test implementations (recommended)
    Active/standby broker for high availability in production environments
  5. Scroll down and enter your user name and password.
  6. Expand Advanced Settings.
  7. For VPC, Subnet, and Security Group, pick the values for the resources in which your broker will reside.
  8. For Public Accessibility, choose Yes, as connectivity is internet-based. Another option would be to use private connectivity between your on-premises network and the VPC, an example being an AWS Direct Connect or VPN connection. In that case, you could set Public Accessibility to No.
  9. For Maintenance, leave the default value, No preference.
  10. Choose Create Broker. Wait several minutes for the broker to be created.

After creation is complete, you see your broker listed.

For connectivity to work, you must configure the security group where Amazon MQ resides. For this post, I focus on the OpenWire protocol.

For Openwire connectivity, allow port 61617 access for Amazon MQ from your on-premises ActiveMQ broker source IP address. For alternate protocols, see the Amazon MQ broker configuration information for the ports required:

OpenWire – ssl://xxxxxxx.xxx.com:61617
AMQP – amqp+ssl:// xxxxxxx.xxx.com:5671
STOMP – stomp+ssl:// xxxxxxx.xxx.com:61614
MQTT – mqtt+ssl:// xxxxxxx.xxx.com:8883
WSS – wss:// xxxxxxx.xxx.com:61619

Configuring the network of brokers

Configuring the network of brokers with static propagation occurs on the on-premises broker by applying changes to the following file:
<activemq install directory>/conf activemq.xml

Network connector

This is the first configuration item required to enable a network of brokers. It is only required on the on-premises broker, which initiates and creates the connection with Amazon MQ. This connection, after it’s established, enables the flow of messages in either direction between the on-premises broker and Amazon MQ. The focus of this post is the uni-directional flow of messages from the on-premises broker to Amazon MQ.

The default activemq.xml file does not include the network connector configuration. Add this with the networkConnector element. In this scenario, edit the on-premises broker activemq.xml file to include the following information between <systemUsage> and <transportConnectors>:

<networkConnectors>
             <networkConnector 
                name="Q:source broker name->target broker name"
                duplex="false" 
                uri="static:(ssl:// aws mq endpoint:61617)" 
                userName="username"
                password="password" 
                networkTTL="2" 
                dynamicOnly="false">
                <staticallyIncludedDestinations>
                    <queue physicalName="queuename"/>
                </staticallyIncludedDestinations> 
                <excludedDestinations>
                      <queue physicalName=">" />
                </excludedDestinations>
             </networkConnector> 
     <networkConnectors>

The highlighted components are the most important elements when configuring your on-premises broker.

  • name – Name of the network bridge. In this case, it specifies two things:
    • That this connection relates to an ActiveMQ queue (Q) as opposed to a topic (T), for reference purposes.
    • The source broker and target broker.
  • duplex –Setting this to false ensures that messages traverse uni-directionally from the on-premises broker to Amazon MQ.
  • uri –Specifies the remote endpoint to which to connect for message transfer. In this case, it is an Openwire endpoint on your Amazon MQ broker. This information could be obtained from the Amazon MQ console or via the API.
  • username and password – The same username and password configured when creating the Amazon MQ broker, and used to access the Amazon MQ ActiveMQ console.
  • networkTTL – Number of brokers in the network through which messages and subscriptions can pass. Leave this setting at the current value, if it is already included in your broker connection.
  • staticallyIncludedDestinations > queue physicalName – The destination ActiveMQ queue for which messages are destined. This is the queue that is propagated from the on-premises broker to the Amazon MQ broker for message consumption.

After the network connector is configured, you must restart the ActiveMQ service on the on-premises broker for the changes to be applied.

Verify the configuration

There are a number of places within the ActiveMQ console of your on-premises and Amazon MQ brokers to browse to verify that the configuration is correct and the connection has been established.

On-premises broker

Launch the ActiveMQ console of your on-premises broker and navigate to Network. You should see an active network bridge similar to the following:

This identifies that the connection between your on-premises broker and your Amazon MQ broker is up and running.

Now navigate to Connections and scroll to the bottom of the page. Under the Network Connectors subsection, you should see a connector labeled with the name: value that you provided within the ActiveMQ.xml configuration file. You should see an entry similar to:

Amazon MQ broker

Launch the ActiveMQ console of your Amazon MQ broker and navigate to Connections. Scroll to the Connections openwire subsection and you should see a connection specified that references the name: value that you provided within the ActiveMQ.xml configuration file. You should see an entry similar to:

If you configured the uri: for AMQP, STOMP, MQTT, or WSS as opposed to Openwire, you would see this connection under the corresponding section of the Connections page.

Testing your message flow

The setup described outlines a way for messages produced on premises to be propagated to the cloud for consumption in the cloud. This section provides steps on verifying the message flow.

Verify that the queue has been created

After you specify this queue name as staticallyIncludedDestinations > queue physicalName: and your ActiveMQ service starts, you see the following on your on-premises ActiveMQ console Queues page.

As you can see, no messages have been sent but you have one consumer listed. If you then choose Active Consumers under the Views column, you see Active Consumers for TestingQ.

This is telling you that your Amazon MQ broker is a consumer of your on-premises broker for the testing queue.

Produce and send a message to the on-premises broker

Now, produce a message on an on-premises producer and send it to your on-premises broker to a queue named TestingQ. If you navigate back to the queues page of your on-premises ActiveMQ console, you see that the messages enqueued and messages dequeued column count for your TestingQ queue have changed:

What this means is that the message originating from the on-premises producer has traversed the on-premises broker and propagated immediately to the Amazon MQ broker. At this point, the message is no longer available for consumption from the on-premises broker.

If you access the ActiveMQ console of your Amazon MQ broker and navigate to the Queues page, you see the following for the TestingQ queue:

This means that the message originally sent to your on-premises broker has traversed the network of brokers unidirectional network bridge, and is ready to be consumed from your Amazon MQ broker. The indicator is the Number of Pending Messages column.

Consume the message from an Amazon MQ broker

Connect to the Amazon MQ TestingQ queue from a consumer within the AWS Cloud environment for message consumption. Log on to the ActiveMQ console of your Amazon MQ broker and navigate to the Queue page:

As you can see, the Number of Pending Messages column figure has changed to 0 as that message has been consumed.

This diagram outlines the message lifecycle from the on-premises producer to the on-premises broker, traversing the hybrid connection between the on-premises broker and Amazon MQ, and finally consumption within the AWS Cloud.

Conclusion

This post focused on an ActiveMQ-specific scenario for transferring messages within an ActiveMQ queue from an on-premises broker to Amazon MQ.

For other on-premises brokers, such as IBM MQ, another approach would be to run ActiveMQ on-premises broker and use JMS bridging to IBM MQ, while using the approach in this post to forward to Amazon MQ. Yet another approach would be to use Apache Camel for more sophisticated routing.

I hope that you have found this example of hybrid messaging between an on-premises environment in the AWS Cloud to be useful. Many customers are already using on-premises ActiveMQ brokers, and this is a great use case to enable hybrid cloud scenarios.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.

 

Invoking AWS Lambda from Amazon MQ

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/invoking-aws-lambda-from-amazon-mq/

Contributed by Josh Kahn, AWS Solutions Architect

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

In this post, I discuss one approach to invoking AWS Lambda from queues and topics managed by Amazon MQ brokers. This and other similar patterns can be useful in integrating legacy systems with serverless architectures. You could also integrate systems already migrated to the cloud that use common APIs such as JMS.

For example, imagine that you work for a company that produces training videos and which recently migrated its video management system to AWS. The on-premises system used to publish a message to an ActiveMQ broker when a video was ready for processing by an on-premises transcoder. However, on AWS, your company uses Amazon Elastic Transcoder. Instead of modifying the management system, Lambda polls the broker for new messages and starts a new Elastic Transcoder job. This approach avoids changes to the existing application while refactoring the workload to leverage cloud-native components.

This solution uses Amazon CloudWatch Events to trigger a Lambda function that polls the Amazon MQ broker for messages. Instead of starting an Elastic Transcoder job, the sample writes the received message to an Amazon DynamoDB table with a time stamp indicating the time received.

Getting started

To start, navigate to the Amazon MQ console. Next, launch a new Amazon MQ instance, selecting Single-instance Broker and supplying a broker name, user name, and password. Be sure to document the user name and password for later.

For the purposes of this sample, choose the default options in the Advanced settings section. Your new broker is deployed to the default VPC in the selected AWS Region with the default security group. For this post, you update the security group to allow access for your sample Lambda function. In a production scenario, I recommend deploying both the Lambda function and your Amazon MQ broker in your own VPC.

After several minutes, your instance changes status from “Creation Pending” to “Available.” You can then visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your broker, publish test messages, and so on. In this example, use the Stomp protocol to connect to your broker. Be sure to capture the broker host name, for example:

<BROKER_ID>.mq.us-east-1.amazonaws.com

You should also modify the Security Group for the broker by clicking on its Security Group ID. Click the Edit button and then click Add Rule to allow inbound traffic on port 8162 for your IP address.

Deploying and scheduling the Lambda function

To simplify the deployment of this example, I’ve provided an AWS Serverless Application Model (SAM) template that deploys the sample function and DynamoDB table, and schedules the function to be invoked every five minutes. Detailed instructions can be found with sample code on GitHub in the amazonmq-invoke-aws-lambda repository, with sample code. I discuss a few key aspects in this post.

First, SAM makes it easy to deploy and schedule invocation of our function:

SubscriberFunction:
	Type: AWS::Serverless::Function
	Properties:
		CodeUri: subscriber/
		Handler: index.handler
		Runtime: nodejs6.10
		Role: !GetAtt SubscriberFunctionRole.Arn
		Timeout: 15
		Environment:
			Variables:
				HOST: !Ref AmazonMQHost
				LOGIN: !Ref AmazonMQLogin
				PASSWORD: !Ref AmazonMQPassword
				QUEUE_NAME: !Ref AmazonMQQueueName
				WORKER_FUNCTIOn: !Ref WorkerFunction
		Events:
			Timer:
				Type: Schedule
				Properties:
					Schedule: rate(5 minutes)

WorkerFunction:
Type: AWS::Serverless::Function
	Properties:
		CodeUri: worker/
		Handler: index.handler
		Runtime: nodejs6.10
Role: !GetAtt WorkerFunctionRole.Arn
		Environment:
			Variables:
				TABLE_NAME: !Ref MessagesTable

In the code, you include the URI, user name, and password for your newly created Amazon MQ broker. These allow the function to poll the broker for new messages on the sample queue.

The sample Lambda function is written in Node.js, but clients exist for a number of programming languages.

stomp.connect(options, (error, client) => {
	if (error) { /* do something */ }

	let headers = {
		destination: ‘/queue/SAMPLE_QUEUE’,
		ack: ‘auto’
	}

	client.subscribe(headers, (error, message) => {
		if (error) { /* do something */ }

		message.readString(‘utf-8’, (error, body) => {
			if (error) { /* do something */ }

			let params = {
				FunctionName: MyWorkerFunction,
				Payload: JSON.stringify({
					message: body,
					timestamp: Date.now()
				})
			}

			let lambda = new AWS.Lambda()
			lambda.invoke(params, (error, data) => {
				if (error) { /* do something */ }
			})
		}
})
})

Sending a sample message

For the purpose of this example, use the Amazon MQ console to send a test message. Navigate to the details page for your broker.

About midway down the page, choose ActiveMQ Web Console. Next, choose Manage ActiveMQ Broker to launch the admin console. When you are prompted for a user name and password, use the credentials created earlier.

At the top of the page, choose Send. From here, you can send a sample message from the broker to subscribers. For this example, this is how you generate traffic to test the end-to-end system. Be sure to set the Destination value to “SAMPLE_QUEUE.” The message body can contain any text. Choose Send.

You now have a Lambda function polling for messages on the broker. To verify that your function is working, you can confirm in the DynamoDB console that the message was successfully received and processed by the sample Lambda function.

First, choose Tables on the left and select the table name “amazonmq-messages” in the middle section. With the table detail in view, choose Items. If the function was successful, you’ll find a new entry similar to the following:

If there is no message in DynamoDB, check again in a few minutes or review the CloudWatch Logs group for Lambda functions that contain debug messages.

Alternative approaches

Beyond the approach described here, you may consider other approaches as well. For example, you could use an intermediary system such as Apache Flume to pass messages from the broker to Lambda or deploy Apache Camel to trigger Lambda via a POST to API Gateway. There are trade-offs to each of these approaches. My goal in using CloudWatch Events was to introduce an easily repeatable pattern familiar to many Lambda developers.

Summary

I hope that you have found this example of how to integrate AWS Lambda with Amazon MQ useful. If you have expertise or legacy systems that leverage APIs such as JMS, you may find this useful as you incorporate serverless concepts in your enterprise architectures.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.

Cross-Account Integration with Amazon SNS

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/cross-account-integration-with-amazon-sns/

Contributed by Zak Islam, Senior Manager, Software Development, AWS Messaging

 

Amazon Simple Notification Service (Amazon SNS) is a fully managed AWS service that makes it easy to decouple your application components and fan-out messages. SNS provides topics (similar to topics in message brokers such as RabbitMQ or ActiveMQ) that you can use to create 1:1, 1:N, or N:N producer/consumer design patterns. For more information about how to send messages from SNS to Amazon SQS, AWS Lambda, or HTTP(S) endpoints in the same account, see Sending Amazon SNS Messages to Amazon SQS Queues.

SNS can be used to send messages within a single account or to resources in different accounts to create administrative isolation. This enables administrators to grant only the minimum level of permissions required to process a workload (for example, limiting the scope of your application account to only send messages and to deny deletes). This approach is commonly known as the “principle of least privilege.” If you are interested, read more about AWS’s multi-account security strategy.

This is great from a security perspective, but why would you want to share messages between accounts? It may sound scary, but it’s a common practice to isolate application components (such as producer and consumer) to operate using different AWS accounts to lock down privileges in case credentials are exposed. In this post, I go slightly deeper and explore how to set up your SNS topic so that it can route messages to SQS queues that are owned by a separate AWS account.

Potential use cases

First, look at a common order processing design pattern:

This is a simple architecture. A web server submits an order directly to an SNS topic, which then fans out messages to two SQS queues. One SQS queue is used to track all incoming orders for audits (such as anti-entropy, comparing the data of all replicas and updating each replica to the newest version). The other is used to pass the request to the order processing systems.

Imagine now that a few years have passed, and your downstream processes no longer scale, so you are kicking around the idea of a re-architecture project. To thoroughly test your system, you need a way to replay your production messages in your development system. Sure, you can build a system to replicate and replay orders from your production environment in your development environment. Wouldn’t it be easier to subscribe your development queues to the production SNS topic so you can test your new system in real time? That’s exactly what you can do here.

Here’s another use case. As your business grows, you recognize the need for more metrics from your order processing pipeline. The analytics team at your company has built a metrics aggregation service and ingests data via a central SQS queue. Their architecture is as follows:

Again, it’s a fairly simple architecture. All data is ingested via SQS queues (master_ingest_queue, in this case). You subscribe the master_ingest_queue, running under the analytics team’s AWS account, to the topic that is in the order management team’s account.

Making it work

Now that you’ve seen a few scenarios, let’s dig into the details. There are a couple of ways to link an SQS queue to an SNS topic (subscribe a queue to a topic):

  1. The queue owner can create a subscription to the topic.
  2. The topic owner can subscribe a queue in another account to the topic.

Queue owner subscription

What happens when the queue owner subscribes to a topic? In this case, assume that the topic owner has given permission to the subscriber’s account to call the Subscribe API action using the topic ARN (Amazon Resource Name). For the examples below, also assume the following:

  •  Topic_Owner is the identifier for the account that owns the topic MainTopic
  • Queue_Owner is the identifier for the account that owns the queue subscribed to the main topic

To enable the subscriber to subscribe to a topic, the topic owner must add the sns:Subscribe and topic ARN to the topic policy via the AWS Management Console, as follows:

{
  "Version":"2012-10-17",
  "Id":"MyTopicSubscribePolicy",
  "Statement":[{
      "Sid":"Allow-other-account-to-subscribe-to-topic",
      "Effect":"Allow",
      "Principal":{
        "AWS":"Topic_Owner"
      },
      "Action":"sns:Subscribe",
      "Resource":"arn:aws:sns:us-east-1:Queue_Owner:MainTopic"
    }
  ]
}

After this has been set up, the subscriber (using account Queue_Owner) can call Subscribe to link the queue to the topic. After the queue has been successfully subscribed, SNS starts to publish notifications. In this case, neither the topic owner nor the subscriber have had to process any kind of confirmation message.

Topic owner subscription

The second way to subscribe an SQS queue to an SNS topic is to have the Topic_Owner account initiate the subscription for the queue from account Queue_Owner. In this case, SNS first sends a confirmation message to the queue. To confirm the subscription, a user who can read messages from the queue must visit the URL specified in the SubscribeURL value in the message. Until the subscription is confirmed, no notifications published to the topic are sent to the queue. To confirm a subscription, you can use the SQS console or the ReceiveMessage API action.

What’s next?

In this post, I covered a few simple use cases but the principles can be extended to complex systems as well. As you architect new systems and refactor existing ones, think about where you can leverage queues (SQS) and topics (SNS) to build a loosely coupled system that can be quickly and easily extended to meet your business need.

For step by step instructions, see Sending Amazon SNS messages to an Amazon SQS queue in a different account. You can also visit the following resources to get started working with message queues and topics:

Introducing Cost Allocation Tags for Amazon SQS

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/introducing-cost-allocation-tags-for-amazon-sqs/

You have long had the ability to tag your AWS resources and to see cost breakouts on a per-tag basis. Cost allocation was launched in 2012 (see AWS Cost Allocation for Customer Bills) and we have steadily added support for additional services, most recently DynamoDB (Introducing Cost Allocation Tags for Amazon DynamoDB), Lambda (AWS Lambda Supports Tagging and Cost Allocations), and EBS (New – Cost Allocation for AWS Snapshots).

Today, we are launching tag-based cost allocation for Amazon Simple Queue Service (SQS). You can now assign tags to your queues and use them to manage your costs at any desired level: application, application stage (for a loosely coupled application that communicates via queues), project, department, or developer. After you have tagged your queues, you can use the AWS Tag Editor to search queues that have tags of interest.

Here’s how I would add three tags (app, stage, and department) to one of my queues:

This feature is available now in all AWS Regions and you can start using in today! To learn more about tagging, read Tagging Your Amazon SQS Queues. To learn more about cost allocation via tags, read Using Cost Allocation Tags. To learn more about how to use message queues to build loosely coupled microservices for modern applications, read our blog post (Building Loosely Coupled, Scalable, C# Applications with Amazon SQS and Amazon SNS) and watch the recording of our recent webinar, Decouple and Scale Applications Using Amazon SQS and Amazon SNS.

If you are coming to AWS re:Invent, plan to attend session ARC 330: How the BBC Built a Massive Media Pipeline Using Microservices. In the talk you will find out how they used SNS and SQS to improve the elasticity and reliability of the BBC iPlayer architecture.

Jeff;

Dynamic Users with systemd

Post Syndicated from Lennart Poettering original http://0pointer.net/blog/dynamic-users-with-systemd.html

TL;DR: you may now configure systemd to dynamically allocate a UNIX
user ID for service processes when it starts them and release it when
it stops them. It’s pretty secure, mixes well with transient services,
socket activated services and service templating.

Today we released systemd
235
. Among
other improvements this greatly extends the dynamic user logic of
systemd. Dynamic users are a powerful but little known concept,
supported in its basic form since systemd 232. With this blog story I
hope to make it a bit better known.

The UNIX user concept is the most basic and well-understood security
concept in POSIX operating systems. It is UNIX/POSIX’ primary security
concept, the one everybody can agree on, and most security concepts
that came after it (such as process capabilities, SELinux and other
MACs, user name-spaces, …) in some form or another build on it, extend
it or at least interface with it. If you build a Linux kernel with all
security features turned off, the user concept is pretty much the one
you’ll still retain.

Originally, the user concept was introduced to make multi-user systems
a reality, i.e. systems enabling multiple human users to share the
same system at the same time, cleanly separating their resources and
protecting them from each other. The majority of today’s UNIX systems
don’t really use the user concept like that anymore though. Most of
today’s systems probably have only one actual human user (or even
less!), but their user databases (/etc/passwd) list a good number
more entries than that. Today, the majority of UNIX users in most
environments are system users, i.e. users that are not the technical
representation of a human sitting in front of a PC anymore, but the
security identity a system service — an executable program — runs
as. Event though traditional, simultaneous multi-user systems slowly
became less relevant, their ground-breaking basic concept became the
cornerstone of UNIX security. The OS is nowadays partitioned into
isolated services — and each service runs as its own system user, and
thus within its own, minimal security context.

The people behind the Android OS realized the relevance of the UNIX
user concept as the primary security concept on UNIX, and took its use
even further: on Android not only system services take benefit of the
UNIX user concept, but each UI app gets its own, individual user
identity too — thus neatly separating app resources from each other,
and protecting app processes from each other, too.

Back in the more traditional Linux world things are a bit less
advanced in this area. Even though users are the quintessential UNIX
security concept, allocation and management of system users is still a
pretty limited, raw and static affair. In most cases, RPM or DEB
package installation scripts allocate a fixed number of (usually one)
system users when you install the package of a service that wants to
take benefit of the user concept, and from that point on the system
user remains allocated on the system and is never deallocated again,
even if the package is later removed again. Most Linux distributions
limit the number of system users to 1000 (which isn’t particularly a
lot). Allocating a system user is hence expensive: the number of
available users is limited, and there’s no defined way to dispose of
them after use. If you make use of system users too liberally, you are
very likely to run out of them sooner rather than later.

You may wonder why system users are generally not deallocated when the
package that registered them is uninstalled from a system (at least on
most distributions). The reason for that is one relevant property of
the user concept (you might even want to call this a design flaw):
user IDs are sticky to files (and other objects such as IPC
objects). If a service running as a specific system user creates a
file at some location, and is then terminated and its package and user
removed, then the created file still belongs to the numeric ID (“UID”)
the system user originally got assigned. When the next system user is
allocated and — due to ID recycling — happens to get assigned the same
numeric ID, then it will also gain access to the file, and that’s
generally considered a problem, given that the file belonged to a
potentially very different service once upon a time, and likely should
not be readable or changeable by anything coming after
it. Distributions hence tend to avoid UID recycling which means system
users remain registered forever on a system after they have been
allocated once.

The above is a description of the status quo ante. Let’s now focus on
what systemd’s dynamic user concept brings to the table, to improve
the situation.

Introducing Dynamic Users

With systemd dynamic users we hope to make make it easier and cheaper
to allocate system users on-the-fly, thus substantially increasing the
possible uses of this core UNIX security concept.

If you write a systemd service unit file, you may enable the dynamic
user logic for it by setting the
DynamicUser=
option in its [Service] section to yes. If you do a system user is
dynamically allocated the instant the service binary is invoked, and
released again when the service terminates. The user is automatically
allocated from the UID range 61184–65519, by looking for a so far
unused UID.

Now you may wonder, how does this concept deal with the sticky user
issue discussed above? In order to counter the problem, two strategies
easily come to mind:

  1. Prohibit the service from creating any files/directories or IPC objects

  2. Automatically removing the files/directories or IPC objects the
    service created when it shuts down.

In systemd we implemented both strategies, but for different parts of
the execution environment. Specifically:

  1. Setting DynamicUser=yes implies
    ProtectSystem=strict
    and
    ProtectHome=read-only. These
    sand-boxing options turn off write access to pretty much the whole OS
    directory tree, with a few relevant exceptions, such as the API file
    systems /proc, /sys and so on, as well as /tmp and
    /var/tmp. (BTW: setting these two options on your regular services
    that do not use DynamicUser= is a good idea too, as it drastically
    reduces the exposure of the system to exploited services.)

  2. Setting DynamicUser=yes implies
    PrivateTmp=yes. This
    option sets up /tmp and /var/tmp for the service in a way that it
    gets its own, disconnected version of these directories, that are not
    shared by other services, and whose life-cycle is bound to the
    service’s own life-cycle. Thus if the service goes down, the user is
    removed and all its temporary files and directories with it. (BTW: as
    above, consider setting this option for your regular services that do
    not use DynamicUser= too, it’s a great way to lock things down
    security-wise.)

  3. Setting DynamicUser=yes implies
    RemoveIPC=yes. This
    option ensures that when the service goes down all SysV and POSIX IPC
    objects (shared memory, message queues, semaphores) owned by the
    service’s user are removed. Thus, the life-cycle of the IPC objects is
    bound to the life-cycle of the dynamic user and service, too. (BTW:
    yes, here too, consider using this in your regular services, too!)

With these four settings in effect, services with dynamic users are
nicely sand-boxed. They cannot create files or directories, except in
/tmp and /var/tmp, where they will be removed automatically when
the service shuts down, as will any IPC objects created. Sticky
ownership of files/directories and IPC objects is hence dealt with
effectively.

The
RuntimeDirectory=
option may be used to open up a bit the sandbox to external
programs. If you set it to a directory name of your choice, it will be
created below /run when the service is started, and removed in its
entirety when it is terminated. The ownership of the directory is
assigned to the service’s dynamic user. This way, a dynamic user
service can expose API interfaces (AF_UNIX sockets, …) to other
services at a well-defined place and again bind the life-cycle of it to
the service’s own run-time. Example: set RuntimeDirectory=foobar in
your service, and watch how a directory /run/foobar appears at the
moment you start the service, and disappears the moment you stop
it again. (BTW: Much like the other settings discussed above,
RuntimeDirectory= may be used outside of the DynamicUser= context
too, and is a nice way to run any service with a properly owned,
life-cycle-managed run-time directory.)

Persistent Data

Of course, a service running in such an environment (although already
very useful for many cases!), has a major limitation: it cannot leave
persistent data around it can reuse on a later run. As pretty much the
whole OS directory tree is read-only to it, there’s simply no place it
could put the data that survives from one service invocation to the
next.

With systemd 235 this limitation is removed: there are now three new
settings:
StateDirectory=,
LogsDirectory= and CacheDirectory=. In many ways they operate like
RuntimeDirectory=, but create sub-directories below /var/lib,
/var/log and /var/cache, respectively. There’s one major
difference beyond that however: directories created that way are
persistent, they will survive the run-time cycle of a service, and
thus may be used to store data that is supposed to stay around between
invocations of the service.

Of course, the obvious question to ask now is: how do these three
settings deal with the sticky file ownership problem?

For that we lifted a concept from container managers. Container
managers have a very similar problem: each container and the host
typically end up using a very similar set of numeric UIDs, and unless
user name-spacing is deployed this means that host users might be able
to access the data of specific containers that also have a user by the
same numeric UID assigned, even though it actually refers to a very
different identity in a different context. (Actually, it’s even worse
than just getting access, due to the existence of setuid file bits,
access might translate to privilege elevation.) The way container
managers protect the container images from the host (and from each
other to some level) is by placing the container trees below a
boundary directory, with very restrictive access modes and ownership
(0700 and root:root or so). A host user hence cannot take advantage
of the files/directories of a container user of the same UID inside of
a local container tree, simply because the boundary directory makes it
impossible to even reference files in it. After all on UNIX, in order
to get access to a specific path you need access to every single
component of it.

How is that applied to dynamic user services? Let’s say
StateDirectory=foobar is set for a service that has DynamicUser=
turned off. The instant the service is started, /var/lib/foobar is
created as state directory, owned by the service’s user and remains in
existence when the service is stopped. If the same service now is run
with DynamicUser= turned on, the implementation is slightly
altered. Instead of a directory /var/lib/foobar a symbolic link by
the same path is created (owned by root), pointing to
/var/lib/private/foobar (the latter being owned by the service’s
dynamic user). The /var/lib/private directory is created as boundary
directory: it’s owned by root:root, and has a restrictive access
mode of 0700. Both the symlink and the service’s state directory will
survive the service’s life-cycle, but the state directory will remain,
and continues to be owned by the now disposed dynamic UID — however it
is protected from other host users (and other services which might get
the same dynamic UID assigned due to UID recycling) by the boundary
directory.

The obvious question to ask now is: but if the boundary directory
prohibits access to the directory from unprivileged processes, how can
the service itself which runs under its own dynamic UID access it
anyway? This is achieved by invoking the service process in a slightly
modified mount name-space: it will see most of the file hierarchy the
same way as everything else on the system (modulo /tmp and
/var/tmp as mentioned above), except for /var/lib/private, which
is over-mounted with a read-only tmpfs file system instance, with a
slightly more liberal access mode permitting the service read
access. Inside of this tmpfs file system instance another mount is
placed: a bind mount to the host’s real /var/lib/private/foobar
directory, onto the same name. Putting this together these means that
superficially everything looks the same and is available at the same
place on the host and from inside the service, but two important
changes have been made: the /var/lib/private boundary directory lost
its restrictive character inside the service, and has been emptied of
the state directories of any other service, thus making the protection
complete. Note that the symlink /var/lib/foobar hides the fact that
the boundary directory is used (making it little more than an
implementation detail), as the directory is available this way under
the same name as it would be if DynamicUser= was not used. Long
story short: for the daemon and from the view from the host the
indirection through /var/lib/private is mostly transparent.

This logic of course raises another question: what happens to the
state directory if a dynamic user service is started with a state
directory configured, gets UID X assigned on this first invocation,
then terminates and is restarted and now gets UID Y assigned on the
second invocation, with X ≠ Y? On the second invocation the directory
— and all the files and directories below it — will still be owned by
the original UID X so how could the second instance running as Y
access it? Our way out is simple: systemd will recursively change the
ownership of the directory and everything contained within it to UID Y
before invoking the service’s executable.

Of course, such recursive ownership changing (chown()ing) of whole
directory trees can become expensive (though according to my
experiences, IRL and for most services it’s much cheaper than you
might think), hence in order to optimize behavior in this regard, the
allocation of dynamic UIDs has been tweaked in two ways to avoid the
necessity to do this expensive operation in most cases: firstly, when
a dynamic UID is allocated for a service an allocation loop is
employed that starts out with a UID hashed from the service’s
name. This means a service by the same name is likely to always use
the same numeric UID. That means that a stable service name translates
into a stable dynamic UID, and that means recursive file ownership
adjustments can be skipped (of course, after validation). Secondly, if
the configured state directory already exists, and is owned by a
suitable currently unused dynamic UID, it’s preferably used above
everything else, thus maximizing the chance we can avoid the
chown()ing. (That all said, ultimately we have to face it, the
currently available UID space of 4K+ is very small still, and
conflicts are pretty likely sooner or later, thus a chown()ing has to
be expected every now and then when this feature is used extensively).

Note that CacheDirectory= and LogsDirectory= work very similar to
StateDirectory=. The only difference is that they manage directories
below the /var/cache and /var/logs directories, and their boundary
directory hence is /var/cache/private and /var/log/private,
respectively.

Examples

So, after all this introduction, let’s have a look how this all can be
put together. Here’s a trivial example:

# cat > /etc/systemd/system/dynamic-user-test.service <<EOF
[Service]
ExecStart=/usr/bin/sleep 4711
DynamicUser=yes
EOF
# systemctl daemon-reload
# systemctl start dynamic-user-test
# systemctl status dynamic-user-test
● dynamic-user-test.service
   Loaded: loaded (/etc/systemd/system/dynamic-user-test.service; static; vendor preset: disabled)
   Active: active (running) since Fri 2017-10-06 13:12:25 CEST; 3s ago
 Main PID: 2967 (sleep)
    Tasks: 1 (limit: 4915)
   CGroup: /system.slice/dynamic-user-test.service
           └─2967 /usr/bin/sleep 4711

Okt 06 13:12:25 sigma systemd[1]: Started dynamic-user-test.service.
# ps -e -o pid,comm,user | grep 2967
 2967 sleep           dynamic-user-test
# id dynamic-user-test
uid=64642(dynamic-user-test) gid=64642(dynamic-user-test) groups=64642(dynamic-user-test)
# systemctl stop dynamic-user-test
# id dynamic-user-test
id: ‘dynamic-user-test’: no such user

In this example, we create a unit file with DynamicUser= turned on,
start it, check if it’s running correctly, have a look at the service
process’ user (which is named like the service; systemd does this
automatically if the service name is suitable as user name, and you
didn’t configure any user name to use explicitly), stop the service
and verify that the user ceased to exist too.

That’s already pretty cool. Let’s step it up a notch, by doing the
same in an interactive transient service (for those who don’t know
systemd well: a transient service is a service that is defined and
started dynamically at run-time, for example via the systemd-run
command from the shell. Think: run a service without having to write a
unit file first):

# systemd-run --pty --property=DynamicUser=yes --property=StateDirectory=wuff /bin/sh
Running as unit: run-u15750.service
Press ^] three times within 1s to disconnect TTY.
sh-4.4$ id
uid=63122(run-u15750) gid=63122(run-u15750) groups=63122(run-u15750) context=system_u:system_r:initrc_t:s0
sh-4.4$ ls -al /var/lib/private/
total 0
drwxr-xr-x. 3 root       root        60  6. Okt 13:21 .
drwxr-xr-x. 1 root       root       852  6. Okt 13:21 ..
drwxr-xr-x. 1 run-u15750 run-u15750   8  6. Okt 13:22 wuff
sh-4.4$ ls -ld /var/lib/wuff
lrwxrwxrwx. 1 root root 12  6. Okt 13:21 /var/lib/wuff -> private/wuff
sh-4.4$ ls -ld /var/lib/wuff/
drwxr-xr-x. 1 run-u15750 run-u15750 0  6. Okt 13:21 /var/lib/wuff/
sh-4.4$ echo hello > /var/lib/wuff/test
sh-4.4$ exit
exit
# id run-u15750
id: ‘run-u15750’: no such user
# ls -al /var/lib/private
total 0
drwx------. 1 root  root   66  6. Okt 13:21 .
drwxr-xr-x. 1 root  root  852  6. Okt 13:21 ..
drwxr-xr-x. 1 63122 63122   8  6. Okt 13:22 wuff
# ls -ld /var/lib/wuff
lrwxrwxrwx. 1 root root 12  6. Okt 13:21 /var/lib/wuff -> private/wuff
# ls -ld /var/lib/wuff/
drwxr-xr-x. 1 63122 63122 8  6. Okt 13:22 /var/lib/wuff/
# cat /var/lib/wuff/test
hello

The above invokes an interactive shell as transient service
run-u15750.service (systemd-run picked that name automatically,
since we didn’t specify anything explicitly) with a dynamic user whose
name is derived automatically from the service name. Because
StateDirectory=wuff is used, a persistent state directory for the
service is made available as /var/lib/wuff. In the interactive shell
running inside the service, the ls commands show the
/var/lib/private boundary directory and its contents, as well as the
symlink that is placed for the service. Finally, before exiting the
shell, a file is created in the state directory. Back in the original
command shell we check if the user is still allocated: it is not, of
course, since the service ceased to exist when we exited the shell and
with it the dynamic user associated with it. From the host we check
the state directory of the service, with similar commands as we did
from inside of it. We see that things are set up pretty much the same
way in both cases, except for two things: first of all the user/group
of the files is now shown as raw numeric UIDs instead of the
user/group names derived from the unit name. That’s because the user
ceased to exist at this point, and “ls” shows the raw UID for files
owned by users that don’t exist. Secondly, the access mode of the
boundary directory is different: when we look at it from outside of
the service it is not readable by anyone but root, when we looked from
inside we saw it it being world readable.

Now, let’s see how things look if we start another transient service,
reusing the state directory from the first invocation:

# systemd-run --pty --property=DynamicUser=yes --property=StateDirectory=wuff /bin/sh
Running as unit: run-u16087.service
Press ^] three times within 1s to disconnect TTY.
sh-4.4$ cat /var/lib/wuff/test
hello
sh-4.4$ ls -al /var/lib/wuff/
total 4
drwxr-xr-x. 1 run-u16087 run-u16087  8  6. Okt 13:22 .
drwxr-xr-x. 3 root       root       60  6. Okt 15:42 ..
-rw-r--r--. 1 run-u16087 run-u16087  6  6. Okt 13:22 test
sh-4.4$ id
uid=63122(run-u16087) gid=63122(run-u16087) groups=63122(run-u16087) context=system_u:system_r:initrc_t:s0
sh-4.4$ exit
exit

Here, systemd-run picked a different auto-generated unit name, but
the used dynamic UID is still the same, as it was read from the
pre-existing state directory, and was otherwise unused. As we can see
the test file we generated earlier is accessible and still contains
the data we left in there. Do note that the user name is different
this time (as it is derived from the unit name, which is different),
but the UID it is assigned to is the same one as on the first
invocation. We can thus see that the mentioned optimization of the UID
allocation logic (i.e. that we start the allocation loop from the UID
owner of any existing state directory) took effect, so that no
recursive chown()ing was required.

And that’s the end of our example, which hopefully illustrated a bit
how this concept and implementation works.

Use-cases

Now that we had a look at how to enable this logic for a unit and how
it is implemented, let’s discuss where this actually could be useful
in real life.

  • One major benefit of dynamic user IDs is that running a
    privilege-separated service leaves no artifacts in the system. A
    system user is allocated and made use of, but it is discarded
    automatically in a safe and secure way after use, in a fashion that is
    safe for later recycling. Thus, quickly invoking a short-lived service
    for processing some job can be protected properly through a user ID
    without having to pre-allocate it and without this draining the
    available UID pool any longer than necessary.

  • In many cases, starting a service no longer requires
    package-specific preparation. Or in other words, quite often
    useradd/mkdir/chown/chmod invocations in “post-inst” package
    scripts, as well as
    sysusers.d
    and
    tmpfiles.d
    drop-ins become unnecessary, as the DynamicUser= and
    StateDirectory=/CacheDirectory=/LogsDirectory= logic can do the
    necessary work automatically, on-demand and with a well-defined
    life-cycle.

  • By combining dynamic user IDs with the transient unit concept, new
    creative ways of sand-boxing are made available. For example, let’s say
    you don’t trust the correct implementation of the sort command. You
    can now lock it into a simple, robust, dynamic UID sandbox with a
    simple systemd-run and still integrate it into a shell pipeline like
    any other command. Here’s an example, showcasing a shell pipeline
    whose middle element runs as a dynamically on-the-fly allocated UID,
    that is released when the pipelines ends.

    # cat some-file.txt | systemd-run ---pipe --property=DynamicUser=1 sort -u | grep -i foobar > some-other-file.txt
    
  • By combining dynamic user IDs with the systemd templating logic it
    is now possible to do much more fine-grained and fully automatic UID
    management. For example, let’s say you have a template unit file
    /etc/systemd/system/[email protected]:

    [Service]
    ExecStart=/usr/bin/myfoobarserviced
    DynamicUser=1
    StateDirectory=foobar/%i
    

    Now, let’s say you want to start one instance of this service for
    each of your customers. All you need to do now for that is:

    # systemctl enable [email protected] --now
    

    And you are done. (Invoke this as many times as you like, each time
    replacing customerxyz by some customer identifier, you get the
    idea.)

  • By combining dynamic user IDs with socket activation you may easily
    implement a system where each incoming connection is served by a
    process instance running as a different, fresh, newly allocated UID
    within its own sandbox. Here’s an example waldo.socket:

    [Socket]
    ListenStream=2048
    Accept=yes
    

    With a matching [email protected]:

    [Service]
    ExecStart=-/usr/bin/myservicebinary
    DynamicUser=yes
    

    With the two unit files above, systemd will listen on TCP/IP port
    2048, and for each incoming connection invoke a fresh instance of
    [email protected], each time utilizing a different, new,
    dynamically allocated UID, neatly isolated from any other
    instance.

  • Dynamic user IDs combine very well with state-less systems,
    i.e. systems that come up with an unpopulated /etc and /var. A
    service using dynamic user IDs and the StateDirectory=,
    CacheDirectory=, LogsDirectory= and RuntimeDirectory= concepts
    will implicitly allocate the users and directories it needs for
    running, right at the moment where it needs it.

Dynamic users are a very generic concept, hence a multitude of other
uses are thinkable; the list above is just supposed to trigger your
imagination.

What does this mean for you as a packager?

I am pretty sure that a large number of services shipped with today’s
distributions could benefit from using DynamicUser= and
StateDirectory= (and related settings). It often allows removal of
post-inst packaging scripts altogether, as well as any sysusers.d
and tmpfiles.d drop-ins by unifying the needed declarations in the
unit file itself. Hence, as a packager please consider switching your
unit files over. That said, there are a number of conditions where
DynamicUser= and StateDirectory= (and friends) cannot or should
not be used. To name a few:

  1. Service that need to write to files outside of /run/<package>,
    /var/lib/<package>, /var/cache/<package>, /var/log/<package>,
    /var/tmp, /tmp, /dev/shm are generally incompatible with this
    scheme. This rules out daemons that upgrade the system as one example,
    as that involves writing to /usr.

  2. Services that maintain a herd of processes with different user
    IDs. Some SMTP services are like this. If your service has such a
    super-server design, UID management needs to be done by the
    super-server itself, which rules out systemd doing its dynamic UID
    magic for it.

  3. Services which run as root (obviously…) or are otherwise
    privileged.

  4. Services that need to live in the same mount name-space as the host
    system (for example, because they want to establish mount points
    visible system-wide). As mentioned DynamicUser= implies
    ProtectSystem=, PrivateTmp= and related options, which all require
    the service to run in its own mount name-space.

  5. Your focus is older distributions, i.e. distributions that do not
    have systemd 232 (for DynamicUser=) or systemd 235 (for
    StateDirectory= and friends) yet.

  6. If your distribution’s packaging guides don’t allow it. Consult
    your packaging guides, and possibly start a discussion on your
    distribution’s mailing list about this.

Notes

A couple of additional, random notes about the implementation and use
of these features:

  1. Do note that allocating or deallocating a dynamic user leaves
    /etc/passwd untouched. A dynamic user is added into the user
    database through the glibc NSS module
    nss-systemd,
    and this information never hits the disk.

  2. On traditional UNIX systems it was the job of the daemon process
    itself to drop privileges, while the DynamicUser= concept is
    designed around the service manager (i.e. systemd) being responsible
    for that. That said, since v235 there’s a way to marry DynamicUser=
    and such services which want to drop privileges on their own. For
    that, turn on DynamicUser= and set
    User=
    to the user name the service wants to setuid() to. This has the
    effect that systemd will allocate the dynamic user under the specified
    name when the service is started. Then, prefix the command line you
    specify in
    ExecStart=
    with a single ! character. If you do, the user is allocated for the
    service, but the daemon binary is is invoked as root instead of the
    allocated user, under the assumption that the daemon changes its UID
    on its own the right way. Not that after registration the user will
    show up instantly in the user database, and is hence resolvable like
    any other by the daemon process. Example:
    ExecStart=!/usr/bin/mydaemond

  3. You may wonder why systemd uses the UID range 61184–65519 for its
    dynamic user allocations (side note: in hexadecimal this reads as
    0xEF00–0xFFEF). That’s because distributions (specifically Fedora)
    tend to allocate regular users from below the 60000 range, and we
    don’t want to step into that. We also want to stay away from 65535 and
    a bit around it, as some of these UIDs have special meanings (65535 is
    often used as special value for “invalid” or “no” UID, as it is
    identical to the 16bit value -1; 65534 is generally mapped to the
    “nobody” user, and is where some kernel subsystems map unmappable
    UIDs). Finally, we want to stay within the 16bit range. In a user
    name-spacing world each container tends to have much less than the full
    32bit UID range available that Linux kernels theoretically
    provide. Everybody apparently can agree that a container should at
    least cover the 16bit range though — already to include a nobody
    user. (And quite frankly, I am pretty sure assigning 64K UIDs per
    container is nicely systematic, as the the higher 16bit of the 32bit
    UID values this way become a container ID, while the lower 16bit
    become the logical UID within each container, if you still follow what
    I am babbling here…). And before you ask: no this range cannot be
    changed right now, it’s compiled in. We might change that eventually
    however.

  4. You might wonder what happens if you already used UIDs from the
    61184–65519 range on your system for other purposes. systemd should
    handle that mostly fine, as long as that usage is properly registered
    in the user database: when allocating a dynamic user we pick a UID,
    see if it is currently used somehow, and if yes pick a different one,
    until we find a free one. Whether a UID is used right now or not is
    checked through NSS calls. Moreover the IPC object lists are checked to
    see if there are any objects owned by the UID we are about to
    pick. This means systemd will avoid using UIDs you have assigned
    otherwise. Note however that this of course makes the pool of
    available UIDs smaller, and in the worst cases this means that
    allocating a dynamic user might fail because there simply are no
    unused UIDs in the range.

  5. If not specified otherwise the name for a dynamically allocated
    user is derived from the service name. Not everything that’s valid in
    a service name is valid in a user-name however, and in some cases a
    randomized name is used instead to deal with this. Often it makes
    sense to pick the user names to register explicitly. For that use
    User= and choose whatever you like.

  6. If you pick a user name with User= and combine it with
    DynamicUser= and the user already exists statically it will be used
    for the service and the dynamic user logic is automatically
    disabled. This permits automatic up- and downgrades between static and
    dynamic UIDs. For example, it provides a nice way to move a system
    from static to dynamic UIDs in a compatible way: as long as you select
    the same User= value before and after switching DynamicUser= on,
    the service will continue to use the statically allocated user if it
    exists, and only operates in the dynamic mode if it does not. This is
    useful for other cases as well, for example to adapt a service that
    normally would use a dynamic user to concepts that require statically
    assigned UIDs, for example to marry classic UID-based file system
    quota with such services.

  7. systemd always allocates a pair of dynamic UID and GID at the same
    time, with the same numeric ID.

  8. If the Linux kernel had a “shiftfs” or similar functionality,
    i.e. a way to mount an existing directory to a second place, but map
    the exposed UIDs/GIDs in some way configurable at mount time, this
    would be excellent for the implementation of StateDirectory= in
    conjunction with DynamicUser=. It would make the recursive
    chown()ing step unnecessary, as the host version of the state
    directory could simply be mounted into a the service’s mount
    name-space, with a shift applied that maps the directory’s owner to the
    services’ UID/GID. But I don’t have high hopes in this regard, as all
    work being done in this area appears to be bound to user name-spacing
    — which is a concept not used here (and I guess one could say user
    name-spacing is probably more a source of problems than a solution to
    one, but you are welcome to disagree on that).

And that’s all for now. Enjoy your dynamic users!

Building Loosely Coupled, Scalable, C# Applications with Amazon SQS and Amazon SNS

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/building-loosely-coupled-scalable-c-applications-with-amazon-sqs-and-amazon-sns/

 
Stephen Liedig, Solutions Architect

 

One of the many challenges professional software architects and developers face is how to make cloud-native applications scalable, fault-tolerant, and highly available.

Fundamental to your project success is understanding the importance of making systems highly cohesive and loosely coupled. That means considering the multi-dimensional facets of system coupling to support the distributed nature of the applications that you are building for the cloud.

By that, I mean addressing not only the application-level coupling (managing incoming and outgoing dependencies), but also considering the impacts of of platform, spatial, and temporal coupling of your systems. Platform coupling relates to the interoperability, or lack thereof, of heterogeneous systems components. Spatial coupling deals with managing components at a network topology level or protocol level. Temporal, or runtime coupling, refers to the ability of a component within your system to do any kind of meaningful work while it is performing a synchronous, blocking operation.

The AWS messaging services, Amazon SQS and Amazon SNS, help you deal with these forms of coupling by providing mechanisms for:

  • Reliable, durable, and fault-tolerant delivery of messages between application components
  • Logical decomposition of systems and increased autonomy of components
  • Creating unidirectional, non-blocking operations, temporarily decoupling system components at runtime
  • Decreasing the dependencies that components have on each other through standard communication and network channels

Following on the recent topic, Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox, in this post, I look at some of the ways you can introduce SQS and SNS into your architectures to decouple your components, and show how you can implement them using C#.

Walkthrough

To illustrate some of these concepts, consider a web application that processes customer orders. As good architects and developers, you have followed best practices and made your application scalable and highly available. Your solution included implementing load balancing, dynamic scaling across multiple Availability Zones, and persisting orders in a Multi-AZ Amazon RDS database instance, as in the following diagram.


In this example, the application is responsible for handling and persisting the order data, as well as dealing with increases in traffic for popular items.

One potential point of vulnerability in the order processing workflow is in saving the order in the database. The business expects that every order has been persisted into the database. However, any potential deadlock, race condition, or network issue could cause the persistence of the order to fail. Then, the order is lost with no recourse to restore the order.

With good logging capability, you may be able to identify when an error occurred and which customer’s order failed. This wouldn’t allow you to “restore” the transaction, and by that stage, your customer is no longer your customer.

As illustrated in the following diagram, introducing an SQS queue helps improve your ordering application. Using the queue isolates the processing logic into its own component and runs it in a separate process from the web application. This, in turn, allows the system to be more resilient to spikes in traffic, while allowing work to be performed only as fast as necessary in order to manage costs.


In addition, you now have a mechanism for persisting orders as messages (with the queue acting as a temporary database), and have moved the scope of your transaction with your database further down the stack. In the event of an application exception or transaction failure, this ensures that the order processing can be retired or redirected to the Amazon SQS Dead Letter Queue (DLQ), for re-processing at a later stage. (See the recent post, Using Amazon SQS Dead-Letter Queues to Control Message Failure, for more information on dead-letter queues.)

Scaling the order processing nodes

This change allows you now to scale the web application frontend independently from the processing nodes. The frontend application can continue to scale based on metrics such as CPU usage, or the number of requests hitting the load balancer. Processing nodes can scale based on the number of orders in the queue. Here is an example of scale-in and scale-out alarms that you would associate with the scaling policy.

Scale-out Alarm

aws cloudwatch put-metric-alarm --alarm-name AddCapacityToCustomerOrderQueue --metric-name ApproximateNumberOfMessagesVisible --namespace "AWS/SQS" 
--statistic Average --period 300 --threshold 3 --comparison-operator GreaterThanOrEqualToThreshold --dimensions Name=QueueName,Value=customer-orders
--evaluation-periods 2 --alarm-actions <arn of the scale-out autoscaling policy>

Scale-in Alarm

aws cloudwatch put-metric-alarm --alarm-name RemoveCapacityFromCustomerOrderQueue --metric-name ApproximateNumberOfMessagesVisible --namespace "AWS/SQS" 
 --statistic Average --period 300 --threshold 1 --comparison-operator LessThanOrEqualToThreshold --dimensions Name=QueueName,Value=customer-orders
 --evaluation-periods 2 --alarm-actions <arn of the scale-in autoscaling policy>

In the above example, use the ApproximateNumberOfMessagesVisible metric to discover the queue length and drive the scaling policy of the Auto Scaling group. Another useful metric is ApproximateAgeOfOldestMessage, when applications have time-sensitive messages and developers need to ensure that messages are processed within a specific time period.

Scaling the order processing implementation

On top of scaling at an infrastructure level using Auto Scaling, make sure to take advantage of the processing power of your Amazon EC2 instances by using as many of the available threads as possible. There are several ways to implement this. In this post, we build a Windows service that uses the BackgroundWorker class to process the messages from the queue.

Here’s a closer look at the implementation. In the first section of the consuming application, use a loop to continually poll the queue for new messages, and construct a ReceiveMessageRequest variable.

public static void PollQueue()
{
    while (_running)
    {
        Task<ReceiveMessageResponse> receiveMessageResponse;

        // Pull messages off the queue
        using (var sqs = new AmazonSQSClient())
        {
            const int maxMessages = 10;  // 1-10

            //Receiving a message
            var receiveMessageRequest = new ReceiveMessageRequest
            {
                // Get URL from Configuration
                QueueUrl = _queueUrl, 
                // The maximum number of messages to return. 
                // Fewer messages might be returned. 
                MaxNumberOfMessages = maxMessages, 
                // A list of attributes that need to be returned with message.
                AttributeNames = new List<string> { "All" },
                // Enable long polling. 
                // Time to wait for message to arrive on queue.
                WaitTimeSeconds = 5 
            };

            receiveMessageResponse = sqs.ReceiveMessageAsync(receiveMessageRequest);
        }

The WaitTimeSeconds property of the ReceiveMessageRequest specifies the duration (in seconds) that the call waits for a message to arrive in the queue before returning a response to the calling application. There are a few benefits to using long polling:

  • It reduces the number of empty responses by allowing SQS to wait until a message is available in the queue before sending a response.
  • It eliminates false empty responses by querying all (rather than a limited number) of the servers.
  • It returns messages as soon any message becomes available.

For more information, see Amazon SQS Long Polling.

After you have returned messages from the queue, you can start to process them by looping through each message in the response and invoking a new BackgroundWorker thread.

// Process messages
if (receiveMessageResponse.Result.Messages != null)
{
    foreach (var message in receiveMessageResponse.Result.Messages)
    {
        Console.WriteLine("Received SQS message, starting worker thread");

        // Create background worker to process message
        BackgroundWorker worker = new BackgroundWorker();
        worker.DoWork += (obj, e) => ProcessMessage(message);
        worker.RunWorkerAsync();
    }
}
else
{
    Console.WriteLine("No messages on queue");
}

The event handler, ProcessMessage, is where you implement business logic for processing orders. It is important to have a good understanding of how long a typical transaction takes so you can set a message VisibilityTimeout that is long enough to complete your operation. If order processing takes longer than the specified timeout period, the message becomes visible on the queue. Other nodes may pick it and process the same order twice, leading to unintended consequences.

Handling Duplicate Messages

In order to manage duplicate messages, seek to make your processing application idempotent. In mathematics, idempotent describes a function that produces the same result if it is applied to itself:

f(x) = f(f(x))

No matter how many times you process the same message, the end result is the same (definition from Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions, Hohpe and Wolf, 2004).

There are several strategies you could apply to achieve this:

  • Create messages that have inherent idempotent characteristics. That is, they are non-transactional in nature and are unique at a specified point in time. Rather than saying “place new order for Customer A,” which adds a duplicate order to the customer, use “place order <orderid> on <timestamp> for Customer A,” which creates a single order no matter how often it is persisted.
  • Deliver your messages via an Amazon SQS FIFO queue, which provides the benefits of message sequencing, but also mechanisms for content-based deduplication. You can deduplicate using the MessageDeduplicationId property on the SendMessage request or by enabling content-based deduplication on the queue, which generates a hash for MessageDeduplicationId, based on the content of the message, not the attributes.
var sendMessageRequest = new SendMessageRequest
{
    QueueUrl = _queueUrl,
    MessageBody = JsonConvert.SerializeObject(order),
    MessageGroupId = Guid.NewGuid().ToString("N"),
    MessageDeduplicationId = Guid.NewGuid().ToString("N")
};
  • If using SQS FIFO queues is not an option, keep a message log of all messages attributes processed for a specified period of time, as an alternative to message deduplication on the receiving end. Verifying the existence of the message in the log before processing the message adds additional computational overhead to your processing. This can be minimized through low latency persistence solutions such as Amazon DynamoDB. Bear in mind that this solution is dependent on the successful, distributed transaction of the message and the message log.

Handling exceptions

Because of the distributed nature of SQS queues, it does not automatically delete the message. Therefore, you must explicitly delete the message from the queue after processing it, using the message ReceiptHandle property (see the following code example).

However, if at any stage you have an exception, avoid handling it as you normally would. The intention is to make sure that the message ends back on the queue, so that you can gracefully deal with intermittent failures. Instead, log the exception to capture diagnostic information, and swallow it.

By not explicitly deleting the message from the queue, you can take advantage of the VisibilityTimeout behavior described earlier. Gracefully handle the message processing failure and make the unprocessed message available to other nodes to process.

In the event that subsequent retries fail, SQS automatically moves the message to the configured DLQ after the configured number of receives has been reached. You can further investigate why the order process failed. Most importantly, the order has not been lost, and your customer is still your customer.

private static void ProcessMessage(Message message)
{
    using (var sqs = new AmazonSQSClient())
    {
        try
        {
            Console.WriteLine("Processing message id: {0}", message.MessageId);

            // Implement messaging processing here
            // Ensure no downstream resource contention (parallel processing)
            // <your order processing logic in here…>
            Console.WriteLine("{0} Thread {1}: {2}", DateTime.Now.ToString("s"), Thread.CurrentThread.ManagedThreadId, message.MessageId);
            
            // Delete the message off the queue. 
            // Receipt handle is the identifier you must provide 
            // when deleting the message.
            var deleteRequest = new DeleteMessageRequest(_queueName, message.ReceiptHandle);
            sqs.DeleteMessageAsync(deleteRequest);
            Console.WriteLine("Processed message id: {0}", message.MessageId);

        }
        catch (Exception ex)
        {
            // Do nothing.
            // Swallow exception, message will return to the queue when 
            // visibility timeout has been exceeded.
            Console.WriteLine("Could not process message due to error. Exception: {0}", ex.Message);
        }
    }
}

Using SQS to adapt to changing business requirements

One of the benefits of introducing a message queue is that you can accommodate new business requirements without dramatically affecting your application.

If, for example, the business decided that all orders placed over $5000 are to be handled as a priority, you could introduce a new “priority order” queue. The way the orders are processed does not change. The only significant change to the processing application is to ensure that messages from the “priority order” queue are processed before the “standard order” queue.

The following diagram shows how this logic could be isolated in an “order dispatcher,” whose only purpose is to route order messages to the appropriate queue based on whether the order exceeds $5000. Nothing on the web application or the processing nodes changes other than the target queue to which the order is sent. The rates at which orders are processed can be achieved by modifying the poll rates and scalability settings that I have already discussed.

Extending the design pattern with Amazon SNS

Amazon SNS supports reliable publish-subscribe (pub-sub) scenarios and push notifications to known endpoints across a wide variety of protocols. It eliminates the need to periodically check or poll for new information and updates. SNS supports:

  • Reliable storage of messages for immediate or delayed processing
  • Publish / subscribe – direct, broadcast, targeted “push” messaging
  • Multiple subscriber protocols
  • Amazon SQS, HTTP, HTTPS, email, SMS, mobile push, AWS Lambda

With these capabilities, you can provide parallel asynchronous processing of orders in the system and extend it to support any number of different business use cases without affecting the production environment. This is commonly referred to as a “fanout” scenario.

Rather than your web application pushing orders to a queue for processing, send a notification via SNS. The SNS messages are sent to a topic and then replicated and pushed to multiple SQS queues and Lambda functions for processing.

As the diagram above shows, you have the development team consuming “live” data as they work on the next version of the processing application, or potentially using the messages to troubleshoot issues in production.

Marketing is consuming all order information, via a Lambda function that has subscribed to the SNS topic, inserting the records into an Amazon Redshift warehouse for analysis.

All of this, of course, is happening without affecting your order processing application.

Summary

While I haven’t dived deep into the specifics of each service, I have discussed how these services can be applied at an architectural level to build loosely coupled systems that facilitate multiple business use cases. I’ve also shown you how to use infrastructure and application-level scaling techniques, so you can get the most out of your EC2 instances.

One of the many benefits of using these managed services is how quickly and easily you can implement powerful messaging capabilities in your systems, and lower the capital and operational costs of managing your own messaging middleware.

Using Amazon SQS and Amazon SNS together can provide you with a powerful mechanism for decoupling application components. This should be part of design considerations as you architect for the cloud.

For more information, see the Amazon SQS Developer Guide and Amazon SNS Developer Guide. You’ll find tutorials on all the concepts covered in this post, and more. To can get started using the AWS console or SDK of your choice visit:

Happy messaging!

Using Amazon SQS Dead-Letter Queues to Control Message Failure

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/using-amazon-sqs-dead-letter-queues-to-control-message-failure/


Michael G. Khmelnitsky, Senior Programmer Writer

 

Sometimes, messages can’t be processed because of a variety of possible issues, such as erroneous conditions within the producer or consumer application. For example, if a user places an order within a certain number of minutes of creating an account, the producer might pass a message with an empty string instead of a customer identifier. Occasionally, producers and consumers might fail to interpret aspects of the protocol that they use to communicate, causing message corruption or loss. Also, the consumer’s hardware errors might corrupt message payload. For these reasons, messages that can’t be processed in a timely manner are delivered to a dead-letter queue.

The recent post Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox gives an overview of messaging in the microservice architecture of modern applications. This post explains how and when you should use dead-letter queues to gain better control over message handling in your applications. It also offers some resources for configuring a dead-letter queue in Amazon Simple Queue Service (SQS).

What are the benefits of dead-letter queues?

The main task of a dead-letter queue is handling message failure. A dead-letter queue lets you set aside and isolate messages that can’t be processed correctly to determine why their processing didn’t succeed. Setting up a dead-letter queue allows you to do the following:

  • Configure an alarm for any messages delivered to a dead-letter queue.
  • Examine logs for exceptions that might have caused messages to be delivered to a dead-letter queue.
  • Analyze the contents of messages delivered to a dead-letter queue to diagnose software or the producer’s or consumer’s hardware issues.
  • Determine whether you have given your consumer sufficient time to process messages.

How do high-throughput, unordered queues handle message failure?

High-throughput, unordered queues (sometimes called standard or storage queues) keep processing messages until the expiration of the retention period. This helps ensure continuous processing of messages, which minimizes the chances of your queue being blocked by messages that can’t be processed. It also ensures fast recovery for your queue.

In a system that processes thousands of messages, having a large number of messages that the consumer repeatedly fails to acknowledge and delete might increase costs and place extra load on the hardware. Instead of trying to process failing messages until they expire, it is better to move them to a dead-letter queue after a few processing attempts.

Note: This queue type often allows a high number of in-flight messages. If the majority of your messages can’t be consumed and aren’t sent to a dead-letter queue, your rate of processing valid messages can slow down. Thus, to maintain the efficiency of your queue, you must ensure that your application handles message processing correctly.

How do FIFO queues handle message failure?

FIFO (first-in-first-out) queues (sometimes called service bus queues) help ensure exactly-once processing by consuming messages in sequence from a message group. Thus, although the consumer can continue to retrieve ordered messages from another message group, the first message group remains unavailable until the message blocking the queue is processed successfully.

Note: This queue type often allows a lower number of in-flight messages. Thus, to help ensure that your FIFO queue doesn’t get blocked by a message, you must ensure that your application handles message processing correctly.

When should I use a dead-letter queue?

  • Do use dead-letter queues with high-throughput, unordered queues. You should always take advantage of dead-letter queues when your applications don’t depend on ordering. Dead-letter queues can help you troubleshoot incorrect message transmission operations. Note: Even when you use dead-letter queues, you should continue to monitor your queues and retry sending messages that fail for transient reasons.
  • Do use dead-letter queues to decrease the number of messages and to reduce the possibility of exposing your system to poison-pill messages (messages that can be received but can’t be processed).
  • Don’t use a dead-letter queue with high-throughput, unordered queues when you want to be able to keep retrying the transmission of a message indefinitely. For example, don’t use a dead-letter queue if your program must wait for a dependent process to become active or available.
  • Don’t use a dead-letter queue with a FIFO queue if you don’t want to break the exact order of messages or operations. For example, don’t use a dead-letter queue with instructions in an Edit Decision List (EDL) for a video editing suite, where changing the order of edits changes the context of subsequent edits.

How do I get started with dead-letter queues in Amazon SQS?

Amazon SQS is a fully managed service that offers reliable, highly scalable hosted queues for exchanging messages between applications or microservices. Amazon SQS moves data between distributed application components and helps you decouple these components. It supports both standard queues and FIFO queues. To configure a queue as a dead-letter queue, you can use the AWS Management Console or the Amazon SQS SetQueueAttributes API action.

To get started with dead-letter queues in Amazon SQS, see the following topics in the Amazon SQS Developer Guide:

To start working with dead-letter queues programmatically, see the following resources:

systemd Status Update

Post Syndicated from Lennart Poettering original http://0pointer.net/blog/projects/systemd-update-3.html

It
has been way too long since my last status update on
systemd
. Here’s another short, incomprehensive status update on
what we worked on for systemd since
then.

We have been working hard to turn systemd into the most viable set
of components to build operating systems, appliances and devices from,
and make it the best choice for servers, for desktops and for embedded
environments alike. I think we have a really convincing set of
features now, but we are actively working on making it even
better.

Here’s a list of some more and some less interesting features, in
no particular order:

  1. We added an automatic pager to systemctl (and related tools), similar
    to how git has it.
  2. systemctl learnt a new switch --failed, to show only
    failed services.
  3. You may now start services immediately, overrding all dependency
    logic by passing --ignore-dependencies to
    systemctl. This is mostly a debugging tool and nothing people
    should use in real life.
  4. Sending SIGKILL as final part of the implicit shutdown
    logic of services is now optional and may be configured with the
    SendSIGKILL= option individually for each service.
  5. We split off the Vala/Gtk tools into its own project systemd-ui.
  6. systemd-tmpfiles learnt file globbing and creating FIFO
    special files as well as character and block device nodes, and
    symlinks. It also is capable of relabelling certain directories at
    boot now (in the SELinux sense).
  7. Immediately before shuttding dow we will now invoke all binaries
    found in /lib/systemd/system-shutdown/, which is useful for
    debugging late shutdown.
  8. You may now globally control where STDOUT/STDERR of services goes
    (unless individual service configuration overrides it).
  9. There’s a new ConditionVirtualization= option, that makes
    systemd skip a specific service if a certain virtualization technology
    is found or not found. Similar, we now have a new option to detect
    whether a certain security technology (such as SELinux) is available,
    called ConditionSecurity=. There’s also
    ConditionCapability= to check whether a certain process
    capability is in the capability bounding set of the system. There’s
    also a new ConditionFileIsExecutable=,
    ConditionPathIsMountPoint=,
    ConditionPathIsReadWrite=,
    ConditionPathIsSymbolicLink=.
  10. The file system condition directives now support globbing.
  11. Service conditions may now be “triggering” and “mandatory”, meaning that
    they can be a necessary requirement to hold for a service to start, or
    simply one trigger among many.
  12. At boot time we now print warnings if: /usr
    is on a split-off partition but not already mounted by an initrd
    ;
    if /etc/mtab is not a symlink to /proc/mounts; CONFIG_CGROUPS
    is not enabled in the kernel
    . We’ll also expose this as
    tainted flag on the bus.
  13. You may now boot the same OS image on a bare metal machine and in
    Linux namespace containers and will get a clean boot in both
    cases. This is more complicated than it sounds since device management
    with udev or write access to /sys, /proc/sys or
    things like /dev/kmsg is not available in a container. This
    makes systemd a first-class choice for managing thin container
    setups. This is all tested with systemd’s own systemd-nspawn
    tool but should work fine in LXC setups, too. Basically this means
    that you do not have to adjust your OS manually to make it work in a
    container environment, but will just work out of the box. It also
    makes it easier to convert real systems into containers.
  14. We now automatically spawn gettys on HVC ttys when booting in VMs.
  15. We introduced /etc/machine-id as a generalization of
    D-Bus machine ID logic. See this
    blog story for more information
    . On stateless/read-only systems
    the machine ID is initialized randomly at boot. In virtualized
    environments it may be passed in from the machine manager (with qemu’s
    -uuid switch, or via the container
    interface
    ).
  16. All of the systemd-specific /etc/fstab mount options are
    now in the x-systemd-xyz format.
  17. To make it easy to find non-converted services we will now
    implicitly prefix all LSB and SysV init script descriptions with the
    strings “LSB:” resp. “SYSV:“.
  18. We introduced /run and made it a hard dependency of
    systemd. This directory is now widely accepted and implemented on all
    relevant Linux distributions.
  19. systemctl can now execute all its operations remotely too (-H switch).
  20. We now ship systemd-nspawn,
    a really powerful tool that can be used to start containers for
    debugging, building and testing, much like chroot(1). It is useful to
    just get a shell inside a build tree, but is good enough to boot up a
    full system in it, too.
  21. If we query the user for a hard disk password at boot he may hit
    TAB to hide the asterisks we normally show for each key that is
    entered, for extra paranoia.
  22. We don’t enable udev-settle.service anymore, which is
    only required for certain legacy software that still hasn’t been
    updated to follow devices coming and going cleanly.
  23. We now include a tool that can plot boot speed graphs, similar to
    bootchartd, called systemd-analyze.
  24. At boot, we now initialize the kernel’s binfmt_misc logic with the data from /etc/binfmt.d.
  25. systemctl now recognizes if it is run in a chroot()
    environment and will work accordingly (i.e. apply changes to the tree
    it is run in, instead of talking to the actual PID 1 for this). It also has a new --root= switch to work on an OS tree from outside of it.
  26. There’s a new unit dependency type OnFailureIsolate= that
    allows entering a different target whenever a certain unit fails. For
    example, this is interesting to enter emergency mode if file system
    checks of crucial file systems failed.
  27. Socket units may now listen on Netlink sockets, special files
    from /proc and POSIX message queues, too.
  28. There’s a new IgnoreOnIsolate= flag which may be used to
    ensure certain units are left untouched by isolation requests. There’s
    a new IgnoreOnSnapshot= flag which may be used to exclude
    certain units from snapshot units when they are created.
  29. There’s now small mechanism services for
    changing the local hostname and other host meta data
    , changing
    the system locale and console settings
    and the system
    clock
    .
  30. We now limit the capability bounding set for a number of our
    internal services by default.
  31. Plymouth may now be disabled globally with
    plymouth.enable=0 on the kernel command line.
  32. We now disallocate VTs when a getty finished running (and
    optionally other tools run on VTs). This adds extra security since it
    clears up the scrollback buffer so that subsequent users cannot get
    access to a user’s session output.
  33. In socket units there are now options to control the
    IP_TRANSPARENT, SO_BROADCAST, SO_PASSCRED,
    SO_PASSSEC socket options.
  34. The receive and send buffers of socket units may now be set larger
    than the default system settings if needed by using
    SO_{RCV,SND}BUFFORCE.
  35. We now set the hardware timezone as one of the first things in PID
    1, in order to avoid time jumps during normal userspace operation, and
    to guarantee sensible times on all generated logs. We also no longer
    save the system clock to the RTC on shutdown, assuming that this is
    done by the clock control tool when the user modifies the time, or
    automatically by the kernel if NTP is enabled.
  36. The SELinux directory got moved from /selinux to
    /sys/fs/selinux.
  37. We added a small service systemd-logind that keeps tracks
    of logged in users and their sessions. It creates control groups for
    them, implements the XDG_RUNTIME_DIR
    specification
    for them, maintains seats and device node ACLs and
    implements shutdown/idle inhibiting for clients. It auto-spawns gettys
    on all local VTs when the user switches to them (instead of starting
    six of them unconditionally), thus reducing the resource foot print by
    default. It has a D-Bus interface as well as a
    simple synchronous library interface
    . This mechanism obsoletes
    ConsoleKit which is now deprecated and should no longer be used.
  38. There’s now full, automatic multi-seat support, and this is
    enabled in GNOME 3.4. Just by pluging in new seat hardware you get a
    new login screen on your seat’s screen.
  39. There is now an option ControlGroupModify= to allow
    services to change the properties of their control groups dynamically,
    and one to make control groups persistent in the tree
    (ControlGroupPersistent=) so that they can be created and
    maintained by external tools.
  40. We now jump back into the initrd in shutdown, so that it can
    detach the root file system and the storage devices backing it. This
    allows (for the first time!) to reliably undo complex storage setups
    on shutdown and leave them in a clean state.
  41. systemctl now supports presets, a way for distributions and
    administrators to define their own policies on whether services should
    be enabled or disabled by default on package installation.
  42. systemctl now has high-level verbs for masking/unmasking
    units. There’s also a new command (systemctl list-unit-files)
    for determining the list of all installed unit file files and whether
    they are enabled or not.
  43. We now apply sysctl variables to each new network device, as it
    appears. This makes /etc/sysctl.d compatible with hot-plug
    network devices.
  44. There’s limited profiling for SELinux start-up perfomance built
    into PID 1.
  45. There’s a new switch PrivateNetwork=
    to turn of any network access for a specific service.
  46. Service units may now include configuration for control group
    parameters. A few (such as MemoryLimit=) are exposed with
    high-level options, and all others are available via the generic
    ControlGroupAttribute= setting.
  47. There’s now the option to mount certain cgroup controllers
    jointly at boot. We do this now for cpu and
    cpuacct by default.
  48. We added the
    journal
    and turned it on by default.
  49. All service output is now written to the Journal by default,
    regardless whether it is sent via syslog or simply written to
    stdout/stderr. Both message streams end up in the same location and
    are interleaved the way they should. All log messages even from the
    kernel and from early boot end up in the journal. Now, no service
    output gets unnoticed and is saved and indexed at the same
    location.
  50. systemctl status will now show the last 10 log lines for
    each service, directly from the journal.
  51. We now show the progress of fsck at boot on the console,
    again. We also show the much loved colorful [ OK ] status
    messages at boot again, as known from most SysV implementations.
  52. We merged udev into systemd.
  53. We implemented and documented interfaces to container
    managers
    and initrds
    for passing execution data to systemd. We also implemented and
    documented an
    interface for storage daemons that are required to back the root file
    system
    .
  54. There are two new options in service files to propagate reload requests between several units.
  55. systemd-cgls won’t show kernel threads by default anymore, or show empty control groups.
  56. We added a new tool systemd-cgtop that shows resource
    usage of whole services in a top(1) like fasion.
  57. systemd may now supervise services in watchdog style. If enabled
    for a service the daemon daemon has to ping PID 1 in regular intervals
    or is otherwise considered failed (which might then result in
    restarting it, or even rebooting the machine, as configured). Also,
    PID 1 is capable of pinging a hardware watchdog. Putting this
    together, the hardware watchdogs PID 1 and PID 1 then watchdogs
    specific services. This is highly useful for high-availability servers
    as well as embedded machines. Since watchdog hardware is noawadays
    built into all modern chipsets (including desktop chipsets), this
    should hopefully help to make this a more widely used
    functionality.
  58. We added support for a new kernel command line option
    systemd.setenv= to set an environment variable
    system-wide.
  59. By default services which are started by systemd will have SIGPIPE
    set to ignored. The Unix SIGPIPE logic is used to reliably implement
    shell pipelines and when left enabled in services is usually just a
    source of bugs and problems.
  60. You may now configure the rate limiting that is applied to
    restarts of specific services. Previously the rate limiting parameters
    were hard-coded (similar to SysV).
  61. There’s now support for loading the IMA integrity policy into the
    kernel early in PID 1, similar to how we already did it with the
    SELinux policy.
  62. There’s now an official API to schedule and query scheduled shutdowns.
  63. We changed the license from GPL2+ to LGPL2.1+.
  64. We made systemd-detect-virt
    an official tool in the tool set. Since we already had code to detect
    certain VM and container environments we now added an official tool
    for administrators to make use of in shell scripts and suchlike.
  65. We documented numerous
    interfaces
    systemd introduced.

Much of the stuff above is already available in Fedora 15 and 16,
or will be made available in the upcoming Fedora 17.

And that’s it for now. There’s a lot of other stuff in the git commits, but
most of it is smaller and I will it thus spare you.

I’d like to thank everybody who contributed to systemd over the past years.

Thanks for your interest!

systemd for Developers I

Post Syndicated from Lennart Poettering original http://0pointer.net/blog/projects/socket-activation.html

systemd
not only brings improvements for administrators and users, it also
brings a (small) number of new APIs with it. In this blog story (which might
become the first of a series) I hope to shed some light on one of the
most important new APIs in systemd:

Socket Activation

In the original blog
story about systemd
I tried to explain why socket activation is a
wonderful technology to spawn services. Let’s reiterate the background
here a bit.

The basic idea of socket activation is not new. The inetd
superserver was a standard component of most Linux and Unix systems
since time began: instead of spawning all local Internet services
already at boot, the superserver would listen on behalf of the
services and whenever a connection would come in an instance of the
respective service would be spawned. This allowed relatively weak
machines with few resources to offer a big variety of services at the
same time. However it quickly got a reputation for being somewhat
slow: since daemons would be spawned for each incoming connection a
lot of time was spent on forking and initialization of the services
— once for each connection, instead of once for them all.

Spawning one instance per connection was how inetd was primarily
used, even though inetd actually understood another mode: on the first
incoming connection it would notice this via poll() (or
select()) and spawn a single instance for all future
connections. (This was controllable with the
wait/nowait options.) That way the first connection
would be slow to set up, but subsequent ones would be as fast as with
a standalone service. In this mode inetd would work in a true
on-demand mode: a service would be made available lazily when it was
required.

inetd’s focus was clearly on AF_INET (i.e. Internet) sockets. As
time progressed and Linux/Unix left the server niche and became
increasingly relevant on desktops, mobile and embedded environments
inetd was somehow lost in the troubles of time. Its reputation for
being slow, and the fact that Linux’ focus shifted away from only
Internet servers made a Linux machine running inetd (or one of its newer
implementations, like xinetd) the exception, not the rule.

When Apple engineers worked on optimizing the MacOS boot time they
found a new way to make use of the idea of socket activation: they
shifted the focus away from AF_INET sockets towards AF_UNIX
sockets. And they noticed that on-demand socket activation was only
part of the story: much more powerful is socket activation when used
for all local services including those which need to be started
anyway on boot. They implemented these ideas in launchd, a central building
block of modern MacOS X systems, and probably the main reason why
MacOS is so fast booting up.

But, before we continue, let’s have a closer look what the benefits
of socket activation for non-on-demand, non-Internet services in
detail are. Consider the four services Syslog, D-Bus, Avahi and the
Bluetooth daemon. D-Bus logs to Syslog, hence on traditional Linux
systems it would get started after Syslog. Similarly, Avahi requires
Syslog and D-Bus, hence would get started after both. Finally
Bluetooth is similar to Avahi and also requires Syslog and D-Bus but
does not interface at all with Avahi. Sinceoin a traditional
SysV-based system only one service can be in the process of getting
started at a time, the following serialization of startup would take
place: Syslog → D-Bus → Avahi → Bluetooth (Of course, Avahi and
Bluetooth could be started in the opposite order too, but we have to
pick one here, so let’s simply go alphabetically.). To illustrate
this, here’s a plot showing the order of startup beginning with system
startup (at the top).

Parallelization plot

Certain distributions tried to improve this strictly serialized
start-up: since Avahi and Bluetooth are independent from each other,
they can be started simultaneously. The parallelization is increased,
the overall startup time slightly smaller. (This is visualized in the
middle part of the plot.)

Socket activation makes it possible to start all four services
completely simultaneously, without any kind of ordering. Since the
creation of the listening sockets is moved outside of the daemons
themselves we can start them all at the same time, and they are able
to connect to each other’s sockets right-away. I.e. in a single step
the /dev/log and /run/dbus/system_bus_socket sockets
are created, and in the next step all four services are spawned
simultaneously. When D-Bus then wants to log to syslog, it just writes
its messages to /dev/log. As long as the socket buffer does
not run full it can go on immediately with what else it wants to do
for initialization. As soon as the syslog service catches up it will
process the queued messages. And if the socket buffer runs full then
the client logging will temporarily block until the socket is writable
again, and continue the moment it can write its log messages. That
means the scheduling of our services is entirely done by the kernel:
from the userspace perspective all services are run at the same time,
and when one service cannot keep up the others needing it will
temporarily block on their request but go on as soon as these
requests are dispatched. All of this is completely automatic and
invisible to userspace. Socket activation hence allows us to
drastically parallelize start-up, enabling simultaneous start-up of
services which previously were thought to strictly require
serialization. Most Linux services use sockets as communication
channel. Socket activation allows starting of clients and servers of
these channels at the same time.

But it’s not just about parallelization. It offers a number of
other benefits:

  • We no longer need to configure dependencies explicitly. Since the
    sockets are initialized before all services they are simply available,
    and no userspace ordering of service start-up needs to take place
    anymore. Socket activation hence drastically simplifies configuration
    and development of services.
  • If a service dies its listening socket stays around, not losing a
    single message. After a restart of the crashed service it can continue
    right where it left off.
  • If a service is upgraded we can restart the service while keeping
    around its sockets, thus ensuring the service is continously
    responsive. Not a single connection is lost during the upgrade.
  • We can even replace a service during runtime in a way that is
    invisible to the client. For example, all systems running systemd
    start up with a tiny syslog daemon at boot which passes all log
    messages written to /dev/log on to the kernel message
    buffer. That way we provide reliable userspace logging starting from
    the first instant of boot-up. Then, when the actual rsyslog daemon is
    ready to start we terminate the mini daemon and replace it with the
    real daemon. And all that while keeping around the original logging
    socket and sharing it between the two daemons and not losing a single
    message. Since rsyslog flushes the kernel log buffer to disk after
    start-up all log messages from the kernel, from early-boot and from
    runtime end up on disk.

For another explanation of this idea consult the original blog
story about systemd
.

Socket activation has been available in systemd since its
inception. On Fedora 15 a number of services have been modified to
implement socket activation, including Avahi, D-Bus and rsyslog (to continue with the example above).

systemd’s socket activation is quite comprehensive. Not only classic
sockets are support but related technologies as well:

  • AF_UNIX sockets, in the flavours SOCK_DGRAM, SOCK_STREAM and SOCK_SEQPACKET; both in the filesystem and in the abstract namespace
  • AF_INET sockets, i.e. TCP/IP and UDP/IP; both IPv4 and IPv6
  • Unix named pipes/FIFOs in the filesystem
  • AF_NETLINK sockets, to subscribe to certain kernel features. This
    is currently used by udev, but could be useful for other
    netlink-related services too, such as audit.
  • Certain special files like /proc/kmsg or device nodes like /dev/input/*.
  • POSIX Message Queues

A service capable of socket activation must be able to receive its
preinitialized sockets from systemd, instead of creating them
internally. For most services this requires (minimal)
patching. However, since systemd actually provides inetd compatibility
a service working with inetd will also work with systemd — which is
quite useful for services like sshd for example.

So much about the background of socket activation, let’s now have a
look how to patch a service to make it socket activatable. Let’s start
with a theoretic service foobard. (In a later blog post we’ll focus on
real-life example.)

Our little (theoretic) service includes code like the following for
creating sockets (most services include code like this in one way or
another):

/* Source Code Example #1: ORIGINAL, NOT SOCKET-ACTIVATABLE SERVICE */
...
union {
        struct sockaddr sa;
        struct sockaddr_un un;
} sa;
int fd;

fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
        fprintf(stderr, "socket(): %m\n");
        exit(1);
}

memset(&sa, 0, sizeof(sa));
sa.un.sun_family = AF_UNIX;
strncpy(sa.un.sun_path, "/run/foobar.sk", sizeof(sa.un.sun_path));

if (bind(fd, &sa.sa, sizeof(sa)) < 0) {
        fprintf(stderr, "bind(): %m\n");
        exit(1);
}

if (listen(fd, SOMAXCONN) < 0) {
        fprintf(stderr, "listen(): %m\n");
        exit(1);
}
...

A socket activatable service may use the following code instead:

/* Source Code Example #2: UPDATED, SOCKET-ACTIVATABLE SERVICE */
...
#include "sd-daemon.h"
...
int fd;

if (sd_listen_fds(0) != 1) {
        fprintf(stderr, "No or too many file descriptors received.\n");
        exit(1);
}

fd = SD_LISTEN_FDS_START + 0;
...

systemd might pass you more than one socket (based on
configuration, see below). In this example we are interested in one
only. sd_listen_fds()
returns how many file descriptors are passed. We simply compare that
with 1, and fail if we got more or less. The file descriptors systemd
passes to us are inherited one after the other beginning with fd
#3. (SD_LISTEN_FDS_START is a macro defined to 3). Our code hence just
takes possession of fd #3.

As you can see this code is actually much shorter than the
original. This of course comes at the price that our little service
with this change will no longer work in a non-socket-activation
environment. With minimal changes we can adapt our example to work nicely
both with and without socket activation:

/* Source Code Example #3: UPDATED, SOCKET-ACTIVATABLE SERVICE WITH COMPATIBILITY */
...
#include "sd-daemon.h"
...
int fd, n;

n = sd_listen_fds(0);
if (n > 1) {
        fprintf(stderr, "Too many file descriptors received.\n");
        exit(1);
} else if (n == 1)
        fd = SD_LISTEN_FDS_START + 0;
else {
        union {
                struct sockaddr sa;
                struct sockaddr_un un;
        } sa;

        fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd < 0) {
                fprintf(stderr, "socket(): %m\n");
                exit(1);
        }

        memset(&sa, 0, sizeof(sa));
        sa.un.sun_family = AF_UNIX;
        strncpy(sa.un.sun_path, "/run/foobar.sk", sizeof(sa.un.sun_path));

        if (bind(fd, &sa.sa, sizeof(sa)) < 0) {
                fprintf(stderr, "bind(): %m\n");
                exit(1);
        }

        if (listen(fd, SOMAXCONN) < 0) {
                fprintf(stderr, "listen(): %m\n");
                exit(1);
        }
}
...

With this simple change our service can now make use of socket
activation but still works unmodified in classic environments. Now,
let’s see how we can enable this service in systemd. For this we have
to write two systemd unit files: one describing the socket, the other
describing the service. First, here’s foobar.socket:

[Socket]
ListenStream=/run/foobar.sk

[Install]
WantedBy=sockets.target

And here’s the matching service file foobar.service:

[Service]
ExecStart=/usr/bin/foobard

If we place these two files in /etc/systemd/system we can
enable and start them:

# systemctl enable foobar.socket
# systemctl start foobar.socket

Now our little socket is listening, but our service not running
yet. If we now connect to /run/foobar.sk the service will be
automatically spawned, for on-demand service start-up. With a
modification of foobar.service we can start our service
already at startup, thus using socket activation only for
parallelization purposes, not for on-demand auto-spawning anymore:

[Service]
ExecStart=/usr/bin/foobard

[Install]
WantedBy=multi-user.target

And now let’s enable this too:

# systemctl enable foobar.service
# systemctl start foobar.service

Now our little daemon will be started at boot and on-demand,
whatever comes first. It can be started fully in parallel with its
clients, and when it dies it will be automatically restarted when it
is used the next time.

A single .socket file can include multiple ListenXXX stanzas, which
is useful for services that listen on more than one socket. In this
case all configured sockets will be passed to the service in the exact
order they are configured in the socket unit file. Also,
you may configure various socket settings in the .socket
files.

In real life it’s a good idea to include description strings in
these unit files, to keep things simple we’ll leave this out of our
example. Speaking of real-life: our next installment will cover an
actual real-life example. We’ll add socket activation to the CUPS
printing server.

The sd_listen_fds() function call is defined in sd-daemon.h
and sd-daemon.c. These
two files are currently drop-in .c sources which projects should
simply copy into their source tree. Eventually we plan to turn this
into a proper shared library, however using the drop-in files allows
you to compile your project in a way that is compatible with socket
activation even without any compile time dependencies on
systemd. sd-daemon.c is liberally licensed, should compile
fine on the most exotic Unixes and the algorithms are trivial enough
to be reimplemented with very little code if the license should
nonetheless be a problem for your project. sd-daemon.c
contains a couple of other API functions besides
sd_listen_fds() that are useful when implementing socket
activation in a project. For example, there’s sd_is_socket()
which can be used to distuingish and identify particular sockets when
a service gets passed more than one.

Let me point out that the interfaces used here are in no way bound
directly to systemd. They are generic enough to be implemented in
other systems as well. We deliberately designed them as simple and
minimal as possible to make it possible for others to adopt similar
schemes.

Stay tuned for the next installment. As mentioned, it will cover a
real-life example of turning an existing daemon into a
socket-activatable one: the CUPS printing service. However, I hope
this blog story might already be enough to get you started if you plan
to convert an existing service into a socket activatable one. We
invite everybody to convert upstream projects to this scheme. If you
have any questions join us on #systemd on freenode.