4. Subscriptions¶
Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent to only those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS Python client.
Subscribing to a Topic¶
The AMPS client makes it simple to subscribe to a topic. You call
client.subscribe()
with the topic to subscribe to and the parameters
for the subscription. The client submits the subscription to AMPS and
returns a MessageStream
that you can iterate over to receive the
messages from the subscription. Below is a short example:
from AMPS import Client
# Here, we create a Client object and connect to an AMPS server.
client = Client("test")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()
# Here we subscribe to the topic messages. We do not provide a filter, so AMPS
# does not content-filter the topic. Although we don't use the object explicitly
# here, the subscribe method returns a MessageStream object that we iterate over.
# If, at any time, we no longer need to subscribe, we can break out of the loop.
# When there are no more active references to the MessageStream, the client sends
# an unsubscribe command to AMPS and stops receiving messages.
for message in client.subscribe("messages"):
# Within the loop, we process the message. In this case, we simply print the
# contents of the message.
print(message.get_data())
Example 4.1: Basic AMPS subscription
AMPS creates a background thread that receives messages and copies them
into the MessageStream
that you iterate over. This means that the
client application as a whole can continue to receive messages while you
are doing processing work.
The simple method described above is provided for convenience. The AMPS Python client provides convenience methods for the most common form of AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:
from AMPS import Client
from AMPS import Command
# Here, we create a Client object and connect to an AMPS server.
client = Client("test")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()
# Here, we create a Command object for the subscribe command, specifying the topic
# messages. We do not provide a filter, so AMPS does not content-filter the
# subscription. Although we don't use the object explicitly here, the execute
# method returns a MessageStream object that we iterate over. If, at any time, we
# no longer need to subscribe, we can break out of the loop. When we break out of
# the loop, there are no more references to the MessageStream and the AMPS client
# sends an unsubscribe message to AMPS.
for message in client.execute(Command("subscribe").set_topic("messages")):
# Within the body of the loop, we can process the message as we need to. In this
# case, we simply print the contents of the message.
print(message.get_data())
Example 4.2: Subscribing to a topic using a command
The Command
interface allows you to precisely customize the commands
you send to AMPS. For flexibility and ease of maintenance, 60East
recommends using the Command
interface (rather than a named method)
for any command that will receive messages from AMPS. For publishing
messages, there can be a slight performance advantage to using the named
commands where possible.
Asynchronous Message Processing Interface¶
The AMPS Python client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the method call. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there may be a performance benefit to enqueuing work directly from the background thread. See Understanding Threading and Message Handlers for a discussion of threading considerations, including considerations for message handlers.
Below is a short example (error handling and connection details are omitted for brevity):
from AMPS import Client
from AMPS import Command
...
# Here, we create a Client object and connect to an AMPS server.
client = Client("exampleSubscriber")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()
def on_message_printer(message):
print(message.get_data())
# Here, we create a subscription with the following parameters:
subscriptionid = client.execute_async(
Command("subscribe").set_topic("messages"),
on_message_printer
)
# on_message_printer is a function that acts as our message handler. When a
# message is received, this function is invoked, and in this case, the get_data()
# method from message is printed to the screen. Message is of type AMPS.Message.
Example 4.3: Subscribing to a topic with asynchronous processing
Using an Instance Method as a Message Handler¶
One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It’s simple to provide a handler with this capability, as shown below:
# Define a class that saves state and
# handles messages.
class StatefulHandler:
# Initialize self with state to save
def __init__(self, name):
self.name_ = name
# Use state from this instance while handling
# the message.
def __call__(self, message):
print ("%s got %s" % (self.name_, message.get_data()))
Example 4.4: Providing a message handler
You can then provide an instance of the handler directly wherever a message handler is required, as shown below:
client.subscribe(StatefulHandler("An instance"), "topic")
Understanding Threading and Message Handlers¶
The first time a command causes an instance of the Client
or HAClient
to
connect to AMPS (typically, the logon() command), the client creates a thread
that runs in the background. This background thread is responsible for
processing incoming messages from AMPS, which includes both messages that
contain data and acknowledgments from the server.
When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish. For performance, the publish command does not wait for an acknowledgment from the server before returning.)
In the simple case, using synchronous message processing, the
client provides an internal handler function that populates the
MessageStream
. The client receive thread calls the internal
handler function, which makes a deep copy of the incoming message
and adds it to the MessageStream
. The MessageStream
is used
on the calling thread, so operations on the MessageStream
do not
block the client receive thread.
When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations:
- The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.
- For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue, that is processed by a different thread, will typically perform better than a message handler that does this update during the message handling.
- While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread or use a different client object to submit the commands.
- The AMPS client resets and reuses the
Message
provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information (either by making a copy of the entire message or copying the required fields) rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it. Likewise, a message handler should not modify theMessage
– this will result in modifying the message provided to other handlers (including handlers internal to the AMPS client).
Understanding Messages¶
So far, we have seen that subscribing to a topic involves working with
objects of AMPS.Message
type. A Message
represents a single
message to or from an AMPS server. Messages are received or sent for
every client/server operation in AMPS.
Header Properties¶
There are two parts of each message in AMPS: a set of headers that
provide metadata for the message, and the data that the message
contains. Every AMPS message has one or more header fields defined. The
precise headers present depend on the type and context of the message.
There are many possible fields in any given message, but only a few are
used for any given message. For each header field, the Message
class
contains a distinct property that allows for retrieval and setting of
that field. For example, the Message.get_command_id()
function
corresponds to the commandId
header field, the
Message.get_batch_size()
function corresponds to the BatchSize
header field, and so on. For more information on these header fields,
consult the AMPS User Guide and AMPS Command Reference.
To work with header fields, a Message
contains
get_xxx()
/set_xxx()
methods corresponding to the header fields,
as well as a number of getXXX()
/setXXX()
methods for
compatibility with previous implementations of the AMPS Python client.
60East does not recommend attempting to parse header fields from the raw
data of the message.
get_data() Method¶
Access to the data section of a message is provided via the
get_data()
method. The data
property will contain the unparsed
data of the message. Your application code parses and works with the
data.
The AMPS Python client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use the standard Python facilities or the library you typically use in your environment.
Unsubscribing¶
The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or until the connection to the client is closed.
With a MessageStream
, AMPS automatically unsubscribes to the topic
when there are no more references to the MessageStream
. You can also
call the close()
method on the MessageStream
object to remove
the subscription.
With asynchronous message processing, when a subscription is
successfully made, messages will begin flowing to the message handler
function and the subscribe()
or execute_async()
call returns a
string that serves as the identifier for this subscription. A Client
can
have any number of active subscriptions, and this subscription ID is how
AMPS designates messages intended for this subscription. To unsubscribe,
we simply call unsubscribe
with the subscription identifier, as shown
below:
client = Client("exampleClient")
# Register an asynchronous subscription
subId = client.execute_async(
Command("subscribe").set_topic("messages"),
on_message_printer
)
...
# when the program is done with the subscription, unsubscribe
client.unsubscribe(subId)
Example 4.5: Unsubscribing from a topic
In this example, we use the execute_async()
method to create a subscription
to the messages
topic. When our application is done listening to this
topic, it unsubscribes (on the last line) by passing in the subscription
identifier returned by the subscribe
command. After the subscription
is removed, no more messages will flow into our on_message_printer
function.
When an application calls unsubscribe()
, the client sends an
explicit unsubscribe
command to AMPS. The AMPS server removes that
subscription from the set of subscriptions for the client, and stops
sending messages for that subscription. On the client side, the client
unregisters the subscription so that the MessageStream
or
message handler for that subscription will no longer receive
messages for that subscription.
Notice that calling unsubscribe
does not destroy messages that
the server has already sent to the client. If there are messages on
the way to the client for this subscription, the AMPS client must
consume those messages. If a last_chance_message_handler
is registered,
the handler will receive the messages. Otherwise, they will be
discarded since no message handler matches the subscription ID on
the message.
Advanced Messaging Support¶
AMPS has two powerful features: it allows selective subscription to topics using pattern matching, and message content using content filtering. Pattern matching provides the ability to receive messages from multiple topics using a single pattern, while using a filter ensures that you only receive messages where the content of the message matches your filter, reducing traffic on the network.
Regex Subscriptions¶
Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.
To use a regular expression, simply supply the regular expression in
place of the topic name in the subscribe()
call. For example:
def message_handler(msg):
topic = msg.get_topic()
subscription_id = client.subscribe(
message_handler,
"orders.*"
)
Example 4.6: Regex topic subscription
In this example, messages on topics orders-north-america
and
orders-europe
would match the regular expression, and those messages
would all be sent to our message_handler
function. As in the
example, you can use the get_topic()
method to determine the actual
topic of the message sent to the function.
Content Filtering¶
One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server so that your application and the network are not utilized by messages that are not relevent to your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client.
To apply a content filter to a subscription, simply pass it into the
client.subscribe()
call or use the set_filter
method to add a
filter to the Command
:
for message in client.subscribe(
"letters",
"/sender='mom'",
timeout=5000
):
print("Mom says: %s" % message.get_data())
Example 4.7: Using content filtering
In this example, we have passed in a content filter /sender = 'mom'
. This will
result in the server only sending us messages, from the messages
topic, that have
the sender field equal to mom
in the message.
For example, the AMPS server will send the following message, where /sender
is mom
:
{
"sender" : "mom",
"text" : "Happy Birthday!",
"reminder" : "Call me Thursday!"
}
The AMPS server will not send a message with a different /sender
value:
{
"sender" : "henry dave",
"text" : "Things do not change; we change."
}
Updating the Filter on a Subscribe¶
AMPS allows you to update the filter on a subscription. When you replace
a filter on the subscription, AMPS immediately begins sending only
messages that match the updated filter. Notice that if the subscription
was entered with a command that includes a SOW query, using the
replace
option can re-issue the SOW query (as described in the AMPS
User Guide).
To update the filter on a subscription, you create a subscribe
command. You set the SubscriptionId
provided on the Command
to
the identifier of the existing subscription and include the replace
option on the Command
. When you send the Command
, AMPS
atomically replaces the filter and sends messages that match the updated
filter from that point forward.
sub_cmd = AMPS.Command("sow_and_subscribe") \
.set_topic("orders-sow") \
.set_sub_id("A42") \
.set_filter("/details/items/description LIKE 'puppy'")
client.execute_async(sub_cmd, MyHandler)
# Elsewhere in the program...
replace_cmd = AMPS.Command("sow_and_subscribe") \
.set_topic("orders-sow") \
.set_sub_id("A42") \
.set_filter("/details/items/description LIKE 'kitten'") \
.set_options("replace")
client.execute_async(replace_cmd, MyHandler)
Next Steps¶
At this point, you are able to build AMPS programs in Python that publish and subscribe to AMPS topics. For an AMPS application to be truly robust, it needs to be able to handle the errors and disconnections that occur in any distributed system. In the next chapter, we will take a closer look at error handling and recovery and how you can use it to make your application ready for the real world.