6. State of the World

AMPS State of the World (SOW) allows you to automatically keep and query the latest information about a topic on the AMPS server, without building a separate database. Using SOW lets you build impressively high-performance applications that provide rich experiences to users. The AMPS Python client lets you query SOW topics and subscribe to changes with ease.

Performing SOW Queries

To begin, we will look at a simple example of issuing a SOW query.

for message in client.sow("orders", "/symbol='ROL'"):
    if message.get_command() == Message.Command.GroupBegin:
        print "--- Begin SOW Results ---"
    if message.get_command() == Message.Command.SOW:
        print message.get_data()
    if message.get_command() == Message.Command.GroupEnd:
        print "--- End SOW Results ---"

Example 6.1: Basic SOW query

In listing Example 6.1 the execute_sow_query() function invokes Client.sow() to initiate a SOW query on the orders topic, for all entries that have a symbol of 'ROL'.

As the query executes, messages containing the data of matching entries have a Command of value sow or SOW, so as those arrive, we write them to the console.

As with subscribe, the sow command also provides an asynchronous mode, where you provide a message handler.

def on_message_handler(message):
    if message.get_command() == Message.Command.GroupBegin:
        print "--- Begin SOW Results ---"
    if message.get_command() == Message.Command.SOW:
        print message.get_data()
    if message.get_command() == Message.Command.GroupEnd:
        print "--- End SOW Results ---"

def execute_sow_query(client):
    return client.execute_async(               \
                Command("sow")                 \
                  .set_topic("orders")         \
                  .set_filter("/symbol='ROL'"),\
               on_message_handler)

Example 6.2: Asynchronous SOW query

In listing Example 6.2 the execute_sow_query() function invokes Client.execute_async() to initiate a SOW query on the orders topic, for all entries that have a symbol of ‘ROL’.

As the query executes, the on_message_handler() function is invoked for each matching entry in the topic. Messages containing the data of matching entries have a Command of value sow, so as those arrive, we write them to the console.

SOW and Subscribe

Imagine an application that displays real time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting message to an van location topic, configured with a key of van_id on the AMPS server.

In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sow_and_subscribe() method. As with the other methods for receiving messages, the AMPS Python client provides a basic, synchronous form of sow_and subscribe that provides you with a MessageStream to iterate over, and an asynchronous form that requires a message handler.

First, let’s look at an example that uses the basic form of sow_and_subscribe:

def report_van_position(client):
    # sow_and_subscribe command to begin receiving information about all of the active
    # delivery vans in the system. All of the vans in the system now are returned as
    # Message objects whose get_command returns sow. New messages coming in are
    # returned as Message objects whose get_command returns publish.
    for message in client.execute(Command("sow_and_subscribe")\
            .set_topic("van_location")\
            .set_filter("/status = 'ACTIVE'")\
            .set_batch_size(100)\
            .set_options("oof")):
            # Notice here that we specified the oof option to the command. Setting this option
            # causes AMPS to send Out-of-Focus ("OOF") messages for
            # topic. OOF messages are sent when an entry that was sent to us in the past no
            # longer matches our query. This happens when an entry is removed from the SOW
            # cache via a sow_delete operation, when the entry expires (as specified by the
            # expiration time on the message or by the configuration of that topic on the AMPS
            # server), or when the entry no longer matches the content filter specified. In
            # our case, if a van’s status changes to something other than ACTIVE, it no longer
            # matches the content filter, and becomes out of focus. When this occurs, a
            # Message is sent with Command set to oof. We use OOF messages to remove vans from
            # the display as they become inactive, expire, or are deleted.
      if (message.get_command() == Message.Command.SOW or
          message.get_command() == Message.Command.Publish):

          # For each of these messages we call add_or_update_van(), that presumably adds the
          # van to our application’s display. As vans send updates to the AMPS server, those
          # are also received by the client because of the subscription placed by
          # sow_and_subscribe(). Our application does not need to distinguish between
          # updates and the original set of vans we found via the SOW query, so we use
          # add_or_update_van() to display the new position of vans as well.
          add_or_update_van(message)
      elif message.get_command() == Message.Command.OOF:
          remove_van(message)

Example 6.3: Using sow_and_subscribe

Now we will look at an example that uses the asynchronous form of sow_and_subscribe:

def update_van_position(message):
    if (message.get_command() == Message.Command.SOW or
            message == Message.Command.Publish):
       add_or_update_van(message)
    elif message.get_command() == Message.Command.OOF:
        remove_van(message)


def subscribe_to_van_location(client):
    client.execute_async(
           Command("sow_and_subscribe")          \
             .set_topic("van_location")          \
             .set_filter("/status = 'ACTIVE'")   \
             .set_batch_size(100)                \
             .set_options("oof"),                \
       update_van_position)

Example 6.4: Using sow_and_subscribe

Notice that the two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens, while the other form processes messages on the main thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the calls to add_or_update_van and remove_van receive the same data

Setting Batch Size

The AMPS clients include a batch size parameter that specifies how many messages the AMPS server will return to the client in a single batch when returning the results of a SOW query. The 60East clients set a batch size of 10 by default. This batch size works well for common message sizes and network configurations.

Adjusting the batch size may produce better network utilization and produce better performance overall for the application. The larger the batch size, the more messages AMPS will send to the network layer at a time. This can result in fewer packets being sent, and therefore less overhead in the network layer. The effect on performance is generally most noticeable for small messages, where setting a larger batch size will allow several messages to fit into a single packet. For larger messages, a batch size may still improve performance, but the improvement is less noticeable.

In general, 60East recommends setting a batch size that is large enough to produce few partially-filled packets. Bear in mind that AMPS holds the messages in memory while batching them, and the client must also hold the messages in memory while receiving the messages. Using batch sizes that require large amounts of memory for these operations can reduce overall application performance, even if network utilization is good.

For smaller message sizes, 60East recommends using the default batch size, and experimenting with tuning the batch size if performance improvements are necessary. For relatively large messages (especially messages with sizes over 1MB), 60East recommends explicitly setting a batch size of 1 as an initial value, and increasing the batch size only if performance testing with a larger batch size shows improved network utilization or faster overall performance.

Client-Side Conflation

In many cases, applications that use SOW topics only need the current value of a message at the time the message is processed, rather than processing each change that lead to the current value. On the server side, AMPS provides conflated topics to meet this need. Conflated topics are described in more detail in the AMPS User Guide, and require no special handling on the client side.

In some cases, though, it’s important to conflate messages on the client side. This can be particularly useful for applications that do expensive processing on each message, applications that are more efficient when processing batches of messages, or for situations where you cannot provide an appropriate conflation interval for the server to use.

A MessageStream has the ability to conflate messages received for a subscription to a SOW topic, view, or conflated topic. When conflation is enabled, for each message received, the client checks to see whether it has already received an unprocessed message with the same SowKey. If so, the client replaces the unprocessed message with the new message. The application never receives the message that has been replaced.

To enable client-side conflation, you call conflate() on the MessageStream, and then use the MessageStream as usual:

# Query and subscribe
results = ampsClient.sow_and_subscribe("orders", "/symbol == 'ROL'")

# Turn on conflation
results.conflate()

# Process the results
for message in results:
    # process message here

When a MessageStream is used for a subscription that does not include SowKeys (such as a subscription to a topic that does not have a SOW), the MessageStream will allow you to turn on conflation, but no conflation will occur.

When using client-side conflation with delta subscriptions, bear in mind that client-side conflation replaces the whole message, and does not attempt to merge deltas. This means that updates can be lost when messages are replaced. For some applications (for example, a ticker application that simply sends delta updates that replace the current price), this causes no problems. For other applications (for example, when several processors may be updating different fields of a message simultaneously), using conflation with deltas could result in lost data, and server-side conflation is a safer alternative.

Managing SOW Contents

AMPS allows applications to manage the contents of the SOW by explicitly deleting messages that are no longer relevant. For example, if a particular delivery van is retired from service, the application can remove the record for the van by deleting the record for the van.

The client provides the following methods for deleting records from the SOW:

  • sow_delete accepts a topic and filter, and deletes all messages that match the filter from the topic specified
  • sow_delete_by_keys accepts a set of SOW keys as a comma-delimited string and deletes messages for those keys, regardless of the contents of the messages. SOW keys are provided in the header of a SOW message, and are the internal identifier AMPS uses for that SOW message.
  • sow_delete_by_data accepts a topic and message, and deletes the SOW record that would be updated by that message

The most efficient way to remove messages from the SOW is to use sow_delete_by_keys or sow_delete_by_data, since those options allow AMPS to exactly target the message or messages to be removed. Many applications use sow_delete, since this is the most flexible method for removing items from the SOW when the application does not have information on the exact messages to be removed.

In either case, AMPS sends an OOF message to all subscribers who have received updates for the messages removed, as described in the previous section.

The simple form of the sow_delete command returns a Message. This Message is an acknowledgment that contains information on the delete command. For example, the following snippet simply prints informational text with the number of messages deleted:

msg = client.sow_delete(
    "sow_topic",
    "/id IN (42, 64, 37)"
)

print "Got an %s message containing %s: deleted %s SOW entries" % \
      (msg.get_command(), msg.get_ack_type(), msg.get_matches())

The sow_delete command also provides an asynchronous version that requires a message handler. This message handler is designed to receive sow_delete response messages from AMPS:

def delete_handler(m):
    print "Got an %s message containing %s: deleted %s SOW entries" % \
        (m.get_command(), m.get_ack_type(), m.get_matches())

        client.execute_async(
            Command("sow_delete")     \
            .set_topic("sow_topic") \
            .set_filter("/id IN (42, 64, 37)"), \
            delete_handler
        )

Acknowledging messages from a queue uses a form of the sow_delete command that is only supported for queues. Acknowledgment is discussed in the chapter on queues.