Reliable Publishing

A reliable publisher must ensure that it actually succeeds in publishing the messages that it intends to publish. Before talking about ways to ensure publication, we will review a few concepts that are covered in more detail in the AMPS User’s Guide.

Publishing Acknowledgment

In a non-HA publish, a publisher simply sends a publish message to an AMPS server; the AMPS server does not send an acknowledgment (”ACK”) back to the client. This mode of publishing allows for maximum performance, but with no guarantees. What happens if the AMPS server does not receive your message? What if the AMPS server goes down before it can persist the message or send it on to subscribers?

In order to ensure that messages are seen by AMPS and do not need to be republished, a publisher can supply a sequence number on published messages. The publisher’s sequence numbers uniquely identify each message, and enable the publisher to request a persisted ACK from the server indicating that AMPS has both received the message and persisted it in the transaction log. Sequence numbers are monotonically increasing integers, and must begin with a number greater than the last value used by the client. Publishers are responsible for determining and maintaining this sequence. If a disconnect occurs before the publisher receives the server’s acknowledgment, then the publisher must re-publish all of the unacknowledged messages on the new connection after reconnection occurs.

If an application needs reliable publishing and publishing acknowledgements, that application should add a PublishStore to the HAClient. When a PublishStore is present, the PublishStore manages sequence numbers and adding acknowledgment requests to published messages.

  Once a publisher begins supplying sequence numbers on published messages, it must continue to do so for the remainder of its lifetime.

To maximize performance in this mode of publishing, AMPS does not send a separate ACK for each message published. Instead, AMPS sends periodic ACK message to a publisher containing the last sequence number that has been persisted; a client can safely use this and disregard all of the messages with sequence numbers less than or equal to this value.

  During publisher recovery, AMPS will always return the last sequence number it has acknowledged in the previous connection, which is likely lower than the highest sequence number it has seen from that publisher. As a result, AMPS is likely to consider the first few replayed messages as duplicates.

In support of this mechanism, AMPS will inform a publisher about the last sequence number it has seen from that publisher when logging on. For example, if a publisher has published messages, 1, 2, 3 and 10, and then disconnected, AMPS will tell that publisher (upon a subsequent connection) that the sequence number is 10, so that the publisher can begin from the new value. Likewise, if upon reconnection the server informs the publisher that the most recent sequence number observed is 2, then the publisher is responsible for re-publishing 3 and 10 to the server.

The HAClient will automatically handle this when a PublishStore is present.

Recovering from Disconnect or Server Failure

As noted above, AMPS expects a highly available publisher to be able to republish any messages for which AMPS has not yet sent an ACK. To fulfill this expectation, publishers must either store or be able to re-create previously published messages. To implement such a mechanism with reasonable performance, most publishers will need to follow these recommendations:

  1. Keep an in-memory store of the contents of messages and their corresponding sequence numbers. Just before sending a message to the server, the publisher must calculate a new sequence number, and store it along with the data and topic. This code path will occur once per sent message, so its performance must be optimal.
  2. When a persisted ACK arrives from the server, free up any space held in this store by messages whose sequence number is less than or equal to the one sent in the ACK. This code path will occur frequently as well, though typically less often than one per message (a ratio of 1:10 to 1:100 is expected, but AMPS provides no guarantees).
  3. When logging on to the server (either in the case of an initial connection or a re-connection), a publisher should use the sequence number returned by AMPS to calculate a starting point for the sequence numbers on new messages. AMPS will immediately discard messages sent with sequence numbers less than or equal to the one supplied by the server on logon, considering them to be duplicates of previously received messages.
  4. When recovering from a disconnection, if the server returns a sequence number less than or equal to the most recent message you published, you must re-publish each of the messages that the server has not yet seen (or else, they are lost forever).

If you built your publisher on the AMPS Client Library, all of this functionality exists in the PublishStore implementations provided in the AMPS Client Library:

// Creates a memory-backed HA client that tracks
// bookmarks and subscriptions in memory.

MessageHandler handler = new MyHandler();
HAClient myClient = HAClient.createMemoryBacked("myClient");


// Add your instances to the client.

DefaultServerChooser chooser = new DefaultServerChooser();
chooser.add("tcp://ubuntu.local:9007/amps/fix");
chooser.add("tcp://backupampsinstance:9007/amps/fix");

myClient.setServerChooser(chooser);
myClient.connectAndLogon();

// These messages are all stored in memory
// while we await an ACK from AMPS.

for(int i = 0; i < 100; i++)
{
myClient.publish("someTopic",
String.format("12345=%d", i));
}

In this example, each of the 100 published messages are stored in memory until the AMPS client receives acknowledgment from the server that it has received and persisted them. If a disconnect and reconnect occurs, the AMPS client will automatically re-publish any messages that need to be re-published, based on the rules above. The AMPS client automatically calculates and manages the sequence number for published messages.

Safe Shutdown

When implementing a short-lived publisher, it is important not to shut down until you are sure that the AMPS server has acknowledged all messages that you have published. If the publisher shuts down before this, it runs the risk of message loss – for example, the AMPS server might also shut down, and messages that it has not yet received or persisted may be lost for good.

Before shutting down, well-behaved publishers will examine the last ACK received from AMPS, to verify that it is equal to the sequence number on the last published message. If it is not, the publisher should simply wait; assuming the connection does not fail, AMPS will eventually send the final ACK, and you can proceed with shutting down. If you choose to implement a timeout mechanism, ensure that the publisher will attempt to reconnect after a timeout expiration and that it checks the returned sequence number in the logon, replaying any messages that the server has not yet seen.

To aid in this aspect of publisher implementation, the AMPS Client Library provides a facility to inquire how many unpersisted (unacknowledged) messages remain. You can choose to wait for this value to become 0 before exiting:

for(int i = 0; i < 100; i++)
{
  myClient.publish("someTopic", String.format("12345=%d", i));
}


// Just wait to hear back from the server.

while(myClient.getPublishStore().unpersistedCount() > 0)
{
  Thread.yield();
}

myClient.disconnect();

An alternate approach is to use the publishFlush method to wait for all messages to be acknowledged by the server.

for(int i = 0; i < 100; i++)
{
  myClient.publish("someTopic", String.format("12345=%d", i));
}


// Wait for all messages to be acknowledged by the server.
myClient.publishFlush();.

myClient.disconnect();

Untimely Publisher Death

Your publisher may terminate before you expect it to. Perhaps, for example, another user kills your process, or your process terminates with a fatal error, or the operating system kills your process because it runs out of resources. In such scenarios, your publisher may exit before it has received acknowledgment of all of the messages it attempted to publish. Combined with a server failure at roughly the same time, this could result in lost messages.

To guard against losing messages when a publisher goes down, a publisher must persist messages outside its process, typically to disk. If a publisher exits, a new process can start, log on, and compare the sequence number returned by the server to the messages stored on disk. The publisher replays any messages that were stored with a sequence number higher than the one returned by the AMPS server.

Publishers that write their messages to disk before publishing have greater reliability, but lower performance. Efficient use of disk resources, both in terms of overall footprint and IOPS, are critical in implementing a high-performance publish store. If you wish to protect against host or operating system failure, an additional overhead of flushing buffers to disk must occur as well, further lowering performance.

The AMPS Client Library includes a default, extensible implementation of a disk-backed publish store. When you set the PublishStore for the HAClient to an instance of this class, the client will persist the messages to disk.

// Creates a file-backed HA client that tracks
// bookmarks and subscriptions on disk

HAClient myClient("myClient");
myClient.setPublishStore(new PublishStore("./myClient.publish"));

// ... set up and install ServerChooser ...

myClient.connectAndLogon();

for(int i = 0; i < 100; i++)
{
  myClient.publish("someTopic",
  String.format("12345=%d", i));
}

In this example, the publisher writes each of the 100 messages to the myClient.publish file on disk before sending them to the server. If the publisher process terminates before the server can acknowledge the published messages, then the data of those messages remains on disk. When the publisher restarts, constructing the PublishStore will reload the myClient.publish file. When the publisher successfully logs back on to a server, the HAClient will automatically compare the sequence number returned by AMPS as the last persisted message from that publisher to the sequence numbers recovered from the publish store file. The HAClient will automatically republish any messages that the server did not receive.