An iterable object representing the results of an AMPS subscription and/or query.
More...
An iterable object representing the results of an AMPS subscription and/or query.
Objects of MesageStream type are returned by the MessageStream-returning overloads of methods on class AMPS::Client, such as AMPS::Client::sow() and AMPS::Client::subscribe() . Use this object via the begin() and end() iterators returned, for example:
MessageStream myTopic = client.sow("orders");
for(auto i = myTopic.begin(), e = myTopic.end(); i!=e; ++i)
{
Message m = *i;
...
}
For MessageStream objects returned by calls to methods other than sow(), note that the "end" is never reached, unless the server is disconnected. In this case, you may choose to stop processing messages by simply exiting the loop. For example, assuming that the isDone
flag will be set when the application has processed the last message in a job:
MessageStream myTopic = client.subscribe("aTopic");
for(auto i = myTopic.begin();; ++i)
{
Message m = *i;
...
if(imDone) break;
}
By default, a MessageStream for a subscription will block until a message is available or the connection is closed. You can also configure a MessageStream to periodically produce an invalid (empty) message by setting the timeout on the message stream. This can be useful for indicating that the process is still working (for example, pinging a thread monitor) or reporting an error if a subscription pauses for an unexpectedly long period of time.
For example, the following code configures the message stream to produce an invalid message if no data arrives for this subscription within 1 second (1000 milliseconds). Even though there is no work for the application to do, this gives the application the opportunity to ping a thread monitor. Further, the application keeps track of how long it has been since the last data message arrived. If data flow pauses for more than 4 minutes (240 seconds) continuously, the application reports an error.
{
MessageStream ms = client.subscribe("aTopic");
ms.timeout(1000);
int timeout_counter = 0;
for (auto m : ms)
{
if (m.isValid() == false)
{
if(timeout_counter++ > 240) { }
continue;
}
timeout_counter = 0;
... process message here, pinging thread monitor as appropriate ...
}
}