7. Using Queues

AMPS message queues provide a high-performance way of distributing messages across a set of workers. The AMPS User Guide describes AMPS queues in detail, including the features of AMPS referred to in this chapter. This chapter does not describe message queues in detail, but instead explains how to use the AMPS Java client with message queues.

To publish messages to an message queue, publishers simply publish to any topic that is collected by the queue. There is no difference between publishing to a queue and publishing to any other topic, and a publisher does not need to be aware that the topic will be collected into a queue.

Subscribers must be aware that they are subscribing to a queue, and acknowledge messages from the queue when the message is processed.

Backlog and Smart Pipelining

AMPS queues are designed for high-volume applications that need minimal latency and overhead. One of the features that helps performance is the subscription backlog feature, which allows applications to receive multiple messages at a time. The subscription backlog sets the maximum number of unacknowledged messages that AMPS will provide to the subscription.

When the subscription backlog is larger than 1, AMPS delivers additional messages to a subscriber before the subscriber has acknowledged the first message received. This technique allows subscribers to process messages as fast as possible, without ever having to wait for messages to be delivered. The technique of providing a consistent flow of messages to the application is called smart pipelining.

Subscription Backlog

The AMPS server determines the backlog for each subscription. An application can set the maximum backlog that it is willing to accept with the max_backlog option. Depending on the configuration of the queue (or queues) specified in the subscription, AMPS may assign a smaller backlog to the subscription. If no max_backlog option is specified, AMPS uses a max_backlog of 1 for that subscription.

In general, applications that have a constant flow of messages perform better with a max_backlog setting higher than 1. The reason for this is that, with a backlog greater than 1, the application can always have a message waiting when the previous message is processed. Setting the optimum max_backlog is a matter of understanding the messaging pattern of your application and how quickly your application can process messages.

To request a max_backlog for a subscription, you explicitly set the option on the subscribe command, as shown below:

Command cmd = new Command("subscribe")
                  .setTopic("my_queue")
                  .setOptions("max_backlog=10");

Acknowledging Messages

For each message delivered on a subscription, AMPS counts the message against the subscription backlog until the message is explicitly acknowledged. In addition, when a queue specifies at-least-once delivery, AMPS retains the message in the queue until the message expires or until the message has been explicitly acknowledged and removed from the queue. From the point of view of the AMPS server, acknowledgment is implemented as a sow_delete from the queue with the bookmarks of the messages to remove. The AMPS Java client provides several ways to make it easier for applications to create and send the appropriate sow_delete.

Automatic Acknowledgment

The AMPS client allows you to specify that messages should be automatically acknowledged. When this mode is on, AMPS acknowledges the message automatically in the following cases:

  • Asynchronous message processing interface. The message handler returns without throwing an exception.
  • Synchronous message processing interface. The application requests the next message from the MessageStream.

AMPS batches acknowledgments created with this method, as described in the following section.

To enable automatic acknowledgment batching, use the setAutoAck() method.

client.setAutoAck(true);  // enable AutoAck

Message Convenience Method

The AMPS Java client provides a convenience method, ack(), on delivered messages. When the application is finished with the message, the application simply calls ack() on the message. (This, in turn, provides the topic and bookmark to the ack() method of the client that received the message.)

For messages that originated from a queue with at-least-once semantics, this adds the bookmark from the message to the batch of messages to acknowledge. For other messages, this method has no effect.

message.ack(); // Add this message to the next
               // acknowledgment batch.

Acknowledgment Batching

The AMPS Java client automatically batches acknowledgments when either of the convenience methods is used. Batching acknowledgments reduces the number of round-trips to AMPS, reducing network traffic and improving overall performance. AMPS sends the batch of acknowledgments when the number of acknowledgments exceeds a specified size, or when the amount of time since the last batch was sent exceeds a specified timeout.

You can set the number of messages to batch and the maximum amount of time between batches:

client.setAckBatchSize(10); // Send batch after 10 messages
client.setAckTimeout(1000); // ... or 1 second

The AMPS Java client is aware of the subscription backlog for a subscription. When AMPS returns the acknowledgment for a subscription that contains queues, AMPS includes information on the subscription backlog for the subscription. If the batch size is larger than the subscription backlog, the AMPS Java client adjusts the requested batch size to match the subscription backlog.

60East recommends tuning the batch size to improve application performance. A value of 1/3 of the smallest max_backlog value is a good initial starting point for testing. 60East does not recommend setting the batch size larger than 1/2 of the max_backlog value without testing the setting to ensure that the application does not run out of messages to process while the acknowledgment is being sent to AMPS.

Returning a Message to the Queue

A subscriber can also explicitly release a message back to the queue. AMPS returns the message to the queue, and redelivers the message just as though the lease had expired. To do this, the subscriber sends a sow_delete command with the bookmark of the message to release and the cancel option.

When using automatic acknowledgments and the asynchronous API, AMPS will cancel a message if an exception is thrown from the message handler.

To return a message to the queue, you can build a sow_delete acknowledgment using the Command class, or pass an option to the ack() method on the message.

Option Result
cancel Returns the message to the queue.
expire Immediately expire the message from the queue.

For example, to return a message to a queue, call ack() on the message and pass the cancel option.

message.ack("cancel");

Manual Acknowledgment

60East generally recommends that applications use an ack() method to acknowledge messages during normal processing. This approach works properly from within a message handler, provides batching support as described elsewhere in this chapter, and is generally both easier to code and more efficient.

However, in some situations, you may need to manually acknowledge messages in the queue. This is most common when an application needs to operate on all messages with certain characteristics, rather than acknowledging individual messages. For example, an application that is doing updates to an order may want cancel an order by both publishing a cancellation and immediately expiring all other messages in the queue for that order. With manual acknowledgment, that application can use a filter to remove all previous updates for that order, then publish the cancellation.

To manually acknowledge processed messages and remove the messages from the queue, applications use the sow_delete command. To remove specific messages from the queue, provide the bookmarks of those messages. To remove messages that match a given filter, provide the filter. Notice that AMPS only supports using a bookmark with sow_delete when removing messages from a queue, not when removing records from a SOW.

For example, given a Message object to acknowledge and a client, the code below acknowledges the message.

void acknowledgeSingle(Client client, Message message) throws AMPSException {
    Command acknowledge = Command("sow_delete");
    acknowledge.setTopic(message.getTopic())
               .setBookmark(message.getBookmark());
     client.executeAsync(acknowledge, null);
}

Example 7.1: Simple queue acknowledgment

In listing Example 7.1 the program creates a sow_delete command, specifies the topic and the bookmark, and then sends the command to the server.

While this method works, creating and sending an acknowledgment for each individual message can be inefficient if your application is processing a large volume of messages. Rather than acknowledging each message individually, your application can build a comma-delimited list of bookmarks from the processed messages and acknowledge all of the messages at the same time. In this case, it’s important to be sure that the number of messages you wait for is less than the maximum backlog – the number of messages your client can have unacknowledged at a given time. Notice that both automatic acknowledgment and the helper method on the Message object take the maximum backlog into account.

When constructing a command to acknowledge queue messages, AMPS allows an application to specify a filter rather than a set of bookmarks. AMPS interprets this as the client requesting acknowledgment of all messages that match the filter. (This may include messages that the client has not received, subject to the Leasing model for the queue.)

As a more typical example of manual acknowledgment, the code below expires all messages for a given id that have a status other than cancel. An application might do this to halt processing of an order that it is about to cancel:

void removePending(Client client, string orderId) throws AMPSException {
     Command acknowledge = Command("sow_delete");
     acknowledge.setTopic(message.getTopic())
                .setFilter("/id = '" + orderId + "' and /status != 'cancel'")
                .setOptions("expire");
      client.executeAsync(acknowledge, null);
 }

Example 7.2: Queue acknowledgment with a filter

In listing 7.2, the program specifies a topic and a a filter to use to find the messages that should be removed. In this case, the program also provides the expire option to indicate that the messages have been removed from the queue rather than successfully processed (of course, whether this is the correct behavior for a canceled order depends on the expected message flow for your application).

Notice that, as described in Understanding Threading and Message Handlers, this method of acknowledging a message should not be used from a message handler unless the sow_delete is sent from a different client than the client that called the message handler. Instead, 60East recommends using the ack() function from within a message handler.