9. High Availability

The AMPS Python Client provides an easy way to create highly-available applications using AMPS, via the HAClient class. HAClient derives from Client and offers the same methods, but also adds protection against network, server, and client outages.

Using HAClient allows applications to automatically:

  • Recover from temporary disconnects between client and server.
  • Failover from one server to another when a server becomes unavailable.

Because the HAClient automatically manages failover and reconnection, 60East recommends using the HAClient for applications that need to:

  • Automatically reconnect and resume work in the case of disconnection.
  • Ensure no messages are lost or duplicated after a reconnect or failover.
  • Persist messages and bookmarks on disk for protection against client failure.

You can choose how your application uses HAClient features. For example, you might need automatic reconnection, but have no need to resume subscriptions or republish messages. The high availability behavior in HAClient is provided by implementations of defined interfaces. You can combine different implementations provided by 60East to meet your needs, and implement those interfaces to provide your own policies.

Some of these features require specific configuration settings on your AMPS instance(s). This chapter mentions these features and describes how to use them from the AMPS Java client. You can find full documentation for these settings and server features in the User Guide.

Overview of HAClient

HAClient derives from Client and offers the same methods for sending commands to AMPS and receiving messages from AMPS.

The HAClient differs from the Client in two ways:

  • HAClient automatically installs a disconnect handler that reconnects to AMPS and resumes active (asynchronous) subscriptions. The disconnect handler optionally replays publish and sow_delete messages that have not been acknowledged by AMPS, using a PublishStore. The disconnect handler can optionally resume replays from the transaction log at a point that guarantees no messages are skipped and no duplicates are delivered to the application, using a BookmarkStore.
  • The HAClient includes the infrastructure needed for client failover, including a list of connection strings and their associated authentication mechanisms (provided by the ServerChooser), and options for controlling backoff behavior for reconnects (provided by the DelayStrategy). As a result, the HAClient provides a connect_and_logon() function for establishing a connection to AMPS, rather than treating these as independent steps that an application must manage itself.

If your application needs to automatically reconnect to AMPS, 60East recommends using the HAClient and the automatically provided disconnect handler rather than using a Client or replacing the HAClient default disconnect handler.

Reconnection with HAClient

The most important difference between Client and HAClient is that HAClient automatically provides a reconnect handler.

This description provides a high-level framework for understanding the components involved in failover with the HAClient. The components are described in more detail in the following sections.

The HAClient reconnect handler performs the following steps when reconnecting:

  • Calls the ServerChooser to determine the next URI to connect to and the authenticator to use for that connection.

    If the connection fails, calls get_error on the ServerChooser to get a description of the failure, sends an exception to the exception listener, and stops the reconnection process.

  • Calls the DelayStrategy to determine how long to wait before attempting to reconnect, and waits for that period of time.

  • Connects to the AMPS server. If the connection fails, calls report_failure on the ServerChooser and begins the process again.

  • Logs on to the AMPS server. If the connection fails, calls report_failure on the ServerChooser and begins the process again.

  • Calls report_success on the ServerChooser.

  • Receives the bookmark for the last message that the server has persisted. Discards any older messages from the PublishStore.

  • Republishes any messages in the PublishStore that have not been persisted by the server.

  • Re-establishes subscriptions using the SubscriptionManager for the client. For bookmark subscriptions, the reconnect handler uses the BookmarkStore for the client to determine the most recent bookmark, and resubscribes with that bookmark. For subscriptions that do not use a bookmark, the SubscriptionManager simply re-enters the subscription, meaning that it is entered at the point at which the HAClient reconnects.

The ServerChooser, DelayStrategy, PublishStore, SubscriptionManager, and BookmarkStore are all extension points for the HAClient. You can adapt the failover and recovery behavior by setting a different object for the behavior you want to customize on the HAClient or by providing your own implementation.

For example, the convenience methods in the previous section customize the behavior of the PublishStore and BookmarkStore by providing either memory-backed or file-backed stores.

The reconnection process runs on the thread that discovers the disconnection. This means that, in the event that an application thread discovers the disconnection as a result of a call to the Python AMPS client, that call may not return until a connection is re-established (or until the server chooser indicates failure, in which case the application will receive an exception).

The Python client includes a retry_on_disconnect setting that controls this retry behavior when the client is disconnected. When set to True (the default), any call to the Client that results in a command being sent to AMPS may block until a connection is re-established. When set to False, the HAClient will retry the connection a single time and throw an exception if the connection cannot be re-established.

Regardless of the retry_on_disconnect setting, a call to publish will result in the message being stored in the PublishStore for the client if one is set.

Choosing Store Durability

If your applicaiton needs reliable publish to AMPS, install a PublishStore in the HAClient. If your application needs to resume replays from the transation log, install a BookmarkStore in the HAClient.

These stores provide the following capabilities:

  • A bookmark store tracks received messages, and is used to resume subscriptions that replay from the transaction log.
  • A publish store tracks published messages, and is used to ensure that messages are persisted in AMPS.

The AMPS client provides a memory-backed version of each store and a file-backed version of each store. The store interface is public, and an application can create and provide a custom store as necessary. An HAClient can use either a memory backed store or a file backed store for protection. Each method provides resilience to different failures:

  • Memory-backed stores provide recovery disconnection from AMPS by storing messages and bookmarks in your process’ address space. This is the highest performance option for working with AMPS in a highly available manner. The trade-off with this method is there is no protection from a crash or failure of your client application. If your application is terminated prematurely or, if the application terminates at the same time as an AMPS instance failure or network outage, then messages may be lost or duplicated. The state of bookmark replays will be lost when the application shuts down. Messages in the publish store when the application shuts down will not be maintained through a restart, so the application will not be able to attempt any necessary redelivery when the application restarts.

  • File-backed stores provide recovery after client failure and disconnection from AMPS by storing messages and bookmarks on disk. To use this protection method, the create_file_backed method requests additional arguments for the two files that will be used for both bookmark storage and message storage. If these files exist and are non-empty (as they would be after a client application is restarted), the HAClient loads their contents and ensures synchronization with the AMPS server once connected. The performance of this option depends heavily on the speed of the device on which these files are placed. When the files do not exist (as they would the first time a client starts on a given system), the HAClient creates and initializes the files, and in this case the client does not have a point at which to resume the subscription or messages to republish.

    A store file should only be used by one instance of a client at a time.

    When using file-backed stores, 60East recommends periodically removing unneeded entries by calling the prune() method. The precise strategy that your application uses to call prune() depends on the nature of the application. Most applications call prune() when the application exits.

    Most applications call prune() when the application exits.

    There are two basic strategies that applications follow while the application runs:

    • Install a resize handler and call prune() after a specified number of resize operations, or when the store reaches a specific size.
    • Call prune() after a specific number of messages are processed (for example, every 10,000 messages received or every 1,000 updates completed).

    Regardless of the strategy, it is best to call prune() when the application is idle, since the prune() call rewrites the log file.

The store interface is public, and an application can create and provide a custom store as necessary. While clients provide convenience methods for creating file-backed and memory-backed HAClient objects with the appropriate stores, you can also create and set the stores in your application code. For the AMPS Python Client, stores are implemented in C++. You can implement stores using C++, and use the technique described in Chapter 12 Using the C++ client, to set the store on the client.

Starting in 5.3.2.0, the underlying AMPS client contains a recovery point adapter interface to make it easier to add a custom persistence layer to a cookmark store. The distribution includes a recovery point adapter that can store bookmark recovery information in an AMPS state of the world topic.

The HAClient provides convenience methods for creating clients and setting stores. You can also construct an HAClient and set the store implementations you choose.

In this example, we create several clients. The first client uses memory stores for both bookmarks and publishes. The second client uses files for both bookmarks and publishes. The third client uses a file for bookmarks. The third client does not set a store for publishes, which means that AMPS provides the default store (and no outgoing messages are stored). The final client does not specify any stores, and so has no persistence for published messages or bookmark subscriptions, but can take advantage of the automatic failover and reconnection in the HAClient.

# Memory publish store, memory bookmark store
memoryClient = AMPS.HAClient("lessImportantMessages")

# File-backed publish store, file-backed bookmark store
diskClient = AMPS.HAClient("moreImportantMessages",
          "/mnt/fastDisk/moreImportantMessages.outgoing",
          "/mnt/fastDisk/moreImportantMessages.incoming")

# No-op publish store, file-backed bookmark store
subscriberClient = AMPS.HAClient("subscriber", no_store=True)
subscriberClient.set_bookmark_store(     \
            AMPS.MMapBookmarkStore("/mnt/fastdisk/bookmark.store"))

# No-op publish store, no-op bookmark store
# Failover behavior only.
streamReader = AMPS.HAClient("streamReader",no_store=True)

Example 9.1: HAClient creation example

Using the SOW Recovery Point Adapter

The AMPS client also includes the ability to use a SOW topic to store bookmark state for a bookmark store. This can be a useful option in a situation where an application needs a persistent bookmark store, but does not have the ability to store a file on the filesystem, or where an application has a bookmark file, but wants to have the ability to resume the subscription if the file is lost or damaged, or if the application is started on a system that does not have access to the file.

To use the SOW topic recovery point adapter, you create a bookmark store of the type you would like to use for the Client, passing an adapter when you construct the store. You then set this bookmark store as the store for the Client to use. The constructor for the SOW recovery adapter allows you to customize the topic name and field names used to store the recovery point information in AMPS. As with the RecoveryPointAdapter interface in general, it is possible to customize the behavior of the SOW recovery point adapater by overriding the provided methods.

This section describes how to use the adapter with the default settings. Should you need to change the behavior of the class, you would adjust the guidance in this section accordingly. (For example, if you override methods to produce a message with a different set of keys or a different message format, you would update the topic definition accordingly).

AMPS Topic Configuration

To store recovery point state in AMPS, the AMPS instance that will store the recovery point state must define a SOW/Topic to hold the recovery point data.

By default, the adapter uses a topic named /ADMIN/bookmark_store of json message type, with the /clientName and /subId fields as keys, similar to the following definition.

<Topic>
   <Name>/ADMIN/bookmark_store</Name>
   <MessageType>json</MessageType>
   <Key>/clientName</Key>
   <Key>/subId</Key>
   <!-- Storage/persistence configuration here.
        In most cases, this topic should be
        persisted to a file, but that is not
        a requirement.  -->
</Topic>

You must include this defintion, or an equivalent definition, in the configuration file for the AMPS instance that will host the recovery point.

If you define a topic with a different configuration (for example, different key names, a different topic name, or a different message type), you must ensure that the adapter that you create uses the same parameters as those configured on the server.

Constructing a Client For the Adapter

The AMPS SOW Recovery Point Adapter requires a Client or HAClient connected to the instance that contains the SOW topic. The Adapter will use this client to recover bookmark state and store bookmarks in AMPS. Notice that this client must not be client that the Adapter is keeping state for. This must be a completely separate client instance, otherwise the client may deadlock while updating the store.

The client must be connected and logged in to the instance that contains the State of the World topic, using the message type defined for the topic.

Capacity Planning and Store Sizing

When an application uses a file-backed store, it is important to make sure that there is enough space available on the file system to be able to manage the store.

For logged bookmark stores, an application needs to keep a bookmark record for each message received, each message discarded, and the persisted acknowledgments delivered by the server approximately once a second. Each bookmark entry consumes roughly 70 bytes of storage plus the length of the subscription ID for the subscription receiving the message. The logged bookmark store retains entries until an application explicitly calls prune(). The capacity needed for a logged bookmark store will depend on the strategy that the application uses for pruning the file.

For a file-backed publish store, the application needs to be able to store published messages until the AMPS server that the publisher is connected to acknowledges those messages as persisted. The volume of messages that needs to be stored depends on the failover policy for the server – that is, the maximum amount of time that the server will allow a downstream instance to fail to acknowledge a message before the server downgrades that connection to async acknowledgment. By default, AMPS does not downgrade connections: this policy must be set explicitly using the AMPS actions. As an example, if the server is configured to downgrade connections that are more than 120 seconds behind, then – for disaster recovery – the application must be have the capacity to store 120 seconds of published messages at peak publishing load. However, unlike the logged bookmark store, a file-backed publish store removes messages from the store and reuses the space once AMPS has acknowledged the message.

Connections and the ServerChooser

Unlike Client, the HAClient attempts to keep itself connected to an AMPS instance at all times, by automatically reconnecting or failing over when it detects that the client is disconnected. When you are using the Client directly, your disconnect handler usually takes care of reconnection. HAClient, on the other hand, provides a disconnect handler that automatically reconnects to the current server or to the next available server.

To inform the HAClient of the addresses of the AMPS instances in your system, you pass a ServerChooser instance to the HAClient. ServerChooser acts as a smart enumerator over the servers available: HAClient calls ServerChooser methods to inquire about what server should be connected, and calls methods to indicate whether a given server succeeded or failed.

The AMPS Python Client provides a simple implementation of ServerChooser,called DefaultServerChooser, that provides very simple logic for reconnecting. This server chooser is most suitable for basic testing, or in cases where an application should simply rotate through a list of servers. For most applications, you implement the ServerChooser interface yourself for more advanced logic, such as choosing a backup server based on your network topology, or limiting the number of times your application should try to reconnect to a given address.

To connect to AMPS, you provide a ServerChooser to HAClient and then call connect_and_logon() to create the first connection:

memoryClient = AMPS.HAClient("myClient")

# primary.amps.xyz.com is the primary AMPS instance, and
# secondary.amps.xyz.com is the secondary
chooser = AMPS.DefaultServerChooser()
chooser.add("tcp://primary.amps.xyz.com:12345/fix")
chooser.add("tcp://secondary.amps.xyz.com:12345/fix")
memoryClient.set_server_chooser(chooser)
memoryClient.connect_and_logon()
...
myClient.disconnect()

Example 9.2: Multiple HAClient creation example

Similar to Client, HAClient remains connected to the server until disconnect() is called. Unlike Client, HAClient automatically attempts to reconnect to your server if it detects a disconnect and, if that server cannot be connected, fails over to the next server provided by the ServerChooser. In this example, the call to connectAndLogon() attempts to connect and login to primary.amps.xyz.com, and returns if that is successful. If it cannot connect, it tries secondary.amps.xyz.com, and continues trying servers from the ServerChooser until a connection is established. Likewise, if it detects a disconnection while the client is in use, then HAClient attempts to reconnect to the server with which it was most recently connected; if that is not possible, then it moves on to the next server provided by the ServerChooser.

The default ServerChooser simply provides the next URL in the sequence. This strategy works for many applications. If you need a different strategy, you can implement your own logic for failover by creating a class derived from ServerChooser.

Setting a Reconnect Delay and Timeout

You can control the amount of time between reconnection attempts and set a total amount of time for the HAClient to attempt to reconnect.

The AMPS Python client includes a method for setting a delay strategy on a client, set_reconnect_delay_strategy. This method accepts an instance of any type that provides the methods get_connect_wait_duration and reset, as described in the API documentation.

While you can easily implement your own delay strategy, the client also provides two delay strategies:

  • FixedDelayStrategy provides the same delay each time the HAClient tries to reconnect.
  • ExponentialDelayStrategy provides an exponential backoff until a connection attempt succeeds.

To use either of these classes, you simply create an instance, set the appropriate parameters, and install that instance as the delay strategy for the HAClient. For example, the following code sets up a reconnect delay that starts at 200ms and increases the delay by 1.5 times after each failure. The strategy allows a maximum delay between connection attempts of 5 seconds, and will not retry longer than 60 seconds.

theClient = AMPS.HAClient("myClient")

theClient.set_reconnect_delay_strategy(      \
   AMPS.ExponentialDelayStrategy(            \
                   initial_delay=200,        \
                   maximum_delay=5000,       \
                   backoff_exponent=1.5,     \
                   maximum_retry_time=60000) \
                 )

Implementing a Server Chooser

As described above, you provide the HAClient with connection strings to one or more AMPS servers using a ServerChooser. The purpose of a ServerChooser is to provide information to the HAClient. A ServerChooser does not manage the reconnection process, and should not call methods on the HAClient.

A ServerChooser has two required responsibilities to the HAClient:

  • Tells the HAClient the connection string for the server to connect to. If there are no servers, or the ServerChooser wants the connection to fail, the ServerChooser returns an empty string.

    To provide this information, the ServerChooser implements the get_current_uri() method.

  • Provides an Authenticator for the current connection string. This is especially important for installations where different servers require different credentials or authentication tokens must be reset after each connection attempt.

    To provide the authenticator, the ServerChooser implements the get_current_authenticator() method.

The HAClient calls the get_current_uri() and get_current_authenticator() methods each time it needs to make a connection.

Each time a connection succeeds, the HAClient calls the report_success() method of the ServerChooser. Each time a connection fails, the HAClient calls the report_failure() method of the ServerChooser. The HAClient does not require the ServerChooser to take any particular action when it calls these methods. These methods are provided for the HAClient to do internal maintenance, logging, or record keeping. For example, an HAClient might keep a list of available URIs with a current failure count, and skip over URIs that have failed more than 5 consecutive times until all URIs in the list have failed more than 5 consecutive times.

When the ServerChooser returns an empty string from get_current_uri(), indicating that no servers are available for connection, the HAClient calls the get_error() method on the ServerChooser, if one is provided, and includes the string returned by get_error() in the generated exception.

Heartbeats and Failure Detection

Use of the HAClient allows your application to quickly recover from detected connection failures. By default, connection failure detection occurs when AMPS receives an operating system error on the connection. This system may result in unpredictable delays in detecting a connection failure on the client, particularly when failures in network routing hardware occur, and the client primarily acts as a subscriber.

The heartbeat feature of the AMPS client allows connection failure to be detected quickly. Heartbeats ensure that regular messages are sent between the AMPS client and server on a predictable schedule. The AMPS client and server both assume disconnection has occurred if these regular heartbeats cease, ensuring disconnection is detected in a timely manner. To use heartbeating, call the set_heartbeat method on Client or HAClient:

memoryClient = AMPS.HAClient("importantStuff")
...
memoryClient.set_heartbeat(3)
memoryClient.connect_and_logon()
...

Example 9.3: Heartbeat example

Method set_heartbeat takes one parameter: the heartbeat interval. The heartbeat interval specifies the periodicity of heartbeat messages sent by the server: the value 3 indicates messages are sent on a three-second interval. If the client receives no messages in a six-second window (two heartbeat intervals), the connection is assumed to be dead, and the HAClient attempts reconnection. An additional variant of set_heartbeat allows the idle period to be set to a value other than two heartbeat intervals. (The server, however, will always consider a connection to be closed after two heartbeat intervals without any traffic.)

Notice that, for HAClient, setHeartbeat must be called before the client is connected. For Client, setHeartbeat must be called after the client is connected.

Caution

Heartbeats are serviced on the receive thread created by the AMPS client. Your application must not block the receive thread for longer than the heartbeat interval, or the application is subject to being disconnected.

Considerations for Publishers

Publishing with an HAClient is nearly identical to regular publishing; you simply call the publish() method with your message’s topic and data. The AMPS client sends the message to AMPS, and then returns from the publish() call. For maximum performance, the client does not wait for the AMPS server to acknowledge that the message has been received.

When an HAClient sets a publish store, the publish store retains a copy of each outgoing message and requests that AMPS acknowledge that the message has been persisted. The AMPS server acknowledges messages back to the publisher. Acknowledgments can be delivered for multiple messages at periodic intervals (for topics recorded in the transaction log) or after each message (for topics that are not recorded in the transaction log). When an acknowledgment for a message is received, the HAClient removes that message from the bookmark store. When a connection to a server is made, the HAClientautomatically determines which messages from the publish store (if any) the server has not processed, and replays those messages to the server once the connection is established.

For reliable publishers, the application must choose how best to handle application shutdown. For example, it is possible for the network to fail immediately after the publisher sends the message, while the message is still in transit. In this case, the publisher has sent the message, but the server has not processed it and acknowledged it. During normal operation, the HAClient will automatically connect and retry the message. On shutdown, however, the application must decide whether to wait for messages to be acknowledged, or whether to exit.

Publish store implementations provide an unpersisted_count() method that reports the number of messages that have not yet been acknowledged by the AMPS server. When the unpersisted_count() reaches 0, there are no unpersisted messages in the local publish store.

For the highest level of safety, an application can wait until the unpersisted_count() reaches 0, which indicates that all of the messages have been persisted to the instance that the application is connected to, and the synchronous replication destinations configured for that instance. When a synchronous replication destination goes offline, this approach will cause the publisher to wait to exit until the destination comes back online or until the destination is downgraded to asynchronous replication.

For applications that are shut down periodically for short periods of time (for example, applications that are only offline during a weekly maintenance window), another approach is to use the publish_flush() method to ensure that messages are delivered to AMPS, and then rely on the connection logic to replay messages as necessary when the application restarts.

For example, the following code flushes messages to AMPS, then warns if not all messages have been acknowledged:

client = AMPS.HAClient("ha-publisher",
         "/mnt/fastDisk/moreImportantMessages.outgoing",
         "/mnt/fastDisk/moreImportantMessages.incoming")
...
client.connect_and_logon()

# Publish messages
...


# We think we are done, but the server may not
# have received or acknowledged all messages yet.

# Wait for the server to have received all messages.
# The program could also specify a timeout in this
# command to avoid blocking forever if the network
# is down or all servers are offline.

client.publish_flush()

# Print warning to the console if messages have
# been published but not yet acknowledged as
# persisted

if (client.get_unpersisted_count() > 0):
    print "all messages have been published, " \
        + " but not all have been persisted"

client.disconnect()

Example 9.4: HAPublisher

In this example, the client sends each message immediately when publish() is called. If AMPS becomes unavailable between the final publish() and the disconnect(), or one of the servers that the AMPS instance replicates to is offline, the client may not have received a persisted acknowledgment for all of the published messages. For example, if a message has not yet been persisted by all of the servers in the replication fabric that are connected with synchronous replication, AMPS will not have acknowledged the message.

Before shutting down the client, the code does two This code first flushes messages to the server to ensure that all messages have been delivered to AMPS.

The code next checks to see if all of the messages in the publish store have been acknowledged as persisted by AMPS. If the messages have not been acknowledged, they will remain in the publish store file and will be published to AMPS, if necessary, the next time the application connects. An application may choose to loop until get_unpersisted_count() returns 0, or (as we do in this case) simply warn that AMPS has not confirmed that the messages are fully persisted. The behavior you choose in your application should be consistent with the high-availability guarantees your application needs to provide.

Caution

AMPS uses the name of the HAClient to determine the origin of messages. For the AMPS server to correctly identify duplicate messages, each instance of an application that publishes messages must use a distinct name. That name must be consistent across different runs of the application.

If your application crashes or is terminated, some published messages may not have been persisted in the AMPS server. If you use the file-based store—in other words, if you provide file names for persistent storage when you create the HAClient—the HAClient will recover the messages, and once logged on, will correlate the message store to what the AMPS server has received, re-publishing any missing messages. This occurs automatically when HAClient connects, without any explicit consideration in your code, other than ensuring that the same file name is used to create the HAClient if recovery is desired.

Warning

AMPS provides persisted acknowledgment messages for topics that do not have a transaction log enabled. However, the level of durability provided for topics with no transaction log is minimal. Learn more about transaction logs in the User Guide.

Considerations for Subscribers

HAClient provides two important features for applications that subscribe to one or more topics: re-subscription, and a bookmark store to track the correct point at which to resume a bookmark subscription.

Resubscription With Asynchronous Message Processing

Any asynchronous subscription placed using an HAClient is automatically reinstated after a disconnect or a failover. These subscriptions are placed in an in-memory SubscriptionManager, which is created automatically when the HAClient is instantiated. Most applications will use this built-in subscription manager, but for applications that create a varying number of subscriptions, you may wish to implement SubscriptionManager to store subscriptions in a more durable place. Note that these subscriptions contain no message data, but rather simply contain the parameters of the subscription itself (for instance, the command, topic, message handler, options, and filter).

When a re-subscription occurs, the AMPS Python Client re-executes the command as originally submitted, including the original topic, options, and so on. AMPS sends the subscriber any messages for the specified topic (or topic expression) that are published after the subscription is placed. For a sow_and_subscribe command, this means that the client reissues the full command, including the SOW query as well as the subscription.

Resubscription With Synchronous Message Processing

The HAClient (starting with the AMPS Python Client version 4.3.1.1) does not track synchronous message processing subscriptions in the SubscriptionManager. The reason for this is to preserve the iterator semantics. That is, once the MessageStream indicates that there are no more elements in the stream, it does not suddenly produce more elements.

To resubscribe when the HAClient fails over, you can simply reissue the subscription. For example, the snippet below re-issues the subscribe command when the message stream ends:

while still_need_to_process:
 # Exiting the for loop is the end of stream.
 # For a subscribe, this likely means that the
 # client has disconnected.
 try:
   for message in client.subscribe("messages"):
     # process messages here
     # check condition on still_need_to_process
     if still_need_to_process == False: break
 except AMPS.DisconnectedException as e:
     pass

Example 9.5: Resubscription

Bookmark Stores

In cases where it is critical not to miss a single message, it is important to be able to resume a subscription at the exact point that a failure occurred. In this case, simply recreating a subscription isn’t sufficient. Even though the subscription is recreated, the subscriber may have been disconnected at precisely the wrong time, and will not see the message.

To ensure delivery of every message from a topic or set of topics, the AMPS HAClient includes a BookmarkStore that, combined with the bookmark subscription and transaction log functionality in the AMPS server, ensures that clients receive any messages that might have been missed. The client stores the bookmark associated with each message received, and tracks whether the application has processed that message; if a disconnect occurs, the client uses the BookmarkStore to determine the correct resubscription point, and sends that bookmark to AMPS when it re-subscribes. AMPS then replays messages from its transaction log from the point after the specified bookmark, thus ensuring the client is completely up-to-date.

HAClient helps you to take advantage of this bookmark mechanism through the BookmarkStore interface and bookmarkSubscribe() method on Client. When you create subscriptions with bookmarkSubscribe(), whenever a disconnection or failover occurs, your application automatically resubscribes to the message after the last message it processed. HAClients created by createFileBacked() additionally store these bookmarks on disk, so that the application can restart with the appropriate message if the client application fails and restarts.

To take advantage of bookmark subscriptions, do the following:

  • Ensure the topic(s) to be subscribed are included in a transaction log. See the User Guide for information on how to specify the contents of a transaction log.
  • Use bookmark_subscribe() instead of subscribe() when creating a subscription(), and decide how the application will manage subscription identifiers (SubIds).
  • Use the discard() method in message handlers to indicate when a message has been fully processed by the application, that is, when the application does not need to receive the message again if the application fails over.

The following example creates a bookmark subscription against a transaction-logged topic, and fully processes each message as soon as it is delivered:

class MessagePrinter(object):
    def __init__(self, client):
        self._client = client

    def __call__(self, message):
       print message.get_data()
       self._client.discard(message)


...

client = AMPS.HAClient(
    "aClient",
    "/logs/aClient.publishLog",
    "/logs/aClient.subscribeLog")

# Create ServerChooser, populate chooser, connect client
...

client.execute_async(                        \
    AMPS.Command("subscribe")                            \
        .set_topic("myTopic")                            \
        .set_bookmark(AMPS.Client.Bookmarks.MOST_RECENT) \
        .set_sub_id("MySubID"),                          \
        MessagePrinter(client))

Example 9.6: HAClient subscription

In this example, the client is a file-backed client, meaning that arriving bookmarks will be stored in a file (aClient.subscribeLog). Storing these bookmarks in a file allows the application to restart the subscription from the last message processed, in the event of either server or client failure.

Tip

For optimum performance, it is critical to discard every message once its processing is complete. If a message is never discarded, it remains in the bookmark store. During re-subscription, HAClient always restarts the bookmark subscription with the oldest undiscarded message, and then filters out any more recent messages that have been discarded. If an old message remains in the store, but is no longer important for the application’s functioning, then the client and the AMPS server will incur unnecessary network, disk, and CPU activity.

The fourth parameter, sub_id, specifies an identifier to be used for this subscription. Passing None causes HAClient to generate one and return it, like most other Client functions. However, if you wish to resume a subscription from a previous point after the application has terminated and restarted, the application must pass the same subscription ID as during its previous run. Passing a different subscription ID bypasses any recovery mechanisms, creating an entirely new subscription. When you use an existing subscription ID, the HAClient locates the last-used bookmark for that subscription in the local store, and attempts to re-subscribe from that point.

  • Client.Bookmarks.NOW specifies that the subscription should begin from the moment the server receives the subscription request. This results in the same messages being delivered as if you had invoked subscribe() instead, except that the messages will be accompanied by bookmarks. This is also the behavior that results if you supply an invalid bookmark.
  • Client.Bookmarks.EPOCH specifies that the subscription should begin from the beginning of the AMPS transaction log (that is, the first entry in the oldest journal file for the transaction log).
  • Client.Bookmarks.MOST_RECENT specifies that the subscription should begin from the last-used message in the associated BookmarkStore. Alternatively, if this subscription has not been seen before, it instructs the subscription to begin with EPOCH. This is the most common value for this parameter, and is the value used in the preceding example. By using MOST_RECENT, the application automatically resumes from wherever the subscription left off, taking into account any messages that have already been processed and discarded.

When the HAClient re-subscribes after a disconnection and reconnection, it always uses MOST_RECENT, ensuring that the continued subscription always begins from the last message used before the disconnect, so that no messages are missed.

Conclusion

With only a few changes, most AMPS applications can take advantage of the HAClient and associated classes to become more highly-available and resilient. Using the PublishStore, publishers can ensure that every message published has actually been persisted by AMPS. Using BookmarkStore, subscribers can make sure that there are no gaps or duplicates in the messages received. HAClient makes both kinds of applications more resilient to network and server outages and temporary issues, and, by using the file based HAClient, clients can recover their state after an unexpected termination or crash. Though HAClient provides useful defaults for the Store, BookmarkStore, SubscriptionManager, and ServerChooser, you can customize any or all of these to the specific needs of your application and architecture.