9. High Availability¶
The AMPS C++ Client provides an easy way to create highly-available
applications using AMPS, via the HAClient
class. HAClient
derives from Client
and offers the same methods, but also adds
protection against network, server, and client outages.
Using HAClient
allows applications to automatically:
- Recover from temporary disconnects between client and server.
- Failover from one server to another when a server becomes unavailable.
Because the HAClient
automatically manages failover and
reconnection, 60East recommends using the HAClient
for applications
that need to:
- Automatically reconnect and resume work in the case of disconnection.
- Ensure no messages are lost or duplicated after a reconnect or failover.
- Persist messages and the current state of a bookmark subscription on disk for protection against client failure.
You can choose how your application uses HAClient
features. For
example, you might need automatic reconnection, but have no need to
resume subscriptions or republish messages. The high availability
behavior in HAClient
is provided by implementations of defined
interfaces. You can combine different implementations provided by 60East
to meet your needs, and implement those interfaces to provide your own
policies.
Some of these features require specific configuration settings on your AMPS instance(s). This chapter mentions these features and describes how to use them from the AMPS C++ client. You can find full documentation for these settings and server features in the User Guide.
Overview of HAClient¶
HAClient
derives from Client
and offers the same methods for
sending commands to AMPS and receiving messages from AMPS.
The HAClient
differs from the Client
in two ways:
HAClient
automatically installs a disconnect handler that reconnects to AMPS and resumes active (asynchronous) subscriptions. The disconnect handler optionally replayspublish
andsow_delete
messages that have not been acknowledged by AMPS, using aPublishStore
. The disconnect handler can optionally resume replays from the transaction log at a point that guarantees no messages are skipped and no duplicates are delivered to the application, using aBookmarkStore
.- The
HAClient
includes the infrastructure needed for client failover, including a list of connection strings and their associated authentication mechanisms (provided by theServerChooser
), and options for controlling backoff behavior for reconnects (provided by theDelayStrategy
). As a result, theHAClient
provides aconnectAndLogon()
function for establishing a connection to AMPS, rather than treating these as independent steps that an application must manage itself.
If your application needs to automatically reconnect to AMPS,
60East recommends using the HAClient
and the automatically
provided disconnect handler rather than using a Client
or replacing the HAClient
default disconnect handler.
Reconnection with HAClient¶
The most important difference between Client
and HAClient
is
that HAClient
automatically provides a reconnect handler.
This description provides a high-level framework for understanding the
components involved in failover with the HAClient
. The components
are described in more detail in the following sections.
The HAClient
reconnect handler performs the following steps when
reconnecting:
Calls the
ServerChooser
to determine the next URI to connect to and the authenticator to use for that connection.If the connection fails, calls
get_error
on theServerChooser
to get a description of the failure, sends an exception to the exception listener, and stops the reconnection process.Calls the
DelayStrategy
to determine how long to wait before attempting to reconnect, and waits for that period of time.Connects to the AMPS server. If the connection fails, calls
report_failure
on theServerChooser
and begins the process again.Logs on to the AMPS server. If the connection fails, calls
report_failure
on theServerChooser
and begins the process again.Calls
report_success
on the ServerChooser.Receives the bookmark for the last message that the server has persisted. Discards any older messages from the
PublishStore
.Republishes any messages in the
PublishStore
that have not been persisted by the server.Re-establishes subscriptions using the
SubscriptionManager
for the client. For bookmark subscriptions, the reconnect handler uses theBookmarkStore
for the client to determine the most recent bookmark, and resubscribes with that bookmark. For subscriptions that do not use a bookmark, theSubscriptionManager
simply re-enters the subscription, meaning that it is entered at the point at which theHAClient
reconnects.
The ServerChooser
, DelayStrategy
, PublishStore
,
SubscriptionManager
, and BookmarkStore
are all extension points
for the HAClient
. You can adapt the failover and recovery behavior
by setting a different object for the behavior you want to customize on
the HAClient
or by providing your own implementation.
For example, the convenience methods in the previous section customize
the behavior of the PublishStore
and BookmarkStore
by providing
either memory-backed or file-backed stores.
Choosing Store Durability¶
If your application needs reliable publish to AMPS, install a
PublishStore
in the HAClient
. If your application needs
to resume replays from the transaction log, install a BookmarkStore
in the HAClient
.
These stores provide the following capabilities:
- A bookmark store tracks received messages, and is used to resume subscriptions that replay from the transaction log.
- A publish store tracks published messages, and is used to ensure that messages are persisted in AMPS.
The AMPS C++ client provides a memory-backed version of each store and a
file-backed version of each store. An HAClient
can use either a
memory backed store or a file backed store for protection. Each method
provides resilience to different failures:
Memory-backed stores provide recovery after disconnection from AMPS by storing messages and bookmarks in your process’ address space. This is the highest performance option for working with AMPS in a highly available manner. The trade-off with this method is there is no protection from a crash or failure of your client application. If your application is terminated prematurely or, if the application terminates at the same time as an AMPS instance failure or network outage, then messages may be lost or duplicated. The state of bookmark replays will be lost when the application shuts down. Messages in the publish store when the application shuts down will not be maintained through a restart, so the application will not be able to attempt any necessary redelivery when the application restarts.
File-backed stores provide recovery after client failure or shutdown and disconnection from AMPS by storing messages and bookmarks on disk. To use this protection method, the
createFileBacked
convenience method requests additional arguments for the two files that will be used for both bookmark storage and message storage. If these files exist and are non-empty (as they would be after a client application is restarted), theHAClient
loads their contents and ensures synchronization with the AMPS server once connected. The performance of this option depends heavily on the speed of the device on which these files are placed. When the files do not exist (as they would the first time a client starts on a given system), theHAClient
creates and initializes the files. In this case the client does not have a point at which to resume the subscription or messages to republish.A store file should only be used by one instance of a client at a time.
When using file backed bookmark stores, 60East recommends periodically removing unneeded entries by calling the
prune()
method. The precise strategy that your application uses to callprune()
depends on the nature of the application.Most applications call
prune()
when the application exits.There are two basic strategies that applications follow while the application runs:
- Install a resize handler and call
prune()
after a specified number of resize operations, or when the store reaches a specific size. - Call
prune()
after a specific number of messages are processed (for example, every 10,000 messages received or every 1,000 updates completed).
Regardless of the strategy, it is best to call
prune()
when the application is otherwise idle, since theprune()
call rewrites the log file.- Install a resize handler and call
The store interface is public, and an application can create and provide
a custom store as necessary. While clients provide convenience methods
for creating file-backed and memory-backed HAClient
objects with the
appropriate stores, you can also create and set the stores in your
application code. The AMPS C++ client also includes default stores,
which implement the appropriate interface, but do not actually persist
messages.
Starting in 5.3.2.0, the AMPS client contains a recovery point adapter interface to make it easy to add a custom persistence layer to a bookmark store. The distribution includes a recovery point adapter that can store bookmark recovery information in an AMPS state of the world topic.
The HAClient
provides convenience methods for creating clients and
setting stores. You can also construct an HAClient
and set whichever
store implementations you choose.
In this example, we create several clients. The first client uses memory
stores for both bookmarks and publishes. The second client uses files
for both bookmarks and publishes. The third client uses a file for
bookmarks. The third client does not set a store for publishes, which
means that AMPS provides the default store (and no outgoing messages are
stored). The final client does not specify any stores, and so has no
persistence for published messages or bookmark subscriptions, but can
take advantage of the automatic failover and reconnection in the
HAClient
.
/* Memory publish store, memory bookmark store */
HAClient memoryClient = HAClient::createMemoryBacked("lessImportantMessages");
/* File-backed publish store, file-backed bookmark store */
HAClient diskClient = HAClient::createFileBacked("moreImportantMessages",
"/mnt/fastDisk/moreImportantMessages.outgoing",
"/mnt/fastDisk/moreImportantMessages.incoming");
/* Default publish store, file-backed bookmark store */
HAClient subscriberClient("subscriber");
subscriberClient.setBookmarkStore(
new LoggedBookmarkStore("my_app.bookmark"));
/* Default publish store, default bookmark store
Failover behavior and resubscription only */
HAClient streamReader("streamReader");
Example 9.1: HAClient creation examples
Tip
While this chapter presents the built-in file and
memory-based stores, the AMPS C/C++ Client provides open
interfaces that allow development of custom persistent
message stores. To fully control recovery behavior,
you can implement the Store
and BookmarkStore
interfaces
in your code, and then pass instances of those to setPublishStore()
or setBookmarkStore()
methods in your Client
. You can
also implement the RecoveryPointAdapter
interface to easily add a
custom storage mechanism to one of the 60East-provided
bookmark store implementations.
Instructions on developing a custom store are beyond the scope of this document; please refer to the AMPS Client HA Whitepaper for more information.
Using the SOW Recovery Point Adapter¶
The AMPS client also includes the ability to use a SOW topic to store bookmark state for a bookmark store. This can be a useful option in a situation where an application needs a persistent bookmark store, but does not have the ability to store a file on the filesystem, or where an application has a bookmark file, but wants to have the ability to resume the subscription if the file is lost or damaged, or if the application is started on a system that does not have access to the file.
To use the SOW topic recovery point adapter, you create a bookmark store of
the type you would like to use for the Client
, passing an
adapter when you construct the store. You then set this bookmark store as
the store for the Client
to use. The constructor for the
SOW recovery adapter allows you to customize the topic name and
field names used to store the recovery point information in AMPS.
As with the RecoveryPointAdapter
interface
in general, it is possible to customize the behavior of the SOW recovery
point adapater by overriding the provided methods.
This section describes how to use the adapter with the default settings. Should you need to change the behavior of the class, you would adjust the guidance in this section accordingly. (For example, if you override methods to produce a message with a different set of keys or a different message format, you would update the topic definition accordingly).
AMPS Topic Configuration¶
To store recovery point state in AMPS, the AMPS instance
that will store the recovery point state must define a SOW/Topic
to hold the recovery point data.
By default, the adapter uses a topic named /ADMIN/bookmark_store
of
json
message type, with the /clientName
and /subId
fields
as keys, similar to the following definition.
<Topic>
<Name>/ADMIN/bookmark_store</Name>
<MessageType>json</MessageType>
<Key>/clientName</Key>
<Key>/subId</Key>
<!-- Storage/persistence configuration here.
In most cases, this topic should be
persisted to a file, but that is not
a requirement. -->
</Topic>
You must include this defintion, or an equivalent definition, in the configuration file for the AMPS instance that will host the recovery point.
If you define a topic with a different configuration (for example, different key names, a different topic name, or a different message type), you must ensure that the adapter that you create uses the same parameters as those configured on the server.
Constructing a Client For the Adapter¶
The AMPS SOW Recovery Point Adapter requires a Client
or HAClient
connected to the instance that contains the SOW topic. The Adapter will
use this client to recover bookmark state and store bookmarks in AMPS.
Notice that this client must not be client that the Adapter is
keeping state for. This must be a completely separate client instance,
otherwise the client may deadlock while updating the store.
The client must be connected and logged in to the instance that contains the State of the World topic, using the message type defined for the topic.
Capacity Planning and Store Sizing¶
When an application uses a file-backed store, it is important to make sure that there is enough space available on the file system to be able to manage the store.
For logged bookmark stores, an application needs to keep a bookmark record for
each message received, each message discarded, and the persisted
acknowledgments delivered by the server approximately once a second.
Each bookmark entry consumes roughly 70 bytes of storage plus the length
of the subscription ID for the subscription receiving the message. The logged
bookmark store retains entries until an application explicitly calls
prune()
. The capacity needed for a logged bookmark store will
depend on the strategy that the application uses for pruning the file.
For a file-backed publish store, the application needs to be able to
store published messages until the AMPS server that the publisher is
connected to acknowledges those messages as persisted. The volume of
messages that needs to be stored depends on the failover policy for
the server – that is, the maximum amount of time that the server will
allow a downstream instance to fail to acknowledge a message before
the server downgrades that connection to async
acknowledgment.
By default, AMPS does not downgrade connections: this policy must
be set explicitly using the AMPS actions. As an example, if the
server is configured to downgrade connections that are more than
120 seconds behind, then – for disaster recovery – the application
must be have the capacity to store 120 seconds of published messages
at peak publishing load. However, unlike the logged bookmark store, a
file-backed publish store removes messages from the store and reuses
the space once AMPS has acknowledged the message.
Connections and the ServerChooser¶
Unlike Client
, the HAClient
attempts to keep itself connected to
an AMPS instance at all times, by automatically reconnecting or failing
over when it detects that the client is disconnected. When you are using
the Client
directly, your disconnect handler usually takes care of
reconnection. HAClient
, on the other hand, provides a disconnect
handler that automatically reconnects to the current server or to the
next available server.
To inform the HAClient
of the addresses of the AMPS instances in
your system, you pass a ServerChooser
instance to the HAClient
.
ServerChooser
acts as a smart enumerator over the servers available:
HAClient
calls ServerChooser
methods to inquire about what
server should be connected, and calls methods to indicate whether a
given server succeeded or failed.
The AMPS C/C++ Client provides a simple implementation of ServerChooser
, called
DefaultServerChooser
, that provides very simple logic for
reconnecting. This server chooser is most suitable for basic testing, or
in cases where an application should simply rotate through a list of
servers. For most applications, you implement the ServerChooser
interface yourself for more advanced logic, such as choosing a backup
server based on your network topology, or limiting the number of times
your application should try to reconnect to a given address.
In either case, you must provide a ServerChooser
to HAClient
and
then call connectAndLogon()
to create the first connection.
HAClient myClient = HAClient::createMemoryBacked(
"myClient");
/* primary.amps.xyz.com is the primary AMPS instance, and
* secondary.amps.xyz.com is the secondary
*/
ServerChooser chooser(new DefaultServerChooser());
chooser.add("tcp://primary.amps.xyz.com:12345/fix");
chooser.add("tcp://secondary.amps.xyz.com:12345/fix");
myClient.setServerChooser(chooser);
myClient.connectAndLogon();
...
myClient.disconnect();
Example 9.2: HAClient login
Similar to Client
, HAClient
remains connected to the server
until disconnect()
is called. Unlike Client
, HAClient
provides a built-in disconnect handler that automatically attempts to
reconnect to your server if it detects a disconnect, and, if that server
cannot be connected, fails over to the next server provided by the
ServerChooser
. In this example, the call to connectAndLogon()
attempts to connect and log in to primary.amps.xyz.com
, and returns
if that is successful. If it cannot connect, it tries
secondary.amps.xyz.com
, and continues trying servers from the
ServerChooser
until a connection is established. Likewise, if it
detects a disconnection while the client is in use, HAClient
attempts to reconnect to the server it was most recently connected with,
and, if that is not possible, it moves on to the next server provided by
the ServerChooser
.
Setting a Reconnect Delay and Timeout¶
You can control the amount of time between reconnection attempts and set a total
amount of time for the HAClient
to attempt to reconnect.
The AMPS C++ Client includes an interface for managing this behavior
called the ReconnectDelayStrategy
.
Two implementations of this interface are provided with the client:
FixedDelayStrategy
provides the same delay each time theHAClient
tries to reconnect.ExponentialDelayStrategy
provides an exponential backoff until a connection attempt succeeds.
To use either of these classes, you simply create an instance with
appropriate parameters, and install that instance as the delay strategy
for the HAClient
. For example, the following code sets up a
reconnect delay that starts at 200ms and increases the delay by 1.5
times after each failure. The strategy allows a maximum delay between
connection attempts of 5 seconds, and will not retry longer than 60
seconds.
HAClient theClient = HAClient::createMemoryBacked("demo");
theClient.setReconnectDelayStrategy(new ExponentialDelayStrategy(200, 5000, 1.5, 6000,0));
Implementing a Server Chooser¶
As described above, you provide the HAClient
with connection strings to one or more AMPS servers using a
ServerChooser
. The purpose of a ServerChooser
is to provide
information to the HAClient
. A ServerChooser
does not manage the
reconnection process, and should not call methods on the HAClient
.
A ServerChooser
has two required responsibilities to the
HAClient
:
Tells the
HAClient
the connection string for the server to connect to. If there are no servers, or theServerChooser
wants the connection to fail, theServerChooser
returns an empty string.To provide this information, the
ServerChooser
implements thegetCurrentURI()
method.Provides an
Authenticator
for the current connection string. This is especially important for installations where different servers require different credentials or authentication tokens must be reset after each connection attempt.To provide the authenticator, the
ServerChooser
implements thegetCurrentAuthenticator()
method.
The HAClient
calls the getCurrentURI()
and
getCurrentAuthenticator()
methods each time it needs to make a
connection.
Each time a connection succeeds, the HAClient
calls the
reportSuccess()
method of the ServerChooser
. Each time a
connection fails, the HAClient
calls the reportFailure()
method
of the ServerChooser
. The HAClient
does not require the
ServerChooser
to take any particular action when it calls these
methods. These methods are provided for the HAClient
to do internal
maintenance, logging, or record keeping. For example, an HAClient
might keep a list of available URIs with a current failure count, and
skip over URIs that have failed more than 5 consecutive times until all
URIs in the list have failed more than 5 consecutive times.
When the ServerChooser
returns an empty string from
getCurrentURI()
, indicating that no servers are available for
connection, the HAClient
calls getError()
method on the
ServerChooser
and includes the string returned by getError()
in
the generated exception.
Heartbeats and Failure Detection¶
Use of the HAClient
allows your application to quickly recover from
detected connection failures. By default, connection failure detection
occurs when AMPS receives an operating system error on the connection.
This system may result in unpredictable delays in detecting a connection
failure on the client, particularly when failures in network routing
hardware occur, and the client primarily acts as a subscriber.
The heartbeat feature of the AMPS client allows connection failure to be
detected quickly. Heartbeats ensure that regular messages are sent
between the AMPS client and server on a predictable schedule. The AMPS
client and server both assume disconnection has occurred if these
regular heartbeats cease, ensuring disconnection is detected in a timely
manner. To utilize heartbeat, call the setHeartbeat
method on
Client
or HAClient
:
HAClient client = HAClient::createMemoryBacked(
"importantStuff");
...
client.setHeartbeat(3);
client.connectAndLogon();
...
setHeartbeat
takes one parameter: the heartbeat interval. The
heartbeat interval specifies the periodicity of heartbeat messages sent
by the server: the value 3
indicates messages are sent on a
three-second interval. If the client receives no messages in a six
second window (two heartbeat intervals), the connection is assumed to be
dead, and the HAClient
attempts reconnection. An additional variant
of setHeartbeat
allows the idle period to be set to a value other
than two heartbeat intervals. (The server, however, will always consider a
connection to be closed after two heartbeat intervals without any traffic.)
Notice that, for HAClient
, setHeartbeat
must be called before
the client is connected. For Client
, setHeartbeat
must be called
after the client is connected.
Caution
Heartbeats are serviced on the receive thread created by the AMPS client. Your application must not block the receive thread for longer than the heartbeat interval, or the application is subject to being disconnected.
Considerations for Publishers¶
Publishing with an HAClient
is nearly identical to regular
publishing; you simply call the publish()
method with your message’s
topic and data. The AMPS client sends the message to AMPS, and then
returns from the publish()
call. For maximum performance, the client
does not wait for the AMPS server to acknowledge that the message has
been received.
When an HAClient
uses a publish store (other than the
DefaultPublishStore
), the publish store retains a copy of each
outgoing message and requests that AMPS acknowledge that the message has
been persisted. The AMPS server acknowledges messages back to the
publisher. Acknowledgments can be delivered for multiple messages at
periodic intervals (for topics recorded in the transaction log) or after
each message (for topics that are not recorded in the transaction log).
When an acknowledgment for a message is received, the HAClient removes
that message from the bookmark store. When a connection to a server is
made, the HAClient
automatically determines which messages from the
publish store (if any) the server has not processed, and replays those
messages to the server once the connection is established.
For reliable publishers, the application must choose how best to handle
application shutdown. For example, it is possible for the network to
fail immediately after the publisher sends the message, while the
message is still in transit. In this case, the publisher has sent the
message, but the server has not processed it and acknowledged it. During
normal operation, the HAClient
will automatically connect and retry
the message. On shutdown, however, the application must decide whether
to wait for messages to be acknowledged, or whether to exit.
Publish store implementations provide an unpersistedCount()
method
that reports the number of messages that have not yet been acknowledged
by the AMPS server. When the unpersistedCount()
reaches 0
, there
are no unpersisted messages in the local publish store.
For the highest level of safety, an application can wait until the
unpersistedCount()
reaches 0
, which indicates that all of the
messages have been persisted to the instance that the application is
connected to, and the synchronous replication destinations configured
for that instance. When a synchronous replication destination goes
offline, this approach will cause the publisher to wait to exit until
the destination comes back online or until the destination is downgraded
to asynchronous replication.
For applications that are shut down periodically for short periods of
time (for example, applications that are only offline during a weekly
maintenance window), another approach is to use the publishFlush()
method to ensure that messages are delivered to AMPS, and then rely on
the connection logic to replay messages as necessary when the
application restarts.
For example, the following code flushes messages to AMPS, then warns if not all messages have been acknowledged:
HAClient pub = HAClient.createMemoryBacked("importantStuff");
...
pub.connectAndLogon();
std::string topic = "loggedTopic";
std:string data = ...;
for (size_t i = 0; i < MESSAGE_COUNT; i++)
{
pub.publish(topic, data);
}
/* We think we are done, but the server may not
* have received or acknowledged the messages yet.
* Wait for the server to have received all messages.
* The program could also specify a timeout in this
* command to avoid blocking forever if the
* network is down or all servers are offline.
*/
pub.publishFlush();
/* Print warning to the console if messages have
* been published but not yet acknowledged as persisted.
*/
if (pub.getPublishStore().unpersistedCount() > 0)
{
printf("all messages have been published,"
" but not all have been persisted.");
}
pub.disconnect();
Example 9.3: HA Publisher
In this example, the client sends each message immediately when
publish()
is called. If AMPS becomes unavailable between the final
publish()
and the disconnect()
, or one of the servers that the
AMPS instance replicates to is offline, the client may not have received
a persisted acknowledgment for all of the published messages. For
example, if a message has not yet been persisted by all of the servers
in the replication fabric that are connected with synchronous
replication, AMPS will not have acknowledged the message.
This code first flushes messages to the server to ensure that all messages have been delivered to AMPS.
The code next checks to see if all of the messages in the publish store
have been acknowledged as persisted by AMPS. If the messages have not
been acknowledged, they will remain in the publish store file and will
be published to AMPS, if necessary, the next time the application
connects. An application may choose to loop until unpersistedCount()
returns 0
, or (as we do in this case) simply warn that AMPS has not
confirmed that the messages are fully persisted. The behavior you choose
in your application should be consistent with the high-availability
guarantees your application needs to provide.
Caution
AMPS uses the name of the HAClient
to determine the
origin of messages. For the AMPS server to correctly
identify duplicate messages, each instance of an
application that publishes messages must use a distinct
name. That name must be consistent across different runs
of the application.
If your application crashes or is terminated, some published messages
may not have been persisted in the AMPS server. If you use the
file-based store (in other words, the store created by using
HAClient.createFileBacked()
), then the HAClient
will recover the
messages, and once logged on, correlate the message store to what the
AMPS server has received, re-publishing any missing messages. This
occurs automatically when HAClient
connects, without any explicit
consideration in your code, other than ensuring that the same file name
is passed to createFileBacked()
if recovery is desired.
Caution
AMPS provides persisted acknowledgment messages for topics that do not have a transaction log enabled; however, the level of durability provided for topics with no transaction log is minimal. Learn more about transaction logs in the User Guide.
Considerations for Subscribers¶
HAClient
provides two important features for applications that
subscribe to one or more topics: re-subscription, and a bookmark store
to track the correct point at which to resume a bookmark subscription.
Resubscription With Asynchronous Message Processing¶
Any asynchronous subscription placed using an HAClient
is
automatically reinstated after a disconnect or a failover. These
subscriptions are placed in an in-memory SubscriptionManager
, which
is created automatically when the HAClient
is instantiated. Most
applications will use this built-in subscription manager, but for
applications that create a varying number of subscriptions, you may wish
to implement SubscriptionManager
to store subscriptions in a more
durable place. Note that these subscriptions contain no message data,
but rather simply contain the parameters of the subscription itself (for
instance, the command, topic, message handler, options, and filter).
When a re-subscription occurs, the AMPS C++ Client re-executes the
command as originally submitted, including the original topic, options,
and so on. AMPS sends the subscriber any messages for the specified
topic (or topic expression) that are published after the subscription is
placed. For a sow_and_subscribe
command, this means that the client
reissues the full command, including the SOW query as well as the
subscription.
Resubscription With Synchronous Message Processing¶
The HAClient
(starting with the AMPS C++ Client version 4.3.1.1)
does not track synchronous message processing subscriptions in the
SubscriptionManager
. The reason for this is to preserve conventional
iterator behavior. That is, once the MessageStream
indicates that
there are no more elements to iterate (for example, because the
connection has closed), the MessageStream
will not suddenly produce
more elements.
To resubscribe when the HAClient
fails over, you can simply reissue
the subscription. For example, the snippet below re-issues a
subscribe
command when the message stream ends:
bool still_need_to_process = true;
while (still_need_to_process)
{
try
{
for ( auto message : client.subscribe("messages"))
{
/* process messages here */
/* check condition on still_need_to_process */
if (!still_need_to_process) break;
}
/* end of stream: for a subscribe this means
* that the connection is likely closed, or
* the program broke out of the loop
*/
}
catch(...) /* for production, you would catch specific errors */
{
/* log error as appropriate */
}
}
Bookmark Stores¶
In cases where it is critical not to miss a single message, it is important to be able to resume a subscription at the exact point that a failure occurred. In this case, simply recreating a subscription isn’t sufficient. Even though the subscription is recreated, the subscriber may have been disconnected at precisely the wrong time, and will not see the message.
To ensure delivery of every message from a topic or set of topics, the
AMPS HAClient
includes a BookmarkStore
that, combined with the
bookmark subscription and transaction log functionality in the AMPS
server, ensures that clients receive any messages that might have been
missed. The client stores the bookmark associated with each message
received, and tracks whether the application has processed that message;
if a disconnect occurs, the client uses the BookmarkStore to determine
the correct resubscription point, and sends that bookmark to AMPS when
it re-subscribes. AMPS then replays messages from its transaction log
from the point after the specified bookmark, thus ensuring the client is
completely up-to-date.
HAClient
helps you to take advantage of this bookmark mechanism
through the BookmarkStore
interface and bookmarkSubscribe()
method on Client
. When you create subscriptions with
bookmarkSubscribe()
, whenever a disconnection or failover occurs,
your application automatically resubscribes to the message after the
last message it processed. HAClients
created by
createFileBacked()
additionally store these bookmarks on disk, so
that the application can restart with the appropriate message if the
client application fails and restarts.
To take advantage of bookmark subscriptions, do the following:
- Ensure the topic(s) to be subscribed are included in a transaction log. See the User Guide for information on how to specify the contents of a transaction log.
- Use
bookmarkSubscribe()
instead ofsubscribe()
when creating a subscription, and decide how the application will manage subscription identifiers (SubIds). - Use the
BookmarkStore.discard()
method in message handlers to indicate when a message has been fully processed by the application, that is, when the application does not need to receive the message again if the application fails over.
The following example creates a bookmark subscription against a transaction-logged topic, and fully processes each message as soon as it is delivered:
HAClient client = HAClient::createFileBacked("theClient",
"/logs/theClient.publishLog",
"/logs/theClient.subscribeLog");
namespace MyMessageHandler
{
public void invoke(const Message& message, void* data)
{
...
client.getBookmarkStore().discard(message);
...
}
}
std::string commandID = client.execute_async(Command("subscribe")
.setTopic("myTopic")
.setSubscriptionId("MySubId")
.setBookmark(AMPS::Client::BOOKMARK_RECENT()),
AMPS::MessageHandler(MyMessageHandler::invoke,(void*)(&client)));
Example 9.4: HAClient subscription
In this example, the client is a file-backed client, meaning that
arriving bookmarks will be stored in a file
(theClient.subscribeLog
). Storing these bookmarks in a file allows
the application to restart the subscription from the last message
processed, in the event of either server or client failure.
Tip
For optimum performance, it is critical to discard every
message once its processing is complete. If a message is
never discarded, it remains in the bookmark store. During
re-subscription, HAClient
always restarts the
bookmark subscription with the oldest undiscarded
message, and then filters out any more recent messages
that have been discarded. If an old message remains in
the store, but is no longer important for the
application’s functioning, the client and the AMPS server
will incur unnecessary network, disk, and CPU activity.
In Example 9.4
all parameters after the bookmark are optional. However, all options before — and including the
bookmark — are required when creating a bookmarkSubscribe()
.
The last parameter, subId
, specifies an identifier to be used for
this subscription. Passing NULL
causes HAClient
to generate one
and return it, like most other Client
functions. However, if you
wish to resume a subscription from a previous point after the
application has terminated and restarted, the application must pass the
same subscription ID as during its previous run. Passing a different
subscription ID bypasses any recovery mechanisms, creating an entirely
new subscription. When you use an existing subscription ID, the
HAClient
locates the last-used bookmark for that subscription in the
local store, and attempts to re-subscribe from that point.
The subId
is also required to be unique when used within a single
client, but can be the same in different clients. Internally, AMPS
tracks subscriptions in each client, thus each identifier for each
subscription within a client must be unique. The same subId
can be
reused across unique clients simultaneously without causing problems.
Client::BOOKMARK_NOW()
specifies that the subscription should begin from the moment the server receives the subscription request. This results in the same messages being delivered as if you had invokedsubscribe()
instead, except that the messages will be accompanied by bookmarks. This is also the behavior that results if you supply an invalid bookmark.Client::BOOKMARK_EPOCH()
specifies that the subscription should begin from the beginning of the AMPS transaction log (that is, the first entry in the oldest journal file for the transaction log).Client::BOOKMARK_RECENT()
specifies that the subscription should begin from the last-used message in the associatedBookmarkStore
, or, if this subscription has not been seen before, to begin withEPOCH
. This is the most common value for this parameter, and is the value used in the preceding example. By usingBOOKMARK_RECENT
, the application automatically resumes from wherever the subscription left off, taking into account any messages that have already been processed and discarded.
When the HAClient
re-subscribes after a disconnection and
reconnection, it always uses BOOKMARK_RECENT
, ensuring that the
continued subscription always begins from the last message discarded
before the disconnect, so that no messages are missed.
Conclusion¶
With only a few changes, most AMPS applications can take advantage of
the HAClient
and associated classes to become more highly-available
and resilient. Using the PublishStore
, publishers can ensure that
every message published has actually been persisted by AMPS. Using
BookmarkStore
, subscribers can make sure that there are no gaps or
duplicates in the messages received. HAClient
makes both kinds of
applications more resilient to network and server outages and temporary
issues, and, by using the file-based HAClient
, clients can recover
their state after an unexpected termination or crash. Though
HAClient
provides useful defaults for the Store
,
BookmarkStore
, SubscriptionManager
, and ServerChooser
, you
can customize any or all of these to the specific needs of your
application and architecture.