4. Subscriptions¶
Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent to only those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS Java client.
Subscribing Method¶
You subscribe to an AMPS topic by calling Client.subscribe()
. Here
is a short example showing the simplest way to subscribe to a topic
(error handling and connection details are omitted for brevity):
class MyApp {
public static void main(String[] args) {
// We create a Client, then connect() and logon().
Client client = new Client("subscribe");
try {
client.connect("tcp://127.0.0.1:9007/amps/json");
client.logon();
/* Here, we subscribe to the topic messages. We do not provide a
* filter, so the subscription receives all of the messages published
* to the topic, regardless of content. We protect the MessageStream
* in a try with resources block, so that the stream is closed when
* control exits the block.
*/
try (MessageStream ms = client.subscribe("messages"))
{
/* Here, we iterate over the messages returned by the message
* stream. When we no longer need to subscribe, we can break
* out of the loop. When the MessageStream is cleaned up, the
* client sends an unsubscribe command to AMPS and stops
* receiving messages.
*/
for (Message m : ms) {
/* Within the loop, we process the message. In this case, we
* simply print the contents of the message
*/
System.out.println(m.getData());
}
}
}
catch(AMPSException e){System.err.println(e);}
finally {
client.close();
}
}
}
Example 4.1: Subscribing to a topic
AMPS creates a background thread that receives messages and copies them
into the MessageStream
that you iterate over. This means that the
client application as a whole can continue to receive messages while you
are doing processing work.
The simple method described above is provided for convenience. The AMPS Java client provides convenience methods for the most common forms of the commands. AMPS also provides an interface that allows you precise control over the command. Using that interface, the example above becomes:
class MyApp {
public static void main(String[] args) {
// We create a Client that is properly connected to an AMPS server.
Client client = new Client("subscribe");
try {
client.connect("tcp://127.0.0.1:9007/amps");
client.logon();
// We create a Command object to subscribe to the messages topic.
Command command = new Command("subscribe").setTopic("messages");
/* Here we execute the command and subscribe to the topic messages.
* This works exactly the same way as the command in Example 1. If,
* at any time, we no longer need to subscribe, we can break out of
* the loop. We use a try with resources to automatically clean up
* the MessageStream when we leave the try block. When the
* MessageStream is cleaned up, the client sends an unsubscribe
* command to AMPS and stops receiving messages.
*/
try (MessageStream ms = client.execute(command))
for (Message m : ms) {
/* Within the loop, we process the message. In this case, we
* simply print the contents of the message
*/
System.out.println(m.getData());
}
}
catch(AMPSException e){;}
finally {
client.close();
}
}
}
Example 4.2: Subscribing to a topic
The Command
interface allows you to precisely customize the commands
you send to AMPS. For flexibility and ease of maintenance, 60East
recommends using the Command
interface (rather than a named method)
for any command that will receive messages from AMPS. For publishing
messages, there can be a slight performance advantage to using the named
commands where possible.
Asynchronous Message Processing Interface¶
The AMPS Java client also supports an interface designed for asynchronous message processing. In this case, you add a message handler to the call to subscribe. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, AMPS calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there is usually a performance benefit to enqueuing work directly from the background thread. See Understanding Threading for a discussion of threading considerations, including considerations for message handlers.
As with the simple interface, the AMPS client provides both convenience
interfaces and interfaces that use a Command
object. The following
example shows how to use the asynchronous interface.
class MyApp {
public static void main(String[] args) {
/* We create a Client here, then call connect() and logon() to connect to
* AMPS.
*/
Client client = new Client("subscribe");
try {
client.connect("tcp://127.0.0.1:9007/amps/json");
client.logon();
/* Here, we create a new MessagePrinter object. The MessagePrinter
* class implements MessageHandler, as described below. The subscription
* uses this object to handle all messages returned from the subscription.
*/
MessagePrinter mp = new MessagePrinter();
Command command = new Command("subscribe").setTopic("messages");
/* Here, we call the overload of Client.execute() that specifies the
* command and the message handler to invoke with messages received
* in response to the command.
*/
CommandId subscriptionId = client.executeAsync(command, mp);
}
catch(AMPSException e){;}
finally {
client.close();
}
}
}
Example 4.3: Subscribing to a topic with asynchronous processing
/* An implementation of MessageHandler provides an invoke() method that receives
* a com.crankuptheamps.client.Message object. Notice that the same instance of
* this class is called for all messages on a given subscription, and that the
* instance is called asynchronously from the background thread created by the
* client. Design your message handlers so they have access to any program state
* that they need to do their work.
*/
public class MessagePrinter implements MessageHandler {
public void invoke(Message m) {
System.out.println(m.getData());
}
}
Example 4.4: Implementing MessageHandler
Caution
In the asynchronous interface, the AMPS client resets and reuses the message provided to MessageHandler functions between calls. This improves performance in the client, but means if your MessageHandler function needs to preserve information contained within the message you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it
Unsubscribing¶
The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or the connection to the client is closed.
With the simple (synchronous message processing) interface, AMPS automatically
unsubscribes from the topic when the MessageStream
is cleaned up by having
the close()
method called. This typically happens automatically when using
try-with-resources. You can also create a finally
block and call
the method yourself. Whatever strategy you use, when the MessageStream
closes, it automatically stops the subscription by sending an
unsubscribe
command to AMPS.
In the advanced (asynchronous) interface, when the subscription is successfully
made, messages will begin flowing to our MessagePrinter.invoke()
function,
and the Client.subscribe()
call will return a CommandId
that
serves as the identifier for this subscription. A Client can have any
number of active subscriptions, and this CommandId
instance is used
to refer to the particular subscription we have made here. For example,
to unsubscribe, we simply pass in this identifier:
Client c = ...;
// try/catch block to manage client lifetime
// is left out
// ... subscribe using the asynchronous message
// processing interface and save the subId
CommandId subId = c.subscribe(MyMessageHandler(), "messages");
// ... other code here ...
c.unsubscribe(subId);
In this example, as in the previous section, we use the
Client.subscribe()
method to create a subscription to the messages
topic. The subscribe method returns an identifier for the subscription
created in AMPS. When our application is done listening to this topic,
it unsubscribes (on the last line) by passing in the subscriptionId
returned by subscribe()
. AMPS deletes the subscription. After the
subscription is removed, no more messages will flow into our
MessageHandler.invoke()
function.
When an application calls unsubscribe
, the client sends an explicit
unsubscribe
command to AMPS. The AMPS server removes that subscription
from the set of subscriptions for the client, and stops sending messages
for that subscription. On the client side, the client unregisters the
subscription so that the MessageStream
or MessageHandler
for the
subscription will no longer receive messages for that subscription.
Notice that calling unsubscribe
does not destroy messages that the
server has already sent to the client. If there are messages on the way
to the client for this subscription, the AMPS client must consume those
messages. If a LastChanceMessageHandler
is registered, that handler
will receive the messages. Otherwise, they will be discarded since no
message handler matches the subscription ID on the message.
Understanding Messages¶
So far, we have seen that subscribing to a topic involves working with
objects of type com.crankuptheamps.client.Message
. A Message
represents a single message to or from an AMPS server. Messages are
received or sent for every client/server operation in AMPS.
Header properties¶
There are two parts of each message in AMPS: a set of headers that
provide metadata for the message, and the data that the message
contains. Every AMPS message has one or more header fields defined. The
precise headers present depend on the type and context of the message.
There are many possible fields in any given message, but only a few are
used for any given message. For each header field, the Message
class
contains a distinct property that allows for retrieval and setting of
that field. For example, the Message.getCommandId()
function
corresponds to the commandId
header field, the
Message.getBatchSize()
function corresponds to the BatchSize
header field, and so on. For more information on these header fields,
consult the AMPS User Guide and AMPS Command Reference.
To work with header fields, a Message
contains
getXxx()
/setXxx()
methods corresponding to the header fields.
60East does not recommend attempting to parse header fields from the raw
data of the message.
In AMPS, fields sometimes need to be set to a unique identifier value.
For example, when creating a new subscription, or sending a manually
constructed message, you’ll need to assign a new unique identifier to
multiple fields such as CommandId
and SubscriptionId
. For this
purpose, Message provides newXxx()
methods for each field that generates
a new unique identifier and sets the field to that new value.
getData() method¶
Access to the data section of a message is provided via the
getData()
method. The data
contains the unparsed data in the
message. The getData()
method returns the data as a Java string,
which is suitable message formats that can be represented as Unicode
text, such as JSON, XML, FIX, or NVFIX. For binary data, the AMPS Java
client provides a getDataRaw()
method to allow you to work with the
underlying byte array in the message. See
Working with Messages & Byte Buffers
for details.
The AMPS Java client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use whichever library you typically use in your environment.
Understanding Threading and Message Handlers¶
The first time a command causes an instance of the Client
or HAClient
to
connect to AMPS (typically, the logon() command), the client creates a thread
that runs in the background. This background thread is responsible for
processing incoming messages from AMPS, which includes both messages that
contain data and acknowledgments from the server.
When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish. For performance, the publish command does not wait for an acknowledgment from the server before returning.)
In the simple case, using synchronous message processing, the
client provides an internal handler function that populates the
MessageStream
. The client receive thread calls the internal
handler function, which makes a deep copy of the incoming message
and adds it to the MessageStream
. The MessageStream
is used
on the calling thread, so operations on the MessageStream
do not
block the client receive thread.
When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations.
The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.
For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue that is processed by a different thread will typically perform better than a message handler that does this update during the message handler.
While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler, because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed, and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread, or use a different client object to submit the commands.
The AMPS client resets and reuses the Message
provided to this
function between calls. This improves performance in the client, but
means that if your handler function needs to preserve information
contained within the message, you must copy the information (either
by making a copy of the entire message or copying the required
fields) rather than just saving the message object. Otherwise, the
AMPS client cannot guarantee the state of the object or the contents
of the object when your program goes to use it. Likewise, a
message handler should not modify the Message
– this will
result in modifying the message provided to other handlers (including
handlers internal to the AMPS client.).
Advanced Messaging Support¶
Method Client.subscribe()
provides options for subscribing to topics
even when you do not know their exact names. It also provides a filter
that works on the server to limit the messages your application must
process.
Regex Topics¶
Regular Expression (Regex) Topics allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.
To use a regular expression, simply supply the regular expression in
place of the topic name in the subscribe()
call. For example:
class MyApp
{
...
CommandId id = client.subscribe(
mp, // MessageHandler implementation
"client.*", // Topic with a Regex
5000); // Timeout
...
}
In this example, messages on topics “client” and “client1” would match the regular
expression, and those messages would all be sent to our mp
instance,
which as before is an implementation of the MessageHandler
interface. As in the example, you can use the getTopic()
method to
determine the actual topic of the message sent to the MessageHandler
function.
Content Filtering¶
One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server, so that your application and the network are not utilized by messages that are uninteresting for your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client.
To apply a content filter to a subscription, simply pass it into
the Client.subscribe()
call:
CommandId subscriptionId = client.subscribe(
mp, // MessageHandler implementation
"messages", // Topic
"/sender = 'mom'", // Content filter
5000); // Timeout
In this example, we have passed in a content filter /sender = 'mom'
. This
will cause the server to only send us messages from the messages topic that additionally
have a sender field equal to mom
.
For example, the AMPS server will send the following message, where /sender
is mom
:
{
"sender" : "mom",
"text" : "Happy Birthday!",
"reminder" : "Call me Thursday!"
}
The AMPS server will not send a message with a different /sender
value:
{
"sender" : "henry dave",
"text" : "Things do not change; we change."
}
Updating the Filter on a Subscribe¶
AMPS allows you to update the filter on a subscription. When you replace
a filter on the subscription, AMPS immediately begins sending only
messages that match the updated filter. Notice that if the subscription
was entered with a command that includes a SOW query, using the
replace
option can re-issue the SOW query (as described in the AMPS
User Guide).
To update a the filter on a subscription, you create a subscribe
command. You set the SubscriptionId
provided on the Command
to
the identifier of the existing subscription, and include the replace
option on the Command
. When you send the Command
, AMPS
atomically replaces the filter and sends messages that match the updated
filter from that point forward.
// Assumes client is connected and logged on to AMPS
// Enter subscription
Command subscribe_cmd = new Command("sow_and_subscribe")
.setTopic("orders-sow")
.setSubId("A42") // Used later for replace
.setFilter("/details/items/description LIKE 'puppy'");
MyHandler mh = new MyHandler();
client.executeAsync(subscribe_cmd, mh);
// ...
// replace filter elsewhere in the program
Command replace_cmd = new Command("sow_and_subscribe")
.setTopic("orders-sow")
.setSubId("A42") // A42 is the ID of the subscription to replace
.setFilter("/details/items/description LIKE 'kitten'")
.setOptions("replace");
client.executeAsync(replace_cmd, mh);
Next Steps¶
At this point, you are able to build AMPS programs in Java that publish and subscribe to AMPS topics. For an AMPS application to be truly robust, it needs to be able to handle the errors and disconnections that occur in any distributed system. In the next chapter, we will take a closer look at error handling and recovery, and how you can use it to make your application ready for the real world.