6. State of the World¶
The AMPS State of the World (SOW) allows you to automatically keep and query the latest information about a topic on the AMPS server, without building a separate database. Using SOW lets you build impressively high-performance applications that provide rich experiences to users. The AMPS C++ client lets you query SOW topics and subscribe to changes with ease. AMPS SOW topics can be used as a current value cache to provide the most recently published value for each record, as a key/value object store, as the source for an aggregate or conflated topic, or all of the above uses. For more information on State of the World topics, see the AMPS User Guide.
Performing SOW Queries¶
To begin, we will look at a simple example of issuing a SOW query.
for (auto message : ampsClient.sow("orders" ,"/symbol == 'ROL'")) {
if (message.getCommand() == "group_begin" ) {
std::cout << "Receiving messages from the SOW." << std::endl ;
}
else if (message.getCommand() == "group_end") {
std::cout << "Done receiving messages from SOW." << std::endl;
}
else {
std::cout << "Received message: " << message.getData () << std::endl;
}
}
Example 6.1: Basic SOW query
In listing
Example 6.1
the program invokes ampsClient.sow()
to initiate a SOW query on the orders
topic,
for all entries that have a symbol of ’ROL’. The SOW query is requested
with a batch size of 100, meaning that AMPS will attempt to send 100
messages at a time as results are returned.
As the query executes, each matching entry in the topic at the time of
the query is returned. Messages containing the data of matching entries
have a Command
of value sow
, so as those arrive, we write them
to the console. AMPS sends a “group_begin” message before the first SOW
result, and a “group_end” message after the last SOW result.
When the SOW query is complete, the MessageStream
completes
iteration and the loop completes. There’s no need to explicitly break
out of the loop.
As with subscribe, the sow function also provides an asynchronous version. In this case, you provide a message handler that will be called on a background thread:
void HandleSOW(const Message& message)
{
if (message.getCommand() == "sow") {
cout << message.getData() << endl;
}
}
void ExecuteSOWQuery(Client client)
{
Command command("sow");
command.setTopic("orders")
.setFilter("/symbol='ROL'")
.setBatchSize(100);
client.execute_async(Command("sow")
.setTopic("orders")
.setFilter("/symbol = 'ROL'")
.setBatchSize(100),
bind(HandleSOW, placeholders::_1));
}
Example 6.2: Asynchronous SOW
In the listing for
Example 6.2,
the ExecuteSOWQuery()
function invokes client.sow()
to initiate a SOW
query on the orders topic, for all entries that have a symbol of
ROL
. The SOW query is requested with a batch size of 100, meaning
that AMPS will attempt to send 100 messages at a time as results are
returned.
As the query executes, the HandleSOW()
method is invoked for each
matching entry in the topic. Messages containing the data of matching
entries have a Command
of sow
, so as those arrive, we write them
to the console.
Samples of Querying a Topic in the SOW¶
The C++ client includes the following samples that demonstrate how to query a topic in the State-of-the-World.
Sample Name | Demonstrates |
---|---|
amps_publish_sow.cpp |
Publishing messages to a State-of-the-World topic. |
amps_query_sow.cpp |
Querying messages from a State-of-the-World topic. |
SOW and Subscribe¶
Imagine an application that displays real-time information about the
position and status of a fleet of delivery vans. When the application
starts, it should display the current location of each of the vans along
with their current status. As vans move around the city and post other
status updates, the application should keep its display up to date. Vans
upload information to the system by posting message to a van
location
topic, configured with a key of van_id
on the AMPS
server.
In this application, it is important to not only stay up-to-date on the
latest information about each van, but to ensure all of the active vans
are displayed as soon as the application starts. Combining a SOW with a
subscription to the topic is exactly what is needed, and that is
accomplished by the Client.sowAndSubscribe()
method. Now we will
look at an example:
/* processSOWMessage
*
* Processes a message during SOW query. Returns
* false if the SOW query is complete, true
* if there is no more SOW processing.
*/
bool processSOWMessage(const AMPS::Message& message)
{
if (message.getCommand() == "group_begin") {
std::cout << "Receiving messages from the SOW." << std::endl;
}
else if (message.getCommand() == "group_end") {
std::cout << "Done receiving messages from SOW." << std::endl;
return true;
}
else {
std::cout << "SOW message: " << message.getData() << std::endl;
addVan(message);
}
return false;
}
/* processSubscriptionMessage
*
* Process messages received on a subscription, after the SOW
* query is complete.
*/
void processSubscribeMessage(const AMPS::Message& message)
{
if (message.getCommand() == "oof") {
std::cout << "OOF : " << message.getReason()
<< " message to remove : "
<< message.getData() << std::endl;
removeVan(message);
}
else {
std::cout << "New or updated message: " << message.getData() << std::endl;
addOrUpdateVan(message);
}
}
...
void doSowAndSubscribe(AMPS::Client& ampsClient)
{
bool sowDone = false;
std::cerr << "about to subscribe..." << std::endl;
/* we issue a sowAndSubscribe() to begin receiving information about all of the
* open orders in the system for the symbol ROL. These orders are now are returned
* as Messages whose Command returns SOW.
*
* notice here that we specified true for the oofEnabled parameter.
* Setting this parameter to true causes us to receive Out-of-Focus("OOF")
* messages for the topic. OOF messages are sent when an entry that was sent
* to us in the past no longer matches our query. This happens when an entry
* is removed from the SOW cache via a sowDelete() operation,
* when the entry expires (as specified by the expiration time on the message
* or by the configuration of that topic on the AMPS server),
* or when the entry no longer matches the content filter
* specified. In our case, when an order is processed or canceled
* (or if the symbol changes), a Message is sent with Command set to OOF.
* The content of that message is the message sent previously.
* We use OOF messages to remove orders from our display as they are
* completed or canceled.
*/
for (auto message : ampsClient.execute(Command("sow_and_subscribe")
.setTopic("van_location")
.setFilter("/status = 'ACTIVE'")
.setBatchSize(100)
.setOptions("oof"))) {
if (sowDone == false)
{
sowDone = processSOWMessage(message);
}
else
{
processSubscribeMessage(message);
}
}
}
Example 6.3: Using sowAndSubscribe
Now we will look at an example that uses the asynchronous form of sowAndSubscribe:
// handleMessage
//
// Handles messages for both SOW query and subscription.
void processSOWMessage(const AMPS::Message& message)
{
if (message.getCommand() == "group_begin")
{
std::cout << "Receiving messages from the SOW." << std::endl;
return;
}
else if (message.getCommand() == "group_end")
{
std::cout << "Done receiving messages from SOW." << std::endl;
return true;
}
else if (message.getCommand() == "oof")
{
std::cout << "OOF : " << message.getReason()
<< " message to remove : "
<< message.getData() << std::endl;
removeVan(message);
}
else
{
std::cout << "New or updated message: " << message.getData() << std::endl;
addOrUpdateVan(message);
}
}
...
std::string trackVanPositions(AMPS::Client& ampsClient)
{
std::cerr << "about to subscribe..." << std::endl;
return ampsClient.execute_async(
Command("sow_and_subscribe")
.setTopic("van_location")
.setFilter("/status = 'ACTIVE'")
.setBatchSize(100)
.setOptions("oof"),
bind(processSOWMessage(placeholders::_1));
}
Example 6.4: Asynchronous SOW and Subscribe
In Example 6.4,
the trackVanPositions
function invokes sowAndSubscribe
to begin
tracking vans, and returns the subscription ID. The application can
later use this to unsubscribe.
The two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens, while the other form processes messages on the calling thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the application receives and processes the same messages.
Samples of SOW and Subscribe¶
The C++ client includes the following samples that demonstrate how to query a topic in the State-of-the-World and enter a subscription to receive updates to the topic.
Sample Name | Demonstrates |
---|---|
amps_publish_sow.cpp |
Publishing messages to a State-of-the-World topic. |
amps_sow_and_subscribe.cpp |
Querying messages from a State-of-the-World topic and entering a subscription to receive updates. |
Setting Batch Size¶
The AMPS clients include a batch size parameter that specifies how many messages the AMPS server will return to the client in a single batch when returning the results of a SOW query. The 60East clients set a batch size of 10 by default. This batch size works well for common message sizes and network configurations.
Adjusting the batch size may produce better network utilization and produce better performance overall for the application. The larger the batch size, the more messages AMPS will send to the network layer at a time. This can result in fewer packets being sent, and therefore less overhead in the network layer. The effect on performance is generally most noticeable for small messages, where setting a larger batch size will allow several messages to fit into a single packet. For larger messages, a batch size may still improve performance, but the improvement is less noticeable.
In general, 60East recommends setting a batch size that is large enough to produce few partially-filled packets. Bear in mind that AMPS holds the messages in memory while batching them, and the client must also hold the messages in memory while receiving the messages. Using batch sizes that require large amounts of memory for these operations can reduce overall application performance, even if network utilization is good.
For smaller message sizes, 60East recommends using the default batch size, and experimenting with tuning the batch size if performance improvements are necessary. For relatively large messages (especially messages with sizes over 1MB), 60East recommends explicitly setting a batch size of 1 as an initial value, and increasing the batch size only if performance testing with a larger batch size shows improved network utilization or faster overall performance.
Client-Side Conflation¶
In many cases, applications that use SOW topics only need the current value of a message at the time the message is processed, rather than processing each change that lead to the current value. On the server side, AMPS provides conflated topics to meet this need. Conflated topics are described in more detail in the AMPS User Guide, and require no special handling on the client side.
In some cases, though, it’s important to conflate messages on the client side. This can be particularly useful for applications that do expensive processing on each message, applications that are more efficient when processing batches of messages, or for situations where you cannot provide an appropriate conflation interval for the server to use.
A MessageStream
has the ability to conflate messages received for a
subscription to a SOW topic, view, or conflated topic. When conflation
is enabled, for each message received, the client checks to see whether
it has already received an unprocessed message with the same SowKey
.
If so, the client replaces the unprocessed message with the new message.
The application never receives the message that has been replaced.
To enable client-side conflation, you call conflate()
on the
MessageStream
, and then use the MessageStream
as usual:
/* Query and subscribe */
MessageStream results = ampsClient.sowAndSubscribe("orders", "/symbol == 'ROL'");
/* Turn on conflation */
results.conflate();
/* Process the results */
for (auto message : results)
{
// Process message here
}
Notice that if the MessageStream
is used for a subscription that
does not include SowKeys
(such as a subscription to a topic that
does not have a SOW), no conflation will occur.
When using client-side conflation with delta subscriptions, bear in mind that client-side conflation replaces the whole message, and does not attempt to merge deltas. This means that updates can be lost when messages are replaced. For some applications (for example, a ticker application that simply sends delta updates that replace the current price), this causes no problems. For other applications (for example, when several processors may be updating different fields of a message simultaneously), using conflation with deltas could result in lost data, and server-side conflation is a safer alternative.
Managing SOW Contents¶
AMPS allows applications to manage the contents of the SOW by explicitly deleting messages that are no longer relevant. For example, if a particular delivery van is retired from service, the application can remove the record for the van by deleting the record for the van.
The client provides the following functions for deleting records from the SOW.
sowDelete
accepts a filter, and deletes all messages that match the filtersowDeleteByKeys
accepts a set of SOW keys as a comma-delimited string and deletes messages for those keys, regardless of the contents of the messages. SOW keys are provided in the header of a SOW message, and is the internal identifier AMPS uses for that SOW messagesowDeleteByData
accepts a topic and message, and deletes the SOW record that would be updated by that message
The most efficient way to remove messages from the SOW is to use
sowDeleteByKeys
or sowDeleteByData
, since those options
allow AMPS to exactly target the message or messages to be removed.
Many applications use sowDelete
, since this is the most
flexible method for removing items from the SOW when the application
does not have information on the exact messages to be removed.
In either case, AMPS sends an OOF message to all subscribers who have received updates for the messages removed, as described in the previous section.
The simple form of the sowDelete command returns a MessageStream that receives the response. This response is an acknowledgment message that contains information on the delete command. For example, the following snippet simply prints informational text with the number of messages deleted:
for (auto msg : client.sowDelete("sow_topic", "/id in (42, 64, 37)"))
{
std::cout << "Got a " << msg.getCommand()
<< " message containing " << msg.getAckType()
<< ": deleted " << msg.getMatches() << " entries."
<< std::endl;
}
The sowDelete
command can also be sent asynchronously, in a version
that requires a message handler. The message handler is written to
receive sow_delete
response messages from AMPS:.
void HandleSOWDelete(const Message& message)
{
std::cout << "Got a " << msg.getCommand()
<< " message containing " << msg.getAckType()
<< ": deleted " << msg.getMatches() << " entries."
<< std::endl;
}
....
client.execute_async(Command("sow_delete")
.setTopic("sow_topic")
.setFilter("/id in (42, 64, 37)"),
bind(HandleSOWDelete, placeholders::_1));
Acknowledging messages from a queue uses a form of the sow_delete
command that is only supported for queues. Acknowledgment is discussed
in the chapter on queues.