5. Error Handling

In every distributed system, the robustness of your application depends on its ability to recover gracefully from unexpected events. The AMPS client provides the building blocks necessary to ensure your application can recover from the kinds of errors and special events that may occur when using AMPS.

Exceptions

Generally speaking, when an error occurs that prohibits an operation from succeeding, AMPS will throw an exception. AMPS exceptions universally derive from AMPS::AMPSException, so by catching AMPSException, you will be sure to catch anything AMPS throws. For example:

...
void ReadAndEvaluate(Client& client)
{
    /* read a new payload from the user */
    string payload;
    getline(cin, payload);

    /* write a new message to AMPS */
    if (!payload.empty()) {
        try
        {
            client.publish("UserMessage",
            string("{ \"message\" : \"data\" }");
        }
        catch (const AMPSException& exception)
        {
            cerr << "An AMPS exception occurred: "<< exception.toString() << endl;
        }
    }
}

In this example, if an error occurs the program writes the error to stderr, and the publish() command fails. However, client is still usable for continued publishing and subscribing. When the error occurs, the exception is written to the console, converting the exception to a string via the toString() method.

AMPS exception types vary based on the nature of the error that occurs. In your program, if you would like to handle certain kinds of errors differently than others, you can catch the appropriate subclass of AMPSException to detect those specific errors and do something different.

string CreateNewSubscription(Client& client)
{
    string id;
    string topicName;

    while (id.empty()) {
        topicName = AskUserForTopicName();

        try
        {
            /* If an error occurs when setting up the subscription whether or not to try again
             * based on the subclass of AMPSException that is thrown. If a
             * BadRegexTopicException, this exception is thrown during subscription to indicate
             * that a bad regular expression was supplied, so we would like to give the user a
             * chance to correct.
             */
            id = client.subscribe(bind(HandleMessage,
            placeholders::_1),
            topicName, 5000);
        }
        catch(const BadRegexTopicException& ex)
        {
            /* This line indicates that the program catches the BadRegexTopicException
             * exception and displays a specific error to the user indicating the topic name or
             * expression was invalid. By not returning from the function in this catch block,
             * the while loop runs again and the user is asked for another topic name.
             * we’ll ask the user for another topic
             */
            DisplayError("Error: bad topic name or regular " +
                         "expression ’" + topicName +"’. " +
                         "The error was: " + ex.toString());
        }

        /* If an AMPS exception of a type other than BadRegexTopicException is thrown by
         * AMPS, it is caught here. In that case, the program emits a different error
         * message to the user.
         */
        catch(const AMPSException& ex)
        {
            DisplayError("Error: error setting up subscription " +
                         "to topic " + topicName +
                         ". The error was: " + ex.toString());

        /* At this point the code stops attempting to subscribe to the client by the return
         * NULL statement.
         */
            return NULL; // give up
        }
    }
    return id;
}

Example 5.1: Catching AMPSException Subclasses

Exception Types

Each method in AMPS documents the kinds of exceptions that it can throw. For reference, Table 1.A: contains a list of all of the exception types you may encounter while using AMPS, when they occur, and what they mean.

Exception Handling and Asynchronous Message Processing

When using asynchronous message processing, exceptions thrown form the message handler are silently absorbed by the AMPS C++ client by default. The AMPS C++ client allows you to register an exception listener to detect and respond to these exceptions. When an exception listener is registered, AMPS will call the exception listener with the exception. See Chapter 5 for details.

Disconnect Handling

Every distributed system will experience occasional disconnections between one or more nodes. The reliability of the overall system depends on an application’s ability to efficiently detect and recover from these disconnections. Using the AMPS C/C++ client’s disconnect handling, you can build powerful applications that are resilient in the face of connection failures and spurious disconnects.

The HAClient class, included with the AMPS C++ client, contains a disconnect handler and other features for building highly-available applications. The HAClient includes features for managing a list of failover servers, resuming subscriptions, republishing in-flight messages, and other functionality that is commonly needed for high availability. 60East recommends using the HAClient for automatic reconnection wherever possible, as the HAClient disconnect handler has been carefully crafted to handle a wide variety of edge cases and potential failures. This section covers the use of a custom disconnect handler in the event that the behavior of the HAClient does not suit the needs of your application.

Custom disconnect handling gives you the ultimate in control and flexibility regarding how to respond to disconnects. Your application gets to specify exactly what happens when a disconnect occurs by supplying a function to client.setDisconnectHandler(), which is invoked whenever a disconnect occurs.

Example 5.2 shows the basics:

class MyApp
{
    string _uri;
    Client _client;

public:
    MyApp(const string& uri) : _uri(uri), _client("myapp")
    {
        _uri = uri;

        /* setDisconnectHandler() method is called to supply a function for use when AMPS
         * detects a disconnect. At any time, this function may be called by AMPS to
         * indicate that the client has disconnected from the server, and to allow your
         * application to choose what to do about it. The application continues on to
         * connect and subscribe to the orders topic.
         */
        _client.setDisconnectHandler(AttemptReconnection, (void*) this);
        _client.connect(uri);
        _client.logon();
        _client.execute_async(Command("subscribe")
                                .setTopic("orders"),
                              bind(&MyApp::ShowMessage,
                                    this,
                                    placeholders::_1));
    }
    void ShowMessage(const Message& m)
    {
        /* display order data to the user */
        ...
    }
    /* Our disconnect handler’s implementation begins here. In this example, we simply
     * try to reconnect to the original server. A more robust reconnect would have
     * logic to limit either the total number of connects, frequency of connects or
     * both. Errors are likely to occur here, therefore we must have disconnected for a
     * reason, but Client takes care of catching errors from our disconnect handler. If
     * an error occurs in our attempt to reconnect and an exception is thrown by
     * connect(), then Client will catch it and absorb it, passing it to the
     * ExceptionListener if registered. If the client is not connected by the time the
     * disconnect handler returns, AMPS throws DisconnectedException.
     */
    void AttemptReconnection(Client& client, void* userdata)
    {
        MyApp* app = (MyApp*) userdata;
        /* simple: just try to reconnect once. */
        client.connect(app->_uri);
        client.logon();
    }
}

Example 5.2: Supplying a disconnect handler

By creating a more advanced disconnect handler, you can implement logic to make your application even more robust. For example, imagine you have a group of AMPS servers configured for high availability—you could implement fail-over by simply trying the next server in the list until one is found. Example 5.3 shows a brief example.

class MyApp
{

    /* Here our application is configured with a vector of AMPS server URIs to choose
     * from, instead of a single URI. These will be used in the ConnectToNextUri()
     * method as explained below.
     */
    vector<string>& _uris;
    int _currentUri;
    Client _client;

public:
    MyApp(vector<string>& uris) : _uris(uris), _currentUri(0), _client("MyApp")
    {
        /* ConnectToNextUri() is invoked by our disconnect handler TestDisconnectHandler in
         * the AMPS Client when a disconnect occurs. Since our client is currently
         * disconnected, we manually invoke our disconnect handler to initiate the first
         * connection.
         */
        _client.setDisconnectHandler(&ConnectToNextUri, this);
        ConnectToNextUri(this);
    }
    static void ConnectToNextUri(Client client, void* me)
    {
        MyApp* app = (MyApp*)me;

        /* During a disconnect the AMPS Client invokes ConnectToNextUri(), which loops
         * around our array of URIs attempting to connect to each one until successful. In
         * the invoke() method it attempts to connect to the current URI, and if it is
         * successful, returns immediately. If the connection attempt fails, the exception
         * handler for AMPSException is invoked. In the exception handler, we advance to
         * the next URI, display a warning message, and continue around the loop. This
         * simplistic handler never gives up, but in a typical implementation, you would
         * likely stop attempting to reconnect at some point.
         */
        while(true) {
            try
            {
                client.connect(app->_uris[app->_currentUri]);
                /* At this point the client registers a subscription to the server we have
                 * connected to. It is important to note that, once a new server is connected, it
                 * is the responsibility of the application to re-establish any subscriptions
                 * placed previously. This behavior provides an important benefit to your
                 * application: one reason for disconnect is due to a client’s inability to keep up
                 * with the rate of message flow. In a more advanced disconnect handler, you could
                 * choose to not re-establish subscriptions that are the cause of your
                 * application’s demise.
                 */
                client.subscribe(...);
                return;
            }
            catch(AMPSException& e)
            {
                app->_currentUri = (app->_currentUri + 1) % app->_uris.size();
            }
        }
    }
}

Example 5.3: Simple client failover implementation

Using a Heartbeat to Detect Disconnection

The AMPS client includes a heartbeat feature to help applications detect disconnection from the server within a predictiable amount of time. Without using a heartbeat, an application must rely on the operating system to notify the application when a disconnect occurs. For applications that are simply receiving messages, it can be impossible to tell whether a socket is disconnected or whether there are simply no incoming messages for the client.

When you set a heartbeat, the AMPS client sends a heartbeat message to the AMPS server at a regular interval, and waits a specified amount of time for the response. If the operating system reports an error on send, or if the server does not respond within the specified amount of time, the AMPS client considers the server to be disconnected.

The AMPS client processes heartbeat messages on the client receive thread, which is the thread used for asynchronous message processing. If your application uses asynchronous message processing and occupies the thread for longer than the heartbeat interval, the client may fail to respond to heartbeat messages in a timely manner and may be disconnected by the server.

Unexpected Messages

The AMPS C++ client handles most incoming messages and takes appropriate action. Some messages are unexpected or occur only in very rare circumstances. The AMPS C++ client provides a way for clients to process these messages. Rather than providing handlers for all of these unusual events, AMPS provides a single handler function for messages that can’t be handled during normal processing.

Your application registers this handler by setting the UnhandledMessageHandler for the client. This handler is called when the client receives a message that can’t be processed by any other handler. This is a rare event, and typically indicates an unexpected condition.

For example, if a client publishes a message that AMPS cannot parse, AMPS returns a failure acknowledgment. This is an unexpected event, so AMPS does not include an explicit handler for this event, and failure acknowledgments are received in the method registered as the UnhandledMessageHandler.

Your application is responsible for taking any corrective action needed. For example, if a message publication fails, your application can decide to republish the message, publish a compensating message, log the error, stop publication altogether, or any other action that is appropriate.

Unhandled Exceptions

In the AMPS C++ client, exceptions can occur that are not thrown to the main thread of the application. For example, when an exception is thrown from a message handler running on a background thread, AMPS does not automatically propagate that exception to the main thread.

Instead, AMPS provides the exception to an unhandled exception handler if one is specified on the client. The unhandled exception handler receives a reference to the exception object, and takes whatever action is necessary. Typically, this involves logging the exception or setting an error flag that the main thread can act on. Notice that AMPS C++ client only catches exceptions that derive from std::exception. If your message handler contains code that can throw exceptions that do not derive from std::exception, 60East recommends catching these exceptions and throwing an equivalent exception that derives from std::exception.

For example, the unhandled exception handler below takes a std::ostream, and logs information from each exception to that std::ostream.

class ExceptionLogger : public AMPS::ExceptionListener
{
private:
    std::ostream& os_;

public:

    ExceptionLogger() : os_(std::cout) {}
    ExceptionLogger(std::ostream& os) : os_(os) {}

    virtual void exceptionThrown(const std::exception& e) const
    {
       os_ << e.what() << std::endl;
    }
}

Detecting Write Failures

The publish methods in the C++ client deliver the message to be published to AMPS and then return immediately, without waiting for AMPS to return an acknowledgment. Likewise, the sowDelete methods request deletion of SOW messages, and return before AMPS processes the message and performs the deletion. This approach provides high performance for operations that are unlikely to fail in production. However, this means that the methods return before AMPS has processed the command, without the ability to return an error in the event that the command fails.

The AMPS C++ client provides a FailedWriteHandler that is called when the client receives an acknowledgment that indicates a failure to persist data within AMPS. To use this functionality, you implement the FailedWriteHandler interface, construct an instance of your new class, and register that instance with the setFailedWriteHandler() function on the client. When an acknowledgment returns that indicates a failed write, AMPS calls the registered handler method with information from the acknowledgment message, supplemented with information from the client publish store if one is available. Your client can log this information, present an error to the user, or take whatever action is appropriate for the failure.

When no FailedWriteHandler is registered, acknowledgments that indicate errors in persisting data are treated as unexpected messages and routed to the LastChanceMessageHandler. In this case, AMPS provides only the acknowledgment message and does not provide the additional information from the client publish store.

Monitoring Connection State

The AMPS client interface provides the ability to set one or more connection state listeners. A connection state listener is a callback that is invoked when the AMPS client detects a change to the connection state.

The AMPS client provides the following state values for a connection state listener:

State Indicates
Connected

The client has established a connection to AMPS. If you are using a Client, this is delivered when connect() is successful.

If you are using an HAClient, this state indicates that the connect part of the connect and logon process has completed. An HAClient using the default disconnect handler will attempt to log on immediately after delivering this state.

Most applications that use Client will attempt to log on immediately after the call to connect() returns.

An application should not submit commands to AMPS from the connection state listener while the client is in this state unless the application knows that the state has been delivered from a Client and that the Client does not call logon().

LoggedOn

The client has successfully logged on to AMPS. If you are using a Client, this is delivered when logon() is successful.

If you are using an HAClient, this state indicates that the logon part of the connect and logon process has completed.

This state is delivered after the client is logged on, but before recovery of client state is complete. Recovery will continue after delivering this state: the application should not submit commands to AMPS from the connection state listener while the client is in this state if further recovery will take place.

HeartbeatInitiated

The client has successfully started heartbeat monitoring with AMPS. This state is delivered if the application has enabled heartbeating on the client.

This state is delivered before recovery of the client state is complete. Recovery may continue after this state is delivered. The application should not submit commands to AMPS from the connection state listener until the client is completely recovered.

PublishReplayed

Delivered when a client has completed replay of the publish store when recovering after connecting to AMPS.

This state is delivered when the client has a PublishStore configured.

If the client has a subscription manager set, (which is the default for an HAClient), the application should not submit commands from from the connection state listener until the Resubscribed state is received.

Resubscribed

Delivered when a client has re-entered subscriptions when recovering after connecting to AMPS.

This state is delivered when the client has a subscription manager set (which is the default for an HAClient). This is the final recovery step. An application can submit commands to AMPS from the connection state listener after receiving this state.

Disconnected The client is not connected. For an HAClient, this means that the client will attempt to reconnect to AMPS. For a Client, this means that the client will invoke the disconnect handler, if one is specified.
Shutdown The client is shut down. For an HAClient, this means that the client will no longer attempt to reconnect to AMPS. This state is delivered when close() is called on the client or when a server chooser tells the HAClient to stop reconnecting to AMPS.

Table 5.1: ConnectionStateListener values

The enumeration provided for the connection state listener also includes a value of UNKNOWN, for use as a default or to represent additional states in a custom Client implementation. The 60East implementations of the client do not deliver this state.

The following table shows examples of the set of states that will be delivered during connection, in order, depending on what features of the client are set. Notice that, for an instance of the Client class, this table assumes that the application calls both connect() and logon(). For an HAClient, this table assumes that the HAClient is using the default DisconnectHandler for the HAClient.

Configuration States

subscription manager

publish store

Connected

LoggedOn

PublishReplayed

Resubscribed

subscription manager

publish store

heartbeat set

Connected

LoggedOn

HeartbeatInitiated

PublishReplayed

Resubscribed

subscription manager

Connected

LoggedOn

Resubscribed

subscription manager

heartbeat set

Connected

LoggedOn

HeartbeatInitiated

Resubscribed

(default Client
configuration)

Connected

LoggedOn

Table 5.2: Sequence of states for connection