public class MessageStream extends java.lang.Object implements java.util.Iterator<Message>, java.lang.Iterable<Message>, MessageHandler, ConnectionStateListener, java.lang.AutoCloseable
MessageStream stream = Client.execute(new Command("sow").setTopic("/orders")).timeout(1000); for(Message message : stream) { ... }
Modifier and Type | Field and Description |
---|---|
protected Client |
_client |
protected CommandId |
_commandId |
protected Message |
_current |
protected int |
_maxDepth |
protected Field |
_previousBookmark |
protected Field |
_previousTopic |
protected java.util.concurrent.ConcurrentLinkedQueue<Message> |
_q |
protected CommandId |
_queryId |
protected int |
_requestedAcks |
protected java.util.concurrent.locks.Lock |
_sowKeyLock |
protected java.util.HashMap<java.lang.String,Message> |
_sowKeyMap |
protected int |
_state |
protected CommandId |
_subId |
protected boolean |
_timedOut |
protected int |
_timeout |
Connected, Disconnected, HeartbeatInitiated, LoggedOn, PublishReplayed, Resubscribed, Shutdown, UNKNOWN
Modifier | Constructor and Description |
---|---|
protected |
MessageStream(Client client_)
Constructs an instance of message stream with the given client.
|
Modifier and Type | Method and Description |
---|---|
void |
close()
Closes this MessageStream, unsubscribing from AMPS if applicable.
|
MessageStream |
conflate()
Causes messages to be conflated by SOW key.
|
void |
connectionStateChanged(int newState_)
Used to indicate if there was a change in state with connection.
|
int |
getDepth()
Gets the number of messages currently buffered by this message stream.
|
static MessageStream |
getEmptyMessageStream()
Method to retrieve the empty MessageStream.
|
int |
getMaxDepth()
Gets the allowed maximum depth of the internal message buffer before
this stream will push-back on reads from the server.
|
int |
getTimeout()
Gets the currently set timeout value for self (zero if none).
|
boolean |
hasNext()
Implements
Iterator.hasNext() for a message stream. |
void |
invoke(Message message)
Method for the AMPS client to call when a message is received.
|
boolean |
isConflating()
Indicates whether this message stream is conflating messages by SOW key.
|
boolean |
isConnected()
Returns true if the connection to AMPS is still active, or false if a
disconnect is detected.
|
java.util.Iterator<Message> |
iterator()
Implements
Iterable.iterator() to return this instance. |
MessageStream |
maxDepth(int maxDepth_)
Sets the maximum number of messages that will be buffered in memory
by this stream before taking measures to slow reads.
|
Message |
next()
Implements
Iterator.next() for a message stream. |
void |
remove()
Operation not supported.
|
protected void |
setAcksOnly(CommandId commandId_,
int ackTypes_)
Changes the current state of the message stream to
STATE_AcksOnly . |
protected void |
setCommandId(CommandId commandId_)
When this message stream is the result of a form of subscribe command, this
gets called with the command ID of the command.
|
protected void |
setQueryId(CommandId queryId_)
When this message stream is the result of a form of SOW command, this gets
called with the query ID of the command.
|
protected void |
setRunning()
Changes the current state of the message stream to
STATE_Reading . |
protected void |
setSOWOnly()
Changes the current state of the message stream to
STATE_SOWOnly . |
protected void |
setStatsOnly()
Changes the current state of the message stream to
STATE_StatsOnly . |
protected void |
setSubscription(CommandId subId_)
When this message stream is the result of a subscription command, this gets
called with the subscription ID.
|
MessageStream |
timeout(int timeout_)
Sets a timeout on self.
|
protected CommandId _commandId
protected CommandId _queryId
protected CommandId _subId
protected Client _client
protected java.util.concurrent.ConcurrentLinkedQueue<Message> _q
protected java.util.HashMap<java.lang.String,Message> _sowKeyMap
protected java.util.concurrent.locks.Lock _sowKeyLock
protected Message _current
protected volatile int _state
protected int _timeout
protected boolean _timedOut
protected volatile int _maxDepth
protected int _requestedAcks
protected Field _previousTopic
protected Field _previousBookmark
protected MessageStream(Client client_)
client_
- the client to use for this message streampublic static MessageStream getEmptyMessageStream()
protected void setSubscription(CommandId subId_)
subId_
- the CommandId to set as the SubscriptionId of this commandprotected void setQueryId(CommandId queryId_)
queryId_
- the CommandId to set as the QueryId of this commandprotected void setCommandId(CommandId commandId_)
commandId_
- the CommandId to set as the QueryId of this commandprotected void setSOWOnly()
STATE_SOWOnly
.protected void setStatsOnly()
STATE_StatsOnly
.protected void setAcksOnly(CommandId commandId_, int ackTypes_)
STATE_AcksOnly
.commandId_
- The commandId of the command for the acks.ackTypes_
- The types of acks that will be returned.protected void setRunning()
STATE_Reading
.public MessageStream timeout(int timeout_)
timeout_
- The timeout in milliseconds, or 0 for infinite timeout.public int getTimeout()
timeout(int)
public MessageStream conflate()
public boolean isConflating()
conflate()
public MessageStream maxDepth(int maxDepth_)
Sets the maximum number of messages that will be buffered in memory by this stream before taking measures to slow reads. When this threshold is exceeded the message stream must push back on the client's background reader thread, delaying reads from the client's socket connection until the buffer depth reaches or falls below the maxDepth. Due to this behavior, there are several caveats that must be observed when using a max depth on a message stream:
MessageHandler
on the
same client instance, since your asynchronous message handler could
be starved.Ignoring these caveats can potentially cause your AMPS client to hang because the background thread may never be able to read important internal acknowledgement control messages.
If these caveats are unsuitable for your use case, alternative solutions
include (1) using the pause/resume or rate options for a bookmark
subscription; (2) using the top_n/skip_n options to page a large
SOW query; (3) using message stream conflation with a SOW via
conflate()
; (4) use your own asynchronous MessageHandler
implementation instead of using this synchronous message stream, so that
you can process one message at a time without needing to store it; or
(5) creating an additional client instance so that its background
reader thread isn't affected by max depth push-back by this
message stream.
maxDepth_
- The maximum depth to set on the stream's internal buffer.
The default value comes from
Client.setDefaultMaxDepth(int)
,
which defaults to Integer.MAX_VALUE
.public int getMaxDepth()
maxDepth(int)
public int getDepth()
hasNext()
public void connectionStateChanged(int newState_)
connectionStateChanged
in interface ConnectionStateListener
newState_
- integer to compare to Disconnectedpublic boolean hasNext()
Iterator.hasNext()
for a message stream. //might need a
commenthasNext
in interface java.util.Iterator<Message>
public Message next()
Iterator.next()
for a message stream.next
in interface java.util.Iterator<Message>
public void invoke(Message message)
MessageHandler
The AMPS client reuses the same message object in successive calls to the message handler, resetting the contents of the object for each call. Should you need to use the data in the message outside of the call to the message handler, you must copy the message object or copy data out of the message object.
invoke
in interface MessageHandler
message
- the message received from AMPS. As mentioned above, this message object is reused by successive calls to the handler.public void remove()
Iterator.remove()
to throw
UnsupportedOperationException.remove
in interface java.util.Iterator<Message>
public java.util.Iterator<Message> iterator()
Iterable.iterator()
to return this instance.iterator
in interface java.lang.Iterable<Message>
public boolean isConnected()
Client
has
disconnected since the message stream was createdpublic void close()
close
in interface java.lang.AutoCloseable