8. AMPS Embedded Client

The AMPS external API includes an embedded client for working with AMPS. The embedded client provides basic functions and is similar to the C client API. The embedded client can be used for situations where a small amount of application logic needs to run in the AMPS process. In general, application functionality is better suited to being implemented in an external process. 60East recommends using the embedded client in cases where an external process cannot be used. 60East recommends limiting the amount of processing done within the server, since (unlike an external process), an internal client uses the same processing and memory resources that AMPS uses, consuming resources that could be used for the AMPS server.

Header Files

The definitions for the embedded client API are included in the utility API header file:

sdk/include/amps_api.h

The header file contains API documentation for each of the functions. Short descriptions of the most commonly used functions appear below, with an overview discussion of how to use the functions.

Embedded client

To use the embedded client, you:

  1. Create a client handle. This is equivalent to creating a client, connecting the client to AMPS, and logging on to AMPS from the client libraries.
  2. Construct a command. This is equivalent to constructing a command in the client libraries. As with the client libraries, you construct the command and then set the arguments for the command.
  3. Execute the command. This is equivalent to executing the command in the client libraries.

AMPS embedded client handles and command handles do not store thread-specific state, so they can be used from any thread. However, the handles do not provide internal synchronization. If a client handle or command handle will be used from multiple concurrent threads, your module must provide the appropriate synchronization.

Embedded clients rely on the AMPS server infrastructure having been initialized. Creating an embedded client during module initialization is not guaranteed to succeed, and may cause the server to exit.

When your module is finished with the embedded client, typically when the module context is destroyed, you must release the client.

The AMPS Server SDK includes a sample action that uses the embedded client to receive and process messages.

AMPS provides the following functions for working with embedded clients.

Function Description
amps_client_create

Initializes the provided amps_client_object handle.

Notice that, unlike the client-side interface, this method requires the message_handler that will be used for subscriptions and SOW queries for this client.

amps_client_destroy Releases the provided amps_client_object and frees the associated resources. You call this method when the module is done with the client, typically at context destruction.
amps_client_get_sequence_number Returns the current sequence number for the provided amps_client_object.

Table 8.1: Client Functions

Since the client is embedded in the AMPS server, there are features of the AMPS client libraries that do not apply. For example, since the embedded client publishes messages to AMPS directly, the client does not provide a publish store for store-and-forward functionality.

Embedded Commands

AMPS provides the following functions for working with embedded commands. As with the embedded client, the embedded interface is similar to the interface provided in the AMPS client libraries. One important difference with embedded commands is that an embedded command references a client object, and is used only with that client.

Function Description
amps_command_create Initializes the provided amps_command_object handle to be a command for the provided amps_client_object of the specified command type.
amps_command_destroy Releases the provided amps_command_object and frees the associated resources. You call this method when the module is done with the command, typically at context destruction.
amps_command_execute Executes the provided amps_command_object. Notice that this method does not require an amps_client_object. The client used when constructing the command is the client used by this method.
amps_command_clear Clears the arguments for the provided amps_command_object. This method resets the command object to the state it was in immediately after the create command.
amps_command_set_data Sets the data (the message payload) for this command. To clear the data, pass a NULL pointer and a length of zero.
amps_command_set_topic Set the topic for this command. To clear the topic, pass a NULL pointer and a length of zero.
amps_command_set_filter Set the filter for this command. To clear the filter, pass a NULL pointer and a length of zero.
amps_command_set_correlation_id Set the correlation ID for this command. To clear the correlationID, pass a NULL pointer and a length of zero.
amps_command_set_options Set the options for this command. To clear the options, pass a NULL pointer and a length of zero.
amps_command_set_ack_type Set the ack type for this command. To clear the ack type, pass a NULL pointer and a length of zero.
amps_command_set_auth_id Set the authenticated user ID for this command. To clear the authenticated ID, pass a NULL pointer and a length of zero.

Table 8.2: Command Functions

Embedded Message Handlers

When receiving messages from AMPS, your module must provide a message handler function with the following signature:

int message_handler(amps_message_object message, void* user_data);

The user_data returned is the data provided when the subscription was registered, and is typically the module context or some other object associated with the state of the subscription.

Embedded message handlers are called by the AMPS message processing thread. Your module must be able to quickly process the message: time spent in the message handler may reduce throughput or increase latency for the AMPS instance. If extensive processing is required, 60East recommends copying the message and dispatching the copied message to a dedicated processing thread managed by the module.

An embedded message handler must be re-entrant, and may be called simultaneously from multiple threads. The message returned is guaranteed to be safe to read within the call to the message handler, but is not guaranteed to have a lifetime beyond the message handler. If you need to store data from the message for use later, you must copy the data. The message itself and the data that the message points to must be treated as immutable. Your message handler must not modify the data provided to the message handler: for efficiency, the handler provides a pointer to the internal data AMPS holds for the message. Modifying this data would produce unpredictable results, which could include corrupt messages or crashes in AMPS.

The user_data returned is not guaranteed to be in use by only one thread at a time. If the user_data can be modified during use, you will need to include synchronization on the user_data provided to a subscription.

Likewise, a message_handler should not rely on thread-local storage for storing state that must be preserved across calls to the handler, since AMPS will call the message handler from a variety of threads. Thread-local storage may be a good option for resources that are effectively stateless, but should not be shared across threads (for example, parser handles).

Embedded Messages

AMPS provides the following functions for working with embedded messages. For a description of the fields and how they are used, see the AMPS Command Reference. The values returned by the message are valid for the lifetime of the callback to which AMPS provides the message. After the callback returns, AMPS may reuse the memory that holds the values. If you need a value to be available after the callback returns, you must copy the value.

Function Description
amps_message_get_command_type Retrieve the command type for this message.
amps_message_get_topic Retrieve the topic for this message.
amps_message_get_is_replication Retrieve a value indicating whether the message was received via replication.
amps_message_get_is_local Retrieve a value indicating whether the message was received via a publish directly to this instance (rather than received via replication).
amps_message_get_sow_key Retrieve the SowKey for this message, if one is present.
amps_message_get_correlation_id Retrieve the CorrelationId for this message, if one is present.
amps_message_get_bookmark Retrieve the bookmark for this message, if one is present.
amps_message_get_status Retrieve the status field for this message, if one is present.
amps_message_get_data Retrieve the data field for this message, if one is present.
amps_message_get_previous_data Retrieve the previous data for this message, if present.
amps_message_get_ack_type Retrieve the acknowledgment type for this message, if present.
amps_message_get_client_name Retrieve the client name for this message, if present.
amps_message_get_publisher_name Retrieve the publisher name for this message, if present.

Table 8.3: Embedded Message Functions

AMPS includes an embedded API for parsing messages and retrieving specified values. These functions rely on the message type parsing functions: they are only available for message types that provide parsing. These functions return the parsed values provided by the message type. As mentioned earlier, data returned from the message must be treated as immutable by your module. Modifying data in a message handler will produce unpredictable results.

As with the client handles and command handles, the embedded message parser does not store thread-specific data, so it can be used from any thread. However, the parser contains internal state and does not provide implicit synchronization. If a parser will be used from multiple concurrent threads, your module must provide the appropriate synchronization, or provide a distinct parser handle for each thread.

The embedded parser returns values as expression values. Your code treats these values as opaque pointers, and uses functions from the AMPS utility API to extract information from these expression values. When values are returned by the parser, they are pointers into the internal state of the parser and the parsed message. These values will be invalid if the parser parses another message, or if the message data goes out of scope. See Working with Expressions for more information on expression values.

Function Description
amps_message_parser_create Initializes the provided amps_message_parser handle with a message parser.
amps_message_parser_destroy Releases the provided amps_message_parser and deallocates the underlying resources.
amps_message_parser_register_filter Registers a filter with the parser. To determine whether a message matches a filter, you first register the filter, then parse the message, then retrieve the match results. You can register any number of filters with the parser.
amps_message_parser_register_xpath Registers an XPath identifier with the parser. To retrieve a specific value from a message, you first register an XPath identifier, then parse the message, then extract the value. You can register any number of XPath identifiers with the parser.
amps_message_parser_extract Extract the values from the parsed message and populate a provided value list. and populate a list of values for the registered XPath identifiers. The parser fills in the values in the order in which the identifiers were registered.
amps_message_parser_evaluate_filter Return the results of evaluating the registered filters against the message, populating a provided array of int. The parser fills in the results in the order in which the filters were registered.
amps_message_parser_parse Parse the message, using the registered XPath identifiers to locate elements within the message and evaluating the registered filters against the message.

Table 8.4: Message Parsing Functions

For example, the snippet below shows how to create, use, and destroy an embedded message parser:

// Assume we have messageData extracted from a
// received message.

// Retrieve the hash for the message type the
// parser will parse.
amps_hash_value msgTypeHash = NULL;
amps_hash_message_type(&msgTypeHash, "json", strlen("json"));

// Initialize the parser
// Notice that the msgTypeHash declares this as
// a parser for JSON.

amps_message_parser parser = NULL;
amps_message_parser_create(&parser, msgTypeHash);

// Register a filter
amps_message_parser_register_filter(parser,
             "/processThis = 'true'", 21);

// Register the paths of interest
amps_message_parser_register_xpath(parser, "/id", 3);
amps_message_parser_register_xpath(parser, "/greeting", 9);

// Parse the message. The parser stores the parsed state
// of the message.
amps_message_parser_parse(parser, messageData.data, messageData.length);

// Determine whether the filter matches
int filters[1];
amps_message_parser_evaluate_filter(parser, filters);

// filters has the result of whether the filter matches.
// If that result is false, return.
if( filters[0] == 0 )
{
    amps_message_parser_destroy(parser);
    return;
}

// Retrieve the values
amps_expression_value values[2];
amps_message_parser_extract(parser, values);

// Work with the values
assert(amps_expression_value_as_long(values[0]) == 42);

const char *greeting = NULL;
size_t      greetingLength = 0;

amps_expression_value_as_string(values[1], &greeting, &greetingLength);

// Use the greeting: the lifetime of the buffer pointed
// to is the lifetime of the message. Copy the greeting
// if it's necessary to save it.

// Destroy the parser
amps_message_parser_destroy(parser);