10. Transport Filter Modules

What’s A Transport Filter Module?

A transport filter module provides the ability to process and rewrite incoming commands, including sow commands, subscribe commands, publish commands, and logon commands.

Transport filters can be useful when a system needs to rewrite commands before they are processed by AMPS. For example, a system may be in the process of renaming a topic, and may want any publish or subscribe to the old topic name to be interpreted as though the new topic was specified. Likewise, a system may want to enforce constraints on valid messages that cannot be expressed with a entitlement publish filter.

When a transport filter is specified for a transport, AMPS calls the transport filter after the message is received and the header is parsed, but before any other action is taken on the message. A transport filter can change the data provided for a command and can change or add many of the headers on a command before AMPS acts on the command. A transport filter can also direct AMPS to stop processing the command and return a failure acknowledgment to the client that submitted the command.

A transport filter cannot change the header format, the credentials for the connection, or the message type of the command.

AMPS processes the modified command. For example, if the transport filter changes the topic of a subscribe command, AMPS enters the subscription using the modified topic rather than the original topic. Likewise, if a transport filter changes the data provided with a publish command, the modified data is provided to subscribers, stored in the transaction log and the State-of-the-World, used to calculate differences for a delta subscribe, and so on.

When to Implement a Transport Filter Module

Implement a transport filter module when:

  • Your system needs to manipulate commands at the point at which AMPS receives the command
  • The changes you need to make to the command aren’t provided by another mechanism (for example, preprocessing and enrichment)
  • The changes you need to make are not more easily made by the publisher or subscriber

For these scenarios, transport filters may be the best option. 60East recommends carefully considering whether a more straightforward solution exists before implementing a transport filter.

For example, your system may require that the correlation id provided with a publish command conform to a specific format. In this case, a transport filter could validate that format and direct AMPS to reject publish commands that do not have that format.

Transport Filter Context

AMPS creates a transport filter context for each Transport the module is used for. When initializing the transport, AMPS calls the amps_transport_filter_create_context function to create the context. This context will be provided to calls to the module for that transport.

Your module implements the amps_transport_filter_create_context function to create a transport filter context. The function has this signature:

amps_transport_filter_context amps_transport_filter_create_context(amps_module_options options);

The transport filter context is a pointer to data that your module defines. The AMPS server does not process or interpret this data. The server receives the pointer from your module when the module is initialized, and then provides the pointer back to your module for each incoming command to be filtered.

If your module is unable to successfully create the context, you return a NULL context and use the amps_logger provided to your module to log information about the problem.

Notice that you cast the pointer to amps_transport_filter_context before you return it, and cast the pointer back to the type that your application uses after AMPS returns the pointer to you.

AMPS passes configuration options to modules as a pointer to an array of amps_option structs. AMPS indicates the end of the array with an option that has a NULL key. Your module processes options until it reaches an option with a NULL key. If no options are specified in the configuration file, the first amps_option in the array will have a NULL key.

The following snippet shows one way to process options:

for (; options->key != NULL; ++options) {
    // process the key/value pair here

}

When AMPS exits, AMPS calls amps_transport_filter_destroy_context with each of the transport filter contexts created. This gives your module an opportunity to do any resource management, cleanup or logging necessary before AMPS shuts down. The function has this signature:

int amps_transport_filter_destroy_context(amps_transport_filter_context context);

Implementing a Transport Filter

Transport filter modules implement a single function for processing incoming commands:

Function  
amps_transport_filter_execute_incoming Filter an incoming command. Called after AMPS successfully parses the message headers, but before any other processing takes place.

Table 10.1: Transport filter function

To implement a transport filter module, your module implements this method.

When AMPS runs the action, the AMPS server calls the following method:

int amps_transport_filter_execute_incoming(amps_transport_filter_context* context,
                                           amps_message_object message);

The transport filter context provided is the pointer returned by the call to amps_transport_filter_create_context() for the transport for which the filter is being run.

Following is a simple, minimal implementation of amps_transport_filter_execute_incoming:

extern "C"
int amps_transport_filter_execute_incoming(amps_transport_filter_context* context,
                                           amps_message_object message)
{

    // Constant message for output.
    static const char* messageText = "{\"message\":\"I am Groot!\"}";
    static const size_t messageLength = strlen(messageText);

    // Information about incoming message, to be used to determine
    // if the filter should alter the message.
    const char* messageType = amps_get_message_type();
    struct amps_cstring data;

    // Ignore filter if message type isn't json.
    if (strncmp(messageType, "json", 4) !=  0)
    {
        return AMPS_SUCCESS;
    }

    // Ignore filter if there is no data in the command,
    // otherwise Groot-ify the message.
    amps_message_get_data(message, &data);
    if (data.length > 0)
    {
      amps_message_set_data(message,messageText,messageLength);
    }

    return AMPS_SUCCESS;
}

This simple implementation takes any incoming command that contains data and converts the data to the string {"message":"I am Groot!"}.

For your module, you would substitute code that implements whatever filtering you need to perform. If the filter succeeds and the command should be accepted, return AMPS_SUCCESS. If the filter fails, or the filter does not want AMPS to process the command, use the logger provided in amps_module_init to log information about the failure and return AMPS_FAILURE.

Determining the Message Type And Connection Information

Within a transport filter, AMPS provides a function to determine the message type of the connection that submitted the command. For transport filters that manipulate message data, this is useful for determining the output format that AMPS expects.

This function has the following signature:

const char* amps_get_message_type(void);

This function returns a pointer to a NULL-terminated string that contains the message type name for the client connection that submitted the command.

Notice that AMPS does not parse or validate the message data before providing the message to the content filter, so AMPS cannot guarantee that a given command contains valid data for the message type.

Transport filters also have access to other connection information, including the following functions:

Function  
amps_get_client_correlation_id Returns the correlation ID for the submitted command.
amps_get_client_name Returns the client name for the connection on which the command arrived.
amps_get_connection_name Returns the connection name for the connection on which the command arrived.
amps_get_message_type Returns the message type of the connection on which the command arrived.
amps_get_remote_address Returns the remote address for the connection on which the command arrived.

Table 10.2: Transport filter context functions

Working With The Message

The AMPS server SDK provides a set of functions for retrieving and setting headers on the message.

The following headers are available through transport filters:

  • CommandType
  • Topic
  • SowKey
  • CorrelationId
  • Bookmark
  • Filter
  • Options

The Data part of a message is also available to a transport filter.

Notice that a transport filter cannot change the message type of a connection, change the client name, or change the credentials of the connection. Further, the transport filter is not allowed to change fields that the client relies on for correct behavior, such as the subscription ID, the query ID, or the sequence number for a publish, delta_publish, or sow_delete command.

Refusing a Command

To refuse a command provided to a transport filter, return AMPS_FAILURE from the amps_transport_filter_execute_incoming function. AMPS will stop processing the message, and if the client has requested an acknowledgment, AMPS will return a failure acknowledgment indicating that the transport filter has refused the message.

To refuse a command and indicate that AMPS should disconnect the client, return AMPS_FATAL from the amps_transport_filter_execute_incoming function. Providing this return value immediately closes the connection and discards any messages currently en route to the client.

Client Interaction and Transport Filters

Other than the case where a transport filter refuses a command, transport filters are not visible to client applications. Use caution when changing command parameters outside of the control of the client.

For example, while it is possible to use a transport filter to change a subscribe command to a sow_and_subscribe command, the fact that the command has been altered before AMPS processes it will not be obvious to the application that submitted the command. In this case, a subscriber may not be prepared for the sow, group_begin and group_end messages returned by a sow_and_subscribe, since none of these messages are returned by a subscribe. The application may ignore these messages, missing data, or may even interpret the results as a critical error and exit.

Likewise, it is possible to use a transport filter to add the oof option to commands like sow_and_subscribe. As with the previous example, though, there is no way for the transport filter to guarantee that the application is expecting oof messages and will process them correctly.

In general, 60East recommends against using transport filters to correct or modify client behavior.

Event Logging and Transport Filters

When trace logging is active, AMPS typically logs the original message headers and data rather than the message as modified by the transport filter. If your filter modifies the command, and it is important to capture the modifications in the event log, the filter itself should log the changes made.