Implementing FIFO message ordering with Amazon MQ for Apache ActiveMQ

Post Syndicated from Chris Munns original https://aws.amazon.com/blogs/compute/implementing-fifo-message-ordering-with-amazon-mq-for-apache-activemq/

This post is contributed by Ravi Itha, Sr. Big Data Consultant

Messaging plays an important role in building distributed enterprise applications. Amazon MQ is a key offering within the AWS messaging services solution stack focused on enabling messaging services for modern application architectures. Amazon MQ is a managed message broker service for Apache ActiveMQ that simplifies setting up and operating message brokers in the cloud. Amazon MQ uses open standard APIs and protocols such as JMS, NMS, AMQP, STOMP, MQTT, and WebSocket. Using standards means that, in most cases, there’s no need to rewrite any messaging code when you migrate to AWS. This allows you to focus on your business logic and application architecture.

Message ordering via Message Groups

Sometimes it’s important to guarantee the order in which messages are processed. In ActiveMQ, there is no explicit distinction between a standard queue and a FIFO queue. However, a queue can be used to route messages in FIFO order. This ordering can be achieved via two different ActiveMQ features, either by implementing an Exclusive Consumer or using using Message Groups. This blog focuses on Message Groups, an enhancement to Exclusive Consumers. Message groups provide:

  • Guaranteed ordering of the processing of related messages across a single queue
  • Load balancing of the processing of messages across multiple consumers
  • High availability with automatic failover to other consumers if a JVM goes down

This is achieved programmatically as follows:

Sample producer code snippet:

TextMessage tMsg = session.createTextMessage("SampleMessage");
tMsg.setStringProperty("JMSXGroupID", "Group-A");
producer.send(tMsg);

Sample consumer code snippet:

Message consumerMessage = consumer.receive(50);
TextMessage txtMessage = (TextMessage) message.get();
String msgBody = txtMessage.getText();
String msgGroup = txtMessage.getStringProperty("JMSXGroupID")

This sample code highlights:

  • A message group is set by the producer during message ingestion
  • A consumer determines the message group once a message is consumed

Additionally, if a queue has messages for multiple message groups then it’s possible a consumer receives messages for multiple message groups. This depends on various factors such as the number of consumers of a queue and consumer start time.

Scenarios: Multiple producers and consumers with Message Groups

A FIFO queue in ActiveMQ supports multiple ordered message groups. Due to this, it’s common that a queue is used to exchange messages between multiple producer and consumer applications. By running multiple consumers to process messages from a queue, the message broker is able to partition messages across consumers. This improves the scalability and performance of your application.

In terms of scalability, commonly asked questions center on the ideal number of consumers and how messages are distributed across all consumers. To provide more clarity in this area, we provisioned an Amazon MQ broker and ran various test scenarios.

Scenario 1: All consumers started at the same time

Test setup

  • All producers and consumers have the same start time
  • Each test uses a different combination of number of producers, message groups, and consumers
Setup for Tests 1 to 5

Setup for Tests 1 to 5

Test results

Test # # producers # message groups

# messages sent

by each producer

# consumers Total messages # messages received by consumers
C1 C2 C3 C4
1 3 3 5000 1 15000

15000

(All Groups)

NA NA NA
2 3 3 5000 2 15000

5000

(Group-C)

10000

(Group-A and Group-B)

NA NA
3 3 3 5000 3 15000

5000

(Group-A)

5000

(Group-B)

5000

(Group C)

NA
4 3 3 5000 4 15000

5000

(Group-C)

5000

(Group-B)

5000

(Group-A)

0
5 4 4 5000 3 20000

5000

(Group-A)

5000

(Group-B)

10000

(Group-C and Group-D)

NA

Test conclusions

  • Test 3 – illustrates even message distribution across consumers when a one-to-one relationship exists between message groups and number of consumers
  • Test 4 – illustrates one of the four consumers did not receive any messages. This highlights that running more consumers than the available number of messages groups does not provide additional benefits
  • Tests 1, 2, 5 – indicate that a consumer can receive messages belonging to multiple message groups. The following table provides additional granularity to messages received by consumer C2 in test #2. As you can see, these messages belong to Group-A and Group-B message groups, and FIFO ordering is maintained at a message group level
consumer_id msg_id msg_group
Consumer C2 A-1 Group-A
Consumer C2 B-1 Group-B
Consumer C2 A-2 Group-A
Consumer C2 B-2 Group-B
Consumer C2 A-3 Group-A
Consumer C2 B-3 Group-B
Consumer C2 A-4999 Group-A
Consumer C2 B-4999 Group-B
Consumer C2 A-5000 Group-A
Consumer C2 B-5000 Group-B

Scenario 2a: All consumers not started at same time

Test setup

  • Three producers and one consumer started at the same time
  • The second and third consumers started after 30 seconds and 60 seconds respectively
  • 15,000 messages sent in total across three message groups
 Setup for Test 6

Setup for Test 6

Test results

Test # # producers # message groups

# messages sent

by each producer

# consumers Total messages # messages received by consumers
C1 C2 C3
6 3 3 5000 3 15000 15000 0 0

Test conclusion

Consumer C1 received all messages, while consumers C2 and C3 both ran idle and did not receive any messages. Key takeaway here is that results can be inefficient in real-world scenarios where consumers start at different times.

The last scenario (2b) illustrates this same scenario, while optimizing message distribution so that all consumers are used.

Scenario 2b: Utilization of all consumers when not started at same time

Test setup

  • Three producers and one consumer started at the same time
  • The second and third consumers started after 30 seconds and 60 seconds respectively
  • 15,000 messages sent in total across three message groups
  • After each producer message group sends its 2501st message, their message groups are closed after which message distribution is restarted by sending the remaining messages. Closing a message group can be done as in the following code example (specifically the -1 value set for the JMSXGroupSeq property):
TextMessage tMsg = session.createTextMessage("<foo>hey</foo>");
tMsg.setStringProperty("JMSXGroupID", "Group-A");
tMsg.setIntProperty("JMSXGroupSeq", -1);
producer.send(tMsg);
Setup for Test 7

Setup for Test 7

Test results

Test # # producers # message groups

# messages sent

by each producer

# consumers

Total

messages

# messages received by consumers
C1 C2 C3
7 3 3 5001 3 15003 10003 2500 2500

Distribution of messages received by message group

Consumer Group-A Group-B Group-C Consumer-wise total
Consumer 1 2501 2501 5001 10003
Consumer 2 2500 0 0 2500
Consumer 3 0 2500 0 2500
Group total 5001 5001 5001 NA
Total messages received 15003

Test conclusions

Message distribution is optimized with the closing and reopening of a message group when all consumers are not started at the same time. This mitigation step results in all consumers receiving messages.

  • After Group-A was closed, the broker assigned subsequent Group-A messages to consumer C2
  • After Group-B was closed, the broker assigned subsequent Group-B messages to consumer C3
  • After Group-C was closed, the broker continued to send Group-C messages to consumer C1. The assignment did not change because there was no other available consumer
Test 7 – Message distribution among consumers

Test 7 – Message distribution among consumers

Scalability techniques

Now that we understand how to use Message Groups to implement FIFO use cases within Amazon MQ, let’s look at how they scale. By default, a message queue supports a maximum of 1024 message groups. This means, if you use more than 1024 message groups per queue then message ordering is lost for the oldest message group. This is further explained in the ActiveMQ Message Groups documentation. This can be problematic for complex use cases involving stock exchanges or financial trading scenarios where thousands of ordered message groups are required. In the following table, are a couple of techniques to address this issue.

Scalability techniques Details
  1. Verify that the appropriate Amazon MQ broker instance type is used
  2. Increase number of message groups per message queue
  1. Select the appropriate broker instance type according to your use case. To learn more, refer to the Amazon MQ broker instance types documentation.
  2. Default max number of message groups can be increased via a custom configuration file at the time of launching a new broker. This default can also be increased by modifying an existing broker. Refer to the next section for an example (requirement #2)
Recycle the number of message groups when they are no longer needed A message group can be closed programmatically by a producer once it’s finished sending all messages to a queue. Following is a sample code snippet:

TextMessage tMsg = session.createTextMessage("<foo>hey</foo>");
tMsg.setStringProperty("JMSXGroupID", "GroupA");
tMsg.setIntProperty("JMSXGroupSeq", -1);
producer.send(tMsg);

In the preceding scenario 2b, we used this technique to improve the message distribution across consumers.

Customize message broker configuration

In the previous section, to improve scalability we suggested increasing the number of message groups per queue by updating the broker configuration. A broker configuration is essentially an XML file that contains all ActiveMQ settings for a given message broker. Let’s look at the following broker configuration settings for the purpose of achieving a specific requirement. For your reference, we’ve placed a copy of a broker configuration file with these settings, within a GitHub repository.

# Requirement Applicable broker configuration
1 Change message group implementation from default CachedMessageGroupMap default to MessageGroupHashBucket

<!–valid values: simple, bucket, cached. default is cached–>

<!–keyword simple represents SimpleMessageGroupMap–>

<!–keyword bucket represents MessageGroupHashBucket–>

<!–keyword cached represents CachedMessageGroupMap–>

<policyEntry messageGroupMapFactoryType=“bucket” queue=“&gt;”/>

2 Increase number of message groups per queue from 1024 to 2048 and increase cache size from 64 to 128

<!–default value for bucketCount is 1024 and for cacheSize is 64–>

<policyEntry queue=“&gt;”>

<messageGroupMapFactory>

<messageGroupHashBucketFactory bucketCount=“2048” cacheSize=“128”/>

</messageGroupMapFactory>

</policyEntry>

3 Wait for three consumers or 30 seconds before broker begins sending messages <policyEntry queue=”&gt;” consumersBeforeDispatchStarts=”3″ timeBeforeDispatchStarts=”30000″/>

When must default broker configurations be updated? This would only apply in scenarios where the default settings do not meet your requirements. Additional information on how to update your broker configuration file can be found here.

Amazon MQ starter kit

Want to get up and running with Amazon MQ quickly? Start building with Amazon MQ by cloning the starter kit available on GitHub. The starter kit includes a CloudFormation template to provision a message broker, sample broker configuration file, and source code related to the consumer scenarios in this blog.

Conclusion

In this blog post, you learned how Amazon MQ simplifies the setup and operation of Apache ActiveMQ in the cloud. You also learned how the Message Groups feature can be used to implement FIFO. Lastly, you walked through real scenarios demonstrating how ActiveMQ distributes messages with queues to exchange messages between multiple producers and consumers.