7. Using Queues¶
AMPS message queues provide a high-performance way of distributing messages across a set of workers. The AMPS User Guide describes AMPS queues in detail, including the features of AMPS referred to in this chapter. This chapter does not describe message queues in detail, but instead explains how to use the AMPS C++ client with message queues.
To publish messages to an message queue, publishers simply publish to any topic that is collected by the queue. There is no difference between publishing to a queue and publishing to any other topic, and a publisher does not need to be aware that the topic will be collected into a queue.
Subscribers must be aware that they are subscribing to a queue, and acknowledge messages from the queue when the message is processed.
Backlog and Smart Pipelining¶
AMPS queues are designed for high-volume applications that need minimal latency and overhead. One of the features that helps performance is the subscription backlog feature, which allows applications to receive multiple messages at a time. The subscription backlog sets the maximum number of unacknowledged messages that AMPS will provide to the subscription.
When the subscription backlog is larger than 1
, AMPS delivers
additional messages to a subscriber before the subscriber has
acknowledged the first message received. This technique allows
subscribers to process messages as fast as possible, without ever having
to wait for messages to be delivered. The technique of providing a
consistent flow of messages to the application is called smart
pipelining.
Subscription Backlog¶
The AMPS server determines the backlog for each subscription. An
application can set the maximum backlog that it is willing to accept
with the max_backlog
option. Depending on the configuration of the
queue (or queues) specified in the subscription, AMPS may assign a
smaller backlog to the subscription. If no max_backlog
option is
specified, AMPS uses a max_backlog
of 1
for that subscription.
In general, applications that have a constant flow of messages perform
better with a max_backlog
setting higher than 1
. The reason for
this is that, with a backlog greater than 1
, the application can
always have a message waiting when the previous message is processed.
Setting the optimum max_backlog
is a matter of understanding the
messaging pattern of your application and how quickly your application
can process messages.
To request a max_backlog
for a subscription, you explicitly set the
option on the subscribe command, as shown below:
Command cmd("subscribe");
cmd.setTopic("my_queue")
.setOptions("max_backlog=10");
Acknowledging Messages¶
For each message delivered on a subscription, AMPS counts the message
against the subscription backlog until the message is explicitly
acknowledged. In addition, when a queue specifies at-least-once
delivery, AMPS retains the message in the queue until the message
expires or until the message has been explicitly acknowledged and
removed from the queue. From the point of view of the AMPS server, this
is implemented as a sow_delete
from the queue with the bookmarks of
the messages to remove. The AMPS C++ client provides several ways to
make it easier for applications to create and send the appropriate
sow_delete
.
Automatic Acknowledgment¶
The AMPS client allows you to specify that messages should be automatically acknowledged. When this mode is on, AMPS acknowledges the message automatically in the following cases:
- Asynchronous message processing interface. The message handler returns without throwing an exception.
- Synchronous message processing interface. The application requests
the next message from the
MessageStream
.
AMPS batches acknowledgments created with this method, as described in the following section.
To enable automatic acknowledgment, use the setAutoAck()
method.
client.setAutoAck(true); // enable AutoAck
Message Convenience Method¶
The AMPS C++ client provides a convenience method, ack()
, on
delivered messages. When the application is finished with the message,
the application simply calls ack()
on the message. (This, in turn,
provides the topic and bookmark to the ack()
function of the client
that received the message.)
For messages that originated from a queue with at-least-once
semantics, this adds the bookmark from the message to the batch of
messages to acknowledge. For other messages, this method has no effect.
message.ack(); // Add this message to the next
// acknowledgment batch.
Acknowledgment Batching¶
The AMPS C++ client automatically batches acknowledgments when either of the convenience methods is used. Batching acknowledgments reduces the number of round-trips to AMPS, reducing network traffic and improving overall performance. AMPS sends the batch of acknowledgments when the number of acknowledgments exceeds a specified size, or when the amount of time since the last batch was sent exceeds a specified timeout.
You can set the number of messages to batch and the maximum amount of time between batches:
client.setAckBatchSize(10); // Send batch after 10 messages
client.setAckTimeout(1000); // ... or 1 second
The AMPS C++ client is aware of the subscription backlog for a subscription. When AMPS returns the acknowledgment for a subscription that contains queues, AMPS includes information on the subscription backlog for the subscription. If the batch size is larger than the subscription backlog, the AMPS C++ client adjusts the requested batch size to match the subscription backlog.
60East recommends tuning the batch size to improve application performance.
A value of 1/3 of the smallest max_backlog
value is a good initial
starting point for testing. 60East does not recommend setting the batch size
larger than 1/2 of the max_backlog
value without testing the setting to
ensure that the application does not run out of messages to process.
Returning a Message to the Queue¶
A subscriber can also explicitly release a message back to the queue.
AMPS returns the message to the queue, and redelivers the message just
as though the lease had expired. To do this, the subscriber sends a
sow_delete
command with the bookmark of the message to release and
the cancel
option.
When using automatic acknowledgments and the asynchronous API, AMPS will cancel a message if an exception is thrown from the message handler.
To return a message to the queue, you can build a sow_delete
acknowledgment using the Command
class, or pass an option to the
ack()
method on the message.
Option | Result |
---|---|
cancel |
Returns the message to the queue. |
expire |
Immediately expire the message from the queue. |
For example, to return a message to a queue, call ack()
on the message
and pass the cancel
option.
message.ack("cancel");
Example 7.1: Simple queue cancel
Manual Acknowledgment¶
60East generally recommends that applications use an ack()
method
to acknowledge messages during normal processing. This approach works
properly from within a message handler, provides batching support as
described elsewhere in this chapter, and is generally both easier to
code and more efficient.
However, in some situations, you may need to manually acknowledge messages in the queue. This is most common when an application needs to operate on all messages with certain characteristics, rather than acknowledging individual messages. For example, an application that is doing updates to an order may want cancel an order by both publishing a cancellation and immediately expiring all other messages in the queue for that order. With manual acknowledgment, that application can use a filter to remove all previous updates for that order, then publish the cancellation.
To manually acknowledge processed messages and remove the messages from
the queue, applications use the sow_delete
command. To remove
specific messages from the queue, provide the bookmarks of those
messages. To remove messages that match a given filter, provide
the filter. Notice that AMPS only supports
using a bookmark with sow_delete
when removing messages from a
queue, not when removing records from a SOW.
For example, given a Message
object to acknowledge and a client, the
code below acknowledges the message.
void acknowledgeSingle(const Client& client, const Message& message)
{
Command acknowledge("sow_delete");
acknowledge.setTopic(message.getTopic())
.setBookmark(message.getBookmark());
client.executeAsync(acknowledge, MessageHandler());
}
Example 7.2: Simple queue acknowledgment
In listing
Example 7.2
the program creates a sow_delete
command, specifies the topic and the bookmark,
and then sends the command to the server.
While this method works, creating and sending an acknowledgment for
each individual message can be inefficient if your application is
processing a large volume of messages. Rather than acknowledging each
message individually, your application can build a comma-delimited list
of bookmarks from the processed messages and acknowledge all of the
messages at the same time. In this case, it’s important to be sure that
the number of messages you wait for is less than the maximum backlog –
the number of messages your client can have unacknowledged at a given
time. Notice that both automatic acknowledgment and the helper method
on the Message
object take the maximum backlog into account.
When constructing a command to acknowledge queue messages, AMPS allows an
application to specify a filter rather than a set of bookmarks. AMPS interprets
this as the client requesting acknowledgment of all messages that match the
filter. (This may include messages that the client has not received, subject
to the Leasing
model for the queue.)
As a more typical example of manual acknowledgment, the code below expires
all messages for a given id
that have a status other than cancel
. An
application might do this to halt processing of an order that it is about
to cancel:
void removePending(const Client& client, const std::string& orderId)
{
Command acknowledge("sow_delete");
acknowledge.setTopic(message.getTopic())
.setFilter("/id = '" + orderId +"' and /status != 'cancel'")
.setOptions("expire");
client.executeAsync(acknowledge, MessageHandler());
}
Example 7.3: Queue acknowledgment with a filter
In listing 7.3, the program specifies a topic and a filter to use to find the messages that should be removed. In this case, the program also provides the expire option to indicate that the messages have been removed from the queue rather than successfully processed (of course, whether this is the correct behavior for a canceled order depends on the expected message flow for your application).
Notice that, as described in Understanding Threading and Message Handlers, this method
of acknowledging a message should not be used from a message handler unless
the sow_delete
is sent from a different client than the client that
called the message handler. Instead, 60East recommends using the ack()
function from within a message handler.
Samples of Working With a Queue¶
The C++ client includes the following samples that demonstrate how to query a topic in the State-of-the-World.
Sample Name | Demonstrates |
---|---|
amps_publish_queue.cpp |
Publishing messages to a queue topic. |
amps_consume_queue.cpp |
Consuming messages from a queue topic. |