25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 28 #include "amps/ampsver.h" 46 #include <sys/atomic.h> 48 #include "amps/BookmarkStore.hpp" 49 #include "amps/MessageRouter.hpp" 50 #include "amps/util.hpp" 51 #include "amps/ampscrc.hpp" 52 #if __cplusplus >= 201100L || _MSC_VER >= 1900 56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 57 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 91 #define AMPS_DEFAULT_TOP_N -1 92 #define AMPS_DEFAULT_BATCH_SIZE 10 93 #define AMPS_NUMBER_BUFFER_LEN 20 94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 101 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
109 typedef std::map<std::string, std::string> ConnectionInfo;
111 class PerThreadMessageTracker
113 std::vector<AMPS::Message*> _messages;
115 PerThreadMessageTracker() {}
116 ~PerThreadMessageTracker()
118 for (
size_t i = 0; i < _messages.size(); ++i)
125 _messages.push_back(message);
129 static AMPS::Mutex _lock;
130 AMPS::Lock<Mutex> l(_lock);
131 _addMessageToCleanupList(message);
135 static PerThreadMessageTracker tracker;
136 tracker.addMessage(message);
141 inline std::string asString(Type x_)
143 std::ostringstream os;
149 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
151 size_t pos = AMPS_NUMBER_BUFFER_LEN;
152 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
156 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
165 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
167 size_t pos = AMPS_NUMBER_BUFFER_LEN;
168 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
172 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
186 static const char* duplicate()
190 static const char* badFilter()
194 static const char* badRegexTopic()
196 return "bad regex topic";
198 static const char* subscriptionAlreadyExists()
200 return "subscription already exists";
202 static const char* nameInUse()
204 return "name in use";
206 static const char* authFailure()
208 return "auth failure";
210 static const char* notEntitled()
212 return "not entitled";
214 static const char* authDisabled()
216 return "authentication disabled";
218 static const char* subidInUse()
220 return "subid in use";
222 static const char* noTopic()
240 virtual void exceptionThrown(
const std::exception&)
const {;}
246 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 251 catch (std::exception& ex_)\ 255 _exceptionListener->exceptionThrown(ex_);\ 280 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 283 while(me->_connected)\ 290 catch(MessageStreamFullException&)\ 292 me->checkAndSendHeartbeat(false);\ 296 catch (std::exception& ex_)\ 300 me->_exceptionListener->exceptionThrown(ex_);\ 324 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 325 while(me->_connected)\ 332 catch(MessageStreamFullException&)\ 334 me->checkAndSendHeartbeat(false);\ 338 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 341 while(me->_connected)\ 348 catch(MessageStreamFullException& ex_)\ 350 me->checkAndSendHeartbeat(false);\ 354 catch (std::exception& ex_)\ 358 me->_exceptionListener->exceptionThrown(ex_);\ 382 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 383 while(me->_connected)\ 390 catch(MessageStreamFullException& ex_)\ 392 me->checkAndSendHeartbeat(false);\ 397 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 400 _exceptionListener->exceptionThrown(ex);\ 405 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 408 me->_exceptionListener->exceptionThrown(ex);\ 447 static const unsigned Subscribe = 1;
448 static const unsigned SOW = 2;
449 static const unsigned NeedsSequenceNumber = 4;
450 static const unsigned ProcessedAck = 8;
451 static const unsigned StatsAck = 16;
452 void init(Message::Command::Type command_)
461 void init(
const std::string& command_)
470 void init(
const char* command_,
size_t commandLen_)
482 if (!(command & Message::Command::NoDataCommands))
485 if (command == Message::Command::Subscribe ||
486 command == Message::Command::SOWAndSubscribe ||
487 command == Message::Command::DeltaSubscribe ||
488 command == Message::Command::SOWAndDeltaSubscribe)
493 if (command == Message::Command::SOW
494 || command == Message::Command::SOWAndSubscribe
495 || command == Message::Command::SOWAndDeltaSubscribe)
500 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
502 if (command == Message::Command::SOW)
507 _flags |= ProcessedAck;
509 else if (command == Message::Command::SOWDelete)
512 _flags |= ProcessedAck;
513 _flags |= NeedsSequenceNumber;
515 else if (command == Message::Command::Publish
516 || command == Message::Command::DeltaPublish)
518 _flags |= NeedsSequenceNumber;
520 else if (command == Message::Command::StopTimer)
537 Command(
const char* command_,
size_t commandLen_)
539 init(command_, commandLen_);
563 init(command_, commandLen_);
657 _message.
setTopic(topic_, topicLen_);
787 std::ostringstream os;
792 amps_uint64_t getSequence()
const 808 _message.
setData(data_, dataLen_);
838 _batchSize = batchSize_;
860 if (ackType_ ==
"processed")
862 _flags |= ProcessedAck;
864 else if (ackType_ ==
"stats")
874 if (ackType_.find(
"processed") != std::string::npos)
876 _flags |= ProcessedAck;
880 _flags &= ~ProcessedAck;
882 if (ackType_.find(
"stats") != std::string::npos)
896 if (ackType_ & Message::AckType::Processed)
898 _flags |= ProcessedAck;
902 _flags &= ~ProcessedAck;
904 if (ackType_ & Message::AckType::Stats)
929 unsigned getTimeout(
void)
const 933 unsigned getBatchSize(
void)
const 937 bool isSubscribe(
void)
const 939 return _flags & Subscribe;
941 bool isSow(
void)
const 943 return (_flags & SOW) != 0;
945 bool hasProcessedAck(
void)
const 947 return (_flags & ProcessedAck) != 0;
949 bool hasStatsAck(
void)
const 951 return (_flags & StatsAck) != 0;
953 bool needsSequenceNumber(
void)
const 955 return (_flags & NeedsSequenceNumber) != 0;
961 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
978 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
986 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
993 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1005 std::string
authenticate(
const std::string& ,
const std::string& password_)
1012 std::string
retry(
const std::string& ,
const std::string& )
1014 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1017 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1038 virtual void execute(
Message& message_) = 0;
1053 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1068 : _resizeHandler(NULL)
1069 , _resizeHandlerData(NULL)
1070 , _errorOnPublishGap(errorOnPublishGap_)
1077 virtual amps_uint64_t store(
const Message& message_) = 0;
1085 virtual void discardUpTo(amps_uint64_t index_) = 0;
1100 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1106 virtual size_t unpersistedCount()
const = 0;
1118 virtual void flush(
long timeout_) = 0;
1124 return AMPS_UNSET_INDEX;
1131 return AMPS_UNSET_SEQUENCE;
1137 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
1142 virtual amps_uint64_t getLastPersisted() = 0;
1156 _resizeHandler = handler_;
1157 _resizeHandlerData = userData_;
1162 return _resizeHandler;
1165 bool callResizeHandler(
size_t newSize_);
1167 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1169 _errorOnPublishGap = errorOnPublishGap_;
1172 inline virtual bool getErrorOnPublishGap()
const 1174 return _errorOnPublishGap;
1179 void* _resizeHandlerData;
1180 bool _errorOnPublishGap;
1187 RefHandle<StoreImpl> _body;
1191 Store(
const Store& rhs) : _body(rhs._body) {;}
1203 return _body.get().store(message_);
1214 _body.get().discardUpTo(index_);
1223 _body.get().replay(replayer_);
1235 return _body.get().replaySingle(replayer_, index_);
1244 return _body.get().unpersistedCount();
1252 return _body.isValid();
1265 return _body.get().flush(timeout_);
1273 return _body.get().getLowestUnpersisted();
1281 return _body.get().getLastPersisted();
1296 _body.get().setResizeHandler(handler_, userData_);
1301 return _body.get().getResizeHandler();
1310 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1319 return _body.get().getErrorOnPublishGap();
1327 if (_body.isValid())
1329 return &_body.get();
1353 virtual void failedWrite(
const Message& message_,
1354 const char* reason_,
size_t reasonLength_) = 0;
1358 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1362 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1376 long* timeoutp = (
long*)data_;
1384 store_.
flush(*timeoutp);
1387 catch (
const TimedOutException&)
1389 catch (
const TimedOutException& e)
1416 unsigned requestedAckTypes_,
1417 const AMPSException& exception_) = 0;
1435 unsigned requestedAckTypes_) = 0;
1442 virtual void clear() = 0;
1446 virtual void resubscribe(Client& client_) = 0;
1453 _failedResubscribeHandler = handler_;
1456 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1467 typedef enum { Disconnected = 0,
1471 PublishReplayed = 8,
1472 HeartbeatInitiated = 16,
1486 virtual void connectionStateChanged(
State newState_) = 0;
1491 class MessageStreamImpl;
1494 typedef void(*DeferredExecutionFunc)(
void*);
1496 class ClientImpl :
public RefBody
1502 AMPS_SOCKET _socket;
1508 socklen_t _valueLen;
1512 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(
sizeof(
int))
1514 _valuePtr = (
char*)&_noDelay;
1516 if (_socket != AMPS_INVALID_SOCKET)
1518 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1522 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1526 _socket = AMPS_INVALID_SOCKET;
1533 if (_socket != AMPS_INVALID_SOCKET)
1536 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1541 friend class Client;
1544 DisconnectHandler _disconnectHandler;
1545 enum GlobalCommandTypeHandlers :
size_t 1555 DuplicateMessage = 8,
1558 std::vector<MessageHandler> _globalCommandTypeHandlers;
1559 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1561 MessageRouter::RouteCache _routeCache;
1562 mutable Mutex _lock;
1563 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1564 amps_uint64_t _nameHashValue;
1566 Store _publishStore;
1567 bool _isRetryOnDisconnect;
1568 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1569 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1570 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1572 volatile amps_uint64_t _lastSentHaSequenceNumber;
1574 AMPS_ATOMIC_TYPE_8 _logonInProgress;
1575 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1576 VersionInfo _serverVersion;
1577 Timer _heartbeatTimer;
1578 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1581 int _queueAckTimeout;
1582 bool _isAutoAckEnabled;
1583 unsigned _ackBatchSize;
1584 unsigned _queuedAckCount;
1585 unsigned _defaultMaxDepth;
1586 struct QueueBookmarks
1588 QueueBookmarks(
const std::string& topic_)
1595 amps_uint64_t _oldestTime;
1596 unsigned _bookmarkCount;
1598 typedef amps_uint64_t topic_hash;
1599 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1600 TopicHashMap _topicHashMap;
1604 ClientImpl* _client;
1609 ClientStoreReplayer()
1610 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1613 ClientStoreReplayer(ClientImpl* client_)
1614 : _client(client_), _version(0), _res(
AMPS_E_OK)
1617 void setClient(ClientImpl* client_)
1622 void execute(
Message& message_)
1626 throw CommandException(
"Can't replay without a client.");
1630 if (index > _client->_lastSentHaSequenceNumber)
1632 _client->_lastSentHaSequenceNumber = index;
1640 (!_client->_logonInProgress ||
1644 message_.getMessage(),
1648 throw DisconnectedException(
"AMPS Server disconnected during replay");
1654 ClientStoreReplayer _replayer;
1658 ClientImpl* _parent;
1659 const char* _reason;
1660 size_t _reasonLength;
1661 size_t _replayCount;
1663 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1666 _reasonLength(reasonLength_),
1669 void execute(
Message& message_)
1671 if (_parent->_failedWriteHandler)
1674 _parent->_failedWriteHandler->failedWrite(message_,
1675 _reason, _reasonLength);
1678 size_t replayCount(
void)
const 1680 return _replayCount;
1684 struct AckResponseImpl :
public RefBody
1686 std::string username, password, reason, status, bookmark, options;
1687 amps_uint64_t sequenceNo;
1688 amps_uint64_t nameHashValue;
1689 VersionInfo serverVersion;
1690 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1691 std::atomic<bool> responded;
1692 std::atomic<bool> abandoned;
1694 volatile bool responded;
1695 volatile bool abandoned;
1697 unsigned connectionVersion;
1700 sequenceNo((amps_uint64_t)0),
1704 connectionVersion(0)
1711 RefHandle<AckResponseImpl> _body;
1713 AckResponse() : _body(NULL) {;}
1714 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1715 static AckResponse create()
1718 r._body =
new AckResponseImpl();
1722 const std::string& username()
1724 return _body.get().username;
1726 void setUsername(
const char* data_,
size_t len_)
1730 _body.get().username.assign(data_, len_);
1734 _body.get().username.clear();
1737 const std::string& password()
1739 return _body.get().password;
1741 void setPassword(
const char* data_,
size_t len_)
1745 _body.get().password.assign(data_, len_);
1749 _body.get().password.clear();
1752 const std::string& reason()
1754 return _body.get().reason;
1756 void setReason(
const char* data_,
size_t len_)
1760 _body.get().reason.assign(data_, len_);
1764 _body.get().reason.clear();
1767 const std::string& status()
1769 return _body.get().status;
1771 void setStatus(
const char* data_,
size_t len_)
1775 _body.get().status.assign(data_, len_);
1779 _body.get().status.clear();
1782 const std::string& bookmark()
1784 return _body.get().bookmark;
1786 void setBookmark(
const Field& bookmark_)
1788 if (!bookmark_.
empty())
1790 _body.get().bookmark.assign(bookmark_.
data(), bookmark_.
len());
1791 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1792 _body.get().sequenceNo);
1796 _body.get().bookmark.clear();
1797 _body.get().sequenceNo = (amps_uint64_t)0;
1798 _body.get().nameHashValue = (amps_uint64_t)0;
1801 amps_uint64_t sequenceNo()
const 1803 return _body.get().sequenceNo;
1805 amps_uint64_t nameHashValue()
const 1807 return _body.get().nameHashValue;
1809 void setSequenceNo(
const char* data_,
size_t len_)
1811 amps_uint64_t result = (amps_uint64_t)0;
1814 for (
size_t i = 0; i < len_; ++i)
1816 result *= (amps_uint64_t)10;
1817 result += (amps_uint64_t)(data_[i] -
'0');
1820 _body.get().sequenceNo = result;
1822 VersionInfo serverVersion()
const 1824 return _body.get().serverVersion;
1826 void setServerVersion(
const char* data_,
size_t len_)
1830 _body.get().serverVersion.setVersion(std::string(data_, len_));
1835 return _body.get().responded;
1839 _body.get().responded =
true;
1843 return _body.get().abandoned;
1847 if (_body.isValid())
1849 _body.get().abandoned =
true;
1853 void setConnectionVersion(
unsigned connectionVersion)
1855 _body.get().connectionVersion = connectionVersion;
1858 unsigned getConnectionVersion()
1860 return _body.get().connectionVersion;
1862 void setOptions(
const char* data_,
size_t len_)
1866 _body.get().options.assign(data_, len_);
1870 _body.get().options.clear();
1874 const std::string& options()
1876 return _body.get().options;
1879 AckResponse& operator=(
const AckResponse& rhs)
1887 typedef std::map<std::string, AckResponse> AckMap;
1890 DefaultExceptionListener _defaultExceptionListener;
1893 struct DeferredExecutionRequest
1895 DeferredExecutionRequest(DeferredExecutionFunc func_,
1898 _userData(userData_)
1901 DeferredExecutionFunc _func;
1905 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1906 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1907 volatile bool _connected;
1908 std::string _username;
1909 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1910 ConnectionStateListeners _connectionStateListeners;
1911 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1912 Mutex _deferredExecutionLock;
1913 DeferredExecutionList _deferredExecutionList;
1914 unsigned _heartbeatInterval;
1915 unsigned _readTimeout;
1923 if (!_connected && newState_ > ConnectionStateListener::Connected)
1927 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1929 AMPS_CALL_EXCEPTION_WRAPPER(
1930 (*it)->connectionStateChanged(newState_));
1933 unsigned processedAck(
Message& message);
1934 unsigned persistedAck(
Message& meesage);
1935 void lastChance(
Message& message);
1936 void checkAndSendHeartbeat(
bool force =
false);
1937 virtual ConnectionInfo getConnectionInfo()
const;
1939 ClientImplMessageHandler(
amps_handle message,
void* userData);
1941 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1943 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1945 void unsubscribeInternal(
const std::string&
id)
1953 subId.assign(
id.data(),
id.length());
1954 _routes.removeRoute(subId);
1956 if (_subscriptionManager)
1959 Unlock<Mutex> unlock(_lock);
1960 _subscriptionManager->unsubscribe(subId);
1966 _sendWithoutRetry(_message);
1967 deferredExecution(&s_noOpFn, NULL);
1970 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1971 bool isHASubscribe_)
1973 return syncAckProcessing(timeout_, message_,
1974 (amps_uint64_t)0, isHASubscribe_);
1977 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1978 amps_uint64_t haSeq = (amps_uint64_t)0,
1979 bool isHASubscribe_ =
false)
1982 AckResponse ack = AckResponse::create();
1985 Lock<Mutex> guard(_ackMapLock);
1988 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1989 if (ack.getConnectionVersion() == 0)
1992 throw DisconnectedException(
"Connection closed while waiting for response.");
1994 bool timedOut =
false;
1995 AMPS_START_TIMER(timeout_)
1996 while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
2000 timedOut = !_lock.wait(timeout_);
2004 AMPS_RESET_TIMER(timedOut, timeout_);
2011 Unlock<Mutex> unlck(_lock);
2012 amps_invoke_waiting_function();
2015 if (ack.responded())
2017 if (ack.status() !=
"failure")
2021 amps_uint64_t ackSequence = ack.sequenceNo();
2022 if (_lastSentHaSequenceNumber < ackSequence)
2024 _lastSentHaSequenceNumber = ackSequence;
2037 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2038 _nameHashValue = ack.nameHashValue();
2039 _serverVersion = ack.serverVersion();
2040 if (_bookmarkStore.isValid())
2047 const std::string& options = ack.options();
2048 size_t index = options.find_first_of(
"max_backlog=");
2049 if (index != std::string::npos)
2052 const char* c = options.c_str() + index + 12;
2053 while (*c && *c !=
',')
2055 data = (data * 10) + (
unsigned)(*c++ -48);
2057 if (_ackBatchSize > data)
2059 _ackBatchSize = data;
2065 const size_t NotEntitled = 12;
2066 std::string ackReason = ack.reason();
2067 if (ackReason.length() == 0)
2071 if (ackReason.length() == NotEntitled &&
2072 ackReason[0] ==
'n' &&
2077 message_.throwFor(_client, ackReason);
2081 if (!ack.abandoned())
2083 throw TimedOutException(
"timed out waiting for operation.");
2087 throw DisconnectedException(
"Connection closed while waiting for response.");
2101 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2102 _pEmptyMessageStream.reset(NULL);
2109 ClientImpl(
const std::string& clientName)
2110 : _client(NULL), _name(clientName)
2111 , _isRetryOnDisconnect(
true)
2112 , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2113 , _badTimeToHASubscribe(0), _serverVersion()
2114 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2115 , _isAutoAckEnabled(
false)
2117 , _queuedAckCount(0)
2118 , _defaultMaxDepth(0)
2120 , _heartbeatInterval(0)
2123 _replayer.setClient(
this);
2126 (amps_handler)ClientImpl::ClientImplMessageHandler,
2129 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2132 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2134 _exceptionListener = &_defaultExceptionListener;
2135 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2137 #ifdef AMPS_USE_EMPLACE 2145 virtual ~ClientImpl()
2150 const std::string& getName()
const 2155 const std::string& getNameHash()
const 2160 const amps_uint64_t getNameHashValue()
const 2162 return _nameHashValue;
2165 void setName(
const std::string& name)
2172 AMPSException::throwFor(_client, result);
2177 const std::string& getLogonCorrelationData()
const 2179 return _logonCorrelationData;
2182 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2184 _logonCorrelationData = logonCorrelationData_;
2187 size_t getServerVersion()
const 2189 return _serverVersion.getOldStyleVersion();
2192 VersionInfo getServerVersionInfo()
const 2194 return _serverVersion;
2197 const std::string& getURI()
const 2202 virtual void connect(
const std::string& uri)
2204 Lock<Mutex> l(_lock);
2208 virtual void _connect(
const std::string& uri)
2214 AMPSException::throwFor(_client, result);
2221 _readMessage.setClientImpl(
this);
2222 if (_queueAckTimeout)
2227 AMPSException::throwFor(_client, result);
2231 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2234 void setDisconnected()
2237 Lock<Mutex> l(_lock);
2240 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2243 _heartbeatTimer.setTimeout(0.0);
2250 virtual void disconnect()
2252 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2254 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2255 Lock<Mutex> l(_lock);
2256 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2259 void clearAcks(
unsigned failedVersion)
2262 Lock<Mutex> guard(_ackMapLock);
2265 std::vector<std::string> worklist;
2266 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2268 if (i->second.getConnectionVersion() <= failedVersion)
2270 i->second.setAbandoned();
2271 worklist.push_back(i->first);
2275 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2284 int send(
const Message& message)
2286 Lock<Mutex> l(_lock);
2287 return _send(message);
2290 void sendWithoutRetry(
const Message& message_)
2292 Lock<Mutex> l(_lock);
2295 if (_logonInProgress)
2297 throw DisconnectedException(
"The client has been disconnected.");
2299 _sendWithoutRetry(message_);
2302 void _sendWithoutRetry(
const Message& message_)
2307 AMPSException::throwFor(_client, result);
2311 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2312 bool isHASubscribe_ =
false)
2319 Message localMessage = message;
2320 unsigned version = 0;
2324 if (haSeq && _logonInProgress)
2328 if (!_isRetryOnDisconnect)
2332 if (!_lock.wait(1000))
2334 amps_invoke_waiting_function();
2339 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2340 (isHASubscribe_ && _badTimeToHASubscribe))
2342 return (
int)version;
2346 if (haSeq > _lastSentHaSequenceNumber)
2348 while (haSeq > _lastSentHaSequenceNumber + 1)
2354 _lastSentHaSequenceNumber + 1))
2360 version = _replayer._version;
2363 catch (
const DisconnectedException&)
2365 catch (
const DisconnectedException& e)
2368 result = _replayer._res;
2373 localMessage.getMessage(),
2375 ++_lastSentHaSequenceNumber;
2379 if (_logonInProgress && localMessage.
getCommand().
data()[0] !=
'l')
2381 while (_logonInProgress)
2383 if (!_lock.wait(1000))
2385 amps_invoke_waiting_function();
2390 localMessage.getMessage(),
2395 if (!isHASubscribe_ && !haSeq &&
2396 localMessage.getMessage() == message.getMessage())
2400 if (_isRetryOnDisconnect)
2402 Unlock<Mutex> u(_lock);
2407 if ((isHASubscribe_ || haSeq) &&
2410 return (
int)version;
2417 AMPSException::throwFor(_client, result);
2423 amps_invoke_waiting_function();
2429 AMPSException::throwFor(_client, result);
2431 return (
int)version;
2434 void addMessageHandler(
const Field& commandId_,
2436 unsigned requestedAcks_,
bool isSubscribe_)
2438 Lock<Mutex> lock(_lock);
2439 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2443 bool removeMessageHandler(
const Field& commandId_)
2445 Lock<Mutex> lock(_lock);
2446 return _routes.removeRoute(commandId_);
2454 bool isSubscribe =
false;
2455 bool isSubscribeOnly =
false;
2456 bool replace =
false;
2458 unsigned systemAddedAcks = Message::AckType::None;
2462 case Message::Command::Subscribe:
2463 case Message::Command::DeltaSubscribe:
2464 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2465 isSubscribeOnly =
true;
2467 case Message::Command::SOWAndSubscribe:
2468 case Message::Command::SOWAndDeltaSubscribe:
2475 while (!replace &&
id != subId && _routes.hasRoute(
id))
2488 systemAddedAcks |= Message::AckType::Persisted;
2491 case Message::Command::SOW:
2498 while (!replace &&
id != subId && _routes.hasRoute(
id))
2509 if (!isSubscribeOnly)
2518 while (!replace && qid != subId && qid !=
id 2519 && _routes.hasRoute(qid))
2525 systemAddedAcks |= Message::AckType::Processed;
2527 if (!isSubscribeOnly)
2529 systemAddedAcks |= Message::AckType::Completed;
2533 int routesAdded = 0;
2534 Lock<Mutex> l(_lock);
2535 if (!subId.
empty() && messageHandler_.isValid())
2537 if (!_routes.hasRoute(subId))
2543 _routes.addRoute(subId, messageHandler_, requestedAcks,
2544 systemAddedAcks, isSubscribe);
2546 if (!isSubscribeOnly && !qid.
empty()
2547 && messageHandler_.isValid() && qid != subId)
2549 if (routesAdded == 0)
2551 _routes.addRoute(qid, messageHandler_,
2552 requestedAcks, systemAddedAcks,
false);
2558 Unlock<Mutex> u(_lock);
2559 data = amps_invoke_copy_route_function(
2560 messageHandler_.userData());
2564 _routes.addRoute(qid, messageHandler_, requestedAcks,
2565 systemAddedAcks,
false);
2569 _routes.addRoute(qid,
2572 requestedAcks, systemAddedAcks,
false);
2577 if (!
id.empty() && messageHandler_.isValid()
2578 && requestedAcks & ~
Message::AckType::Persisted
2579 &&
id != subId &&
id != qid)
2581 if (routesAdded == 0)
2583 _routes.addRoute(
id, messageHandler_, requestedAcks,
2584 systemAddedAcks,
false);
2590 Unlock<Mutex> u(_lock);
2591 data = amps_invoke_copy_route_function(
2592 messageHandler_.userData());
2596 _routes.addRoute(
id, messageHandler_, requestedAcks,
2597 systemAddedAcks,
false);
2601 _routes.addRoute(
id,
2605 systemAddedAcks,
false);
2614 syncAckProcessing(timeout_, message_, 0,
false);
2621 _routes.removeRoute(
id);
2628 case Message::Command::Unsubscribe:
2629 case Message::Command::Heartbeat:
2630 case Message::Command::Logon:
2631 case Message::Command::StartTimer:
2632 case Message::Command::StopTimer:
2633 case Message::Command::SOWDelete:
2635 Lock<Mutex> l(_lock);
2644 if (messageHandler_.isValid())
2646 _routes.addRoute(
id, messageHandler_, requestedAcks,
2647 Message::AckType::None,
false);
2653 case Message::Command::DeltaPublish:
2654 case Message::Command::Publish:
2657 Lock<Mutex> l(_lock);
2660 if (ackType != Message::AckType::None
2668 if (messageHandler_.isValid())
2670 _routes.addRoute(
id, messageHandler_, requestedAcks,
2671 Message::AckType::None,
false);
2677 syncAckProcessing(timeout_, message_, 0,
false);
2686 case Message::Command::GroupBegin:
2687 case Message::Command::GroupEnd:
2688 case Message::Command::OOF:
2689 case Message::Command::Ack:
2690 case Message::Command::Unknown:
2692 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2698 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2700 Lock<Mutex> l(_lock);
2701 _disconnectHandler = disconnectHandler;
2704 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2706 switch (command_[0])
2708 #if 0 // Not currently implemented to avoid an extra branch in delivery 2710 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2713 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2717 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2719 #if 0 // Not currently implemented to avoid an extra branch in delivery 2721 if (command_[6] ==
'b')
2723 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2725 else if (command_[6] ==
'e')
2727 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2731 std::ostringstream os;
2732 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2733 throw CommandException(os.str());
2737 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2741 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2745 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2749 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2752 std::ostringstream os;
2753 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2754 throw CommandException(os.str());
2759 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2763 #if 0 // Not currently implemented to avoid an extra branch in delivery 2764 case Message::Command::Publish:
2765 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2767 case Message::Command::SOW:
2768 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2771 case Message::Command::Heartbeat:
2772 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2774 #if 0 // Not currently implemented to avoid an extra branch in delivery 2775 case Message::Command::GroupBegin:
2776 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2778 case Message::Command::GroupEnd:
2779 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2781 case Message::Command::OOF:
2782 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2785 case Message::Command::Ack:
2786 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2790 unsigned command = command_;
2797 AMPS_snprintf(errBuf,
sizeof(errBuf),
2798 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2799 CommandConstants<0>::Lengths[bits],
2800 CommandConstants<0>::Values[bits]);
2801 throw CommandException(errBuf);
2806 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2808 _globalCommandTypeHandlers[handlerType_] = handler_;
2813 Lock<Mutex> l(_lock);
2814 _failedWriteHandler.reset(handler_);
2817 void setPublishStore(
const Store& publishStore_)
2819 Lock<Mutex> l(_lock);
2822 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2824 _publishStore = publishStore_;
2829 Lock<Mutex> l(_lock);
2832 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2834 _bookmarkStore = bookmarkStore_;
2839 Lock<Mutex> l(_lock);
2840 _subscriptionManager.reset(subscriptionManager_);
2848 DisconnectHandler getDisconnectHandler()
const 2850 return _disconnectHandler;
2855 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2863 Store getPublishStore()
const 2865 return _publishStore;
2870 return _bookmarkStore;
2873 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2877 Lock<Mutex> l(_lock);
2879 _publishMessage.assignData(data_, dataLen_);
2880 _send(_publishMessage);
2885 if (!publishStoreMessage)
2887 publishStoreMessage =
new Message();
2888 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2890 publishStoreMessage->reset();
2891 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2892 return _publish(topic_, topicLen_, data_, dataLen_);
2896 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2897 size_t dataLen_,
unsigned long expiration_)
2901 Lock<Mutex> l(_lock);
2903 _publishMessage.assignData(data_, dataLen_);
2904 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2905 size_t pos = convertToCharArray(exprBuf, expiration_);
2906 _publishMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2907 _send(_publishMessage);
2913 if (!publishStoreMessage)
2915 publishStoreMessage =
new Message();
2916 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2918 publishStoreMessage->reset();
2919 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2920 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2921 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2922 .assignExpiration(exprBuf + exprPos,
2923 AMPS_NUMBER_BUFFER_LEN - exprPos);
2924 return _publish(topic_, topicLen_, data_, dataLen_);
2931 ClientImpl* _pClient;
2933 #if __cplusplus >= 201100L || _MSC_VER >= 1900 2934 std::atomic<bool> _acked;
2935 std::atomic<bool> _disconnected;
2937 volatile bool _acked;
2938 volatile bool _disconnected;
2941 FlushAckHandler(ClientImpl* pClient_)
2942 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2944 pClient_->addConnectionStateListener(
this);
2948 _pClient->removeConnectionStateListener(
this);
2949 _pClient->removeMessageHandler(_cmdId);
2952 void setCommandId(
const Field& cmdId_)
2960 void connectionStateChanged(
State state_)
2962 if (state_ <= Shutdown)
2964 _disconnected =
true;
2973 return _acked || _disconnected;
2977 void publishFlush(
long timeout_,
unsigned ackType_)
2979 static const char* processed =
"processed";
2980 static const size_t processedLen = strlen(processed);
2981 static const char* persisted =
"persisted";
2982 static const size_t persistedLen = strlen(persisted);
2983 static const char* flush =
"flush";
2984 static const size_t flushLen = strlen(flush);
2985 static VersionInfo minPersisted(
"5.3.3.0");
2986 static VersionInfo minFlush(
"4");
2987 if (ackType_ != Message::AckType::Processed
2988 && ackType_ != Message::AckType::Persisted)
2990 throw CommandException(
"Flush can only be used with processed or persisted acks.");
2992 FlushAckHandler flushHandler(
this);
2993 if (_serverVersion >= minFlush)
2995 Lock<Mutex> l(_lock);
2998 throw DisconnectedException(
"Not connected trying to flush");
3003 if (_serverVersion < minPersisted
3004 || ackType_ == Message::AckType::Processed)
3014 std::bind(&FlushAckHandler::invoke,
3015 std::ref(flushHandler),
3016 std::placeholders::_1),
3018 NoDelay noDelay(_client);
3019 if (_send(_message) == -1)
3021 throw DisconnectedException(
"Disconnected trying to flush");
3028 _publishStore.
flush(timeout_);
3030 catch (
const AMPSException& ex)
3032 AMPS_UNHANDLED_EXCEPTION(ex);
3036 else if (_serverVersion < minFlush)
3040 AMPS_USLEEP(timeout_ * 1000);
3044 AMPS_USLEEP(1000 * 1000);
3050 Timer timer((
double)timeout_);
3052 while (!timer.check() && !flushHandler.done())
3055 amps_invoke_waiting_function();
3060 while (!flushHandler.done())
3063 amps_invoke_waiting_function();
3067 if (!flushHandler.done())
3069 throw TimedOutException(
"Timed out waiting for flush");
3072 if (!flushHandler.acked() && !_publishStore.
isValid())
3074 throw DisconnectedException(
"Disconnected waiting for flush");
3078 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3079 const char* data_,
size_t dataLength_)
3083 Lock<Mutex> l(_lock);
3085 _deltaMessage.assignData(data_, dataLength_);
3086 _send(_deltaMessage);
3091 if (!publishStoreMessage)
3093 publishStoreMessage =
new Message();
3094 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3096 publishStoreMessage->reset();
3097 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
3098 return _publish(topic_, topicLength_, data_, dataLength_);
3102 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3103 const char* data_,
size_t dataLength_,
3104 unsigned long expiration_)
3108 Lock<Mutex> l(_lock);
3110 _deltaMessage.assignData(data_, dataLength_);
3111 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3112 size_t pos = convertToCharArray(exprBuf, expiration_);
3113 _deltaMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3114 _send(_deltaMessage);
3120 if (!publishStoreMessage)
3122 publishStoreMessage =
new Message();
3123 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3125 publishStoreMessage->reset();
3126 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3127 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3128 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
3129 .assignExpiration(exprBuf + exprPos,
3130 AMPS_NUMBER_BUFFER_LEN - exprPos);
3131 return _publish(topic_, topicLength_, data_, dataLength_);
3135 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3136 const char* data_,
size_t dataLength_)
3138 publishStoreMessage->assignTopic(topic_, topicLength_)
3139 .setAckTypeEnum(Message::AckType::Persisted)
3140 .assignData(data_, dataLength_);
3141 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3142 char buf[AMPS_NUMBER_BUFFER_LEN];
3143 size_t pos = convertToCharArray(buf, haSequenceNumber);
3144 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3146 Lock<Mutex> l(_lock);
3147 _send(*publishStoreMessage, haSequenceNumber);
3149 return haSequenceNumber;
3152 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
3153 const char* options_ = NULL)
3155 Lock<Mutex> l(_lock);
3156 return _logon(timeout_, authenticator_, options_);
3159 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
3160 const char* options_ = NULL)
3167 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 3169 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3172 if (uri.user().size())
3176 if (uri.password().size())
3180 if (uri.protocol() ==
"amps" && uri.messageType().size())
3184 if (uri.isTrue(
"pretty"))
3190 if (!_logonCorrelationData.empty())
3201 AtomicFlagFlip pubFlip(&_logonInProgress);
3202 NoDelay noDelay(_client);
3206 AckResponse ack = syncAckProcessing(timeout_, _message);
3207 if (ack.status() ==
"retry")
3209 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
3210 _username = ack.username();
3215 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
3219 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3226 catch (
const AMPSException& ex)
3229 AMPS_UNHANDLED_EXCEPTION(ex);
3242 _publishStore.
replay(_replayer);
3243 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3245 catch (
const PublishStoreGapException& ex)
3248 AMPS_UNHANDLED_EXCEPTION(ex);
3251 catch (
const StoreException& ex)
3254 std::ostringstream os;
3255 os <<
"A local store exception occurred while logging on." 3257 throw ConnectionException(os.str());
3259 catch (
const AMPSException& ex)
3262 AMPS_UNHANDLED_EXCEPTION(ex);
3265 catch (
const std::exception& ex)
3268 AMPS_UNHANDLED_EXCEPTION(ex);
3278 return newCommandId;
3282 const std::string& topic_,
3284 const std::string& filter_,
3285 const std::string& bookmark_,
3286 const std::string& options_,
3287 const std::string& subId_,
3288 bool isHASubscribe_ =
true)
3290 isHASubscribe_ &= (bool)_subscriptionManager;
3291 Lock<Mutex> l(_lock);
3295 std::string subId(subId_);
3298 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3300 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3310 unsigned ackTypes = Message::AckType::Processed;
3312 if (!bookmark_.empty() && _bookmarkStore.isValid())
3314 ackTypes |= Message::AckType::Persisted;
3318 if (filter_.length())
3322 if (bookmark_.length())
3332 if (_bookmarkStore.isValid())
3337 _bookmarkStore.
log(_message);
3338 _bookmarkStore.
discard(_message);
3344 if (options_.length())
3353 Unlock<Mutex> u(_lock);
3354 _subscriptionManager->subscribe(messageHandler_, message,
3355 Message::AckType::None);
3356 if (_badTimeToHASubscribe)
3364 Message::AckType::None, ackTypes,
true);
3367 if (!options_.empty())
3373 syncAckProcessing(timeout_, message, isHASubscribe_);
3375 catch (
const DisconnectedException&)
3377 if (!isHASubscribe_)
3379 _routes.removeRoute(subIdField);
3384 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3388 catch (
const TimedOutException&)
3390 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3398 Unlock<Mutex> unlock(_lock);
3399 _subscriptionManager->unsubscribe(subIdField);
3401 _routes.removeRoute(subIdField);
3407 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3408 const std::string& topic_,
3410 const std::string& filter_,
3411 const std::string& bookmark_,
3412 const std::string& options_,
3413 const std::string& subId_ =
"",
3414 bool isHASubscribe_ =
true)
3416 isHASubscribe_ &= (bool)_subscriptionManager;
3417 Lock<Mutex> l(_lock);
3421 std::string subId(subId_);
3431 unsigned ackTypes = Message::AckType::Processed;
3433 if (!bookmark_.empty() && _bookmarkStore.isValid())
3435 ackTypes |= Message::AckType::Persisted;
3438 if (filter_.length())
3442 if (bookmark_.length())
3452 if (_bookmarkStore.isValid())
3457 _bookmarkStore.
log(_message);
3458 _bookmarkStore.
discard(_message);
3464 if (options_.length())
3472 Unlock<Mutex> u(_lock);
3473 _subscriptionManager->subscribe(messageHandler_, message,
3474 Message::AckType::None);
3475 if (_badTimeToHASubscribe)
3483 Message::AckType::None, ackTypes,
true);
3486 if (!options_.empty())
3492 syncAckProcessing(timeout_, message, isHASubscribe_);
3494 catch (
const DisconnectedException&)
3496 if (!isHASubscribe_)
3498 _routes.removeRoute(subIdField);
3502 catch (
const TimedOutException&)
3504 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3512 Unlock<Mutex> unlock(_lock);
3513 _subscriptionManager->unsubscribe(subIdField);
3515 _routes.removeRoute(subIdField);
3521 void unsubscribe(
const std::string&
id)
3523 Lock<Mutex> l(_lock);
3524 unsubscribeInternal(
id);
3527 void unsubscribe(
void)
3529 if (_subscriptionManager)
3531 _subscriptionManager->clear();
3534 _routes.unsubscribeAll();
3535 Lock<Mutex> l(_lock);
3540 _sendWithoutRetry(_message);
3542 deferredExecution(&s_noOpFn, NULL);
3546 const std::string& topic_,
3547 const std::string& filter_ =
"",
3548 const std::string& orderBy_ =
"",
3549 const std::string& bookmark_ =
"",
3550 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3551 int topN_ = AMPS_DEFAULT_TOP_N,
3552 const std::string& options_ =
"",
3553 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3555 Lock<Mutex> l(_lock);
3562 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3565 if (filter_.length())
3569 if (orderBy_.length())
3573 if (bookmark_.length())
3578 if (topN_ != AMPS_DEFAULT_TOP_N)
3582 if (options_.length())
3587 _routes.addRoute(_message.
getQueryID(), messageHandler_,
3588 Message::AckType::None, ackTypes,
false);
3592 syncAckProcessing(timeout_, _message);
3596 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3604 const std::string& topic_,
3606 const std::string& filter_ =
"",
3607 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3608 int topN_ = AMPS_DEFAULT_TOP_N)
3611 return sow(messageHandler_,
3622 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3623 const std::string& topic_,
3624 const std::string& filter_ =
"",
3625 const std::string& orderBy_ =
"",
3626 const std::string& bookmark_ =
"",
3627 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3628 int topN_ = AMPS_DEFAULT_TOP_N,
3629 const std::string& options_ =
"",
3630 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3631 bool isHASubscribe_ =
true)
3633 isHASubscribe_ &= (bool)_subscriptionManager;
3634 unsigned ackTypes = Message::AckType::Processed;
3635 Lock<Mutex> l(_lock);
3640 std::string subId = cid;
3642 if (filter_.length())
3646 if (orderBy_.length())
3650 if (bookmark_.length())
3654 if (_bookmarkStore.isValid())
3656 ackTypes |= Message::AckType::Persisted;
3664 _bookmarkStore.
log(_message);
3665 if (!BookmarkRange::isRange(bookmark))
3667 _bookmarkStore.
discard(_message);
3679 if (topN_ != AMPS_DEFAULT_TOP_N)
3683 if (options_.length())
3692 Unlock<Mutex> u(_lock);
3693 _subscriptionManager->subscribe(messageHandler_, message,
3694 Message::AckType::None);
3695 if (_badTimeToHASubscribe)
3700 _routes.addRoute(cid, messageHandler_,
3701 Message::AckType::None, ackTypes,
true);
3703 if (!options_.empty())
3709 syncAckProcessing(timeout_, message, isHASubscribe_);
3711 catch (
const DisconnectedException&)
3713 if (!isHASubscribe_)
3715 _routes.removeRoute(subId);
3719 catch (
const TimedOutException&)
3721 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3729 Unlock<Mutex> unlock(_lock);
3730 _subscriptionManager->unsubscribe(cid);
3732 _routes.removeRoute(subId);
3738 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3739 const std::string& topic_,
3741 const std::string& filter_ =
"",
3742 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3743 bool oofEnabled_ =
false,
3744 int topN_ = AMPS_DEFAULT_TOP_N,
3745 bool isHASubscribe_ =
true)
3748 return sowAndSubscribe(messageHandler_,
3755 (oofEnabled_ ?
"oof" :
""),
3760 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3761 const std::string& topic_,
3762 const std::string& filter_ =
"",
3763 const std::string& orderBy_ =
"",
3764 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3765 int topN_ = AMPS_DEFAULT_TOP_N,
3766 const std::string& options_ =
"",
3767 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3768 bool isHASubscribe_ =
true)
3770 isHASubscribe_ &= (bool)_subscriptionManager;
3771 Lock<Mutex> l(_lock);
3779 if (filter_.length())
3783 if (orderBy_.length())
3788 if (topN_ != AMPS_DEFAULT_TOP_N)
3792 if (options_.length())
3800 Unlock<Mutex> u(_lock);
3801 _subscriptionManager->subscribe(messageHandler_, message,
3802 Message::AckType::None);
3803 if (_badTimeToHASubscribe)
3808 _routes.addRoute(message.
getQueryID(), messageHandler_,
3809 Message::AckType::None, Message::AckType::Processed,
true);
3811 if (!options_.empty())
3817 syncAckProcessing(timeout_, message, isHASubscribe_);
3819 catch (
const DisconnectedException&)
3821 if (!isHASubscribe_)
3823 _routes.removeRoute(subId);
3827 catch (
const TimedOutException&)
3829 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3837 Unlock<Mutex> unlock(_lock);
3838 _subscriptionManager->unsubscribe(
Field(subId));
3840 _routes.removeRoute(subId);
3846 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3847 const std::string& topic_,
3849 const std::string& filter_ =
"",
3850 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3851 bool oofEnabled_ =
false,
3852 bool sendEmpties_ =
false,
3853 int topN_ = AMPS_DEFAULT_TOP_N,
3854 bool isHASubscribe_ =
true)
3862 if (sendEmpties_ ==
false)
3866 return sowAndDeltaSubscribe(messageHandler_,
3878 const std::string& topic_,
3879 const std::string& filter_,
3885 unsigned ackType = Message::AckType::Processed |
3886 Message::AckType::Stats |
3887 Message::AckType::Persisted;
3888 if (!publishStoreMessage)
3890 publishStoreMessage =
new Message();
3891 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3893 publishStoreMessage->reset();
3894 if (commandId_.
empty())
3896 publishStoreMessage->newCommandId();
3897 commandId_ = publishStoreMessage->getCommandId();
3901 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3903 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3904 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3905 .assignQueryID(commandId_.
data(), commandId_.
len())
3906 .setAckTypeEnum(ackType)
3907 .assignTopic(topic_.c_str(), topic_.length())
3908 .assignFilter(filter_.c_str(), filter_.length());
3909 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3910 char buf[AMPS_NUMBER_BUFFER_LEN];
3911 size_t pos = convertToCharArray(buf, haSequenceNumber);
3912 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3916 Lock<Mutex> l(_lock);
3917 _routes.addRoute(commandId_, messageHandler_,
3918 Message::AckType::Stats,
3919 Message::AckType::Processed | Message::AckType::Persisted,
3921 syncAckProcessing(timeout_, *publishStoreMessage,
3924 catch (
const DisconnectedException&)
3931 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3935 return (std::string)commandId_;
3939 Lock<Mutex> l(_lock);
3941 if (commandId_.
empty())
3952 .assignQueryID(commandId_.
data(), commandId_.
len())
3953 .setAckTypeEnum(Message::AckType::Processed |
3954 Message::AckType::Stats)
3956 .assignFilter(filter_.c_str(), filter_.length());
3957 _routes.addRoute(commandId_, messageHandler_,
3958 Message::AckType::Stats,
3959 Message::AckType::Processed,
3963 syncAckProcessing(timeout_, _message);
3967 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3970 return (std::string)commandId_;
3974 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3975 const std::string& topic_,
3976 const std::string& data_,
3982 unsigned ackType = Message::AckType::Processed |
3983 Message::AckType::Stats |
3984 Message::AckType::Persisted;
3985 if (!publishStoreMessage)
3987 publishStoreMessage =
new Message();
3988 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3990 publishStoreMessage->reset();
3991 if (commandId_.
empty())
3993 publishStoreMessage->newCommandId();
3994 commandId_ = publishStoreMessage->getCommandId();
3998 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
4000 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4001 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
4002 .assignQueryID(commandId_.
data(), commandId_.
len())
4003 .setAckTypeEnum(ackType)
4004 .assignTopic(topic_.c_str(), topic_.length())
4005 .assignData(data_.c_str(), data_.length());
4006 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4007 char buf[AMPS_NUMBER_BUFFER_LEN];
4008 size_t pos = convertToCharArray(buf, haSequenceNumber);
4009 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4013 Lock<Mutex> l(_lock);
4014 _routes.addRoute(commandId_, messageHandler_,
4015 Message::AckType::Stats,
4016 Message::AckType::Processed | Message::AckType::Persisted,
4018 syncAckProcessing(timeout_, *publishStoreMessage,
4021 catch (
const DisconnectedException&)
4028 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4032 return (std::string)commandId_;
4036 Lock<Mutex> l(_lock);
4038 if (commandId_.
empty())
4049 .assignQueryID(commandId_.
data(), commandId_.
len())
4050 .setAckTypeEnum(Message::AckType::Processed |
4051 Message::AckType::Stats)
4053 .assignData(data_.c_str(), data_.length());
4054 _routes.addRoute(commandId_, messageHandler_,
4055 Message::AckType::Stats,
4056 Message::AckType::Processed,
4060 syncAckProcessing(timeout_, _message);
4064 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4067 return (std::string)commandId_;
4071 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4072 const std::string& topic_,
4073 const std::string& keys_,
4079 unsigned ackType = Message::AckType::Processed |
4080 Message::AckType::Stats |
4081 Message::AckType::Persisted;
4082 if (!publishStoreMessage)
4084 publishStoreMessage =
new Message();
4085 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4087 publishStoreMessage->reset();
4088 if (commandId_.
empty())
4090 publishStoreMessage->newCommandId();
4091 commandId_ = publishStoreMessage->getCommandId();
4095 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
4097 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4098 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
4099 .assignQueryID(commandId_.
data(), commandId_.
len())
4100 .setAckTypeEnum(ackType)
4101 .assignTopic(topic_.c_str(), topic_.length())
4102 .assignSowKeys(keys_.c_str(), keys_.length());
4103 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4104 char buf[AMPS_NUMBER_BUFFER_LEN];
4105 size_t pos = convertToCharArray(buf, haSequenceNumber);
4106 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4110 Lock<Mutex> l(_lock);
4111 _routes.addRoute(commandId_, messageHandler_,
4112 Message::AckType::Stats,
4113 Message::AckType::Processed | Message::AckType::Persisted,
4115 syncAckProcessing(timeout_, *publishStoreMessage,
4118 catch (
const DisconnectedException&)
4125 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4129 return (std::string)commandId_;
4133 Lock<Mutex> l(_lock);
4135 if (commandId_.
empty())
4146 .assignQueryID(commandId_.
data(), commandId_.
len())
4147 .setAckTypeEnum(Message::AckType::Processed |
4148 Message::AckType::Stats)
4150 .assignSowKeys(keys_.c_str(), keys_.length());
4151 _routes.addRoute(commandId_, messageHandler_,
4152 Message::AckType::Stats,
4153 Message::AckType::Processed,
4157 syncAckProcessing(timeout_, _message);
4161 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4164 return (std::string)commandId_;
4168 void startTimer(
void)
4170 if (_serverVersion >=
"5.3.2.0")
4172 throw CommandException(
"The start_timer command is deprecated.");
4174 Lock<Mutex> l(_lock);
4183 if (_serverVersion >=
"5.3.2.0")
4185 throw CommandException(
"The stop_timer command is deprecated.");
4187 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4202 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4204 _pExceptionListener = pListener_;
4205 _exceptionListener = _pExceptionListener.get();
4210 _exceptionListener = &listener_;
4215 return *_exceptionListener;
4218 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4220 if (readTimeout_ < heartbeatInterval_)
4222 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4224 Lock<Mutex> l(_lock);
4225 if (_heartbeatInterval != heartbeatInterval_ ||
4226 _readTimeout != readTimeout_)
4228 _heartbeatInterval = heartbeatInterval_;
4229 _readTimeout = readTimeout_;
4234 void _sendHeartbeat(
void)
4236 if (_connected && _heartbeatInterval != 0)
4238 std::ostringstream options;
4239 options <<
"start," << _heartbeatInterval;
4242 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4243 _heartbeatTimer.start();
4246 _sendWithoutRetry(_beatMessage);
4247 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4249 catch (ConnectionException& ex_)
4253 AMPS_UNHANDLED_EXCEPTION(ex_);
4258 if (_readTimeout && _connected)
4264 AMPSException::throwFor(_client, result);
4270 Lock<Mutex> lock(_lock);
4271 _connectionStateListeners.insert(listener_);
4276 Lock<Mutex> lock(_lock);
4277 _connectionStateListeners.erase(listener_);
4280 void clearConnectionStateListeners()
4282 Lock<Mutex> lock(_lock);
4283 _connectionStateListeners.clear();
4288 unsigned systemAddedAcks_,
bool isSubscribe_)
4290 Message message = command_.getMessage();
4295 bool added = qid.
len() || subid.
len() || cid_.
len();
4296 bool cidIsQid = cid_ == qid;
4297 bool cidUnique = !cidIsQid && cid_.
len() > 0 && cid_ != subid;
4299 if (subid.
len() > 0)
4303 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4304 systemAddedAcks_, isSubscribe_);
4306 && (commandType == Message::Command::Subscribe
4307 || commandType == Message::Command::DeltaSubscribe))
4314 if (qid.
len() > 0 && qid != subid
4315 && (commandType == Message::Command::SOW
4316 || commandType == Message::Command::SOWDelete
4317 || commandType == Message::Command::SOWAndSubscribe
4318 || commandType == Message::Command::SOWAndDeltaSubscribe))
4320 while (_routes.hasRoute(qid))
4329 if (addedCount == 0)
4331 _routes.addRoute(qid, handler_, requestedAcks_,
4332 systemAddedAcks_, isSubscribe_);
4338 Unlock<Mutex> u(_lock);
4339 data = amps_invoke_copy_route_function(handler_.userData());
4343 _routes.addRoute(qid, handler_, requestedAcks_,
4344 systemAddedAcks_,
false);
4348 _routes.addRoute(qid,
4352 systemAddedAcks_,
false);
4357 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4359 while (_routes.hasRoute(cid_))
4363 if (addedCount == 0)
4365 _routes.addRoute(cid_, handler_, requestedAcks_,
4366 systemAddedAcks_,
false);
4372 Unlock<Mutex> u(_lock);
4373 data = amps_invoke_copy_route_function(handler_.userData());
4377 _routes.addRoute(cid_, handler_, requestedAcks_,
4378 systemAddedAcks_,
false);
4382 _routes.addRoute(cid_,
4386 systemAddedAcks_,
false);
4390 else if ((commandType == Message::Command::Publish ||
4391 commandType == Message::Command::DeltaPublish)
4392 && requestedAcks_ & ~
Message::AckType::Persisted)
4395 _routes.addRoute(cid_, handler_, requestedAcks_,
4396 systemAddedAcks_,
false);
4401 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4406 bool isHASubscribe_ =
true)
4408 isHASubscribe_ &= (bool)_subscriptionManager;
4409 Message& message = command_.getMessage();
4410 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4411 Message::AckType::Processed : Message::AckType::None;
4413 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
4415 if (commandType == Message::Command::SOW
4416 || commandType == Message::Command::SOWAndSubscribe
4417 || commandType == Message::Command::SOWAndDeltaSubscribe
4418 || commandType == Message::Command::StopTimer)
4420 systemAddedAcks |= Message::AckType::Completed;
4423 if (handler_.isValid() && cid.
empty())
4429 if (command_.isSubscribe())
4432 if (_bookmarkStore.isValid())
4434 systemAddedAcks |= Message::AckType::Persisted;
4442 _bookmarkStore.
log(message);
4443 if (!BookmarkRange::isRange(bookmark))
4445 _bookmarkStore.
discard(message);
4459 systemAddedAcks |= Message::AckType::Persisted;
4461 bool isSubscribe = command_.isSubscribe();
4462 if (handler_.isValid() && !isSubscribe)
4464 _registerHandler(command_, cid, handler_,
4465 requestedAcks, systemAddedAcks, isSubscribe);
4467 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
4470 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4473 Unlock<Mutex> u(_lock);
4474 haSequenceNumber = _publishStore.
store(message);
4481 syncAckProcessing((
long)command_.getTimeout(), message,
4486 _send(message, haSequenceNumber);
4489 catch (
const DisconnectedException&)
4496 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4507 Unlock<Mutex> u(_lock);
4508 _subscriptionManager->subscribe(handler_,
4511 if (_badTimeToHASubscribe)
4514 return std::string(subId.
data(), subId.
len());
4517 if (handler_.isValid())
4519 _registerHandler(command_, cid, handler_,
4520 requestedAcks, systemAddedAcks, isSubscribe);
4527 syncAckProcessing((
long)command_.getTimeout(), message,
4535 catch (
const DisconnectedException&)
4537 if (!isHASubscribe_)
4539 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4540 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4541 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4546 catch (
const TimedOutException&)
4548 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4549 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4550 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4558 Unlock<Mutex> unlock(_lock);
4559 _subscriptionManager->unsubscribe(subId);
4565 _routes.removeRoute(cid);
4566 _routes.removeRoute(subId);
4569 if (subId.
len() > 0)
4572 return std::string(subId.
data(), subId.
len());
4582 syncAckProcessing((
long)(command_.getTimeout()), message);
4589 catch (
const DisconnectedException&)
4591 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4592 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4598 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4599 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4612 bool isHASubscribe_ =
true)
4614 Lock<Mutex> lock(_lock);
4615 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4619 void setAutoAck(
bool isAutoAckEnabled_)
4621 _isAutoAckEnabled = isAutoAckEnabled_;
4623 bool getAutoAck(
void)
const 4625 return _isAutoAckEnabled;
4627 void setAckBatchSize(
const unsigned batchSize_)
4629 _ackBatchSize = batchSize_;
4630 if (!_queueAckTimeout)
4632 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4636 unsigned getAckBatchSize(
void)
const 4638 return _ackBatchSize;
4640 int getAckTimeout(
void)
const 4642 return _queueAckTimeout;
4644 void setAckTimeout(
const int ackTimeout_)
4647 _queueAckTimeout = ackTimeout_;
4649 size_t _ack(QueueBookmarks& queueBookmarks_)
4651 if (queueBookmarks_._bookmarkCount)
4653 if (!publishStoreMessage)
4655 publishStoreMessage =
new Message();
4656 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4658 publishStoreMessage->reset();
4659 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4660 .setTopic(queueBookmarks_._topic)
4661 .setBookmark(queueBookmarks_._data)
4662 .setCommandId(
"AMPS-queue-ack");
4663 amps_uint64_t haSequenceNumber = 0;
4666 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4667 publishStoreMessage->setAckType(
"persisted")
4668 .setSequence(haSequenceNumber);
4669 queueBookmarks_._data.erase();
4670 queueBookmarks_._bookmarkCount = 0;
4672 _send(*publishStoreMessage, haSequenceNumber);
4675 queueBookmarks_._data.erase();
4676 queueBookmarks_._bookmarkCount = 0;
4682 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4684 if (_isAutoAckEnabled)
4688 _ack(topic_, bookmark_, options_);
4690 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4692 if (bookmark_.
len() == 0)
4696 Lock<Mutex> lock(_lock);
4697 if (_ackBatchSize < 2 || options_ != NULL)
4699 if (!publishStoreMessage)
4701 publishStoreMessage =
new Message();
4702 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4704 publishStoreMessage->reset();
4705 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4706 .setCommandId(
"AMPS-queue-ack")
4707 .setTopic(topic_).setBookmark(bookmark_);
4710 publishStoreMessage->setOptions(options_);
4712 amps_uint64_t haSequenceNumber = 0;
4715 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4716 publishStoreMessage->setAckType(
"persisted")
4717 .setSequence(haSequenceNumber);
4719 _send(*publishStoreMessage, haSequenceNumber);
4723 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(), topic_.
len());
4724 TopicHashMap::iterator it = _topicHashMap.find(hash);
4725 if (it == _topicHashMap.end())
4728 #ifdef AMPS_USE_EMPLACE 4729 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4731 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4734 QueueBookmarks& queueBookmarks = it->second;
4735 if (queueBookmarks._data.length())
4737 queueBookmarks._data.append(
",");
4741 queueBookmarks._oldestTime = amps_now();
4743 queueBookmarks._data.append(bookmark_);
4744 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4746 _ack(queueBookmarks);
4749 void flushAcks(
void)
4751 size_t sendCount = 0;
4758 Lock<Mutex> lock(_lock);
4759 typedef TopicHashMap::iterator iterator;
4760 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4762 QueueBookmarks& queueBookmarks = it->second;
4763 sendCount += _ack(queueBookmarks);
4766 if (sendCount && _connected)
4768 publishFlush(0, Message::AckType::Processed);
4772 void checkQueueAcks(
void)
4774 if (!_topicHashMap.size())
4778 Lock<Mutex> lock(_lock);
4781 amps_uint64_t threshold = amps_now()
4782 - (amps_uint64_t)_queueAckTimeout;
4783 typedef TopicHashMap::iterator iterator;
4784 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4786 QueueBookmarks& queueBookmarks = it->second;
4787 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4789 _ack(queueBookmarks);
4793 catch (std::exception& ex)
4795 AMPS_UNHANDLED_EXCEPTION(ex);
4799 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4801 Lock<Mutex> lock(_deferredExecutionLock);
4802 #ifdef AMPS_USE_EMPLACE 4803 _deferredExecutionList.emplace_back(
4804 DeferredExecutionRequest(func_, userData_));
4806 _deferredExecutionList.push_back(
4807 DeferredExecutionRequest(func_, userData_));
4811 inline void processDeferredExecutions(
void)
4813 if (_deferredExecutionList.size())
4815 Lock<Mutex> lock(_deferredExecutionLock);
4816 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4817 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4818 for (; it != end; ++it)
4822 it->_func(it->_userData);
4830 _deferredExecutionList.clear();
4831 _routes.invalidateCache();
4832 _routeCache.invalidateCache();
4836 bool getRetryOnDisconnect(
void)
const 4838 return _isRetryOnDisconnect;
4841 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4843 _isRetryOnDisconnect = isRetryOnDisconnect_;
4846 void setDefaultMaxDepth(
unsigned maxDepth_)
4848 _defaultMaxDepth = maxDepth_;
4851 unsigned getDefaultMaxDepth(
void)
const 4853 return _defaultMaxDepth;
4945 RefHandle<MessageStreamImpl> _body;
4955 inline void advance(
void);
4962 : _pStream(pStream_)
4967 bool operator==(
const iterator& rhs)
const 4969 return _pStream == rhs._pStream;
4971 bool operator!=(
const iterator& rhs)
const 4973 return _pStream != rhs._pStream;
4975 void operator++(
void)
4991 return _body.isValid();
4998 if (!_body.isValid())
5000 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5032 unsigned getMaxDepth(
void)
const;
5035 unsigned getDepth(
void)
const;
5039 inline void setSOWOnly(
const std::string& commandId_,
5040 const std::string& queryId_ =
"");
5041 inline void setSubscription(
const std::string& subId_,
5042 const std::string& commandId_ =
"",
5043 const std::string& queryId_ =
"");
5044 inline void setStatsOnly(
const std::string& commandId_,
5045 const std::string& queryId_ =
"");
5046 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5052 friend class Client;
5078 BorrowRefHandle<ClientImpl> _body;
5080 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5081 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5082 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5093 : _body(new ClientImpl(clientName), true)
5096 Client(ClientImpl* existingClient)
5097 : _body(existingClient,
true)
5100 Client(ClientImpl* existingClient,
bool isRef)
5101 : _body(existingClient, isRef)
5104 Client(
const Client& rhs) : _body(rhs._body) {;}
5105 virtual ~Client(
void) {;}
5107 Client& operator=(
const Client& rhs)
5115 return _body.isValid();
5132 _body.get().setName(name);
5139 return _body.get().getName();
5147 return _body.get().getNameHash();
5155 return _body.get().getNameHashValue();
5166 _body.get().setLogonCorrelationData(logonCorrelationData_);
5173 return _body.get().getLogonCorrelationData();
5186 return _body.get().getServerVersion();
5197 return _body.get().getServerVersionInfo();
5211 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5226 return AMPS::convertVersionToNumber(data_, len_);
5233 return _body.get().getURI();
5257 _body.get().connect(uri);
5264 _body.get().disconnect();
5282 _body.get().send(message);
5295 unsigned requestedAcks_,
bool isSubscribe_)
5297 _body.get().addMessageHandler(commandId_, messageHandler_,
5298 requestedAcks_, isSubscribe_);
5306 return _body.get().removeMessageHandler(commandId_);
5334 return _body.get().send(messageHandler, message, timeout);
5348 _body.get().setDisconnectHandler(disconnectHandler);
5356 return _body.get().getDisconnectHandler();
5365 return _body.get().getConnectionInfo();
5378 _body.get().setBookmarkStore(bookmarkStore_);
5386 return _body.
get().getBookmarkStore();
5394 return _body.get().getSubscriptionManager();
5406 _body.get().setSubscriptionManager(subscriptionManager_);
5430 _body.get().setPublishStore(publishStore_);
5438 return _body.
get().getPublishStore();
5446 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5447 duplicateMessageHandler_);
5461 return _body.get().getDuplicateMessageHandler();
5475 _body.get().setFailedWriteHandler(handler_);
5483 return _body.get().getFailedWriteHandler();
5504 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5506 return _body.get().publish(topic_.c_str(), topic_.length(),
5507 data_.c_str(), data_.length());
5529 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5530 const char* data_,
size_t dataLength_)
5532 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5553 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5554 unsigned long expiration_)
5556 return _body.get().publish(topic_.c_str(), topic_.length(),
5557 data_.c_str(), data_.length(), expiration_);
5580 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5581 const char* data_,
size_t dataLength_,
5582 unsigned long expiration_)
5584 return _body.get().publish(topic_, topicLength_,
5585 data_, dataLength_, expiration_);
5626 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5628 _body.get().publishFlush(timeout_, ackType_);
5647 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5649 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5650 data_.c_str(), data_.length());
5671 const char* data_,
size_t dataLength_)
5673 return _body.get().deltaPublish(topic_, topicLength_,
5674 data_, dataLength_);
5693 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5694 unsigned long expiration_)
5696 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5697 data_.c_str(), data_.length(),
5720 const char* data_,
size_t dataLength_,
5721 unsigned long expiration_)
5723 return _body.get().deltaPublish(topic_, topicLength_,
5724 data_, dataLength_, expiration_);
5744 const char* options_ = NULL)
5746 return _body.get().logon(timeout_, authenticator_, options_);
5761 std::string
logon(
const char* options_,
int timeout_ = 0)
5780 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5806 const std::string& topic_,
5808 const std::string& filter_ =
"",
5809 const std::string& options_ =
"",
5810 const std::string& subId_ =
"")
5812 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5813 filter_,
"", options_, subId_);
5832 long timeout_ = 0,
const std::string& filter_ =
"",
5833 const std::string& options_ =
"",
5834 const std::string& subId_ =
"")
5837 if (_body.get().getDefaultMaxDepth())
5839 result.
maxDepth(_body.get().getDefaultMaxDepth());
5841 result.setSubscription(_body.get().subscribe(
5843 topic_, timeout_, filter_,
"",
5844 options_, subId_,
false));
5864 long timeout_ = 0,
const std::string& filter_ =
"",
5865 const std::string& options_ =
"",
5866 const std::string& subId_ =
"")
5869 if (_body.get().getDefaultMaxDepth())
5871 result.
maxDepth(_body.get().getDefaultMaxDepth());
5873 result.setSubscription(_body.get().subscribe(
5875 topic_, timeout_, filter_,
"",
5876 options_, subId_,
false));
5893 const std::string& topic_,
5895 const std::string& filter_ =
"",
5896 const std::string& options_ =
"",
5897 const std::string& subId_ =
"")
5899 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5900 filter_,
"", options_, subId_);
5911 long timeout_,
const std::string& filter_ =
"",
5912 const std::string& options_ =
"",
5913 const std::string& subId_ =
"")
5916 if (_body.get().getDefaultMaxDepth())
5918 result.
maxDepth(_body.get().getDefaultMaxDepth());
5920 result.setSubscription(_body.get().deltaSubscribe(
5922 topic_, timeout_, filter_,
"",
5923 options_, subId_,
false));
5929 long timeout_,
const std::string& filter_ =
"",
5930 const std::string& options_ =
"",
5931 const std::string& subId_ =
"")
5934 if (_body.get().getDefaultMaxDepth())
5936 result.
maxDepth(_body.get().getDefaultMaxDepth());
5938 result.setSubscription(_body.get().deltaSubscribe(
5940 topic_, timeout_, filter_,
"",
5941 options_, subId_,
false));
5971 const std::string& topic_,
5973 const std::string& bookmark_,
5974 const std::string& filter_ =
"",
5975 const std::string& options_ =
"",
5976 const std::string& subId_ =
"")
5978 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5979 filter_, bookmark_, options_, subId_);
6000 const std::string& bookmark_,
6001 const std::string& filter_ =
"",
6002 const std::string& options_ =
"",
6003 const std::string& subId_ =
"")
6006 if (_body.get().getDefaultMaxDepth())
6008 result.
maxDepth(_body.get().getDefaultMaxDepth());
6010 result.setSubscription(_body.get().subscribe(
6012 topic_, timeout_, filter_,
6013 bookmark_, options_,
6021 const std::string& bookmark_,
6022 const std::string& filter_ =
"",
6023 const std::string& options_ =
"",
6024 const std::string& subId_ =
"")
6027 if (_body.get().getDefaultMaxDepth())
6029 result.
maxDepth(_body.get().getDefaultMaxDepth());
6031 result.setSubscription(_body.get().subscribe(
6033 topic_, timeout_, filter_,
6034 bookmark_, options_,
6049 return _body.get().unsubscribe(commandId);
6061 return _body.get().unsubscribe();
6095 const std::string& topic_,
6096 const std::string& filter_ =
"",
6097 const std::string& orderBy_ =
"",
6098 const std::string& bookmark_ =
"",
6099 int batchSize_ = DEFAULT_BATCH_SIZE,
6100 int topN_ = DEFAULT_TOP_N,
6101 const std::string& options_ =
"",
6102 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6104 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6105 bookmark_, batchSize_, topN_, options_,
6133 const std::string& filter_ =
"",
6134 const std::string& orderBy_ =
"",
6135 const std::string& bookmark_ =
"",
6136 int batchSize_ = DEFAULT_BATCH_SIZE,
6137 int topN_ = DEFAULT_TOP_N,
6138 const std::string& options_ =
"",
6139 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6142 if (_body.get().getDefaultMaxDepth())
6144 result.
maxDepth(_body.get().getDefaultMaxDepth());
6146 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6147 topic_, filter_, orderBy_, bookmark_,
6148 batchSize_, topN_, options_, timeout_));
6154 const std::string& filter_ =
"",
6155 const std::string& orderBy_ =
"",
6156 const std::string& bookmark_ =
"",
6157 int batchSize_ = DEFAULT_BATCH_SIZE,
6158 int topN_ = DEFAULT_TOP_N,
6159 const std::string& options_ =
"",
6160 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6163 if (_body.get().getDefaultMaxDepth())
6165 result.
maxDepth(_body.get().getDefaultMaxDepth());
6167 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6168 topic_, filter_, orderBy_, bookmark_,
6169 batchSize_, topN_, options_, timeout_));
6195 const std::string& topic_,
6197 const std::string& filter_ =
"",
6198 int batchSize_ = DEFAULT_BATCH_SIZE,
6199 int topN_ = DEFAULT_TOP_N)
6201 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6227 const std::string& topic_,
6229 const std::string& filter_ =
"",
6230 int batchSize_ = DEFAULT_BATCH_SIZE,
6231 bool oofEnabled_ =
false,
6232 int topN_ = DEFAULT_TOP_N)
6234 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6235 filter_, batchSize_, oofEnabled_,
6260 const std::string& filter_ =
"",
6261 int batchSize_ = DEFAULT_BATCH_SIZE,
6262 bool oofEnabled_ =
false,
6263 int topN_ = DEFAULT_TOP_N)
6266 if (_body.get().getDefaultMaxDepth())
6268 result.
maxDepth(_body.get().getDefaultMaxDepth());
6270 result.setSubscription(_body.get().sowAndSubscribe(
6272 topic_, timeout_, filter_,
6273 batchSize_, oofEnabled_,
6298 const std::string& filter_ =
"",
6299 int batchSize_ = DEFAULT_BATCH_SIZE,
6300 bool oofEnabled_ =
false,
6301 int topN_ = DEFAULT_TOP_N)
6304 if (_body.get().getDefaultMaxDepth())
6306 result.
maxDepth(_body.get().getDefaultMaxDepth());
6308 result.setSubscription(_body.get().sowAndSubscribe(
6310 topic_, timeout_, filter_,
6311 batchSize_, oofEnabled_,
6345 const std::string& topic_,
6346 const std::string& filter_ =
"",
6347 const std::string& orderBy_ =
"",
6348 const std::string& bookmark_ =
"",
6349 int batchSize_ = DEFAULT_BATCH_SIZE,
6350 int topN_ = DEFAULT_TOP_N,
6351 const std::string& options_ =
"",
6352 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6354 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6355 orderBy_, bookmark_, batchSize_,
6356 topN_, options_, timeout_);
6384 const std::string& filter_ =
"",
6385 const std::string& orderBy_ =
"",
6386 const std::string& bookmark_ =
"",
6387 int batchSize_ = DEFAULT_BATCH_SIZE,
6388 int topN_ = DEFAULT_TOP_N,
6389 const std::string& options_ =
"",
6390 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6393 if (_body.get().getDefaultMaxDepth())
6395 result.
maxDepth(_body.get().getDefaultMaxDepth());
6397 result.setSubscription(_body.get().sowAndSubscribe(
6399 topic_, filter_, orderBy_,
6400 bookmark_, batchSize_, topN_,
6401 options_, timeout_,
false));
6407 const std::string& filter_ =
"",
6408 const std::string& orderBy_ =
"",
6409 const std::string& bookmark_ =
"",
6410 int batchSize_ = DEFAULT_BATCH_SIZE,
6411 int topN_ = DEFAULT_TOP_N,
6412 const std::string& options_ =
"",
6413 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6416 if (_body.get().getDefaultMaxDepth())
6418 result.
maxDepth(_body.get().getDefaultMaxDepth());
6420 result.setSubscription(_body.get().sowAndSubscribe(
6422 topic_, filter_, orderBy_,
6423 bookmark_, batchSize_, topN_,
6424 options_, timeout_,
false));
6453 const std::string& topic_,
6454 const std::string& filter_ =
"",
6455 const std::string& orderBy_ =
"",
6456 int batchSize_ = DEFAULT_BATCH_SIZE,
6457 int topN_ = DEFAULT_TOP_N,
6458 const std::string& options_ =
"",
6459 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6461 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6462 filter_, orderBy_, batchSize_,
6463 topN_, options_, timeout_);
6486 const std::string& filter_ =
"",
6487 const std::string& orderBy_ =
"",
6488 int batchSize_ = DEFAULT_BATCH_SIZE,
6489 int topN_ = DEFAULT_TOP_N,
6490 const std::string& options_ =
"",
6491 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6494 if (_body.get().getDefaultMaxDepth())
6496 result.
maxDepth(_body.get().getDefaultMaxDepth());
6498 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6500 topic_, filter_, orderBy_,
6501 batchSize_, topN_, options_,
6508 const std::string& filter_ =
"",
6509 const std::string& orderBy_ =
"",
6510 int batchSize_ = DEFAULT_BATCH_SIZE,
6511 int topN_ = DEFAULT_TOP_N,
6512 const std::string& options_ =
"",
6513 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6516 if (_body.get().getDefaultMaxDepth())
6518 result.
maxDepth(_body.get().getDefaultMaxDepth());
6520 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6522 topic_, filter_, orderBy_,
6523 batchSize_, topN_, options_,
6553 const std::string& topic_,
6555 const std::string& filter_ =
"",
6556 int batchSize_ = DEFAULT_BATCH_SIZE,
6557 bool oofEnabled_ =
false,
6558 bool sendEmpties_ =
false,
6559 int topN_ = DEFAULT_TOP_N)
6561 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6562 timeout_, filter_, batchSize_,
6563 oofEnabled_, sendEmpties_,
6590 const std::string& filter_ =
"",
6591 int batchSize_ = DEFAULT_BATCH_SIZE,
6592 bool oofEnabled_ =
false,
6593 bool sendEmpties_ =
false,
6594 int topN_ = DEFAULT_TOP_N)
6597 if (_body.get().getDefaultMaxDepth())
6599 result.
maxDepth(_body.get().getDefaultMaxDepth());
6601 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6603 topic_, timeout_, filter_,
6604 batchSize_, oofEnabled_,
6605 sendEmpties_, topN_,
false));
6631 const std::string& filter_ =
"",
6632 int batchSize_ = DEFAULT_BATCH_SIZE,
6633 bool oofEnabled_ =
false,
6634 bool sendEmpties_ =
false,
6635 int topN_ = DEFAULT_TOP_N)
6638 if (_body.get().getDefaultMaxDepth())
6640 result.
maxDepth(_body.get().getDefaultMaxDepth());
6642 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6644 topic_, timeout_, filter_,
6645 batchSize_, oofEnabled_,
6646 sendEmpties_, topN_,
false));
6669 const std::string& topic,
6670 const std::string& filter,
6673 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6701 stream.setStatsOnly(cid);
6702 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6703 return *(stream.
begin());
6705 catch (
const DisconnectedException&)
6707 removeMessageHandler(cid);
6718 _body.get().startTimer();
6729 return _body.get().stopTimer(messageHandler);
6754 const std::string& topic_,
6755 const std::string& keys_,
6758 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6790 stream.setStatsOnly(cid);
6791 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6792 return *(stream.
begin());
6794 catch (
const DisconnectedException&)
6796 removeMessageHandler(cid);
6816 const std::string& topic_,
const std::string& data_,
6819 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6846 stream.setStatsOnly(cid);
6847 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6848 return *(stream.
begin());
6850 catch (
const DisconnectedException&)
6852 removeMessageHandler(cid);
6862 return _body.get().getHandle();
6875 _body.get().setExceptionListener(pListener_);
6888 _body.get().setExceptionListener(listener_);
6895 return _body.get().getExceptionListener();
6921 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6945 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6951 setLastChanceMessageHandler(messageHandler);
6958 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6984 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7009 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7092 _body.get().addConnectionStateListener(listener);
7100 _body.get().removeConnectionStateListener(listener);
7107 _body.get().clearConnectionStateListeners();
7137 return _body.get().executeAsync(command_, handler_);
7175 if (command_.isSubscribe())
7177 Message& message = command_.getMessage();
7180 if (useExistingHandler)
7183 if (_body.get()._routes.getRoute(subId, existingHandler))
7186 _body.get().executeAsync(command_, existingHandler,
false);
7191 id = _body.get().executeAsync(command_, handler_,
false);
7193 catch (
const DisconnectedException&)
7195 removeMessageHandler(command_.getMessage().
getCommandId());
7196 if (command_.isSubscribe())
7200 if (command_.isSow())
7202 removeMessageHandler(command_.getMessage().
getQueryID());
7233 _body.get().ack(topic_, bookmark_, options_);
7255 void ack(
const std::string& topic_,
const std::string& bookmark_,
7256 const char* options_ = NULL)
7258 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7266 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7268 _body.get()._ack(topic_, bookmark_, options_);
7281 _body.get().flushAcks();
7290 return _body.get().getAutoAck();
7300 _body.get().setAutoAck(isAutoAckEnabled_);
7308 return _body.get().getAckBatchSize();
7318 _body.get().setAckBatchSize(ackBatchSize_);
7329 return _body.get().getAckTimeout();
7341 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7343 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7345 _body.get().setAckTimeout(ackTimeout_);
7359 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7368 return _body.get().getRetryOnDisconnect();
7377 _body.get().setDefaultMaxDepth(maxDepth_);
7386 return _body.get().getDefaultMaxDepth();
7398 return _body.get().setTransportFilterFunction(filter_, userData_);
7412 return _body.get().setThreadCreatedCallback(callback_, userData_);
7420 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7422 _body.get().deferredExecution(func_, userData_);
7432 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7438 unsigned deliveries = 0;
7450 const char* data = NULL;
7452 const char* status = NULL;
7453 size_t statusLen = 0;
7455 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7458 if (len == NotEntitled || len == Duplicate ||
7459 (statusLen == Failure && status[0] ==
'f'))
7461 if (_failedWriteHandler)
7463 if (_publishStore.isValid())
7465 amps_uint64_t sequence =
7467 FailedWriteStoreReplayer replayer(
this, data, len);
7468 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7469 replayer, sequence));
7475 AMPS_CALL_EXCEPTION_WRAPPER(
7476 _failedWriteHandler->failedWrite(emptyMessage,
7482 if (_publishStore.isValid())
7491 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7495 if (!deliveries && _bookmarkStore.isValid())
7502 const char* bookmarkData = NULL;
7503 size_t bookmarkLen = 0;
7509 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7512 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
7517 catch (std::exception& ex)
7519 AMPS_UNHANDLED_EXCEPTION(ex);
7525 ClientImpl::processedAck(
Message& message)
7527 unsigned deliveries = 0;
7529 const char* data = NULL;
7533 Lock<Mutex> l(_lock);
7536 Lock<Mutex> guard(_ackMapLock);
7537 AckMap::iterator i = _ackMap.find(std::string(data, len));
7538 if (i != _ackMap.end())
7548 ack.setStatus(data, len);
7550 ack.setReason(data, len);
7552 ack.setUsername(data, len);
7554 ack.setPassword(data, len);
7556 ack.setServerVersion(data, len);
7558 ack.setOptions(data, len);
7568 ClientImpl::checkAndSendHeartbeat(
bool force)
7570 if (force || _heartbeatTimer.check())
7572 _heartbeatTimer.start();
7575 sendWithoutRetry(_beatMessage);
7577 catch (
const AMPSException&)
7584 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 7586 ConnectionInfo info;
7587 std::ostringstream writer;
7589 info[
"client.uri"] = _lastUri;
7590 info[
"client.name"] = _name;
7591 info[
"client.username"] = _username;
7592 if (_publishStore.isValid())
7594 writer << _publishStore.unpersistedCount();
7595 info[
"publishStore.unpersistedCount"] = writer.str();
7604 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7606 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7607 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7608 ClientImpl* me = (ClientImpl*) userData_;
7609 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7610 if (!messageHandle_)
7612 if (me->_queueAckTimeout)
7614 me->checkQueueAcks();
7619 me->_readMessage.replace(messageHandle_);
7620 Message& message = me->_readMessage;
7622 if (commandType & SOWMask)
7624 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7628 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7629 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7631 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7634 else if (commandType & PublishMask)
7636 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7637 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7638 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7639 GlobalCommandTypeHandlers::Publish :
7640 GlobalCommandTypeHandlers::OOF)].invoke(message));
7642 const char* subIds = NULL;
7643 size_t subIdsLen = 0;
7646 &subIds, &subIdsLen);
7647 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7648 for (
size_t i = 0; i < subIdCount; ++i)
7650 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7652 if (handler.isValid())
7655 AMPS_SubscriptionId,
7656 subIds + lookupResult.idOffset,
7657 lookupResult.idLength);
7660 bool isAutoAck = me->_isAutoAckEnabled;
7662 if (!isMessageQueue && !bookmark.
empty() &&
7663 me->_bookmarkStore.isValid())
7665 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7668 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7670 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7675 me->_bookmarkStore.log(me->_readMessage);
7676 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7677 handler.invoke(message));
7682 if (isMessageQueue && isAutoAck)
7686 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7687 if (!message.getIgnoreAutoAck())
7689 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7693 catch (std::exception& ex)
7695 if (!message.getIgnoreAutoAck())
7697 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7700 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7705 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7706 handler.invoke(message));
7712 me->lastChance(message);
7716 else if (commandType == Message::Command::Ack)
7718 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7719 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7721 unsigned deliveries = 0U;
7724 case Message::AckType::Persisted:
7725 deliveries += me->persistedAck(message);
7727 case Message::AckType::Processed:
7728 deliveries += me->processedAck(message);
7731 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7732 if (deliveries == 0)
7734 me->lastChance(message);
7737 else if (commandType == Message::Command::Heartbeat)
7739 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7740 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7741 if (me->_heartbeatTimer.getTimeout() != 0.0)
7743 me->checkAndSendHeartbeat(
true);
7747 me->lastChance(message);
7753 unsigned deliveries = 0U;
7756 while (me->_connected)
7760 deliveries = me->_routes.deliverData(message, message.
getCommandId());
7764 catch (MessageStreamFullException&)
7766 catch (MessageStreamFullException& ex_)
7769 me->checkAndSendHeartbeat(
false);
7773 catch (std::exception& ex_)
7777 me->_exceptionListener->exceptionThrown(ex_);
7784 if (deliveries == 0)
7786 me->lastChance(message);
7789 me->checkAndSendHeartbeat();
7794 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7796 ClientImpl* me = (ClientImpl*) userData;
7799 me->clearAcks(failedConnectionVersion);
7803 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7805 ClientImpl* me = (ClientImpl*) userData;
7806 Lock<Mutex> l(me->_lock);
7807 Client wrapper(me,
false);
7810 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7814 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7817 me->_connected =
false;
7821 Unlock<Mutex> unlock(me->_lock);
7822 me->_disconnectHandler.invoke(wrapper);
7825 catch (
const std::exception& ex)
7827 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7829 me->_lock.signalAll();
7831 if (!me->_connected)
7833 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7834 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7840 if (me->_subscriptionManager)
7845 Unlock<Mutex> unlock(me->_lock);
7846 me->_subscriptionManager->resubscribe(wrapper);
7848 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7852 catch (
const AMPSException& subEx)
7854 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7856 catch (
const std::exception& subEx)
7858 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7881 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
7882 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7884 while (_pos != _len && _data[_pos] == _fieldSep)
7890 typedef void* difference_type;
7891 typedef std::forward_iterator_tag iterator_category;
7892 typedef std::pair<Message::Field, Message::Field> value_type;
7893 typedef value_type* pointer;
7894 typedef value_type& reference;
7895 bool operator==(
const iterator& rhs)
const 7897 return _pos == rhs._pos;
7899 bool operator!=(
const iterator& rhs)
const 7901 return _pos != rhs._pos;
7903 iterator& operator++()
7906 while (_pos != _len && _data[_pos] != _fieldSep)
7911 while (_pos != _len && _data[_pos] == _fieldSep)
7918 value_type operator*()
const 7921 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7922 for (; i < _len && _data[i] !=
'='; ++i)
7927 result.first.assign(_data + _pos, keyLength);
7929 if (i < _len && _data[i] ==
'=')
7933 for (; i < _len && _data[i] != _fieldSep; ++i)
7938 result.second.assign(_data + valueStart, valueLength);
7944 class reverse_iterator
7951 typedef std::pair<Message::Field, Message::Field> value_type;
7952 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7953 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7958 while (_pos >= _data && *_pos == _fieldSep)
7962 while (_pos > _data && *_pos != _fieldSep)
7969 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7979 bool operator==(
const reverse_iterator& rhs)
const 7981 return _pos == rhs._pos;
7983 bool operator!=(
const reverse_iterator& rhs)
const 7985 return _pos != rhs._pos;
7987 reverse_iterator& operator++()
7998 while (_pos >= _data && *_pos == _fieldSep)
8003 while (_pos > _data && *_pos != _fieldSep)
8007 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8018 value_type operator*()
const 8021 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8022 size_t i = (size_t)(_pos - _data);
8023 for (; i < _len && _data[i] !=
'='; ++i)
8027 result.first.assign(_pos, keyLength);
8028 if (i < _len && _data[i] ==
'=')
8032 for (; i < _len && _data[i] != _fieldSep; ++i)
8037 result.second.assign(_data + valueStart, valueLength);
8042 : _data(data.
data()), _len(data.
len()),
8043 _fieldSep(fieldSeparator)
8047 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8048 : _data(data), _len(len), _fieldSep(fieldSeparator)
8052 iterator begin()
const 8054 return iterator(_data, _len, 0, _fieldSep);
8056 iterator end()
const 8058 return iterator(_data, _len, _len, _fieldSep);
8062 reverse_iterator rbegin()
const 8064 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8067 reverse_iterator rend()
const 8069 return reverse_iterator(_data, _len, 0, _fieldSep);
8090 std::stringstream _data;
8107 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8109 _data << tag <<
'=';
8110 _data.write(value + offset, (std::streamsize)length);
8118 void append(
const T& tag,
const std::string& value)
8120 _data << tag <<
'=' << value << _fs;
8129 operator std::string()
const 8137 _data.str(std::string());
8174 typedef std::map<Message::Field, Message::Field>
map_type;
8185 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8194 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128 8198 std::deque<Message> _q;
8199 std::deque<Message> _cache;
8200 std::string _commandId;
8202 std::string _queryId;
8206 unsigned _requestedAcks;
8210 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8211 #if __cplusplus >= 201100L || _MSC_VER >= 1900 8212 std::atomic<State> _state;
8214 volatile State _state;
8216 typedef std::map<std::string, Message*> SOWKeyMap;
8217 SOWKeyMap _sowKeyMap;
8219 MessageStreamImpl(
const Client& client_)
8222 _maxDepth((
unsigned)~0),
8224 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8227 if (_client.isValid())
8233 MessageStreamImpl(ClientImpl* client_)
8236 _maxDepth((
unsigned)~0),
8240 if (_client.isValid())
8246 ~MessageStreamImpl()
8250 virtual void destroy()
8256 catch (std::exception& e)
8260 if (_client.isValid())
8267 if (_client.isValid())
8271 _client = Client((ClientImpl*)NULL);
8272 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8280 static void destroyer(
void* vpMessageStreamImpl_)
8282 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8285 void setSubscription(
const std::string& subId_,
8286 const std::string& commandId_ =
"",
8287 const std::string& queryId_ =
"")
8289 Lock<Mutex> lock(_lock);
8291 if (!commandId_.empty() && commandId_ != subId_)
8293 _commandId = commandId_;
8295 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8297 _queryId = queryId_;
8300 if (Disconnected == _state)
8304 assert(Unset == _state);
8308 void setSOWOnly(
const std::string& commandId_,
8309 const std::string& queryId_ =
"")
8311 Lock<Mutex> lock(_lock);
8312 _commandId = commandId_;
8313 if (!queryId_.empty() && queryId_ != commandId_)
8315 _queryId = queryId_;
8318 if (Disconnected == _state)
8322 assert(Unset == _state);
8326 void setStatsOnly(
const std::string& commandId_,
8327 const std::string& queryId_ =
"")
8329 Lock<Mutex> lock(_lock);
8330 _commandId = commandId_;
8331 if (!queryId_.empty() && queryId_ != commandId_)
8333 _queryId = queryId_;
8336 if (Disconnected == _state)
8340 assert(Unset == _state);
8342 _requestedAcks = Message::AckType::Stats;
8345 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8347 Lock<Mutex> lock(_lock);
8348 _commandId = commandId_;
8350 if (Disconnected == _state)
8354 assert(Unset == _state);
8356 _requestedAcks = acks_;
8361 Lock<Mutex> lock(_lock);
8362 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8364 _state = Disconnected;
8370 void timeout(
unsigned timeout_)
8372 _timeout = timeout_;
8376 if (_state == Subscribe)
8381 void maxDepth(
unsigned maxDepth_)
8385 _maxDepth = maxDepth_;
8389 _maxDepth = (unsigned)~0;
8392 unsigned getMaxDepth(
void)
const 8396 unsigned getDepth(
void)
const 8398 return (
unsigned)(_q.size());
8403 Lock<Mutex> lock(_lock);
8404 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
8408 if (_client.isValid())
8410 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8414 catch (AMPSException&)
8416 catch (AMPSException& e)
8419 current_.invalidate();
8420 _previousTopic.
clear();
8421 _previousBookmark.
clear();
8424 _previousTopic.
clear();
8425 _previousBookmark.
clear();
8427 double minWaitTime = (double)((_timeout && _timeout > 1000)
8429 Timer timer(minWaitTime);
8431 while (_q.empty() && _state & Running)
8434 _lock.wait((
long)minWaitTime);
8436 Unlock<Mutex> unlck(_lock);
8437 amps_invoke_waiting_function();
8442 if (timer.checkAndGetRemaining(&minWaitTime))
8448 if (current_.isValid() && _cache.size() < _cacheMax)
8451 _cache.push_back(current_);
8455 current_ = _q.front();
8456 if (_q.size() == _maxDepth)
8461 if (_state == Conflate)
8463 std::string sowKey = current_.
getSowKey();
8464 if (sowKey.length())
8466 _sowKeyMap.erase(sowKey);
8469 else if (_state == AcksOnly)
8473 if ((_state == AcksOnly && _requestedAcks == 0) ||
8474 (_state == SOWOnly && current_.
getCommand() ==
"group_end"))
8478 else if (current_.
getCommandEnum() == Message::Command::Publish &&
8488 if (_state == Disconnected)
8490 throw DisconnectedException(
"Connection closed.");
8492 current_.invalidate();
8493 if (_state == Closed)
8497 return _timeout != 0;
8501 if (_client.isValid())
8503 if (_state == SOWOnly || _state == Subscribe)
8505 if (!_commandId.empty())
8509 if (!_subId.empty())
8513 if (!_queryId.empty())
8520 if (!_commandId.empty())
8524 if (!_subId.empty())
8528 if (!_queryId.empty())
8534 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8539 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8541 Lock<Mutex> lock(this_->_lock);
8542 if (this_->_state != Conflate)
8544 AMPS_TESTING_SLOW_MESSAGE_STREAM
8545 if (this_->_q.size() >= this_->_maxDepth)
8550 this_->_lock.signalAll();
8551 throw MessageStreamFullException(
"Stream is currently full.");
8553 if (!this_->_cache.empty())
8555 this_->_cache.front().deepCopy(message_);
8556 this_->_q.push_back(this_->_cache.front());
8557 this_->_cache.pop_front();
8561 #ifdef AMPS_USE_EMPLACE 8562 this_->_q.emplace_back(message_.
deepCopy());
8564 this_->_q.push_back(message_.
deepCopy());
8568 this_->_client.isValid() && this_->_client.getAutoAck() &&
8572 message_.setIgnoreAutoAck();
8577 std::string sowKey = message_.
getSowKey();
8578 if (sowKey.length())
8580 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8581 if (it != this_->_sowKeyMap.end())
8583 it->second->deepCopy(message_);
8587 if (this_->_q.size() >= this_->_maxDepth)
8593 this_->_lock.signalAll();
8594 throw MessageStreamFullException(
"Stream is currently full.");
8596 if (!this_->_cache.empty())
8598 this_->_cache.front().deepCopy(message_);
8599 this_->_q.push_back(this_->_cache.front());
8600 this_->_cache.pop_front();
8604 #ifdef AMPS_USE_EMPLACE 8605 this_->_q.emplace_back(message_.
deepCopy());
8607 this_->_q.push_back(message_.
deepCopy());
8610 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8615 if (this_->_q.size() >= this_->_maxDepth)
8620 this_->_lock.signalAll();
8621 throw MessageStreamFullException(
"Stream is currently full.");
8623 if (!this_->_cache.empty())
8625 this_->_cache.front().deepCopy(message_);
8626 this_->_q.push_back(this_->_cache.front());
8627 this_->_cache.pop_front();
8631 #ifdef AMPS_USE_EMPLACE 8632 this_->_q.emplace_back(message_.
deepCopy());
8634 this_->_q.push_back(message_.
deepCopy());
8638 this_->_client.isValid() && this_->_client.getAutoAck() &&
8642 message_.setIgnoreAutoAck();
8646 this_->_lock.signalAll();
8649 inline MessageStream::MessageStream(
void)
8652 inline MessageStream::MessageStream(
const Client& client_)
8653 : _body(
new MessageStreamImpl(client_))
8656 inline void MessageStream::iterator::advance(
void)
8658 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8662 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8667 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8669 result._body = (MessageStreamImpl*)(handler_._userData);
8674 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8675 const std::string& queryId_)
8677 _body->setSOWOnly(commandId_, queryId_);
8679 inline void MessageStream::setSubscription(
const std::string& subId_,
8680 const std::string& commandId_,
8681 const std::string& queryId_)
8683 _body->setSubscription(subId_, commandId_, queryId_);
8685 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8686 const std::string& queryId_)
8688 _body->setStatsOnly(commandId_, queryId_);
8690 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8693 _body->setAcksOnly(commandId_, acks_);
8712 return _body->getMaxDepth();
8716 return _body->getDepth();
8719 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8721 return *(_pEmptyMessageStream.get());
8729 ClientImpl& body = _body.get();
8730 Message& message = command_.getMessage();
8734 if (useExistingHandler)
8740 if (body._routes.getRoute(subId, existingHandler))
8743 body.executeAsync(command_, existingHandler,
false);
8744 return MessageStream::fromExistingHandler(existingHandler);
8753 if ((command & Message::Command::NoDataCommands)
8754 && (ackTypes == Message::AckType::Persisted
8755 || ackTypes == Message::AckType::None))
8758 if (!body._pEmptyMessageStream)
8760 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8761 body._pEmptyMessageStream.get()->_body->close();
8763 return body.getEmptyMessageStream();
8766 if (body.getDefaultMaxDepth())
8768 stream.
maxDepth(body.getDefaultMaxDepth());
8771 std::string commandID = body.executeAsync(command_, handler,
false);
8772 if (command_.hasStatsAck())
8774 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8776 else if (command_.isSow())
8778 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8780 else if (command_.isSubscribe())
8782 stream.setSubscription(commandID,
8789 if (command == Message::Command::Publish ||
8790 command == Message::Command::DeltaPublish ||
8791 command == Message::Command::SOWDelete)
8793 stream.setAcksOnly(commandID,
8794 ackTypes & (
unsigned)~Message::AckType::Persisted);
8798 stream.setAcksOnly(commandID, ackTypes);
8805 inline void Message::ack(
const char* options_)
const 8807 ClientImpl* pClient = _body.get().clientImpl();
8809 if (pClient && bookmark.
len() &&
8810 !pClient->getAutoAck())
8813 pClient->ack(getTopic(), bookmark, options_);
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:717
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:668
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:183
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5092
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6753
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6727
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:915
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5304
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8088
void startTimer()
Definition: ampsplusplus.hpp:6716
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6258
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1059
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8700
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5332
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:529
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:728
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7007
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:893
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7375
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5153
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5209
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6047
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:758
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1031
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:642
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7384
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7243
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5998
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5428
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1308
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5693
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5224
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:694
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5444
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7098
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7327
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5195
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6780
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:537
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1279
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8125
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7395
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5580
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4996
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:629
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7026
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:871
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7036
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5280
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5970
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1122
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7357
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8710
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5255
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1423
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6452
Success.
Definition: amps.h:205
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1005
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5670
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8170
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6094
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:195
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5293
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4989
amps_result
Return values from amps_xxx functions.
Definition: amps.h:200
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5481
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8724
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:626
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5436
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7409
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:920
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5184
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1467
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:798
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5742
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1212
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:661
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5145
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1067
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6406
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5075
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6982
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:687
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7017
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7090
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:552
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:820
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7105
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5529
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:635
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5392
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5805
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:544
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6893
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1022
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1463
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6815
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:766
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1233
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1017
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5910
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7046
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:561
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:779
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5761
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:999
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5164
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8098
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7255
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1012
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1053
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6296
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1153
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8107
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1293
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5384
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1250
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7316
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1343
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6691
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1451
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5376
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:674
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6886
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:569
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5404
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6949
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8118
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7306
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5647
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6383
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:851
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4951
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6629
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1242
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:806
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5626
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7135
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:594
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6588
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5780
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6059
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7298
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5928
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1201
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6552
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:835
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6873
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1325
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5459
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:707
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5354
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8162
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5363
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7231
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1401
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8174
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:655
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5346
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1185
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8181
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:236
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6485
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6943
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5863
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4943
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1317
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:772
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7068
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:700
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:785
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7079
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:652
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1271
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:648
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8705
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5262
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6132
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8714
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:229
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5171
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6919
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:826
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5231
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:857
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1221
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7057
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:681
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6860
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8135
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5473
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6507
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7339
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:611
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7288
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5504
Definition: ampsplusplus.hpp:106
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5130
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:968
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1263
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1373
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1129
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:751
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6153
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5553
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6956
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5719
The client and server are disconnected.
Definition: amps.h:233
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6836
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5892
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8695
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6194
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5137
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:441
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6344
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5831
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6019
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:581
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7169
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:739
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6668
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5007
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7366
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7279
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6226