5. Error Handling¶
In every distributed system, the robustness of your application depends on its ability to recover gracefully from unexpected events. The AMPS client provides the building blocks necessary to ensure your application can recover from the kinds of errors and special events that may occur when using AMPS.
Exceptions¶
Generally speaking, when an error occurs that prohibits an operation
from succeeding, AMPS will throw an exception. AMPS exceptions
universally derive from AMPS.AMPSException
, so by catching
AMPSException
, you will be sure to catch anything AMPS throws, for
example:
def read_and_evaluate(client):
# read a new payload from the user
payload = input("Please enter a message")
# write a new message to AMPS
if payload:
try:
client.publish(
"UserMessage",
"{ \"message\" : \"%s\" }" % payload
)
except AMPS.AMPSException as e:
sys.stderr.write("An AMPS exception" + "occurred: %s" % str(e))
Example 5.1: Catching an AMPS Exception
In this example, if an error occurs the program writes the error to
stderr
, and the publish()
command fails. However, client
is
still usable for continued publishing and subscribing. When the error
occurs, the exception is written to the console. As with most Python
exceptions, str()
will convert the exception into a string that
includes a descriptive message.
AMPS exception types vary based on the nature of the error that occurs.
In your program, if you would like to handle certain kinds of errors
differently than others, you can handle the appropriate subclass of
AMPSException
to detect those specific errors and do something
different.
def create_new_subscription(client):
messageStream = None
topicName = None
while messageStream is None:
# attempts to retrieve a topic name (or regular expression) from the user.
topicName = input("Please enter a topic name")
try:
# If an error occurs when setting up the subscription, the program decides whether
# or not to try again based on the subclass of AMPSException that is thrown. In
# this case, if the exception is a BadRegexTopicError, the exception indicates
# that the user provided a bad regular expression. We would like to give the user
# a chance to correct, so we ask the user for a new topic name.
messageStream = client.subscribe(
topicName,
None
)
# This line indicates that the program catches the BadRegexTopicError exception
# and displays a specific error to the user indicating the topic name or
# expression was invalid. By not returning from the function in this except block,
# the while loop runs again and the user is asked for another topic name.
except BadRegexTopicError as e:
print(
"Error: bad topic name or regular expression " +
topicName +
". The exception was " +
str(e) +
"."
)
# we'll ask the user for another topic
# If an AMPS exception of a type other than BadRegexTopicError is thrown by AMPS,
# it is caught here. In that case, the program emits a different error message to
# the user.
except AMPSException as e:
print (
"Error: error setting up subscription to topic" +
topicName +
". The exception was " +
str(e) +
"."
)
# At this point the code stops attempting to subscribe to the client by the return
# None statement.
return None
return messageStream
Example 5.2: Handling AMPSException Subclasses
Exception Types¶
Each method in AMPS documents the kinds of exceptions that are thrown from it. For reference, table 1A contains a list of all of the exception types you may encounter while using AMPS, when they occur, and what they mean.
Exception Handling and Asynchronous Message Processing¶
When using asynchronous message processing, exceptions thrown from the message handler are silently absorbed by the AMPS Python client by default. The AMPS Python client allows you to register an exception listener to detect and respond to these exceptions. When an exception listener is registered, AMPS will call the exception listener with the exception. See Example 5.6
for details.
Controlling Blocking with Command Timeout¶
The named convenience methods and the Command
class provide a
timeout
setting that specifies how long the command should wait
to receive a processed
acknowledgment from AMPS. This can be helpful
in cases where it is important for the caller to limit the amount of time
to block waiting for AMPS to acknowledge the command. If the AMPS client
does not receive the processed acknowledgment within the specified
time, the client sends an unsubscribe
command to the server to
cancel the command and throws an exception.
Acknowledgments from AMPS are processed by the client receive thread on the same socket as data from AMPS. This means that any other data previously returned (such as the results of a large query) must be consumed before the acknowledgment can be processed. An application that submits a set of SOW queries in rapid succession should set a timeout that takes into account the amount of time required to process the results of the previous query.
Disconnect Handling¶
Every distributed system will experience occasional disconnections between one or more nodes. The reliability of the overall system depends on an application’s ability to efficiently detect and recover from these disconnections. Using the AMPS Python client’s disconnect handling, you can build powerful applications that are resilient in the face of connection failures and spurious disconnects. For additional reliability, you can also use the high availability client (discussed in the following sections), which provides both disconnect handling and features to help ensure that messages are reliably delivered.
The HAClient
class, included with the AMPS Python client, contains a
disconnect handler and other features for building highly-available
applications. The HAClient
includes features for managing a list of
failover servers, resuming subscriptions, republishing in-flight
messages, and other functionality that is commonly needed for high
availability. This section covers the use of a custom disconnect handler
in the event that the behavior of the HAClient
does not suit the
needs of your application.
AMPS disconnect handling gives you the ultimate in control and
flexibility regarding how to respond to disconnects. Your application
gets to specify exactly what happens when an exception occurs by
supplying a function to Client.set_disconnect_handler()
which is
invoked whenever a disconnect occurs.
Example 5.3 shows the basics:
class MyApp:
def __init__(self, _uri):
self.uri = _uri
self.client = None
self.client = AMPS.Client(...)
# set_disconnect_handler() method is called to supply a function for use when AMPS
# detects a disconnect. At any time, this function may be called by AMPS to
# indicate that the client has disconnected from the server, and to allow your
# application to choose what to do about it. The application continues on to
# connect and subscribe to the orders topic.
self.client.set_disconnect_handler(self.attempt_reconnection)
self.client.connect(self.uri)
self.client.logon()
# display order data to the user
def showMessage(self,m):
# Our disconnect handler’s implementation begins here. In this example, we simply
# try to reconnect to the original server after a 5 second pause. Errors are
# likely to occur here, therefore we must have disconnected for a reason, but
# Client takes care of catching errors from our disconnect handler. If an error
# occurs in our attempt to reconnect and an exception is thrown by connect(),
# Client will catch it and absorb it, passing it to the ExceptionListener if
# registered. If the client is not connected by the time the disconnect handler
# returns, AMPS throws DisconnectedException.
def attempt_reconnection(self, client):
# simple: just sleep and reconnect
time.sleep(5)
self.client.connect(self.uri)
self.client.logon()
Example 5.3: Supplying a Disconnect Handler
By creating a more advanced disconnect handler, you can implement logic
to make your application even more robust. For example, imagine you have
a group of AMPS servers configured for high availability – you could
implement fail-over by simply trying the next server in the list until
one is found (a simplified version of the behavior provided by
the DefaultServerChooser
with the HAClient
).
Example 5.4 shows a brief example.
class MyApp:
def __init__(self, uris):
self._currentUri = 0
# Here our application is configured with an array of AMPS server URIs to choose
# from, instead of a single URI. Our client is constructed and configured with.
# These will be used in the connect_to_next_uri() method as explained below.
self.uris = uris
self.client = Client(...)
self.client.set_disconnect_handler(self.connect_to_next_uri)
# connect_to_next_uri() is invoked by our disconnect handler. Since our client is
# currently disconnected, we manually invoke our disconnect handler to initiate
# the first connection.
self.connect_to_next_uri(self.client)
def connect_to_next_uri(self,client):
# In our disconnect handler, we invoke connect_to_next_uri(), which loops around
# our array of URIs attempting to connect to each one. In the invoke() method it
# attempts to connect to the current URI, and if it is successful, returns
# immediately. If the connection attempt fails, the exception handler for
# AMPSException is invoked. In the exception handler, we advance to the next URI,
# display a warning message, and continue around the loop. This simplistic handler
# never gives up, but in a typical implementation, you would likely stop
# attempting to reconnect at some point.
while true:
try:
client.connect(self.uris[self._currentUri])
client.logon()
# At this point the client registers a subscription to the server we have
# connected to. It is important to note that, once a new server is connected, it
# the responsibility of the application to re-establish any subscriptions placed
# previously. This behavior provides an important benefit to your application: one
# reason for disconnect is due to a client’s inability to keep up with the rate of
# message flow. In a more advanced disconnect handler, you could choose to not re-
# establish subscriptions that are the cause of your application's demise.
client.subscribe(
on_message_handler,
"orders",
None,
timeout=5000
)
return
except AMPSException as e:
self._currentUri = (self._currentUri + 1) % len(self.uris)
print "failed: %s. Failing over to %s" % (str(e),
self.uris[self._currentUri])
Example 5.4: Simple Client Failover Implementation
Using a Heartbeat to Detect Disconnection¶
The AMPS client includes a heartbeat feature to help applications detect disconnection from the server within a predictable amount of time. Without using a heartbeat, an application must rely on the operating system to notify the application when a disconnect occurs. For applications that are simply receiving messages, it can be impossible to tell whether a socket is disconnected or whether there are simply no incoming messages for the client.
When you set a heartbeat, the AMPS client sends a heartbeat message to the AMPS server at a regular interval, and waits a specified amount of time for the response. If the operating system reports an error on send, or if the server does not respond within the specified amount of time, the AMPS client considers the server to be disconnected.
The AMPS client processes heartbeat messages on the client receive thread, which is the thread used for asynchronous message processing. If your application uses asynchronous message processing and occupies the thread for longer than the heartbeat interval, the client may fail to respond to heartbeat messages in a timely manner and may be disconnected by the server.
Unexpected Messages¶
The AMPS Python client handles most incoming messages and takes appropriate action. Some messages are unexpected or occur only in very rare circumstances. The AMPS Python client provides a way for clients to process these messages. Rather than providing handlers for all of these unusual events, AMPS provides a single handler function for messages that can’t be handled during normal processing.
Your application registers this handler by setting the
last_chance_message_handler
for the client. This handler is called
when the client receives a message that can’t be processed by any other
handler. This is a rare event, and typically indicates an unexpected
condition.
For example, if a client publishes a message that AMPS cannot parse,
AMPS returns a failure acknowledgment. This is an unexpected event, so
AMPS does not include an explicit handler for this event, and failure
acknowledgments are received in the method registered as the
last_chance_message_handler
.
Your application is responsible for taking any corrective action needed. For example, if a message publication fails, your application can decide to republish the message, publish a compensating message, log the error, stop publication altogether, or any other action that is appropriate.
Unhandled Exceptions¶
When using the asynchronous interface, exceptions can occur that are not thrown to the user. For example, when an exception occurs in the process of reading subscription data from the AMPS server, the exception occurs on a thread inside of the AMPS Python Client. Consider the following example using the asynchronous interface:
class MyApp:
def on_message_handler(self,message):
print message.get_data()
def wait_to_be_poked(self,client):
client.subscribe(
self.on_message_handler,
"pokes",
"/Pokee LIKE %s" % getpass.getuser(),
timeout=5000)
f = input("Press a key to exit")
Example 5.5: Where do Exceptions go?
In this example, we set up a subscription to wait for messages on the pokes topic, whose Pokee tag begins with our user name. When messages arrive, we print a message out to the console, but otherwise our application waits for a key to be pressed.
Inside of the AMPS client, the client creates a new thread of execution that reads data from the server, and invokes message handlers and disconnect handlers when those events occur. When exceptions occur inside this thread, however, there is no caller for them to be thrown to, and by default they are ignored.
In applications that use the asynchronous interface, and where it is
important to deal with every issue that occurs in using AMPS, you can
set an ExceptionHandler
via Client.set_exception_listener()
that
receives these otherwise-unhandled exceptions. Making the modifications
shown in
Example 5.6
to our previous example will allow those exceptions to be caught and handled.
In this case we are simply printing those caught exceptions out to the
console.
Tip
In some cases, the AMPS Python client may wrap exceptions of unknown type into a
AMPSException
. Your exception listener should always include a except block
for AMPSException
.
class MyApp:
def on_exception(self, e):
print "Exception occurred: %s" % str(e)
def on_message_handler(self,message):
print message.get_data()
def wait_to_be_poked(self, client):
client.set_exception_listener(self.on_exception)
# Use the advanced interface to be able to
# accept input while processing messages.
client.subscribe(
self.on_message_handler,
"pokes",
"/Pokee LIKE %s" % getpass.getuser(),
timeout=5000)
f = input("Press a key to exit")
Example 5.6: Exception Listener
In this example we have added a call to
client.set_exception_listener()
, registering a simple function that
writes the text of the exception out to the console. If exceptions are
thrown in the message handler, those exceptions are written to the
console.
AMPS records the stack trace and provides the stack trace to the exception handler, if the provided method includes a parameter for the stack trace. The sample below demonstrates one way to do this. (For sample purposes, the message handler always throws an exception.)
import AMPS
import time
import traceback
def handler(message):
print message
raise RuntimeError, "in my handler"
def exception_listener(exception, tb):
print "EXCEPTION RECEIVED", exception
if tb is not None:
traceback.print_tb(tb)
client = AMPS.Client("client")
client.set_exception_listener(exception_listener)
client.connect("tcp://localhost:9007/amps")
client.logon()
client.subscribe(handler,"topic")
client.publish("topic","data")
time.sleep(1)
client.close()
Example 5.7: AMPS stack trace
Detecting Write Failures¶
The publish
methods in the Python client deliver the
message to be published to AMPS then return immediately, without waiting
for AMPS to return an acknowledgment. Likewise, the sow_delete
methods request deletion of SOW messages, and return before AMPS
processes the message and performs the deletion. This approach provides
high performance for operations that are unlikely to fail in production.
However, this means that the methods return before AMPS has processed
the command, without the ability to return an error in the event the
command fails.
The AMPS Python client provides a failed_write_handler
that is
called when the client receives an acknowledgment that indicates a
failure to persist data within AMPS. As with the
last_chance_message_handler
described in
Unexpected Messages Section,
your application registers a handler for this function. When an acknowledgment returns that
indicates a failed write, AMPS calls the registered handler method with
information from the acknowledgment message, supplemented with
information from the client publish store if one is available. Your
client can log this information, present an error to the user, or take
whatever action is appropriate for the failure.
If your application needs to know whether publishes succeeded and are durably persisted, the following approach is recommended:
- Set a
PublishStore
on the client. This will ensure that messages are retransmitted if the client becomes disconnected before the message is acknowledged and requestpersisted
acknowledgments for messages. - Install a
failed_write_handler
. In the event that AMPS reports an error for a given message, that event will be reported to thefailed_write_handler
. - Call
publish_flush()
and verify that all messages are persisted before the application exits.
When no failed_write_handler
is registered, acknowledgments that
indicate errors in persisting data are treated as unexpected messages
and routed to the last_chance_message_handler
. In this case, AMPS
provides only the acknowledgment message and does not provide the
additional information from the client publish store.
Monitoring Connection State¶
The AMPS client interface provides the ability to set one or more connection state listeners. A connection state listener is a callback that is invoked when the AMPS client detects a change to the connection state.
The AMPS client provides the following state values for a connection state listener:
State | Indicates |
---|---|
Connected | The client has established a connection to
AMPS. If you are using a If you are using an Most applications that use An application should not submit commands to
AMPS from the connection state listener
while the client is in this state unless
the application knows that the state has been
delivered from a |
LoggedOn | The client has successfully logged on to
AMPS. If you are using a If you are using an This state is delivered after the client is logged on, but before recovery of client state is complete. Recovery will continue after delivering this state: the application should not submit commands to AMPS from the connection state listener while the client is in this state if further recovery will take place. |
HeartbeatInitiated | The client has successfully started heartbeat monitoring with AMPS. This state is delivered if the application has enabled heartbeating on the client. This state is delivered before recovery of the client state is complete. Recovery may continue after this state is delivered. The application should not submit commands to AMPS from the connection state listener until the client is completely recovered. |
PublishReplayed | Delivered when a client has completed replay of the publish store when recovering after connecting to AMPS. This state is delivered when the client has a PublishStore configured. If the client has a subscription manager set,
(which is the default for an |
Resubscribed | Delivered when a client has re-entered subscriptions when recovering after connecting to AMPS. This state is delivered when the client has a
subscription manager set (which is the default
for an |
Disconnected | The client is not connected. For an
HAClient , this means that the client will
attempt to reconnect to AMPS. For a Client ,
this means that the client will invoke the
disconnect handler, if one is specified. |
Shutdown | The client is shut down. For an HAClient ,
this means that the client will no longer
attempt to reconnect to AMPS. This state is
delivered when close() is called on the
client or when a server chooser tells the
HAClient to stop reconnecting to AMPS. |
Table 5.1: ConnectionStateListener values
The enumeration provided for the connection state listener also includes
a value of UNKNOWN
, for use as a default or to represent additional
states in a custom Client
implementation. The 60East implementations
of the client do not deliver this state.
The following table shows examples of the set of states that will be delivered
during connection, in order, depending on what features
of the client are set. Notice that, for an instance of the Client
class,
this table assumes that the application calls both connect()
and
logon()
. For an HAClient
, this table assumes that the HAClient
is
using the default DisconnectHandler
for the HAClient
.
Configuration | States |
---|---|
subscription manager publish store |
Connected LoggedOn PublishReplayed Resubscribed |
subscription manager publish store heartbeat set |
Connected LoggedOn HeartbeatInitiated PublishReplayed Resubscribed |
subscription manager | Connected LoggedOn Resubscribed |
subscription manager heartbeat set |
Connected LoggedOn HeartbeatInitiated Resubscribed |
|
Connected LoggedOn |
Table 5.2: Sequence of states for connection