8. AMPS Embedded Client¶
The AMPS external API includes an embedded client for working with AMPS. The embedded client provides basic functions and is similar to the C client API. The embedded client can be used for situations where a small amount of application logic needs to run in the AMPS process. In general, application functionality is better suited to being implemented in an external process. 60East recommends using the embedded client in cases where an external process cannot be used. 60East recommends limiting the amount of processing done within the server, since (unlike an external process), an internal client uses the same processing and memory resources that AMPS uses, consuming resources that could be used for the AMPS server.
Header Files¶
The definitions for the embedded client API are included in the utility API header file:
sdk/include/amps_api.h
The header file contains API documentation for each of the functions. Short descriptions of the most commonly used functions appear below, with an overview discussion of how to use the functions.
Embedded client¶
To use the embedded client, you:
- Create a client handle. This is equivalent to creating a client, connecting the client to AMPS, and logging on to AMPS from the client libraries.
- Construct a command. This is equivalent to constructing a command in the client libraries. As with the client libraries, you construct the command and then set the arguments for the command.
- Execute the command. This is equivalent to executing the command in the client libraries.
AMPS embedded client handles and command handles do not store thread-specific state, so they can be used from any thread. However, the handles do not provide internal synchronization. If a client handle or command handle will be used from multiple concurrent threads, your module must provide the appropriate synchronization.
Embedded clients rely on the AMPS server infrastructure having been initialized. Creating an embedded client during module initialization is not guaranteed to succeed, and may cause the server to exit.
When your module is finished with the embedded client, typically when the module context is destroyed, you must release the client.
The AMPS Server SDK includes a sample action that uses the embedded client to receive and process messages.
AMPS provides the following functions for working with embedded clients.
Function | Description |
---|---|
amps_client_create |
Initializes the provided
Notice that, unlike the client-side
interface, this method requires the
|
amps_client_destroy |
Releases the provided
amps_client_object and frees the
associated resources. You call this
method when the module is done with the
client, typically at context
destruction. |
amps_client_get_sequence_number |
Returns the current sequence number for
the provided amps_client_object . |
Table 8.1: Client Functions
Since the client is embedded in the AMPS server, there are features of the AMPS client libraries that do not apply. For example, since the embedded client publishes messages to AMPS directly, the client does not provide a publish store for store-and-forward functionality.
Embedded Commands¶
AMPS provides the following functions for working with embedded commands. As with the embedded client, the embedded interface is similar to the interface provided in the AMPS client libraries. One important difference with embedded commands is that an embedded command references a client object, and is used only with that client.
Function | Description |
---|---|
amps_command_create |
Initializes the provided
amps_command_object handle to be a
command for the provided
amps_client_object of the specified
command type. |
amps_command_destroy |
Releases the provided
amps_command_object and frees the
associated resources. You call this
method when the module is done with the
command, typically at context
destruction. |
amps_command_execute |
Executes the provided
amps_command_object . Notice that
this method does not require an
amps_client_object . The client used
when constructing the command is the
client used by this method. |
amps_command_clear |
Clears the arguments for the provided
amps_command_object . This method
resets the command object to the state
it was in immediately after the
create command. |
amps_command_set_data |
Sets the data (the message payload) for this command. To clear the data, pass a NULL pointer and a length of zero. |
amps_command_set_topic |
Set the topic for this command. To clear the topic, pass a NULL pointer and a length of zero. |
amps_command_set_filter |
Set the filter for this command. To clear the filter, pass a NULL pointer and a length of zero. |
amps_command_set_correlation_id |
Set the correlation ID for this command. To clear the correlationID, pass a NULL pointer and a length of zero. |
amps_command_set_options |
Set the options for this command. To clear the options, pass a NULL pointer and a length of zero. |
amps_command_set_ack_type |
Set the ack type for this command. To clear the ack type, pass a NULL pointer and a length of zero. |
amps_command_set_auth_id |
Set the authenticated user ID for this command. To clear the authenticated ID, pass a NULL pointer and a length of zero. |
Table 8.2: Command Functions
Embedded Message Handlers¶
When receiving messages from AMPS, your module must provide a message handler function with the following signature:
int message_handler(amps_message_object message, void* user_data);
The user_data
returned is the data provided when the subscription
was registered, and is typically the module context or some other object
associated with the state of the subscription.
Embedded message handlers are called by the AMPS message processing thread. Your module must be able to quickly process the message: time spent in the message handler may reduce throughput or increase latency for the AMPS instance. If extensive processing is required, 60East recommends copying the message and dispatching the copied message to a dedicated processing thread managed by the module.
An embedded message handler must be re-entrant, and may be called simultaneously from multiple threads. The message returned is guaranteed to be safe to read within the call to the message handler, but is not guaranteed to have a lifetime beyond the message handler. If you need to store data from the message for use later, you must copy the data. The message itself and the data that the message points to must be treated as immutable. Your message handler must not modify the data provided to the message handler: for efficiency, the handler provides a pointer to the internal data AMPS holds for the message. Modifying this data would produce unpredictable results, which could include corrupt messages or crashes in AMPS.
The user_data
returned is not guaranteed to be in use by only one
thread at a time. If the user_data
can be modified during use, you
will need to include synchronization on the user_data
provided to a
subscription.
Likewise, a message_handler
should not rely on thread-local storage
for storing state that must be preserved across calls to the handler,
since AMPS will call the message handler from a variety of threads.
Thread-local storage may be a good option for resources that are
effectively stateless, but should not be shared across threads (for
example, parser handles).
Embedded Messages¶
AMPS provides the following functions for working with embedded messages. For a description of the fields and how they are used, see the AMPS Command Reference. The values returned by the message are valid for the lifetime of the callback to which AMPS provides the message. After the callback returns, AMPS may reuse the memory that holds the values. If you need a value to be available after the callback returns, you must copy the value.
Function | Description |
---|---|
amps_message_get_command_type |
Retrieve the command type for this message. |
amps_message_get_topic |
Retrieve the topic for this message. |
amps_message_get_is_replication |
Retrieve a value indicating whether the message was received via replication. |
amps_message_get_is_local |
Retrieve a value indicating whether the message was received via a publish directly to this instance (rather than received via replication). |
amps_message_get_sow_key |
Retrieve the SowKey for this message, if one is present. |
amps_message_get_correlation_id |
Retrieve the CorrelationId for this message, if one is present. |
amps_message_get_bookmark |
Retrieve the bookmark for this message, if one is present. |
amps_message_get_status |
Retrieve the status field for this message, if one is present. |
amps_message_get_data |
Retrieve the data field for this message, if one is present. |
amps_message_get_previous_data |
Retrieve the previous data for this message, if present. |
amps_message_get_ack_type |
Retrieve the acknowledgment type for this message, if present. |
amps_message_get_client_name |
Retrieve the client name for this message, if present. |
amps_message_get_publisher_name |
Retrieve the publisher name for this message, if present. |
Table 8.3: Embedded Message Functions
AMPS includes an embedded API for parsing messages and retrieving specified values. These functions rely on the message type parsing functions: they are only available for message types that provide parsing. These functions return the parsed values provided by the message type. As mentioned earlier, data returned from the message must be treated as immutable by your module. Modifying data in a message handler will produce unpredictable results.
As with the client handles and command handles, the embedded message parser does not store thread-specific data, so it can be used from any thread. However, the parser contains internal state and does not provide implicit synchronization. If a parser will be used from multiple concurrent threads, your module must provide the appropriate synchronization, or provide a distinct parser handle for each thread.
The embedded parser returns values as expression values. Your code treats these values as opaque pointers, and uses functions from the AMPS utility API to extract information from these expression values. When values are returned by the parser, they are pointers into the internal state of the parser and the parsed message. These values will be invalid if the parser parses another message, or if the message data goes out of scope. See Working with Expressions for more information on expression values.
Function | Description |
---|---|
amps_message_parser_create |
Initializes the provided
amps_message_parser handle with a
message parser. |
amps_message_parser_destroy |
Releases the provided
amps_message_parser and deallocates
the underlying resources. |
amps_message_parser_register_filter |
Registers a filter with the parser. To determine whether a message matches a filter, you first register the filter, then parse the message, then retrieve the match results. You can register any number of filters with the parser. |
amps_message_parser_register_xpath |
Registers an XPath identifier with the parser. To retrieve a specific value from a message, you first register an XPath identifier, then parse the message, then extract the value. You can register any number of XPath identifiers with the parser. |
amps_message_parser_extract |
Extract the values from the parsed message and populate a provided value list. and populate a list of values for the registered XPath identifiers. The parser fills in the values in the order in which the identifiers were registered. |
amps_message_parser_evaluate_filter |
Return the results of evaluating the
registered filters against the message,
populating a provided array of int .
The parser fills in the results in the
order in which the filters were
registered. |
amps_message_parser_parse |
Parse the message, using the registered XPath identifiers to locate elements within the message and evaluating the registered filters against the message. |
Table 8.4: Message Parsing Functions
For example, the snippet below shows how to create, use, and destroy an embedded message parser:
// Assume we have messageData extracted from a
// received message.
// Retrieve the hash for the message type the
// parser will parse.
amps_hash_value msgTypeHash = NULL;
amps_hash_message_type(&msgTypeHash, "json", strlen("json"));
// Initialize the parser
// Notice that the msgTypeHash declares this as
// a parser for JSON.
amps_message parser parser = NULL;
amps_message_parser_create(&parser, msgTypeHash);
// Register a filter
amps_message_parser_register_filter(parser,
"/processThis = 'true'", 21);
// Register the paths of interest
amps_message_parser_register_xpath(parser, "/id", 3);
amps_message_parser_register_xpath(parser, "/greeting", 9);
// Parse the message. The parser stores the parsed state
// of the message.
amps_message_parser_parse(parser, messageData.data, messageData.length);
// Determine whether the filter matches
int filters[1];
amps_message_parser_evaluate_filter(parser, filters);
// filters has the result of whether the filter matches.
// If that result is false, return.
if( filters[0] == 0 )
{
amps_message_parser_destroy(parser);
return;
}
// Retrieve the values
amps_expression_value values[2];
amps_message_parser_extract(parser, values);
// Work with the values
assert(amps_expression_value_as_long(values[0]) == 42);
const char *greeting = NULL;
size_t greetingLength = 0;
amps_expression_value_as_string(values[1], &greeting, &greetingLength);
// Use the greeting: the lifetime of the buffer pointed
// to is the lifetime of the message. Copy the greeting
// if it's necessary to save it.
// Destroy the parser
amps_message_parser_destroy(parser);