25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
28 #include "amps/ampsver.h"
47 #include <sys/atomic.h>
49 #include "amps/BookmarkStore.hpp"
50 #include "amps/MessageRouter.hpp"
51 #include "amps/util.hpp"
52 #include "amps/ampscrc.hpp"
53 #if __cplusplus >= 201100L || _MSC_VER >= 1900
57 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
58 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
86 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
87 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
88 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
89 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
90 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
91 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
92 #define AMPS_DEFAULT_TOP_N -1
93 #define AMPS_DEFAULT_BATCH_SIZE 10
94 #define AMPS_NUMBER_BUFFER_LEN 20
95 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
97 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
106 typedef std::map<std::string, std::string> ConnectionInfo;
109 inline std::string asString(Type x_)
111 std::ostringstream os;
117 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
119 size_t pos = AMPS_NUMBER_BUFFER_LEN;
120 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
124 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
133 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
135 size_t pos = AMPS_NUMBER_BUFFER_LEN;
136 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
140 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
154 static const char* duplicate()
158 static const char* badFilter()
162 static const char* badRegexTopic()
164 return "bad regex topic";
166 static const char* subscriptionAlreadyExists()
168 return "subscription already exists";
170 static const char* nameInUse()
172 return "name in use";
174 static const char* authFailure()
176 return "auth failure";
178 static const char* notEntitled()
180 return "not entitled";
182 static const char* authDisabled()
184 return "authentication disabled";
186 static const char* subidInUse()
188 return "subid in use";
190 static const char* noTopic()
208 virtual void exceptionThrown(
const std::exception&)
const {;}
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
219 catch (std::exception& ex_)\
223 _exceptionListener->exceptionThrown(ex_);\
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
250 while(me->_connected)\
257 catch(MessageStreamFullException&)\
261 me->checkAndSendHeartbeat(false);\
263 catch (std::exception& ex_)\
267 me->_exceptionListener->exceptionThrown(ex_);\
278 catch (std::exception& ex_)\
282 me->_exceptionListener->exceptionThrown(ex_);\
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307 while(me->_connected)\
314 catch(MessageStreamFullException&)\
318 me->checkAndSendHeartbeat(false);\
320 catch (std::exception& ex_)\
324 me->_exceptionListener->exceptionThrown(ex_);\
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
338 while(me->_connected)\
345 catch(MessageStreamFullException& ex_)\
349 me->checkAndSendHeartbeat(false);\
351 catch (std::exception& ex_)\
355 me->_exceptionListener->exceptionThrown(ex_);\
366 catch (std::exception& ex_)\
370 me->_exceptionListener->exceptionThrown(ex_);\
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395 while(me->_connected)\
402 catch(MessageStreamFullException& ex_)\
406 me->checkAndSendHeartbeat(false);\
408 catch (std::exception& ex_)\
412 me->_exceptionListener->exceptionThrown(ex_);\
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
427 _exceptionListener->exceptionThrown(ex);\
434 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
437 me->_exceptionListener->exceptionThrown(ex);\
478 static const unsigned Subscribe = 1;
479 static const unsigned SOW = 2;
480 static const unsigned NeedsSequenceNumber = 4;
481 static const unsigned ProcessedAck = 8;
482 static const unsigned StatsAck = 16;
483 void init(Message::Command::Type command_)
492 void init(
const std::string& command_)
501 void init(
const char* command_,
size_t commandLen_)
513 if (!(command & Message::Command::NoDataCommands))
516 if (command == Message::Command::Subscribe ||
517 command == Message::Command::SOWAndSubscribe ||
518 command == Message::Command::DeltaSubscribe ||
519 command == Message::Command::SOWAndDeltaSubscribe)
524 if (command == Message::Command::SOW
525 || command == Message::Command::SOWAndSubscribe
526 || command == Message::Command::SOWAndDeltaSubscribe)
533 if (command == Message::Command::SOW)
538 _flags |= ProcessedAck;
540 else if (command == Message::Command::SOWDelete)
543 _flags |= ProcessedAck;
544 _flags |= NeedsSequenceNumber;
546 else if (command == Message::Command::Publish
547 || command == Message::Command::DeltaPublish)
549 _flags |= NeedsSequenceNumber;
551 else if (command == Message::Command::StopTimer)
568 Command(
const char* command_,
size_t commandLen_)
570 init(command_, commandLen_);
594 init(command_, commandLen_);
688 _message.
setTopic(topic_, topicLen_);
818 std::ostringstream os;
823 amps_uint64_t getSequence()
const
839 _message.
setData(data_, dataLen_);
869 _batchSize = batchSize_;
891 if (ackType_ ==
"processed")
893 _flags |= ProcessedAck;
895 else if (ackType_ ==
"stats")
905 if (ackType_.find(
"processed") != std::string::npos)
907 _flags |= ProcessedAck;
911 _flags &= ~ProcessedAck;
913 if (ackType_.find(
"stats") != std::string::npos)
927 if (ackType_ & Message::AckType::Processed)
929 _flags |= ProcessedAck;
933 _flags &= ~ProcessedAck;
935 if (ackType_ & Message::AckType::Stats)
960 unsigned getTimeout(
void)
const
964 unsigned getBatchSize(
void)
const
968 bool isSubscribe(
void)
const
970 return _flags & Subscribe;
972 bool isSow(
void)
const
974 return (_flags & SOW) != 0;
976 bool hasProcessedAck(
void)
const
978 return (_flags & ProcessedAck) != 0;
980 bool hasStatsAck(
void)
const
982 return (_flags & StatsAck) != 0;
984 bool needsSequenceNumber(
void)
const
986 return (_flags & NeedsSequenceNumber) != 0;
992 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
1009 virtual std::string
authenticate(
const std::string& userName_,
const std::string& password_) = 0;
1017 virtual std::string
retry(
const std::string& userName_,
const std::string& password_) = 0;
1024 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1036 std::string
authenticate(
const std::string& ,
const std::string& password_)
1043 std::string
retry(
const std::string& ,
const std::string& )
1045 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1048 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1084 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1099 : _resizeHandler(NULL)
1100 , _resizeHandlerData(NULL)
1101 , _errorOnPublishGap(errorOnPublishGap_)
1155 return AMPS_UNSET_INDEX;
1162 return AMPS_UNSET_SEQUENCE;
1187 _resizeHandler = handler_;
1188 _resizeHandlerData = userData_;
1193 return _resizeHandler;
1196 bool callResizeHandler(
size_t newSize_);
1198 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1200 _errorOnPublishGap = errorOnPublishGap_;
1203 inline virtual bool getErrorOnPublishGap()
const
1205 return _errorOnPublishGap;
1210 void* _resizeHandlerData;
1211 bool _errorOnPublishGap;
1218 RefHandle<StoreImpl> _body;
1222 Store(
const Store& rhs) : _body(rhs._body) {;}
1234 return _body.get().store(message_);
1245 _body.get().discardUpTo(index_);
1254 _body.get().replay(replayer_);
1266 return _body.get().replaySingle(replayer_, index_);
1275 return _body.get().unpersistedCount();
1283 return _body.isValid();
1296 return _body.get().flush(timeout_);
1304 return _body.get().getLowestUnpersisted();
1312 return _body.get().getLastPersisted();
1327 _body.get().setResizeHandler(handler_, userData_);
1332 return _body.get().getResizeHandler();
1341 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1350 return _body.get().getErrorOnPublishGap();
1358 if (_body.isValid())
1360 return &_body.get();
1385 const char* reason_,
size_t reasonLength_) = 0;
1389 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1393 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1407 long* timeoutp = (
long*)data_;
1415 store_.
flush(*timeoutp);
1418 catch (
const TimedOutException&)
1420 catch (
const TimedOutException& e)
1447 unsigned requestedAckTypes_,
1448 const AMPSException& exception_) = 0;
1466 unsigned requestedAckTypes_) = 0;
1484 _failedResubscribeHandler = handler_;
1487 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1498 typedef enum { Disconnected = 0,
1502 PublishReplayed = 8,
1503 HeartbeatInitiated = 16,
1522 class MessageStreamImpl;
1523 class MessageStream;
1525 typedef void(*DeferredExecutionFunc)(
void*);
1527 class ClientImpl :
public RefBody
1533 AMPS_SOCKET _socket;
1539 socklen_t _valueLen;
1543 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1545 _valuePtr = (
char*)&_noDelay;
1547 if (_socket != AMPS_INVALID_SOCKET)
1549 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1553 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1557 _socket = AMPS_INVALID_SOCKET;
1564 if (_socket != AMPS_INVALID_SOCKET)
1567 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1572 friend class Client;
1575 DisconnectHandler _disconnectHandler;
1576 enum GlobalCommandTypeHandlers :
size_t
1586 DuplicateMessage = 8,
1589 std::vector<MessageHandler> _globalCommandTypeHandlers;
1590 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1591 MessageRouter _routes;
1592 MessageRouter::RouteCache _routeCache;
1593 mutable Mutex _lock;
1594 std::string _name, _nameHash, _lastUri, _logonCorrelationData, _preflightMessage;
1595 std::vector<std::string> _httpPreflightHeaders;
1596 amps_uint64_t _nameHashValue;
1597 BookmarkStore _bookmarkStore;
1598 Store _publishStore;
1599 bool _isRetryOnDisconnect;
1600 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1601 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1602 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1604 volatile amps_uint64_t _lastSentHaSequenceNumber;
1606 AMPS_ATOMIC_TYPE_8 _logonInProgress;
1607 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1608 VersionInfo _serverVersion;
1609 Timer _heartbeatTimer;
1610 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1613 int _queueAckTimeout;
1614 bool _isAutoAckEnabled;
1615 unsigned _ackBatchSize;
1616 unsigned _queuedAckCount;
1617 unsigned _defaultMaxDepth;
1618 struct QueueBookmarks
1620 QueueBookmarks(
const std::string& topic_)
1627 amps_uint64_t _oldestTime;
1628 unsigned _bookmarkCount;
1630 typedef amps_uint64_t topic_hash;
1631 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1632 TopicHashMap _topicHashMap;
1634 class ClientStoreReplayer :
public StoreReplayer
1636 ClientImpl* _client;
1641 ClientStoreReplayer()
1642 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1645 ClientStoreReplayer(ClientImpl* client_)
1646 : _client(client_), _version(0), _res(
AMPS_E_OK)
1649 void setClient(ClientImpl* client_)
1654 void execute(Message& message_)
1658 throw CommandException(
"Can't replay without a client.");
1662 if (index > _client->_lastSentHaSequenceNumber)
1664 _client->_lastSentHaSequenceNumber = index;
1671 if (!message_.getCommand().empty() &&
1672 (!_client->_logonInProgress ||
1673 message_.getOptions().len() < 6))
1676 message_.getMessage(),
1680 throw DisconnectedException(
"AMPS Server disconnected during replay");
1686 ClientStoreReplayer _replayer;
1688 class FailedWriteStoreReplayer :
public StoreReplayer
1690 ClientImpl* _parent;
1691 const char* _reason;
1692 size_t _reasonLength;
1693 size_t _replayCount;
1695 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1698 _reasonLength(reasonLength_),
1701 void execute(Message& message_)
1703 if (_parent->_failedWriteHandler)
1706 _parent->_failedWriteHandler->failedWrite(message_,
1707 _reason, _reasonLength);
1710 size_t replayCount(
void)
const
1712 return _replayCount;
1716 struct AckResponseImpl :
public RefBody
1718 std::string username, password, reason, status, bookmark, options;
1719 amps_uint64_t sequenceNo;
1720 amps_uint64_t nameHashValue;
1721 VersionInfo serverVersion;
1722 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1723 std::atomic<bool> responded;
1724 std::atomic<bool> abandoned;
1726 volatile bool responded;
1727 volatile bool abandoned;
1729 unsigned connectionVersion;
1732 username(), password(), reason(), status(), bookmark(), options(),
1733 sequenceNo((amps_uint64_t)0),
1737 connectionVersion(UINT_MAX)
1744 RefHandle<AckResponseImpl> _body;
1746 AckResponse() : _body(NULL) {;}
1747 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1748 static AckResponse create()
1751 r._body =
new AckResponseImpl();
1755 const std::string& username()
1757 return _body.get().username;
1759 void setUsername(
const char* data_,
size_t len_)
1763 _body.get().username.assign(data_, len_);
1767 _body.get().username.clear();
1770 const std::string& password()
1772 return _body.get().password;
1774 void setPassword(
const char* data_,
size_t len_)
1778 _body.get().password.assign(data_, len_);
1782 _body.get().password.clear();
1785 const std::string& reason()
1787 return _body.get().reason;
1789 void setReason(
const char* data_,
size_t len_)
1793 _body.get().reason.assign(data_, len_);
1797 _body.get().reason.clear();
1800 const std::string& status()
1802 return _body.get().status;
1804 void setStatus(
const char* data_,
size_t len_)
1808 _body.get().status.assign(data_, len_);
1812 _body.get().status.clear();
1815 const std::string& bookmark()
1817 return _body.get().bookmark;
1819 void setBookmark(
const Field& bookmark_)
1821 if (!bookmark_.empty())
1823 _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1824 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1825 _body.get().sequenceNo);
1829 _body.get().bookmark.clear();
1830 _body.get().sequenceNo = (amps_uint64_t)0;
1831 _body.get().nameHashValue = (amps_uint64_t)0;
1834 amps_uint64_t sequenceNo()
const
1836 return _body.get().sequenceNo;
1838 amps_uint64_t nameHashValue()
const
1840 return _body.get().nameHashValue;
1842 void setSequenceNo(
const char* data_,
size_t len_)
1844 amps_uint64_t result = (amps_uint64_t)0;
1847 for (
size_t i = 0; i < len_; ++i)
1849 result *= (amps_uint64_t)10;
1850 result += (amps_uint64_t)(data_[i] -
'0');
1853 _body.get().sequenceNo = result;
1855 VersionInfo serverVersion()
const
1857 return _body.get().serverVersion;
1859 void setServerVersion(
const char* data_,
size_t len_)
1863 _body.get().serverVersion.setVersion(std::string(data_, len_));
1868 return _body.get().responded;
1872 _body.get().responded =
true;
1876 return _body.get().abandoned;
1880 if (_body.isValid())
1882 _body.get().abandoned =
true;
1886 void setConnectionVersion(
unsigned connectionVersion)
1888 _body.get().connectionVersion = connectionVersion;
1891 unsigned getConnectionVersion()
1893 return _body.get().connectionVersion;
1895 void setOptions(
const char* data_,
size_t len_)
1899 _body.get().options.assign(data_, len_);
1903 _body.get().options.clear();
1907 const std::string& options()
1909 return _body.get().options;
1912 AckResponse& operator=(
const AckResponse& rhs)
1920 typedef std::map<std::string, AckResponse> AckMap;
1923 DefaultExceptionListener _defaultExceptionListener;
1926 struct DeferredExecutionRequest
1928 DeferredExecutionRequest(DeferredExecutionFunc func_,
1931 _userData(userData_)
1934 DeferredExecutionFunc _func;
1937 const ExceptionListener* _exceptionListener;
1938 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1939 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1940 volatile bool _connected;
1941 std::string _username;
1942 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1943 ConnectionStateListeners _connectionStateListeners;
1944 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1945 Mutex _deferredExecutionLock;
1946 DeferredExecutionList _deferredExecutionList;
1947 unsigned _heartbeatInterval;
1948 unsigned _readTimeout;
1956 if (!_connected && newState_ > ConnectionStateListener::Connected)
1960 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1962 AMPS_CALL_EXCEPTION_WRAPPER(
1963 (*it)->connectionStateChanged(newState_));
1966 unsigned processedAck(Message& message);
1967 unsigned persistedAck(Message& meesage);
1968 void lastChance(Message& message);
1969 void checkAndSendHeartbeat(
bool force =
false);
1970 virtual ConnectionInfo getConnectionInfo()
const;
1972 ClientImplMessageHandler(
amps_handle message,
void* userData);
1974 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1976 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1978 ClientImplGetHttpPreflightMessage(
void* userData);
1980 void unsubscribeInternal(
const std::string&
id)
1987 Message::Field subId;
1988 subId.assign(
id.data(),
id.length());
1989 _routes.removeRoute(subId);
1991 if (_subscriptionManager)
1994 Unlock<Mutex> unlock(_lock);
1995 _subscriptionManager->unsubscribe(subId);
1998 _message.setCommandEnum(Message::Command::Unsubscribe);
1999 _message.newCommandId();
2000 _message.setSubscriptionId(
id);
2001 _sendWithoutRetry(_message);
2002 deferredExecution(&s_noOpFn, NULL);
2005 AckResponse syncAckProcessing(
long timeout_, Message& message_,
2006 bool isHASubscribe_)
2008 return syncAckProcessing(timeout_, message_,
2009 (amps_uint64_t)0, isHASubscribe_);
2012 AckResponse syncAckProcessing(
long timeout_, Message& message_,
2013 amps_uint64_t haSeq = (amps_uint64_t)0,
2014 bool isHASubscribe_ =
false)
2017 AckResponse ack = AckResponse::create();
2020 Lock<Mutex> guard(_ackMapLock);
2021 _ackMap[message_.getCommandId()] = ack;
2023 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
2024 if (ack.getConnectionVersion() == 0)
2027 throw DisconnectedException(
"Connection closed while waiting for response.");
2029 bool timedOut =
false;
2030 AMPS_START_TIMER(timeout_)
2031 while (!timedOut && !ack.responded() && !ack.abandoned())
2035 timedOut = !_lock.wait(timeout_);
2039 AMPS_RESET_TIMER(timedOut, timeout_);
2046 Unlock<Mutex> unlck(_lock);
2047 amps_invoke_waiting_function();
2050 if (ack.responded())
2052 if (ack.status() !=
"failure")
2054 if (message_.getCommand() ==
"logon")
2056 amps_uint64_t ackSequence = ack.sequenceNo();
2057 if (_lastSentHaSequenceNumber < ackSequence)
2059 _lastSentHaSequenceNumber = ackSequence;
2061 if (_publishStore.isValid())
2066 _publishStore.discardUpTo(ackSequence);
2067 if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2069 _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2072 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2073 _nameHashValue = ack.nameHashValue();
2074 _serverVersion = ack.serverVersion();
2075 if (_bookmarkStore.isValid())
2077 _bookmarkStore.setServerVersion(_serverVersion);
2082 const std::string& options = ack.options();
2083 size_t index = options.find_first_of(
"max_backlog=");
2084 if (index != std::string::npos)
2087 const char* c = options.c_str() + index + 12;
2088 while (*c && *c !=
',')
2090 data = (data * 10) + (
unsigned)(*c++ -48);
2092 if (_ackBatchSize > data)
2094 _ackBatchSize = data;
2100 const size_t NotEntitled = 12;
2101 std::string ackReason = ack.reason();
2102 if (ackReason.length() == 0)
2106 if (ackReason.length() == NotEntitled &&
2107 ackReason[0] ==
'n' &&
2108 message_.getUserId().len() == 0)
2110 message_.assignUserId(_username);
2112 message_.throwFor(_client, ackReason);
2116 if (!ack.abandoned())
2118 throw TimedOutException(
"timed out waiting for operation.");
2122 throw DisconnectedException(
"Connection closed while waiting for response.");
2136 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2137 _pEmptyMessageStream.reset(NULL);
2144 ClientImpl(
const std::string& clientName)
2145 : _client(NULL), _name(clientName)
2146 , _isRetryOnDisconnect(true)
2147 , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2148 , _badTimeToHASubscribe(0), _serverVersion()
2149 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2150 , _isAutoAckEnabled(false)
2152 , _queuedAckCount(0)
2153 , _defaultMaxDepth(0)
2155 , _heartbeatInterval(0)
2158 _replayer.setClient(
this);
2161 (amps_handler)ClientImpl::ClientImplMessageHandler,
2164 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2167 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2170 ClientImpl::ClientImplGetHttpPreflightMessage,
2172 _exceptionListener = &_defaultExceptionListener;
2173 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2175 #ifdef AMPS_USE_EMPLACE
2176 _globalCommandTypeHandlers.emplace_back(MessageHandler());
2178 _globalCommandTypeHandlers.push_back(MessageHandler());
2183 virtual ~ClientImpl()
2188 const std::string& getName()
const
2193 const std::string& getNameHash()
const
2198 const amps_uint64_t getNameHashValue()
const
2200 return _nameHashValue;
2203 void setName(
const std::string& name)
2210 AMPSException::throwFor(_client, result);
2215 const std::string& getLogonCorrelationData()
const
2217 return _logonCorrelationData;
2220 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2222 _logonCorrelationData = logonCorrelationData_;
2225 size_t getServerVersion()
const
2227 return _serverVersion.getOldStyleVersion();
2230 VersionInfo getServerVersionInfo()
const
2232 return _serverVersion;
2235 const std::string& getURI()
const
2240 virtual void connect(
const std::string& uri)
2242 Lock<Mutex> l(_lock);
2246 virtual void _connect(
const std::string& uri)
2252 AMPSException::throwFor(_client, result);
2255 _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2256 _publishMessage.setCommandEnum(Message::Command::Publish);
2257 _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2258 _beatMessage.setOptions(
"beat");
2259 _readMessage.setClientImpl(
this);
2260 if (_queueAckTimeout)
2265 AMPSException::throwFor(_client, result);
2269 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2272 void addHttpPreflightHeader(
const std::string& header_)
2274 _httpPreflightHeaders.push_back(header_);
2277 void addHttpPreflightHeader(
const std::string& key_,
const std::string& value_)
2279 _httpPreflightHeaders.push_back(key_ + std::string(
": ") + value_);
2282 void clearHttpPreflightHeaders()
2284 _httpPreflightHeaders.clear();
2288 void setHttpPreflightHeaders(
const T& headers_)
2290 _httpPreflightHeaders.clear();
2291 for (
typename T::const_iterator i = headers_.begin(); i != headers_.end(); ++i)
2293 _httpPreflightHeaders.push_back(*i);
2297 void setDisconnected()
2300 Lock<Mutex> l(_lock);
2303 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2306 _heartbeatTimer.setTimeout(0.0);
2309 clearAcks(UINT_MAX-1);
2315 virtual void disconnect()
2317 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2320 clearAcks(UINT_MAX);
2321 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2322 Lock<Mutex> l(_lock);
2323 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2326 void clearAcks(
unsigned failedVersion)
2329 Lock<Mutex> guard(_ackMapLock);
2332 std::vector<std::string> worklist;
2333 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2335 if (i->second.getConnectionVersion() <= failedVersion)
2337 i->second.setAbandoned();
2338 worklist.push_back(i->first);
2342 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2351 int send(
const Message& message)
2353 Lock<Mutex> l(_lock);
2354 return _send(message);
2357 void sendWithoutRetry(
const Message& message_)
2359 Lock<Mutex> l(_lock);
2362 if (_logonInProgress)
2364 throw DisconnectedException(
"The client has been disconnected.");
2366 _sendWithoutRetry(message_);
2369 void _sendWithoutRetry(
const Message& message_)
2374 AMPSException::throwFor(_client, result);
2378 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2379 bool isHASubscribe_ =
false)
2386 Message localMessage = message;
2387 unsigned version = 0;
2391 if (haSeq && _logonInProgress)
2395 if (!_isRetryOnDisconnect)
2399 if (!_lock.wait(1000))
2401 amps_invoke_waiting_function();
2406 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2407 (isHASubscribe_ && _badTimeToHASubscribe))
2409 return (
int)version;
2413 if (haSeq > _lastSentHaSequenceNumber)
2415 while (haSeq > _lastSentHaSequenceNumber + 1)
2420 if (!_publishStore.replaySingle(_replayer,
2421 _lastSentHaSequenceNumber + 1))
2427 version = _replayer._version;
2430 catch (
const DisconnectedException&)
2432 catch (
const DisconnectedException& e)
2435 result = _replayer._res;
2440 localMessage.getMessage(),
2442 ++_lastSentHaSequenceNumber;
2446 if (_logonInProgress && localMessage.getCommand().data()[0] !=
'l')
2448 while (_logonInProgress)
2450 if (!_lock.wait(1000))
2452 amps_invoke_waiting_function();
2457 localMessage.getMessage(),
2462 if (!isHASubscribe_ && !haSeq &&
2463 localMessage.getMessage() == message.getMessage())
2465 localMessage = message.deepCopy();
2467 if (_isRetryOnDisconnect)
2469 Unlock<Mutex> u(_lock);
2474 if ((isHASubscribe_ || haSeq) &&
2477 return (
int)version;
2484 AMPSException::throwFor(_client, result);
2490 amps_invoke_waiting_function();
2496 AMPSException::throwFor(_client, result);
2498 return (
int)version;
2501 void addMessageHandler(
const Field& commandId_,
2503 unsigned requestedAcks_, Message::Command::Type commandType_)
2505 Lock<Mutex> lock(_lock);
2506 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2510 bool removeMessageHandler(
const Field& commandId_)
2512 Lock<Mutex> lock(_lock);
2513 return _routes.removeRoute(commandId_);
2516 std::string send(
const MessageHandler& messageHandler_, Message& message_,
int timeout_ = 0)
2518 Field
id = message_.getCommandId();
2519 Field subId = message_.getSubscriptionId();
2520 Field qid = message_.getQueryId();
2521 bool isSubscribeOnly =
false;
2522 bool replace =
false;
2523 unsigned requestedAcks = message_.getAckTypeEnum();
2524 unsigned systemAddedAcks = Message::AckType::None;
2525 Message::Command::Type commandType = message_.getCommandEnum();
2527 switch (commandType)
2529 case Message::Command::Subscribe:
2530 case Message::Command::DeltaSubscribe:
2531 replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2532 isSubscribeOnly =
true;
2534 case Message::Command::SOWAndSubscribe:
2535 case Message::Command::SOWAndDeltaSubscribe:
2538 id = message_.newCommandId().getCommandId();
2542 while (!replace &&
id != subId && _routes.hasRoute(
id))
2544 id = message_.newCommandId().getCommandId();
2549 message_.setSubscriptionId(
id);
2552 if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2554 systemAddedAcks |= Message::AckType::Persisted;
2557 case Message::Command::SOW:
2560 id = message_.newCommandId().getCommandId();
2564 while (!replace &&
id != subId && _routes.hasRoute(
id))
2566 message_.newCommandId();
2569 qid = message_.getCommandId();
2570 message_.setQueryId(qid);
2572 id = message_.getCommandId();
2575 if (!isSubscribeOnly)
2579 message_.setQueryID(
id);
2584 while (!replace && qid != subId && qid !=
id
2585 && _routes.hasRoute(qid))
2587 qid = message_.newQueryId().getQueryId();
2591 systemAddedAcks |= Message::AckType::Processed;
2592 message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2594 int routesAdded = 0;
2595 Lock<Mutex> l(_lock);
2596 if (!subId.empty() && messageHandler_.isValid())
2598 if (!_routes.hasRoute(subId))
2604 _routes.addRoute(subId, messageHandler_, requestedAcks,
2605 systemAddedAcks, commandType);
2607 if (!isSubscribeOnly && !qid.empty()
2608 && messageHandler_.isValid() && qid != subId)
2610 if (routesAdded == 0)
2612 _routes.addRoute(qid, messageHandler_,
2613 requestedAcks, systemAddedAcks, commandType);
2619 Unlock<Mutex> u(_lock);
2620 data = amps_invoke_copy_route_function(
2621 messageHandler_.userData());
2625 _routes.addRoute(qid, messageHandler_, requestedAcks,
2626 systemAddedAcks, commandType);
2630 _routes.addRoute(qid,
2631 MessageHandler(messageHandler_.function(),
2633 requestedAcks, systemAddedAcks, commandType);
2638 if (!
id.empty() && messageHandler_.isValid()
2639 && requestedAcks & ~Message::AckType::Persisted
2640 &&
id != subId &&
id != qid)
2642 if (routesAdded == 0)
2644 _routes.addRoute(
id, messageHandler_, requestedAcks,
2645 systemAddedAcks, commandType);
2651 Unlock<Mutex> u(_lock);
2652 data = amps_invoke_copy_route_function(
2653 messageHandler_.userData());
2657 _routes.addRoute(
id, messageHandler_, requestedAcks,
2658 systemAddedAcks, commandType);
2662 _routes.addRoute(
id,
2663 MessageHandler(messageHandler_.function(),
2666 systemAddedAcks, commandType);
2675 syncAckProcessing(timeout_, message_, 0,
false);
2676 message_.setAckTypeEnum(requestedAcks);
2680 _routes.removeRoute(message_.getQueryID());
2681 _routes.removeRoute(message_.getSubscriptionId());
2682 _routes.removeRoute(
id);
2683 message_.setAckTypeEnum(requestedAcks);
2689 case Message::Command::Unsubscribe:
2690 case Message::Command::Heartbeat:
2691 case Message::Command::Logon:
2692 case Message::Command::StartTimer:
2693 case Message::Command::StopTimer:
2694 case Message::Command::SOWDelete:
2696 Lock<Mutex> l(_lock);
2698 if (message_.getAckTypeEnum() != Message::AckType::None)
2702 message_.newCommandId();
2703 id = message_.getCommandId();
2705 if (messageHandler_.isValid())
2707 _routes.addRoute(
id, messageHandler_, requestedAcks,
2708 Message::AckType::None, commandType);
2714 case Message::Command::DeltaPublish:
2715 case Message::Command::Publish:
2717 bool useSync = message_.getFilter().len() > 0;
2718 Lock<Mutex> l(_lock);
2720 unsigned ackType = message_.getAckTypeEnum();
2721 if (ackType != Message::AckType::None
2726 message_.newCommandId();
2727 id = message_.getCommandId();
2729 if (messageHandler_.isValid())
2731 _routes.addRoute(
id, messageHandler_, requestedAcks,
2732 Message::AckType::None, commandType);
2737 message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2738 syncAckProcessing(timeout_, message_, 0,
false);
2747 case Message::Command::GroupBegin:
2748 case Message::Command::GroupEnd:
2749 case Message::Command::OOF:
2750 case Message::Command::Ack:
2751 case Message::Command::Unknown:
2753 throw CommandException(
"Command type " + message_.getCommand() +
" can not be sent directly to AMPS");
2755 message_.setAckTypeEnum(requestedAcks);
2759 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2761 Lock<Mutex> l(_lock);
2762 _disconnectHandler = disconnectHandler;
2765 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2767 switch (command_[0])
2771 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2774 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2778 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2782 if (command_[6] ==
'b')
2784 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2786 else if (command_[6] ==
'e')
2788 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2792 std::ostringstream os;
2793 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2794 throw CommandException(os.str());
2798 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2802 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2806 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2810 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2813 std::ostringstream os;
2814 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2815 throw CommandException(os.str());
2820 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2825 case Message::Command::Publish:
2826 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2828 case Message::Command::SOW:
2829 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2832 case Message::Command::Heartbeat:
2833 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2836 case Message::Command::GroupBegin:
2837 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2839 case Message::Command::GroupEnd:
2840 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2842 case Message::Command::OOF:
2843 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2846 case Message::Command::Ack:
2847 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2851 unsigned command = command_;
2858 AMPS_snprintf(errBuf,
sizeof(errBuf),
2859 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2860 CommandConstants<0>::Lengths[bits],
2861 CommandConstants<0>::Values[bits]);
2862 throw CommandException(errBuf);
2867 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2869 _globalCommandTypeHandlers[handlerType_] = handler_;
2872 void setFailedWriteHandler(FailedWriteHandler* handler_)
2874 Lock<Mutex> l(_lock);
2875 _failedWriteHandler.reset(handler_);
2878 void setPublishStore(
const Store& publishStore_)
2880 Lock<Mutex> l(_lock);
2883 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2885 _publishStore = publishStore_;
2888 void setBookmarkStore(
const BookmarkStore& bookmarkStore_)
2890 Lock<Mutex> l(_lock);
2893 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2895 _bookmarkStore = bookmarkStore_;
2898 void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2900 Lock<Mutex> l(_lock);
2901 _subscriptionManager.reset(subscriptionManager_);
2904 SubscriptionManager* getSubscriptionManager()
const
2906 return const_cast<SubscriptionManager*
>(_subscriptionManager.get());
2909 DisconnectHandler getDisconnectHandler()
const
2911 return _disconnectHandler;
2914 MessageHandler getDuplicateMessageHandler()
const
2916 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2919 FailedWriteHandler* getFailedWriteHandler()
const
2921 return const_cast<FailedWriteHandler*
>(_failedWriteHandler.get());
2924 Store getPublishStore()
const
2926 return _publishStore;
2929 BookmarkStore getBookmarkStore()
const
2931 return _bookmarkStore;
2934 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2936 if (!_publishStore.isValid())
2938 Lock<Mutex> l(_lock);
2939 _publishMessage.assignTopic(topic_, topicLen_);
2940 _publishMessage.assignData(data_, dataLen_);
2941 _send(_publishMessage);
2946 publishStoreMessage.reset();
2948 return _publish(topic_, topicLen_, data_, dataLen_);
2952 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2953 size_t dataLen_,
unsigned long expiration_)
2955 if (!_publishStore.isValid())
2957 Lock<Mutex> l(_lock);
2958 _publishMessage.assignTopic(topic_, topicLen_);
2959 _publishMessage.assignData(data_, dataLen_);
2960 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2961 size_t pos = convertToCharArray(exprBuf, expiration_);
2962 _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2963 _send(_publishMessage);
2964 _publishMessage.assignExpiration(NULL, 0);
2969 publishStoreMessage.reset();
2970 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2971 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2974 AMPS_NUMBER_BUFFER_LEN - exprPos);
2975 return _publish(topic_, topicLen_, data_, dataLen_);
2979 class FlushAckHandler : ConnectionStateListener
2982 ClientImpl* _pClient;
2984 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2985 std::atomic<bool> _acked;
2986 std::atomic<bool> _disconnected;
2988 volatile bool _acked;
2989 volatile bool _disconnected;
2992 FlushAckHandler(ClientImpl* pClient_)
2993 : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2995 pClient_->addConnectionStateListener(
this);
2999 _pClient->removeConnectionStateListener(
this);
3000 _pClient->removeMessageHandler(_cmdId);
3003 void setCommandId(
const Field& cmdId_)
3005 _cmdId.deepCopy(cmdId_);
3007 void invoke(
const Message&)
3011 void connectionStateChanged(State state_)
3013 if (state_ <= Shutdown)
3015 _disconnected =
true;
3024 return _acked || _disconnected;
3028 void publishFlush(
long timeout_,
unsigned ackType_)
3030 static const char* processed =
"processed";
3031 static const size_t processedLen = strlen(processed);
3032 static const char* persisted =
"persisted";
3033 static const size_t persistedLen = strlen(persisted);
3034 static const char* flush =
"flush";
3035 static const size_t flushLen = strlen(flush);
3036 static VersionInfo minPersisted(
"5.3.3.0");
3037 static VersionInfo minFlush(
"4");
3038 if (ackType_ != Message::AckType::Processed
3039 && ackType_ != Message::AckType::Persisted)
3041 throw CommandException(
"Flush can only be used with processed or persisted acks.");
3043 FlushAckHandler flushHandler(
this);
3044 if (_serverVersion >= minFlush)
3046 Lock<Mutex> l(_lock);
3049 throw DisconnectedException(
"Not connected trying to flush");
3052 _message.newCommandId();
3053 _message.assignCommand(flush, flushLen);
3054 if (_serverVersion < minPersisted
3055 || ackType_ == Message::AckType::Processed)
3057 _message.assignAckType(processed, processedLen);
3061 _message.assignAckType(persisted, persistedLen);
3063 flushHandler.setCommandId(_message.getCommandId());
3064 addMessageHandler(_message.getCommandId(),
3065 std::bind(&FlushAckHandler::invoke,
3066 std::ref(flushHandler),
3067 std::placeholders::_1),
3068 ackType_, _message.getCommandEnum());
3069 NoDelay noDelay(_client);
3070 if (_send(_message) == -1)
3072 throw DisconnectedException(
"Disconnected trying to flush");
3075 if (_publishStore.isValid())
3079 _publishStore.flush(timeout_);
3081 catch (
const AMPSException& ex)
3083 AMPS_UNHANDLED_EXCEPTION(ex);
3087 else if (_serverVersion < minFlush)
3091 AMPS_USLEEP(timeout_ * 1000);
3095 AMPS_USLEEP(1000 * 1000);
3101 Timer timer((
double)timeout_);
3103 while (!timer.check() && !flushHandler.done())
3106 amps_invoke_waiting_function();
3111 while (!flushHandler.done())
3114 amps_invoke_waiting_function();
3118 if (!flushHandler.done())
3120 throw TimedOutException(
"Timed out waiting for flush");
3123 if (!flushHandler.acked() && !_publishStore.isValid())
3125 throw DisconnectedException(
"Disconnected waiting for flush");
3129 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3130 const char* data_,
size_t dataLength_)
3132 if (!_publishStore.isValid())
3134 Lock<Mutex> l(_lock);
3135 _deltaMessage.assignTopic(topic_, topicLength_);
3136 _deltaMessage.assignData(data_, dataLength_);
3137 _send(_deltaMessage);
3142 publishStoreMessage.reset();
3143 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish);
3144 return _publish(topic_, topicLength_, data_, dataLength_);
3148 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3149 const char* data_,
size_t dataLength_,
3150 unsigned long expiration_)
3152 if (!_publishStore.isValid())
3154 Lock<Mutex> l(_lock);
3155 _deltaMessage.assignTopic(topic_, topicLength_);
3156 _deltaMessage.assignData(data_, dataLength_);
3157 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3158 size_t pos = convertToCharArray(exprBuf, expiration_);
3159 _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3160 _send(_deltaMessage);
3161 _deltaMessage.assignExpiration(NULL, 0);
3166 publishStoreMessage.reset();
3167 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3168 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3169 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish)
3171 AMPS_NUMBER_BUFFER_LEN - exprPos);
3172 return _publish(topic_, topicLength_, data_, dataLength_);
3176 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3177 const char* data_,
size_t dataLength_)
3179 publishStoreMessage.
assignTopic(topic_, topicLength_)
3181 .assignData(data_, dataLength_);
3182 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3183 char buf[AMPS_NUMBER_BUFFER_LEN];
3184 size_t pos = convertToCharArray(buf, haSequenceNumber);
3185 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3187 Lock<Mutex> l(_lock);
3188 _send(publishStoreMessage, haSequenceNumber);
3190 return haSequenceNumber;
3193 virtual std::string logon(
long timeout_, Authenticator& authenticator_,
3194 const char* options_ = NULL)
3196 Lock<Mutex> l(_lock);
3197 return _logon(timeout_, authenticator_, options_);
3200 virtual std::string _logon(
long timeout_, Authenticator& authenticator_,
3201 const char* options_ = NULL)
3204 _message.newCommandId();
3205 std::string newCommandId = _message.getCommandId();
3206 _message.setCommandEnum(Message::Command::Logon);
3207 _message.setClientName(_name);
3208 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3209 _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3210 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3213 if (uri.user().size())
3215 _message.setUserId(uri.user());
3217 if (uri.password().size())
3219 _message.setPassword(uri.password());
3221 if (uri.protocol() ==
"amps" && uri.messageType().size())
3223 _message.setMessageType(uri.messageType());
3225 if (uri.isTrue(
"pretty"))
3227 _message.setOptions(
"pretty");
3230 _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3231 if (!_logonCorrelationData.empty())
3233 _message.assignCorrelationId(_logonCorrelationData);
3237 _message.setOptions(options_);
3239 _username = _message.getUserId();
3242 AtomicFlagFlip pubFlip(&_logonInProgress);
3243 NoDelay noDelay(_client);
3246 _message.setAckTypeEnum(Message::AckType::Processed);
3247 AckResponse ack = syncAckProcessing(timeout_, _message);
3248 if (ack.status() ==
"retry")
3250 _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3251 _username = ack.username();
3252 _message.setUserId(_username);
3256 authenticator_.completed(ack.username(), ack.password(), ack.reason());
3260 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3267 catch (
const AMPSException& ex)
3270 AMPS_UNHANDLED_EXCEPTION(ex);
3279 if (_publishStore.isValid())
3283 _publishStore.replay(_replayer);
3284 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3286 catch (
const PublishStoreGapException& ex)
3289 AMPS_UNHANDLED_EXCEPTION(ex);
3292 catch (
const StoreException& ex)
3295 std::ostringstream os;
3296 os <<
"A local store exception occurred while logging on."
3298 throw ConnectionException(os.str());
3300 catch (
const AMPSException& ex)
3303 AMPS_UNHANDLED_EXCEPTION(ex);
3306 catch (
const std::exception& ex)
3309 AMPS_UNHANDLED_EXCEPTION(ex);
3319 return newCommandId;
3322 std::string subscribe(
const MessageHandler& messageHandler_,
3323 const std::string& topic_,
3325 const std::string& filter_,
3326 const std::string& bookmark_,
3327 const std::string& options_,
3328 const std::string& subId_,
3329 bool isHASubscribe_ =
true)
3331 isHASubscribe_ &= (bool)_subscriptionManager;
3332 Lock<Mutex> l(_lock);
3334 _message.setCommandEnum(Message::Command::Subscribe);
3335 _message.newCommandId();
3336 std::string subId(subId_);
3339 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3341 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3344 subId = _message.getCommandId();
3346 _message.setSubscriptionId(subId);
3351 unsigned ackTypes = Message::AckType::Processed;
3353 if (!bookmark_.empty() && _bookmarkStore.isValid())
3355 ackTypes |= Message::AckType::Persisted;
3357 _message.setTopic(topic_);
3359 if (filter_.length())
3361 _message.setFilter(filter_);
3363 if (bookmark_.length())
3367 Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3368 _message.setBookmark(mostRecent);
3372 _message.setBookmark(bookmark_);
3373 if (_bookmarkStore.isValid())
3378 _bookmarkStore.log(_message);
3379 _bookmarkStore.discard(_message);
3380 _bookmarkStore.persisted(subIdField, _message.getBookmark());
3385 if (options_.length())
3387 _message.setOptions(options_);
3390 Message message = _message;
3393 message = _message.deepCopy();
3394 Unlock<Mutex> u(_lock);
3395 _subscriptionManager->subscribe(messageHandler_, message,
3396 Message::AckType::None);
3397 if (_badTimeToHASubscribe)
3402 if (!_routes.hasRoute(_message.getSubscriptionId()))
3404 _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3405 Message::AckType::None, ackTypes, _message.getCommandEnum());
3407 message.setAckTypeEnum(ackTypes);
3408 if (!options_.empty())
3410 message.setOptions(options_);
3414 syncAckProcessing(timeout_, message, isHASubscribe_);
3416 catch (
const DisconnectedException&)
3418 if (!isHASubscribe_)
3420 _routes.removeRoute(subIdField);
3425 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3429 catch (
const TimedOutException&)
3431 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3439 Unlock<Mutex> unlock(_lock);
3440 _subscriptionManager->unsubscribe(subIdField);
3442 _routes.removeRoute(subIdField);
3448 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3449 const std::string& topic_,
3451 const std::string& filter_,
3452 const std::string& bookmark_,
3453 const std::string& options_,
3454 const std::string& subId_ =
"",
3455 bool isHASubscribe_ =
true)
3457 isHASubscribe_ &= (bool)_subscriptionManager;
3458 Lock<Mutex> l(_lock);
3460 _message.setCommandEnum(Message::Command::DeltaSubscribe);
3461 _message.newCommandId();
3462 std::string subId(subId_);
3465 subId = _message.getCommandId();
3467 _message.setSubscriptionId(subId);
3472 unsigned ackTypes = Message::AckType::Processed;
3474 if (!bookmark_.empty() && _bookmarkStore.isValid())
3476 ackTypes |= Message::AckType::Persisted;
3478 _message.setTopic(topic_);
3479 if (filter_.length())
3481 _message.setFilter(filter_);
3483 if (bookmark_.length())
3487 Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3488 _message.setBookmark(mostRecent);
3492 _message.setBookmark(bookmark_);
3493 if (_bookmarkStore.isValid())
3498 _bookmarkStore.log(_message);
3499 _bookmarkStore.discard(_message);
3500 _bookmarkStore.persisted(subIdField, _message.getBookmark());
3505 if (options_.length())
3507 _message.setOptions(options_);
3509 Message message = _message;
3512 message = _message.deepCopy();
3513 Unlock<Mutex> u(_lock);
3514 _subscriptionManager->subscribe(messageHandler_, message,
3515 Message::AckType::None);
3516 if (_badTimeToHASubscribe)
3521 if (!_routes.hasRoute(_message.getSubscriptionId()))
3523 _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3524 Message::AckType::None, ackTypes, _message.getCommandEnum());
3526 message.setAckTypeEnum(ackTypes);
3527 if (!options_.empty())
3529 message.setOptions(options_);
3533 syncAckProcessing(timeout_, message, isHASubscribe_);
3535 catch (
const DisconnectedException&)
3537 if (!isHASubscribe_)
3539 _routes.removeRoute(subIdField);
3543 catch (
const TimedOutException&)
3545 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3553 Unlock<Mutex> unlock(_lock);
3554 _subscriptionManager->unsubscribe(subIdField);
3556 _routes.removeRoute(subIdField);
3562 void unsubscribe(
const std::string&
id)
3564 Lock<Mutex> l(_lock);
3565 unsubscribeInternal(
id);
3568 void unsubscribe(
void)
3570 if (_subscriptionManager)
3572 _subscriptionManager->clear();
3575 _routes.unsubscribeAll();
3576 Lock<Mutex> l(_lock);
3578 _message.setCommandEnum(Message::Command::Unsubscribe);
3579 _message.newCommandId();
3580 _message.setSubscriptionId(
"all");
3581 _sendWithoutRetry(_message);
3583 deferredExecution(&s_noOpFn, NULL);
3586 std::string sow(
const MessageHandler& messageHandler_,
3587 const std::string& topic_,
3588 const std::string& filter_ =
"",
3589 const std::string& orderBy_ =
"",
3590 const std::string& bookmark_ =
"",
3591 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3592 int topN_ = AMPS_DEFAULT_TOP_N,
3593 const std::string& options_ =
"",
3594 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3596 Lock<Mutex> l(_lock);
3598 _message.setCommandEnum(Message::Command::SOW);
3599 _message.newCommandId();
3601 std::string commandId = _message.getCommandId();
3602 _message.setQueryID(_message.getCommandId());
3603 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3604 _message.setAckTypeEnum(ackTypes);
3605 _message.setTopic(topic_);
3606 if (filter_.length())
3608 _message.setFilter(filter_);
3610 if (orderBy_.length())
3612 _message.setOrderBy(orderBy_);
3614 if (bookmark_.length())
3616 _message.setBookmark(bookmark_);
3618 _message.setBatchSize(AMPS::asString(batchSize_));
3619 if (topN_ != AMPS_DEFAULT_TOP_N)
3621 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3623 if (options_.length())
3625 _message.setOptions(options_);
3628 _routes.addRoute(_message.getQueryID(), messageHandler_,
3629 Message::AckType::None, ackTypes, _message.getCommandEnum());
3633 syncAckProcessing(timeout_, _message);
3637 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3644 std::string sow(
const MessageHandler& messageHandler_,
3645 const std::string& topic_,
3647 const std::string& filter_ =
"",
3648 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3649 int topN_ = AMPS_DEFAULT_TOP_N)
3652 return sow(messageHandler_,
3663 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3664 const std::string& topic_,
3665 const std::string& filter_ =
"",
3666 const std::string& orderBy_ =
"",
3667 const std::string& bookmark_ =
"",
3668 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3669 int topN_ = AMPS_DEFAULT_TOP_N,
3670 const std::string& options_ =
"",
3671 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3672 bool isHASubscribe_ =
true)
3674 isHASubscribe_ &= (bool)_subscriptionManager;
3675 unsigned ackTypes = Message::AckType::Processed;
3676 Lock<Mutex> l(_lock);
3678 _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3679 _message.newCommandId();
3680 Field cid = _message.getCommandId();
3681 std::string subId = cid;
3682 _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3683 if (filter_.length())
3685 _message.setFilter(filter_);
3687 if (orderBy_.length())
3689 _message.setOrderBy(orderBy_);
3691 if (bookmark_.length())
3693 _message.setBookmark(bookmark_);
3694 Message::Field bookmark = _message.getBookmark();
3695 if (_bookmarkStore.isValid())
3697 ackTypes |= Message::AckType::Persisted;
3700 _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3705 _bookmarkStore.log(_message);
3706 if (!BookmarkRange::isRange(bookmark))
3708 _bookmarkStore.discard(_message);
3709 _bookmarkStore.persisted(_message.getSubscriptionId(),
3719 _message.setBatchSize(AMPS::asString(batchSize_));
3720 if (topN_ != AMPS_DEFAULT_TOP_N)
3722 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3724 if (options_.length())
3726 _message.setOptions(options_);
3729 Message message = _message;
3732 message = _message.deepCopy();
3733 Unlock<Mutex> u(_lock);
3734 _subscriptionManager->subscribe(messageHandler_, message,
3735 Message::AckType::None);
3736 if (_badTimeToHASubscribe)
3741 _routes.addRoute(cid, messageHandler_,
3742 Message::AckType::None, ackTypes, _message.getCommandEnum());
3743 message.setAckTypeEnum(ackTypes);
3744 if (!options_.empty())
3746 message.setOptions(options_);
3750 syncAckProcessing(timeout_, message, isHASubscribe_);
3752 catch (
const DisconnectedException&)
3754 if (!isHASubscribe_)
3756 _routes.removeRoute(subId);
3760 catch (
const TimedOutException&)
3762 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3770 Unlock<Mutex> unlock(_lock);
3771 _subscriptionManager->unsubscribe(cid);
3773 _routes.removeRoute(subId);
3779 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3780 const std::string& topic_,
3782 const std::string& filter_ =
"",
3783 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3784 bool oofEnabled_ =
false,
3785 int topN_ = AMPS_DEFAULT_TOP_N,
3786 bool isHASubscribe_ =
true)
3789 return sowAndSubscribe(messageHandler_,
3796 (oofEnabled_ ?
"oof" :
""),
3801 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3802 const std::string& topic_,
3803 const std::string& filter_ =
"",
3804 const std::string& orderBy_ =
"",
3805 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3806 int topN_ = AMPS_DEFAULT_TOP_N,
3807 const std::string& options_ =
"",
3808 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3809 bool isHASubscribe_ =
true)
3811 isHASubscribe_ &= (bool)_subscriptionManager;
3812 Lock<Mutex> l(_lock);
3814 _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3815 _message.newCommandId();
3816 _message.setQueryID(_message.getCommandId());
3817 _message.setSubscriptionId(_message.getCommandId());
3818 std::string subId = _message.getSubscriptionId();
3819 _message.setTopic(topic_);
3820 if (filter_.length())
3822 _message.setFilter(filter_);
3824 if (orderBy_.length())
3826 _message.setOrderBy(orderBy_);
3828 _message.setBatchSize(AMPS::asString(batchSize_));
3829 if (topN_ != AMPS_DEFAULT_TOP_N)
3831 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3833 if (options_.length())
3835 _message.setOptions(options_);
3837 Message message = _message;
3840 message = _message.deepCopy();
3841 Unlock<Mutex> u(_lock);
3842 _subscriptionManager->subscribe(messageHandler_, message,
3843 Message::AckType::None);
3844 if (_badTimeToHASubscribe)
3849 _routes.addRoute(message.getQueryID(), messageHandler_,
3850 Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3851 message.setAckTypeEnum(Message::AckType::Processed);
3852 if (!options_.empty())
3854 message.setOptions(options_);
3858 syncAckProcessing(timeout_, message, isHASubscribe_);
3860 catch (
const DisconnectedException&)
3862 if (!isHASubscribe_)
3864 _routes.removeRoute(subId);
3868 catch (
const TimedOutException&)
3870 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3878 Unlock<Mutex> unlock(_lock);
3879 _subscriptionManager->unsubscribe(Field(subId));
3881 _routes.removeRoute(subId);
3887 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3888 const std::string& topic_,
3890 const std::string& filter_ =
"",
3891 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3892 bool oofEnabled_ =
false,
3893 bool sendEmpties_ =
false,
3894 int topN_ = AMPS_DEFAULT_TOP_N,
3895 bool isHASubscribe_ =
true)
3898 Message::Options options;
3903 if (sendEmpties_ ==
false)
3905 options.setNoEmpties();
3907 return sowAndDeltaSubscribe(messageHandler_,
3918 std::string sowDelete(
const MessageHandler& messageHandler_,
3919 const std::string& topic_,
3920 const std::string& filter_,
3922 Message::Field commandId_ = Message::Field())
3924 if (_publishStore.isValid())
3926 unsigned ackType = Message::AckType::Processed |
3927 Message::AckType::Stats |
3928 Message::AckType::Persisted;
3929 publishStoreMessage.reset();
3930 if (commandId_.empty())
3937 publishStoreMessage.
setCommandId(commandId_.data(), commandId_.len());
3945 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3946 char buf[AMPS_NUMBER_BUFFER_LEN];
3947 size_t pos = convertToCharArray(buf, haSequenceNumber);
3948 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3952 Lock<Mutex> l(_lock);
3953 _routes.addRoute(commandId_, messageHandler_,
3954 Message::AckType::Stats,
3955 Message::AckType::Processed | Message::AckType::Persisted,
3957 syncAckProcessing(timeout_, publishStoreMessage,
3960 catch (
const DisconnectedException&)
3967 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3971 return (std::string)commandId_;
3975 Lock<Mutex> l(_lock);
3977 if (commandId_.empty())
3979 _message.newCommandId();
3980 commandId_ = _message.getCommandId();
3984 _message.setCommandId(commandId_.data(), commandId_.len());
3986 _message.setCommandEnum(Message::Command::SOWDelete)
3987 .assignSubscriptionId(commandId_.data(), commandId_.len())
3988 .assignQueryID(commandId_.data(), commandId_.len())
3989 .setAckTypeEnum(Message::AckType::Processed |
3990 Message::AckType::Stats)
3991 .assignTopic(topic_.c_str(), topic_.length())
3992 .assignFilter(filter_.c_str(), filter_.length());
3993 _routes.addRoute(commandId_, messageHandler_,
3994 Message::AckType::Stats,
3995 Message::AckType::Processed,
3996 _message.getCommandEnum());
3999 syncAckProcessing(timeout_, _message);
4003 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4006 return (std::string)commandId_;
4010 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
4011 const std::string& topic_,
4012 const std::string& data_,
4014 Message::Field commandId_ = Message::Field())
4016 if (_publishStore.isValid())
4018 unsigned ackType = Message::AckType::Processed |
4019 Message::AckType::Stats |
4020 Message::AckType::Persisted;
4021 publishStoreMessage.reset();
4022 if (commandId_.empty())
4029 publishStoreMessage.
setCommandId(commandId_.data(), commandId_.len());
4036 .assignData(data_.c_str(), data_.length());
4037 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4038 char buf[AMPS_NUMBER_BUFFER_LEN];
4039 size_t pos = convertToCharArray(buf, haSequenceNumber);
4040 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4044 Lock<Mutex> l(_lock);
4045 _routes.addRoute(commandId_, messageHandler_,
4046 Message::AckType::Stats,
4047 Message::AckType::Processed | Message::AckType::Persisted,
4049 syncAckProcessing(timeout_, publishStoreMessage,
4052 catch (
const DisconnectedException&)
4059 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4063 return (std::string)commandId_;
4067 Lock<Mutex> l(_lock);
4069 if (commandId_.empty())
4071 _message.newCommandId();
4072 commandId_ = _message.getCommandId();
4076 _message.setCommandId(commandId_.data(), commandId_.len());
4078 _message.setCommandEnum(Message::Command::SOWDelete)
4079 .assignSubscriptionId(commandId_.data(), commandId_.len())
4080 .assignQueryID(commandId_.data(), commandId_.len())
4081 .setAckTypeEnum(Message::AckType::Processed |
4082 Message::AckType::Stats)
4083 .assignTopic(topic_.c_str(), topic_.length())
4084 .assignData(data_.c_str(), data_.length());
4085 _routes.addRoute(commandId_, messageHandler_,
4086 Message::AckType::Stats,
4087 Message::AckType::Processed,
4088 _message.getCommandEnum());
4091 syncAckProcessing(timeout_, _message);
4095 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4098 return (std::string)commandId_;
4102 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4103 const std::string& topic_,
4104 const std::string& keys_,
4106 Message::Field commandId_ = Message::Field())
4108 if (_publishStore.isValid())
4110 unsigned ackType = Message::AckType::Processed |
4111 Message::AckType::Stats |
4112 Message::AckType::Persisted;
4113 publishStoreMessage.reset();
4114 if (commandId_.empty())
4121 publishStoreMessage.
setCommandId(commandId_.data(), commandId_.len());
4129 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4130 char buf[AMPS_NUMBER_BUFFER_LEN];
4131 size_t pos = convertToCharArray(buf, haSequenceNumber);
4132 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4136 Lock<Mutex> l(_lock);
4137 _routes.addRoute(commandId_, messageHandler_,
4138 Message::AckType::Stats,
4139 Message::AckType::Processed | Message::AckType::Persisted,
4141 syncAckProcessing(timeout_, publishStoreMessage,
4144 catch (
const DisconnectedException&)
4151 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4155 return (std::string)commandId_;
4159 Lock<Mutex> l(_lock);
4161 if (commandId_.empty())
4163 _message.newCommandId();
4164 commandId_ = _message.getCommandId();
4168 _message.setCommandId(commandId_.data(), commandId_.len());
4170 _message.setCommandEnum(Message::Command::SOWDelete)
4171 .assignSubscriptionId(commandId_.data(), commandId_.len())
4172 .assignQueryID(commandId_.data(), commandId_.len())
4173 .setAckTypeEnum(Message::AckType::Processed |
4174 Message::AckType::Stats)
4175 .assignTopic(topic_.c_str(), topic_.length())
4176 .assignSowKeys(keys_.c_str(), keys_.length());
4177 _routes.addRoute(commandId_, messageHandler_,
4178 Message::AckType::Stats,
4179 Message::AckType::Processed,
4180 _message.getCommandEnum());
4183 syncAckProcessing(timeout_, _message);
4187 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4190 return (std::string)commandId_;
4194 void startTimer(
void)
4196 if (_serverVersion >=
"5.3.2.0")
4198 throw CommandException(
"The start_timer command is deprecated.");
4200 Lock<Mutex> l(_lock);
4202 _message.setCommandEnum(Message::Command::StartTimer);
4207 std::string stopTimer(MessageHandler messageHandler_)
4209 if (_serverVersion >=
"5.3.2.0")
4211 throw CommandException(
"The stop_timer command is deprecated.");
4213 return executeAsync(Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4228 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4230 _pExceptionListener = pListener_;
4231 _exceptionListener = _pExceptionListener.get();
4234 void setExceptionListener(
const ExceptionListener& listener_)
4236 _exceptionListener = &listener_;
4239 const ExceptionListener& getExceptionListener(
void)
const
4241 return *_exceptionListener;
4244 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4246 if (readTimeout_ < heartbeatInterval_)
4248 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4250 Lock<Mutex> l(_lock);
4251 if (_heartbeatInterval != heartbeatInterval_ ||
4252 _readTimeout != readTimeout_)
4254 _heartbeatInterval = heartbeatInterval_;
4255 _readTimeout = readTimeout_;
4260 void _sendHeartbeat(
void)
4262 if (_connected && _heartbeatInterval != 0)
4264 std::ostringstream options;
4265 options <<
"start," << _heartbeatInterval;
4266 _beatMessage.setOptions(options.str());
4268 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4269 _heartbeatTimer.start();
4272 _sendWithoutRetry(_beatMessage);
4273 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4275 catch (ConnectionException& ex_)
4279 AMPS_UNHANDLED_EXCEPTION(ex_);
4281 _beatMessage.setOptions(
"beat");
4284 if (_readTimeout && _connected)
4289 AMPSException::throwFor(_client, result);
4291 if (!_queueAckTimeout)
4294 (
int)(_heartbeatInterval * 1000));
4297 AMPSException::throwFor(_client, result);
4303 void addConnectionStateListener(ConnectionStateListener* listener_)
4305 Lock<Mutex> lock(_lock);
4306 _connectionStateListeners.insert(listener_);
4309 void removeConnectionStateListener(ConnectionStateListener* listener_)
4311 Lock<Mutex> lock(_lock);
4312 _connectionStateListeners.erase(listener_);
4315 void clearConnectionStateListeners()
4317 Lock<Mutex> lock(_lock);
4318 _connectionStateListeners.clear();
4321 void _registerHandler(Command& command_, Message::Field& cid_,
4322 MessageHandler& handler_,
unsigned requestedAcks_,
4323 unsigned systemAddedAcks_, Message::Command::Type commandType_)
4325 Message message = command_.getMessage();
4326 Message::Command::Type commandType = message.getCommandEnum();
4327 Message::Field subid = message.getSubscriptionId();
4328 Message::Field qid = message.getQueryID();
4330 bool added = qid.len() || subid.len() || cid_.len();
4331 bool cidIsQid = cid_ == qid;
4332 bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4334 if (subid.len() > 0)
4338 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4339 systemAddedAcks_, commandType_);
4341 && (commandType == Message::Command::Subscribe
4342 || commandType == Message::Command::DeltaSubscribe))
4349 if (qid.len() > 0 && qid != subid
4350 && (commandType == Message::Command::SOW
4351 || commandType == Message::Command::SOWDelete
4352 || commandType == Message::Command::SOWAndSubscribe
4353 || commandType == Message::Command::SOWAndDeltaSubscribe))
4355 while (_routes.hasRoute(qid))
4357 message.newQueryId();
4360 cid_ = message.getQueryId();
4362 qid = message.getQueryId();
4364 if (addedCount == 0)
4366 _routes.addRoute(qid, handler_, requestedAcks_,
4367 systemAddedAcks_, commandType_);
4373 Unlock<Mutex> u(_lock);
4374 data = amps_invoke_copy_route_function(handler_.userData());
4378 _routes.addRoute(qid, handler_, requestedAcks_,
4379 systemAddedAcks_, commandType_);
4383 _routes.addRoute(qid,
4384 MessageHandler(handler_.function(),
4387 systemAddedAcks_, commandType_);
4392 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4394 while (_routes.hasRoute(cid_))
4396 cid_ = message.newCommandId().getCommandId();
4398 if (addedCount == 0)
4400 _routes.addRoute(cid_, handler_, requestedAcks_,
4401 systemAddedAcks_, commandType_);
4407 Unlock<Mutex> u(_lock);
4408 data = amps_invoke_copy_route_function(handler_.userData());
4412 _routes.addRoute(cid_, handler_, requestedAcks_,
4413 systemAddedAcks_, commandType_);
4417 _routes.addRoute(cid_,
4418 MessageHandler(handler_.function(),
4421 systemAddedAcks_, commandType_);
4425 else if ((commandType == Message::Command::Publish ||
4426 commandType == Message::Command::DeltaPublish)
4427 && requestedAcks_ & ~Message::AckType::Persisted)
4429 cid_ = command_.getMessage().newCommandId().getCommandId();
4430 _routes.addRoute(cid_, handler_, requestedAcks_,
4431 systemAddedAcks_, commandType_);
4436 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4440 std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4441 bool isHASubscribe_ =
true)
4443 isHASubscribe_ &= (bool)_subscriptionManager;
4444 Message& message = command_.getMessage();
4445 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4446 Message::AckType::Processed : Message::AckType::None;
4447 unsigned requestedAcks = message.getAckTypeEnum();
4448 bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4449 Message::Command::Type commandType = message.getCommandEnum();
4450 if (commandType == Message::Command::StopTimer)
4452 systemAddedAcks |= Message::AckType::Completed;
4454 else if (commandType == Message::Command::Unsubscribe)
4457 const std::string subId = message.getSubscriptionId();
4460 _routes.unsubscribeAll();
4461 if (_subscriptionManager)
4463 Unlock<Mutex> unlock(_lock);
4464 _subscriptionManager->clear();
4469 _routes.removeRoute(subId);
4471 if (_subscriptionManager)
4474 Unlock<Mutex> unlock(_lock);
4475 _subscriptionManager->unsubscribe(subId);
4479 deferredExecution(&s_noOpFn, NULL);
4481 Message::Field cid = message.getCommandId();
4482 if (handler_.isValid() && cid.empty())
4484 cid = message.newCommandId().getCommandId();
4486 if (message.getBookmark().len() > 0)
4488 if (command_.isSubscribe())
4490 Message::Field bookmark = message.getBookmark();
4491 if (_bookmarkStore.isValid())
4493 systemAddedAcks |= Message::AckType::Persisted;
4496 message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4501 _bookmarkStore.log(message);
4502 if (!BookmarkRange::isRange(bookmark))
4504 _bookmarkStore.discard(message);
4505 _bookmarkStore.persisted(message.getSubscriptionId(),
4518 systemAddedAcks |= Message::AckType::Persisted;
4520 bool isSubscribe = command_.isSubscribe();
4521 if (handler_.isValid() && !isSubscribe)
4523 _registerHandler(command_, cid, handler_,
4524 requestedAcks, systemAddedAcks, commandType);
4528 bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4529 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4530 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4532 Unlock<Mutex> u(_lock);
4533 haSequenceNumber = _publishStore.store(message);
4535 message.setSequence(haSequenceNumber);
4540 syncAckProcessing((
long)command_.getTimeout(), message,
4545 _send(message, haSequenceNumber);
4548 catch (
const DisconnectedException&)
4554 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4562 const Message::Field& subId = message.getSubscriptionId();
4565 Unlock<Mutex> u(_lock);
4566 _subscriptionManager->subscribe(handler_,
4569 if (_badTimeToHASubscribe)
4571 message.setAckTypeEnum(requestedAcks);
4572 return std::string(subId.data(), subId.len());
4575 if (handler_.isValid())
4577 _registerHandler(command_, cid, handler_,
4578 requestedAcks, systemAddedAcks, commandType);
4580 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4583 syncAckProcessing((
long)command_.getTimeout(), message,
4586 catch (
const DisconnectedException&)
4588 if (!isHASubscribe_)
4590 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4591 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4592 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4593 message.setAckTypeEnum(requestedAcks);
4597 catch (
const TimedOutException&)
4599 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4600 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4601 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4602 message.setAckTypeEnum(requestedAcks);
4610 Unlock<Mutex> unlock(_lock);
4611 _subscriptionManager->unsubscribe(subId);
4613 if (message.getQueryID().len() > 0)
4615 _routes.removeRoute(message.getQueryID());
4617 _routes.removeRoute(cid);
4618 _routes.removeRoute(subId);
4619 message.setAckTypeEnum(requestedAcks);
4622 if (subId.len() > 0)
4624 message.setAckTypeEnum(requestedAcks);
4625 return std::string(subId.data(), subId.len());
4631 bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4632 || (cid.len() > 0 && command_.hasProcessedAck());
4633 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4638 syncAckProcessing((
long)(command_.getTimeout()), message);
4645 catch (
const TimedOutException&)
4647 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4648 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4649 message.setAckTypeEnum(requestedAcks);
4652 catch (
const DisconnectedException&)
4654 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4655 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4656 message.setAckTypeEnum(requestedAcks);
4661 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4662 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4663 message.setAckTypeEnum(requestedAcks);
4668 message.setAckTypeEnum(requestedAcks);
4672 MessageStream getEmptyMessageStream(
void);
4674 std::string executeAsync(Command& command_, MessageHandler& handler_,
4675 bool isHASubscribe_ =
true)
4677 Lock<Mutex> lock(_lock);
4678 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4682 void setAutoAck(
bool isAutoAckEnabled_)
4684 _isAutoAckEnabled = isAutoAckEnabled_;
4686 bool getAutoAck(
void)
const
4688 return _isAutoAckEnabled;
4690 void setAckBatchSize(
const unsigned batchSize_)
4692 _ackBatchSize = batchSize_;
4693 if (!_queueAckTimeout)
4695 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4699 unsigned getAckBatchSize(
void)
const
4701 return _ackBatchSize;
4703 int getAckTimeout(
void)
const
4705 return _queueAckTimeout;
4707 void setAckTimeout(
const int ackTimeout_)
4710 _queueAckTimeout = ackTimeout_;
4712 size_t _ack(QueueBookmarks& queueBookmarks_)
4714 if (queueBookmarks_._bookmarkCount)
4716 publishStoreMessage.reset();
4721 amps_uint64_t haSequenceNumber = 0;
4722 if (_publishStore.isValid())
4724 haSequenceNumber = _publishStore.store(publishStoreMessage);
4727 queueBookmarks_._data.erase();
4728 queueBookmarks_._bookmarkCount = 0;
4730 _send(publishStoreMessage, haSequenceNumber);
4731 if (!_publishStore.isValid())
4733 queueBookmarks_._data.erase();
4734 queueBookmarks_._bookmarkCount = 0;
4740 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4742 if (_isAutoAckEnabled)
4746 _ack(topic_, bookmark_, options_);
4748 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4750 if (bookmark_.len() == 0)
4754 Lock<Mutex> lock(_lock);
4755 if (_ackBatchSize < 2 || options_ != NULL)
4757 publishStoreMessage.reset();
4765 amps_uint64_t haSequenceNumber = 0;
4766 if (_publishStore.isValid())
4768 haSequenceNumber = _publishStore.store(publishStoreMessage);
4772 _send(publishStoreMessage, haSequenceNumber);
4776 topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4777 TopicHashMap::iterator it = _topicHashMap.find(hash);
4778 if (it == _topicHashMap.end())
4781 #ifdef AMPS_USE_EMPLACE
4782 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4784 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4787 QueueBookmarks& queueBookmarks = it->second;
4788 if (queueBookmarks._data.length())
4790 queueBookmarks._data.append(
",");
4794 queueBookmarks._oldestTime = amps_now();
4796 queueBookmarks._data.append(bookmark_);
4797 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4799 _ack(queueBookmarks);
4802 void flushAcks(
void)
4804 size_t sendCount = 0;
4811 Lock<Mutex> lock(_lock);
4812 typedef TopicHashMap::iterator iterator;
4813 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4815 QueueBookmarks& queueBookmarks = it->second;
4816 sendCount += _ack(queueBookmarks);
4819 if (sendCount && _connected)
4821 publishFlush(0, Message::AckType::Processed);
4825 void checkQueueAcks(
void)
4827 if (!_topicHashMap.size())
4831 Lock<Mutex> lock(_lock);
4834 amps_uint64_t threshold = amps_now()
4835 - (amps_uint64_t)_queueAckTimeout;
4836 typedef TopicHashMap::iterator iterator;
4837 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4839 QueueBookmarks& queueBookmarks = it->second;
4840 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4842 _ack(queueBookmarks);
4846 catch (std::exception& ex)
4848 AMPS_UNHANDLED_EXCEPTION(ex);
4852 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4854 Lock<Mutex> lock(_deferredExecutionLock);
4855 #ifdef AMPS_USE_EMPLACE
4856 _deferredExecutionList.emplace_back(
4857 DeferredExecutionRequest(func_, userData_));
4859 _deferredExecutionList.push_back(
4860 DeferredExecutionRequest(func_, userData_));
4864 inline void processDeferredExecutions(
void)
4866 if (_deferredExecutionList.size())
4868 Lock<Mutex> lock(_deferredExecutionLock);
4869 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4870 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4871 for (; it != end; ++it)
4875 it->_func(it->_userData);
4883 _deferredExecutionList.clear();
4884 _routes.invalidateCache();
4885 _routeCache.invalidateCache();
4889 bool getRetryOnDisconnect(
void)
const
4891 return _isRetryOnDisconnect;
4894 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4896 _isRetryOnDisconnect = isRetryOnDisconnect_;
4899 void setDefaultMaxDepth(
unsigned maxDepth_)
4901 _defaultMaxDepth = maxDepth_;
4904 unsigned getDefaultMaxDepth(
void)
const
4906 return _defaultMaxDepth;
4998 RefHandle<MessageStreamImpl> _body;
5008 inline void advance(
void);
5015 : _pStream(pStream_)
5020 bool operator==(
const iterator& rhs)
const
5022 return _pStream == rhs._pStream;
5024 bool operator!=(
const iterator& rhs)
const
5026 return _pStream != rhs._pStream;
5028 void operator++(
void)
5044 return _body.isValid();
5051 if (!_body.isValid())
5053 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5093 inline void setSOWOnly(
const std::string& commandId_,
5094 const std::string& queryId_ =
"");
5095 inline void setSubscription(
const std::string& subId_,
5096 const std::string& commandId_ =
"",
5097 const std::string& queryId_ =
"");
5098 inline void setStatsOnly(
const std::string& commandId_,
5099 const std::string& queryId_ =
"");
5100 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5107 friend class ClientImpl;
5133 BorrowRefHandle<ClientImpl> _body;
5135 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5136 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5137 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5148 : _body(new ClientImpl(clientName), true)
5151 Client(ClientImpl* existingClient)
5152 : _body(existingClient, true)
5155 Client(ClientImpl* existingClient,
bool isRef)
5156 : _body(existingClient, isRef)
5160 virtual ~
Client(
void) {;}
5170 return _body.isValid();
5187 _body.get().setName(name);
5194 return _body.get().getName();
5202 return _body.get().getNameHash();
5210 return _body.get().getNameHashValue();
5221 _body.get().setLogonCorrelationData(logonCorrelationData_);
5228 return _body.get().getLogonCorrelationData();
5236 _body.get().addHttpPreflightHeader(header_);
5245 _body.get().addHttpPreflightHeader(key_, value_);
5251 _body.get().clearHttpPreflightHeaders();
5260 _body.get().setHttpPreflightHeaders(headers_);
5273 return _body.get().getServerVersion();
5284 return _body.get().getServerVersionInfo();
5298 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5313 return AMPS::convertVersionToNumber(data_, len_);
5320 return _body.get().getURI();
5344 _body.get().connect(uri);
5351 _body.get().disconnect();
5369 _body.get().send(message);
5382 unsigned requestedAcks_,
bool isSubscribe_)
5384 Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5385 _body.get().addMessageHandler(commandId_, messageHandler_,
5386 requestedAcks_, commandType);
5399 unsigned requestedAcks_, Message::Command::Type commandType_)
5401 _body.get().addMessageHandler(commandId_, messageHandler_,
5402 requestedAcks_, commandType_);
5410 return _body.get().removeMessageHandler(commandId_);
5438 return _body.get().send(messageHandler, message, timeout);
5452 _body.get().setDisconnectHandler(disconnectHandler);
5460 return _body.get().getDisconnectHandler();
5469 return _body.get().getConnectionInfo();
5482 _body.get().setBookmarkStore(bookmarkStore_);
5490 return _body.
get().getBookmarkStore();
5498 return _body.get().getSubscriptionManager();
5510 _body.get().setSubscriptionManager(subscriptionManager_);
5534 _body.get().setPublishStore(publishStore_);
5542 return _body.
get().getPublishStore();
5550 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5551 duplicateMessageHandler_);
5565 return _body.get().getDuplicateMessageHandler();
5579 _body.get().setFailedWriteHandler(handler_);
5587 return _body.get().getFailedWriteHandler();
5608 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5610 return _body.get().publish(topic_.c_str(), topic_.length(),
5611 data_.c_str(), data_.length());
5633 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5634 const char* data_,
size_t dataLength_)
5636 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5657 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5658 unsigned long expiration_)
5660 return _body.get().publish(topic_.c_str(), topic_.length(),
5661 data_.c_str(), data_.length(), expiration_);
5684 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5685 const char* data_,
size_t dataLength_,
5686 unsigned long expiration_)
5688 return _body.get().publish(topic_, topicLength_,
5689 data_, dataLength_, expiration_);
5730 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5732 _body.get().publishFlush(timeout_, ackType_);
5751 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5753 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5754 data_.c_str(), data_.length());
5775 const char* data_,
size_t dataLength_)
5777 return _body.get().deltaPublish(topic_, topicLength_,
5778 data_, dataLength_);
5797 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5798 unsigned long expiration_)
5800 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5801 data_.c_str(), data_.length(),
5824 const char* data_,
size_t dataLength_,
5825 unsigned long expiration_)
5827 return _body.get().deltaPublish(topic_, topicLength_,
5828 data_, dataLength_, expiration_);
5848 const char* options_ = NULL)
5850 return _body.get().logon(timeout_, authenticator_, options_);
5865 std::string
logon(
const char* options_,
int timeout_ = 0)
5884 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5910 const std::string& topic_,
5912 const std::string& filter_ =
"",
5913 const std::string& options_ =
"",
5914 const std::string& subId_ =
"")
5916 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5917 filter_,
"", options_, subId_);
5936 long timeout_ = 0,
const std::string& filter_ =
"",
5937 const std::string& options_ =
"",
5938 const std::string& subId_ =
"")
5941 if (_body.get().getDefaultMaxDepth())
5943 result.
maxDepth(_body.get().getDefaultMaxDepth());
5945 result.setSubscription(_body.get().subscribe(
5947 topic_, timeout_, filter_,
"",
5948 options_, subId_,
false));
5968 long timeout_ = 0,
const std::string& filter_ =
"",
5969 const std::string& options_ =
"",
5970 const std::string& subId_ =
"")
5973 if (_body.get().getDefaultMaxDepth())
5975 result.
maxDepth(_body.get().getDefaultMaxDepth());
5977 result.setSubscription(_body.get().subscribe(
5979 topic_, timeout_, filter_,
"",
5980 options_, subId_,
false));
5997 const std::string& topic_,
5999 const std::string& filter_ =
"",
6000 const std::string& options_ =
"",
6001 const std::string& subId_ =
"")
6003 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
6004 filter_,
"", options_, subId_);
6015 long timeout_,
const std::string& filter_ =
"",
6016 const std::string& options_ =
"",
6017 const std::string& subId_ =
"")
6020 if (_body.get().getDefaultMaxDepth())
6022 result.
maxDepth(_body.get().getDefaultMaxDepth());
6024 result.setSubscription(_body.get().deltaSubscribe(
6026 topic_, timeout_, filter_,
"",
6027 options_, subId_,
false));
6033 long timeout_,
const std::string& filter_ =
"",
6034 const std::string& options_ =
"",
6035 const std::string& subId_ =
"")
6038 if (_body.get().getDefaultMaxDepth())
6040 result.
maxDepth(_body.get().getDefaultMaxDepth());
6042 result.setSubscription(_body.get().deltaSubscribe(
6044 topic_, timeout_, filter_,
"",
6045 options_, subId_,
false));
6075 const std::string& topic_,
6077 const std::string& bookmark_,
6078 const std::string& filter_ =
"",
6079 const std::string& options_ =
"",
6080 const std::string& subId_ =
"")
6082 return _body.get().subscribe(messageHandler_, topic_, timeout_,
6083 filter_, bookmark_, options_, subId_);
6104 const std::string& bookmark_,
6105 const std::string& filter_ =
"",
6106 const std::string& options_ =
"",
6107 const std::string& subId_ =
"")
6110 if (_body.get().getDefaultMaxDepth())
6112 result.
maxDepth(_body.get().getDefaultMaxDepth());
6114 result.setSubscription(_body.get().subscribe(
6116 topic_, timeout_, filter_,
6117 bookmark_, options_,
6125 const std::string& bookmark_,
6126 const std::string& filter_ =
"",
6127 const std::string& options_ =
"",
6128 const std::string& subId_ =
"")
6131 if (_body.get().getDefaultMaxDepth())
6133 result.
maxDepth(_body.get().getDefaultMaxDepth());
6135 result.setSubscription(_body.get().subscribe(
6137 topic_, timeout_, filter_,
6138 bookmark_, options_,
6153 return _body.get().unsubscribe(commandId);
6165 return _body.get().unsubscribe();
6199 const std::string& topic_,
6200 const std::string& filter_ =
"",
6201 const std::string& orderBy_ =
"",
6202 const std::string& bookmark_ =
"",
6203 int batchSize_ = DEFAULT_BATCH_SIZE,
6204 int topN_ = DEFAULT_TOP_N,
6205 const std::string& options_ =
"",
6206 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6208 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6209 bookmark_, batchSize_, topN_, options_,
6237 const std::string& filter_ =
"",
6238 const std::string& orderBy_ =
"",
6239 const std::string& bookmark_ =
"",
6240 int batchSize_ = DEFAULT_BATCH_SIZE,
6241 int topN_ = DEFAULT_TOP_N,
6242 const std::string& options_ =
"",
6243 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6246 if (_body.get().getDefaultMaxDepth())
6248 result.
maxDepth(_body.get().getDefaultMaxDepth());
6250 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6251 topic_, filter_, orderBy_, bookmark_,
6252 batchSize_, topN_, options_, timeout_));
6258 const std::string& filter_ =
"",
6259 const std::string& orderBy_ =
"",
6260 const std::string& bookmark_ =
"",
6261 int batchSize_ = DEFAULT_BATCH_SIZE,
6262 int topN_ = DEFAULT_TOP_N,
6263 const std::string& options_ =
"",
6264 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6267 if (_body.get().getDefaultMaxDepth())
6269 result.
maxDepth(_body.get().getDefaultMaxDepth());
6271 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6272 topic_, filter_, orderBy_, bookmark_,
6273 batchSize_, topN_, options_, timeout_));
6299 const std::string& topic_,
6301 const std::string& filter_ =
"",
6302 int batchSize_ = DEFAULT_BATCH_SIZE,
6303 int topN_ = DEFAULT_TOP_N)
6305 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6331 const std::string& topic_,
6333 const std::string& filter_ =
"",
6334 int batchSize_ = DEFAULT_BATCH_SIZE,
6335 bool oofEnabled_ =
false,
6336 int topN_ = DEFAULT_TOP_N)
6338 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6339 filter_, batchSize_, oofEnabled_,
6364 const std::string& filter_ =
"",
6365 int batchSize_ = DEFAULT_BATCH_SIZE,
6366 bool oofEnabled_ =
false,
6367 int topN_ = DEFAULT_TOP_N)
6370 if (_body.get().getDefaultMaxDepth())
6372 result.
maxDepth(_body.get().getDefaultMaxDepth());
6374 result.setSubscription(_body.get().sowAndSubscribe(
6376 topic_, timeout_, filter_,
6377 batchSize_, oofEnabled_,
6402 const std::string& filter_ =
"",
6403 int batchSize_ = DEFAULT_BATCH_SIZE,
6404 bool oofEnabled_ =
false,
6405 int topN_ = DEFAULT_TOP_N)
6408 if (_body.get().getDefaultMaxDepth())
6410 result.
maxDepth(_body.get().getDefaultMaxDepth());
6412 result.setSubscription(_body.get().sowAndSubscribe(
6414 topic_, timeout_, filter_,
6415 batchSize_, oofEnabled_,
6449 const std::string& topic_,
6450 const std::string& filter_ =
"",
6451 const std::string& orderBy_ =
"",
6452 const std::string& bookmark_ =
"",
6453 int batchSize_ = DEFAULT_BATCH_SIZE,
6454 int topN_ = DEFAULT_TOP_N,
6455 const std::string& options_ =
"",
6456 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6458 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6459 orderBy_, bookmark_, batchSize_,
6460 topN_, options_, timeout_);
6488 const std::string& filter_ =
"",
6489 const std::string& orderBy_ =
"",
6490 const std::string& bookmark_ =
"",
6491 int batchSize_ = DEFAULT_BATCH_SIZE,
6492 int topN_ = DEFAULT_TOP_N,
6493 const std::string& options_ =
"",
6494 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6497 if (_body.get().getDefaultMaxDepth())
6499 result.
maxDepth(_body.get().getDefaultMaxDepth());
6501 result.setSubscription(_body.get().sowAndSubscribe(
6503 topic_, filter_, orderBy_,
6504 bookmark_, batchSize_, topN_,
6505 options_, timeout_,
false));
6511 const std::string& filter_ =
"",
6512 const std::string& orderBy_ =
"",
6513 const std::string& bookmark_ =
"",
6514 int batchSize_ = DEFAULT_BATCH_SIZE,
6515 int topN_ = DEFAULT_TOP_N,
6516 const std::string& options_ =
"",
6517 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6520 if (_body.get().getDefaultMaxDepth())
6522 result.
maxDepth(_body.get().getDefaultMaxDepth());
6524 result.setSubscription(_body.get().sowAndSubscribe(
6526 topic_, filter_, orderBy_,
6527 bookmark_, batchSize_, topN_,
6528 options_, timeout_,
false));
6557 const std::string& topic_,
6558 const std::string& filter_ =
"",
6559 const std::string& orderBy_ =
"",
6560 int batchSize_ = DEFAULT_BATCH_SIZE,
6561 int topN_ = DEFAULT_TOP_N,
6562 const std::string& options_ =
"",
6563 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6565 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6566 filter_, orderBy_, batchSize_,
6567 topN_, options_, timeout_);
6590 const std::string& filter_ =
"",
6591 const std::string& orderBy_ =
"",
6592 int batchSize_ = DEFAULT_BATCH_SIZE,
6593 int topN_ = DEFAULT_TOP_N,
6594 const std::string& options_ =
"",
6595 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6598 if (_body.get().getDefaultMaxDepth())
6600 result.
maxDepth(_body.get().getDefaultMaxDepth());
6602 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6604 topic_, filter_, orderBy_,
6605 batchSize_, topN_, options_,
6612 const std::string& filter_ =
"",
6613 const std::string& orderBy_ =
"",
6614 int batchSize_ = DEFAULT_BATCH_SIZE,
6615 int topN_ = DEFAULT_TOP_N,
6616 const std::string& options_ =
"",
6617 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6620 if (_body.get().getDefaultMaxDepth())
6622 result.
maxDepth(_body.get().getDefaultMaxDepth());
6624 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6626 topic_, filter_, orderBy_,
6627 batchSize_, topN_, options_,
6657 const std::string& topic_,
6659 const std::string& filter_ =
"",
6660 int batchSize_ = DEFAULT_BATCH_SIZE,
6661 bool oofEnabled_ =
false,
6662 bool sendEmpties_ =
false,
6663 int topN_ = DEFAULT_TOP_N)
6665 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6666 timeout_, filter_, batchSize_,
6667 oofEnabled_, sendEmpties_,
6694 const std::string& filter_ =
"",
6695 int batchSize_ = DEFAULT_BATCH_SIZE,
6696 bool oofEnabled_ =
false,
6697 bool sendEmpties_ =
false,
6698 int topN_ = DEFAULT_TOP_N)
6701 if (_body.get().getDefaultMaxDepth())
6703 result.
maxDepth(_body.get().getDefaultMaxDepth());
6705 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6707 topic_, timeout_, filter_,
6708 batchSize_, oofEnabled_,
6709 sendEmpties_, topN_,
false));
6735 const std::string& filter_ =
"",
6736 int batchSize_ = DEFAULT_BATCH_SIZE,
6737 bool oofEnabled_ =
false,
6738 bool sendEmpties_ =
false,
6739 int topN_ = DEFAULT_TOP_N)
6742 if (_body.get().getDefaultMaxDepth())
6744 result.
maxDepth(_body.get().getDefaultMaxDepth());
6746 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6748 topic_, timeout_, filter_,
6749 batchSize_, oofEnabled_,
6750 sendEmpties_, topN_,
false));
6773 const std::string& topic,
6774 const std::string& filter,
6777 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6805 stream.setStatsOnly(cid);
6806 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6807 return *(stream.
begin());
6809 catch (
const DisconnectedException&)
6822 _body.get().startTimer();
6833 return _body.get().stopTimer(messageHandler);
6858 const std::string& topic_,
6859 const std::string& keys_,
6862 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6894 stream.setStatsOnly(cid);
6895 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6896 return *(stream.
begin());
6898 catch (
const DisconnectedException&)
6920 const std::string& topic_,
const std::string& data_,
6923 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6950 stream.setStatsOnly(cid);
6951 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6952 return *(stream.
begin());
6954 catch (
const DisconnectedException&)
6966 return _body.get().getHandle();
6979 _body.get().setExceptionListener(pListener_);
6992 _body.get().setExceptionListener(listener_);
6999 return _body.get().getExceptionListener();
7025 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
7049 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
7062 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
7088 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7113 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7196 _body.get().addConnectionStateListener(listener);
7204 _body.get().removeConnectionStateListener(listener);
7211 _body.get().clearConnectionStateListeners();
7241 return _body.get().executeAsync(command_, handler_);
7279 if (command_.isSubscribe())
7281 Message& message = command_.getMessage();
7284 if (useExistingHandler)
7287 if (_body.get()._routes.getRoute(subId, existingHandler))
7290 _body.get().executeAsync(command_, existingHandler,
false);
7295 id = _body.get().executeAsync(command_, handler_,
false);
7297 catch (
const DisconnectedException&)
7300 if (command_.isSubscribe())
7304 if (command_.isSow())
7337 _body.get().ack(topic_, bookmark_, options_);
7359 void ack(
const std::string& topic_,
const std::string& bookmark_,
7360 const char* options_ = NULL)
7362 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7370 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7372 _body.get()._ack(topic_, bookmark_, options_);
7385 _body.get().flushAcks();
7394 return _body.get().getAutoAck();
7404 _body.get().setAutoAck(isAutoAckEnabled_);
7412 return _body.get().getAckBatchSize();
7422 _body.get().setAckBatchSize(ackBatchSize_);
7433 return _body.get().getAckTimeout();
7445 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7447 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7449 _body.get().setAckTimeout(ackTimeout_);
7463 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7472 return _body.get().getRetryOnDisconnect();
7481 _body.get().setDefaultMaxDepth(maxDepth_);
7490 return _body.get().getDefaultMaxDepth();
7502 return _body.get().setTransportFilterFunction(filter_, userData_);
7516 return _body.get().setThreadCreatedCallback(callback_, userData_);
7524 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7526 _body.get().deferredExecution(func_, userData_);
7536 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7542 unsigned deliveries = 0;
7554 const char* data = NULL;
7556 const char* status = NULL;
7557 size_t statusLen = 0;
7559 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7562 if (len == NotEntitled || len == Duplicate ||
7563 (statusLen == Failure && status[0] ==
'f'))
7565 if (_failedWriteHandler)
7567 if (_publishStore.isValid())
7569 amps_uint64_t sequence =
7571 FailedWriteStoreReplayer replayer(
this, data, len);
7572 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7573 replayer, sequence));
7577 static Message emptyMessage;
7579 AMPS_CALL_EXCEPTION_WRAPPER(
7580 _failedWriteHandler->failedWrite(emptyMessage,
7586 if (_publishStore.isValid())
7595 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7599 if (!deliveries && _bookmarkStore.isValid())
7605 Message::Field subId(data, len);
7606 const char* bookmarkData = NULL;
7607 size_t bookmarkLen = 0;
7613 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7616 _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7621 catch (std::exception& ex)
7623 AMPS_UNHANDLED_EXCEPTION(ex);
7629 ClientImpl::processedAck(Message& message)
7631 unsigned deliveries = 0;
7633 const char* data = NULL;
7637 Lock<Mutex> l(_lock);
7640 Lock<Mutex> guard(_ackMapLock);
7641 AckMap::iterator i = _ackMap.find(std::string(data, len));
7642 if (i != _ackMap.end())
7652 ack.setStatus(data, len);
7654 ack.setReason(data, len);
7656 ack.setUsername(data, len);
7658 ack.setPassword(data, len);
7660 ack.setServerVersion(data, len);
7662 ack.setOptions(data, len);
7664 ack.setBookmark(message.getBookmark());
7672 ClientImpl::checkAndSendHeartbeat(
bool force)
7674 if (force || _heartbeatTimer.check())
7676 _heartbeatTimer.start();
7679 sendWithoutRetry(_beatMessage);
7681 catch (
const AMPSException&)
7688 inline ConnectionInfo ClientImpl::getConnectionInfo()
const
7690 ConnectionInfo info;
7691 std::ostringstream writer;
7693 info[
"client.uri"] = _lastUri;
7694 info[
"client.name"] = _name;
7695 info[
"client.username"] = _username;
7696 if (_publishStore.isValid())
7698 writer << _publishStore.unpersistedCount();
7699 info[
"publishStore.unpersistedCount"] = writer.str();
7708 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7710 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7711 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7712 ClientImpl* me = (ClientImpl*) userData_;
7713 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7714 if (!messageHandle_)
7716 if (me->_queueAckTimeout)
7718 me->checkQueueAcks();
7720 me->checkAndSendHeartbeat();
7724 me->_readMessage.replace(messageHandle_);
7725 Message& message = me->_readMessage;
7726 Message::Command::Type commandType = message.getCommandEnum();
7727 if (commandType & SOWMask)
7733 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7734 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7736 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7737 message.getQueryID()));
7739 else if (commandType & PublishMask)
7742 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7743 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7744 GlobalCommandTypeHandlers::Publish :
7745 GlobalCommandTypeHandlers::OOF)].invoke(message));
7747 const char* subIds = NULL;
7748 size_t subIdsLen = 0;
7751 &subIds, &subIdsLen);
7752 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7753 for (
size_t i = 0; i < subIdCount; ++i)
7755 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7756 MessageHandler& handler = lookupResult.handler;
7757 if (handler.isValid())
7760 AMPS_SubscriptionId,
7761 subIds + lookupResult.idOffset,
7762 lookupResult.idLength);
7763 Message::Field bookmark = message.getBookmark();
7764 bool isMessageQueue = message.getLeasePeriod().len() != 0;
7765 bool isAutoAck = me->_isAutoAckEnabled;
7767 if (!isMessageQueue && !bookmark.empty() &&
7768 me->_bookmarkStore.isValid())
7770 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7773 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7775 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7780 me->_bookmarkStore.log(me->_readMessage);
7781 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7782 handler.invoke(message));
7787 if (isMessageQueue && isAutoAck)
7791 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7792 if (!message.getIgnoreAutoAck())
7794 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7795 me->_ack(message.getTopic(), message.getBookmark()));
7798 catch (std::exception& ex)
7800 if (!message.getIgnoreAutoAck())
7802 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7803 me->_ack(message.getTopic(), message.getBookmark(),
"cancel"));
7805 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7810 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7811 handler.invoke(message));
7817 me->lastChance(message);
7821 else if (commandType == Message::Command::Ack)
7823 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7824 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7825 unsigned ackType = message.getAckTypeEnum();
7826 unsigned deliveries = 0U;
7829 case Message::AckType::Persisted:
7830 deliveries += me->persistedAck(message);
7832 case Message::AckType::Processed:
7833 deliveries += me->processedAck(message);
7836 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7837 if (deliveries == 0)
7839 me->lastChance(message);
7842 else if (commandType == Message::Command::Heartbeat)
7844 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7845 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7846 if (me->_heartbeatTimer.getTimeout() != 0.0)
7848 me->checkAndSendHeartbeat(
true);
7852 me->lastChance(message);
7856 else if (!message.getCommandId().empty())
7858 unsigned deliveries = 0U;
7861 while (me->_connected)
7865 deliveries = me->_routes.deliverData(message, message.getCommandId());
7869 catch (MessageStreamFullException&)
7871 catch (MessageStreamFullException& ex_)
7876 me->checkAndSendHeartbeat(
false);
7879 catch (std::exception&)
7881 catch (std::exception& ex_)
7889 catch (std::exception& ex_)
7893 me->_exceptionListener->exceptionThrown(ex_);
7900 if (deliveries == 0)
7902 me->lastChance(message);
7905 me->checkAndSendHeartbeat();
7910 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7912 ClientImpl* me = (ClientImpl*) userData;
7915 me->clearAcks(failedConnectionVersion);
7919 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7921 ClientImpl* me = (ClientImpl*) userData;
7922 Lock<Mutex> l(me->_lock);
7923 Client wrapper(me,
false);
7926 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7930 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7931 bool retryInProgress =
false;
7934 me->_connected =
false;
7935 me->_lock.signalAll();
7938 Unlock<Mutex> unlock(me->_lock);
7939 me->_disconnectHandler.invoke(wrapper);
7942 catch (
const RetryOperationException&)
7944 catch (
const RetryOperationException& ex)
7947 retryInProgress =
true;
7949 catch (
const std::exception& ex)
7951 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7953 me->_lock.signalAll();
7955 if (!me->_connected)
7957 if (retryInProgress)
7959 AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException(
"Reconnect in progress."));
7963 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7964 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7971 if (me->_subscriptionManager)
7976 Unlock<Mutex> unlock(me->_lock);
7977 me->_subscriptionManager->resubscribe(wrapper);
7979 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7983 catch (
const AMPSException& subEx)
7985 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7987 catch (
const std::exception& subEx)
7989 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8001 ClientImpl::ClientImplGetHttpPreflightMessage(
void* userData_)
8003 ClientImpl* me = (ClientImpl*)userData_;
8004 std::ostringstream os;
8007 size_t firstColon = me->_lastUri.find(
':');
8009 size_t pathEnd = me->_lastUri.find(
'?');
8011 size_t lastColon = me->_lastUri.rfind(
':', pathEnd);
8013 size_t at = me->_lastUri.rfind(
'@', lastColon);
8015 size_t hostStart = at == std::string::npos ? firstColon + 3 : at + 1;
8016 size_t hostLen = lastColon - hostStart;
8018 size_t pathStart = me->_lastUri.find(
'/', lastColon);
8019 size_t pathLen = pathEnd;
8020 if (pathEnd != std::string::npos)
8022 pathLen = pathEnd - pathStart;
8024 os <<
"GET " << me->_lastUri.substr(pathStart, pathLen)
8025 <<
" HTTP/1.1\r\nHost: " << me->_lastUri.substr(hostStart, hostLen)
8026 <<
"\r\nConnection: upgrade\r\nUpgrade: "
8027 << me->_lastUri.substr(0, firstColon) <<
"\r\n";
8028 for (
auto header : me->_httpPreflightHeaders)
8030 os << header <<
"\r\n";
8033 me->_preflightMessage = os.str();
8034 return me->_preflightMessage.c_str();
8049 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
8050 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
8052 while (_pos != _len && _data[_pos] == _fieldSep)
8058 typedef void* difference_type;
8059 typedef std::forward_iterator_tag iterator_category;
8060 typedef std::pair<Message::Field, Message::Field> value_type;
8061 typedef value_type* pointer;
8062 typedef value_type& reference;
8063 bool operator==(
const iterator& rhs)
const
8065 return _pos == rhs._pos;
8067 bool operator!=(
const iterator& rhs)
const
8069 return _pos != rhs._pos;
8071 iterator& operator++()
8074 while (_pos != _len && _data[_pos] != _fieldSep)
8079 while (_pos != _len && _data[_pos] == _fieldSep)
8086 value_type operator*()
const
8089 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
8090 for (; i < _len && _data[i] !=
'='; ++i)
8095 result.first.assign(_data + _pos, keyLength);
8097 if (i < _len && _data[i] ==
'=')
8101 for (; i < _len && _data[i] != _fieldSep; ++i)
8106 result.second.assign(_data + valueStart, valueLength);
8112 class reverse_iterator
8119 typedef std::pair<Message::Field, Message::Field> value_type;
8120 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
8121 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
8126 while (_pos >= _data && *_pos == _fieldSep)
8130 while (_pos > _data && *_pos != _fieldSep)
8137 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8147 bool operator==(
const reverse_iterator& rhs)
const
8149 return _pos == rhs._pos;
8151 bool operator!=(
const reverse_iterator& rhs)
const
8153 return _pos != rhs._pos;
8155 reverse_iterator& operator++()
8166 while (_pos >= _data && *_pos == _fieldSep)
8171 while (_pos > _data && *_pos != _fieldSep)
8175 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8186 value_type operator*()
const
8189 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8190 size_t i = (size_t)(_pos - _data);
8191 for (; i < _len && _data[i] !=
'='; ++i)
8195 result.first.assign(_pos, keyLength);
8196 if (i < _len && _data[i] ==
'=')
8200 for (; i < _len && _data[i] != _fieldSep; ++i)
8205 result.second.assign(_data + valueStart, valueLength);
8209 FIX(
const Message::Field& data,
char fieldSeparator = 1)
8210 : _data(data.data()), _len(data.len()),
8211 _fieldSep(fieldSeparator)
8215 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8216 : _data(data), _len(len), _fieldSep(fieldSeparator)
8220 iterator begin()
const
8222 return iterator(_data, _len, 0, _fieldSep);
8224 iterator end()
const
8226 return iterator(_data, _len, _len, _fieldSep);
8230 reverse_iterator rbegin()
const
8232 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8235 reverse_iterator rend()
const
8237 return reverse_iterator(_data, _len, 0, _fieldSep);
8258 std::stringstream _data;
8275 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8277 _data << tag <<
'=';
8278 _data.write(value + offset, (std::streamsize)length);
8286 void append(
const T& tag,
const std::string& value)
8288 _data << tag <<
'=' << value << _fs;
8297 operator std::string()
const
8305 _data.str(std::string());
8342 typedef std::map<Message::Field, Message::Field>
map_type;
8353 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8362 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8366 std::deque<Message> _q;
8367 std::deque<Message> _cache;
8368 std::string _commandId;
8370 std::string _queryId;
8374 unsigned _requestedAcks;
8376 Message::Field _previousTopic;
8377 Message::Field _previousBookmark;
8378 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8379 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8380 std::atomic<State> _state;
8382 volatile State _state;
8384 typedef std::map<std::string, Message*> SOWKeyMap;
8385 SOWKeyMap _sowKeyMap;
8387 MessageStreamImpl(
const Client& client_)
8390 _maxDepth((unsigned)~0),
8392 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8395 if (_client.isValid())
8397 _client.addConnectionStateListener(
this);
8401 MessageStreamImpl(ClientImpl* client_)
8404 _maxDepth((unsigned)~0),
8408 if (_client.isValid())
8410 _client.addConnectionStateListener(
this);
8414 ~MessageStreamImpl()
8418 virtual void destroy()
8424 catch (std::exception& e)
8428 if (_client.isValid())
8430 _client.getExceptionListener().exceptionThrown(e);
8435 if (_client.isValid())
8437 _client.removeConnectionStateListener(
this);
8439 _client = Client((ClientImpl*)NULL);
8440 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8448 static void destroyer(
void* vpMessageStreamImpl_)
8450 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8453 void setSubscription(
const std::string& subId_,
8454 const std::string& commandId_ =
"",
8455 const std::string& queryId_ =
"")
8457 Lock<Mutex> lock(_lock);
8459 if (!commandId_.empty() && commandId_ != subId_)
8461 _commandId = commandId_;
8463 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8465 _queryId = queryId_;
8468 if (Disconnected == _state)
8472 assert(Unset == _state);
8476 void setSOWOnly(
const std::string& commandId_,
8477 const std::string& queryId_ =
"")
8479 Lock<Mutex> lock(_lock);
8480 _commandId = commandId_;
8481 if (!queryId_.empty() && queryId_ != commandId_)
8483 _queryId = queryId_;
8486 if (Disconnected == _state)
8490 assert(Unset == _state);
8494 void setStatsOnly(
const std::string& commandId_,
8495 const std::string& queryId_ =
"")
8497 Lock<Mutex> lock(_lock);
8498 _commandId = commandId_;
8499 if (!queryId_.empty() && queryId_ != commandId_)
8501 _queryId = queryId_;
8504 if (Disconnected == _state)
8508 assert(Unset == _state);
8510 _requestedAcks = Message::AckType::Stats;
8513 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8515 Lock<Mutex> lock(_lock);
8516 _commandId = commandId_;
8518 if (Disconnected == _state)
8522 assert(Unset == _state);
8524 _requestedAcks = acks_;
8529 Lock<Mutex> lock(_lock);
8530 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8532 _state = Disconnected;
8535 else if (state_ == AMPS::ConnectionStateListener::Connected
8536 && _commandId.empty()
8538 && _queryId.empty())
8546 void timeout(
unsigned timeout_)
8548 _timeout = timeout_;
8552 if (_state == Subscribe)
8557 void maxDepth(
unsigned maxDepth_)
8561 _maxDepth = maxDepth_;
8565 _maxDepth = (unsigned)~0;
8568 unsigned getMaxDepth(
void)
const
8572 unsigned getDepth(
void)
const
8574 return (
unsigned)(_q.size());
8577 bool next(Message& current_)
8579 Lock<Mutex> lock(_lock);
8580 if (!_previousTopic.empty() && !_previousBookmark.empty())
8584 if (_client.isValid())
8586 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8590 catch (AMPSException&)
8592 catch (AMPSException& e)
8595 current_.invalidate();
8596 _previousTopic.clear();
8597 _previousBookmark.clear();
8600 _previousTopic.clear();
8601 _previousBookmark.clear();
8604 long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8605 Timer timer((
double)_timeout);
8607 while (_q.empty() && _state & Running)
8610 _lock.wait(minWaitTime);
8612 Unlock<Mutex> unlck(_lock);
8613 amps_invoke_waiting_function();
8618 if (timer.checkAndGetRemaining(&minWaitTime))
8624 minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8627 if (current_.isValid() && _cache.size() < _cacheMax)
8630 _cache.push_back(current_);
8634 current_ = _q.front();
8635 if (_q.size() == _maxDepth)
8640 if (_state == Conflate)
8642 std::string sowKey = current_.getSowKey();
8643 if (sowKey.length())
8645 _sowKeyMap.erase(sowKey);
8648 else if (_state == AcksOnly)
8650 _requestedAcks &= ~(current_.getAckTypeEnum());
8652 if ((_state == AcksOnly && _requestedAcks == 0) ||
8653 (_state == SOWOnly && current_.getCommand() ==
"group_end"))
8657 else if (current_.isValid()
8658 && current_.getCommandEnum() == Message::Command::Publish
8659 && _client.isValid() && _client.getAutoAck()
8660 && !current_.getLeasePeriod().empty()
8661 && !current_.getBookmark().empty())
8663 _previousTopic = current_.getTopic().deepCopy();
8664 _previousBookmark = current_.getBookmark().deepCopy();
8668 if (_state == Disconnected)
8670 throw DisconnectedException(
"Connection closed.");
8672 current_.invalidate();
8673 if (_state == Closed)
8677 return _timeout != 0;
8681 if (_client.isValid())
8683 if (_state == SOWOnly || _state == Subscribe)
8685 if (!_commandId.empty())
8687 _client.unsubscribe(_commandId);
8689 if (!_subId.empty())
8691 _client.unsubscribe(_subId);
8693 if (!_queryId.empty())
8695 _client.unsubscribe(_queryId);
8700 if (!_commandId.empty())
8702 _client.removeMessageHandler(_commandId);
8704 if (!_subId.empty())
8706 _client.removeMessageHandler(_subId);
8708 if (!_queryId.empty())
8710 _client.removeMessageHandler(_queryId);
8714 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8719 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8721 Lock<Mutex> lock(this_->_lock);
8722 if (this_->_state != Conflate)
8724 AMPS_TESTING_SLOW_MESSAGE_STREAM
8725 if (this_->_q.size() >= this_->_maxDepth)
8730 this_->_lock.signalAll();
8731 throw MessageStreamFullException(
"Stream is currently full.");
8733 if (!this_->_cache.empty())
8735 this_->_cache.front().deepCopy(message_);
8736 this_->_q.push_back(this_->_cache.front());
8737 this_->_cache.pop_front();
8741 #ifdef AMPS_USE_EMPLACE
8742 this_->_q.emplace_back(message_.deepCopy());
8744 this_->_q.push_back(message_.deepCopy());
8747 if (message_.getCommandEnum() == Message::Command::Publish &&
8748 this_->_client.isValid() && this_->_client.getAutoAck() &&
8749 !message_.getLeasePeriod().empty() &&
8750 !message_.getBookmark().empty())
8752 message_.setIgnoreAutoAck();
8757 std::string sowKey = message_.getSowKey();
8758 if (sowKey.length())
8760 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8761 if (it != this_->_sowKeyMap.end())
8763 it->second->deepCopy(message_);
8767 if (this_->_q.size() >= this_->_maxDepth)
8773 this_->_lock.signalAll();
8774 throw MessageStreamFullException(
"Stream is currently full.");
8776 if (!this_->_cache.empty())
8778 this_->_cache.front().deepCopy(message_);
8779 this_->_q.push_back(this_->_cache.front());
8780 this_->_cache.pop_front();
8784 #ifdef AMPS_USE_EMPLACE
8785 this_->_q.emplace_back(message_.deepCopy());
8787 this_->_q.push_back(message_.deepCopy());
8790 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8795 if (this_->_q.size() >= this_->_maxDepth)
8800 this_->_lock.signalAll();
8801 throw MessageStreamFullException(
"Stream is currently full.");
8803 if (!this_->_cache.empty())
8805 this_->_cache.front().deepCopy(message_);
8806 this_->_q.push_back(this_->_cache.front());
8807 this_->_cache.pop_front();
8811 #ifdef AMPS_USE_EMPLACE
8812 this_->_q.emplace_back(message_.deepCopy());
8814 this_->_q.push_back(message_.deepCopy());
8817 if (message_.getCommandEnum() == Message::Command::Publish &&
8818 this_->_client.isValid() && this_->_client.getAutoAck() &&
8819 !message_.getLeasePeriod().empty() &&
8820 !message_.getBookmark().empty())
8822 message_.setIgnoreAutoAck();
8826 this_->_lock.signalAll();
8829 inline MessageStream::MessageStream(
void)
8832 inline MessageStream::MessageStream(
const Client& client_)
8833 : _body(new MessageStreamImpl(client_))
8836 inline MessageStream::MessageStream(RefHandle<MessageStreamImpl> body_)
8840 inline void MessageStream::iterator::advance(
void)
8842 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8844 inline MessageStream::operator MessageHandler(
void)
8846 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8848 inline MessageStream MessageStream::fromExistingHandler(
const MessageHandler& handler_)
8850 MessageStream result;
8851 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8853 result._body = (MessageStreamImpl*)(handler_._userData);
8858 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8859 const std::string& queryId_)
8861 _body->setSOWOnly(commandId_, queryId_);
8863 inline void MessageStream::setSubscription(
const std::string& subId_,
8864 const std::string& commandId_,
8865 const std::string& queryId_)
8867 _body->setSubscription(subId_, commandId_, queryId_);
8869 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8870 const std::string& queryId_)
8872 _body->setStatsOnly(commandId_, queryId_);
8874 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8877 _body->setAcksOnly(commandId_, acks_);
8896 return _body->getMaxDepth();
8900 return _body->getDepth();
8903 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8913 ClientImpl& body = _body.get();
8914 Message& message = command_.getMessage();
8918 if (useExistingHandler)
8924 if (body._routes.getRoute(subId, existingHandler))
8927 body.executeAsync(command_, existingHandler,
false);
8928 return MessageStream::fromExistingHandler(existingHandler);
8937 if ((command & Message::Command::NoDataCommands)
8938 && (ackTypes == Message::AckType::Persisted
8939 || ackTypes == Message::AckType::None))
8942 if (!body._pEmptyMessageStream)
8944 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8945 body._pEmptyMessageStream.get()->_body->close();
8947 return body.getEmptyMessageStream();
8950 if (body.getDefaultMaxDepth())
8952 stream.
maxDepth(body.getDefaultMaxDepth());
8955 std::string commandID = body.executeAsync(command_, handler,
false);
8956 if (command_.hasStatsAck())
8958 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8960 else if (command_.isSow())
8964 stream.setAcksOnly(commandID,
8969 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8972 else if (command_.isSubscribe())
8974 stream.setSubscription(commandID,
8981 if (command == Message::Command::Publish ||
8982 command == Message::Command::DeltaPublish ||
8983 command == Message::Command::SOWDelete)
8985 stream.setAcksOnly(commandID,
8986 ackTypes & (
unsigned)~Message::AckType::Persisted);
8990 stream.setAcksOnly(commandID, ackTypes);
8997 inline void Message::ack(
const char* options_)
const
8999 ClientImpl* pClient = _body.get().clientImpl();
9001 if (pClient && bookmark.
len() &&
9002 !pClient->getAutoAck())
9005 pClient->ack(getTopic(), bookmark, options_);
Core type and function declarations for the AMPS C client.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
amps_result
Return values from amps_xxx functions.
Definition: amps.h:217
@ AMPS_E_RETRY
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
@ AMPS_E_OK
Success.
Definition: amps.h:221
@ AMPS_E_DISCONNECTED
The client and server are disconnected.
Definition: amps.h:249
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead.
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:668
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received.
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:642
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
AMPSDLL amps_result amps_client_set_http_preflight_callback(amps_handle client, amps_http_preflight_callback callback, void *userData)
Sets a user-supplied callback function for when a connection is established and the provided uri incl...
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1404
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1084
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:1000
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:229
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5131
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6257
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5633
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example,...
Definition: ampsplusplus.hpp:6977
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5865
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7383
void startTimer()
Definition: ampsplusplus.hpp:6820
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5935
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5480
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5318
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7202
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5657
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7140
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5996
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5496
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6074
void addHttpPreflightHeader(const std::string &key_, const std::string &value_)
Adds a given key/value pair as an HTTP header line as "key: value" to the end of the headers that wil...
Definition: ampsplusplus.hpp:5243
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6163
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5436
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7047
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:7060
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7161
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6964
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5577
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5185
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5751
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5585
void setHttpPreflightHeaders(const T &headers_)
Sets the given HTTP header lines to be sent for the HTTP GET Upgrade request.
Definition: ampsplusplus.hpp:5258
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7479
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5608
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6611
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7420
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5349
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7130
void clearHttpPreflightHeaders()
Clears all previously set HTTP header lines.
Definition: ampsplusplus.hpp:5249
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5774
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5311
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5147
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6997
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6151
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6990
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5458
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5450
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time afte...
Definition: ampsplusplus.hpp:7431
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6102
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7470
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5488
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5967
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5226
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5846
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5208
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7111
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5192
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7392
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7488
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5508
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7335
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7209
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7121
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6123
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7273
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5271
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7150
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5367
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5730
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5823
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7359
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6032
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:7053
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id,...
Definition: ampsplusplus.hpp:5380
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5797
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id,...
Definition: ampsplusplus.hpp:5397
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7239
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7410
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7402
void addHttpPreflightHeader(const std::string &header_)
Adds a given HTTP header line to the end of the headers that will be sent for the HTTP GET Upgrade re...
Definition: ampsplusplus.hpp:5234
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8908
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5342
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5200
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7172
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5408
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7461
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:7086
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7499
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5282
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5532
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7023
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6014
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5219
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7194
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7513
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5909
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7443
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5563
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5467
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5548
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6510
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7183
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6831
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7347
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5296
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5684
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5540
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5884
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:473
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:837
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:857
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:810
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:924
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:705
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:797
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:625
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:902
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:759
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:738
std::string getAckType() const
Definition: ampsplusplus.hpp:946
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:600
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:866
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:725
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:816
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:660
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:718
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:666
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:803
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:673
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:575
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:712
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:829
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:731
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:560
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:686
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:770
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:699
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:568
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:782
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:951
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:612
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:642
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:888
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:592
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:882
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:748
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:583
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1495
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1498
virtual void connectionStateChanged(State newState_)=0
Pure virtual method for receiving the change in connection state.
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1031
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1036
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client.
Definition: ampsplusplus.hpp:1053
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1043
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1048
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:205
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8331
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8342
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8349
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields.
Definition: ampsplusplus.hpp:8338
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1433
virtual bool failure(const Message &message_, const MessageHandler &handler_, unsigned requestedAckTypes_, const AMPSException &exception_)=0
Implement this function to return true if the subscription should be removed from the SubscriptionMan...
Class to handle when a client receives a duplicate publish message, or not entitled message.
Definition: ampsplusplus.hpp:1375
virtual void failedWrite(const Message &message_, const char *reason_, size_t reasonLength_)=0
Called when the server indicates a message could not be written.
Field represents the value of a single field in a Message.
Definition: Field.hpp:87
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5005
An iterable object representing the results of an AMPS subscription and/or query.
Definition: ampsplusplus.hpp:4997
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5049
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:5042
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8884
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8889
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8879
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8898
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8894
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5060
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:532
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message & assignFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType.
Definition: Message.hpp:1160
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Message & assignSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Message & assignQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:152
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1091
virtual amps_uint64_t getLowestUnpersisted() const =0
Get the oldest unpersisted message sequence in the store.
virtual size_t unpersistedCount() const =0
Method to return how many messages are in the store that have not been discarded, indicating that the...
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1160
virtual amps_uint64_t getLastPersisted()=0
Get the last persisted sequence number.
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1153
virtual void discardUpTo(amps_uint64_t index_)=0
Called by Client to indicate that all messages up to and including.
virtual void flush(long timeout_)=0
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
virtual amps_uint64_t store(const Message &message_)=0
Called by Client to store a message being published.
virtual void replay(StoreReplayer &replayer_)=0
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
virtual bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)=0
Called by Client to get a single message replayed by the store onto the StoreReplayer.
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1098
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1184
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1063
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1217
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1252
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1356
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1348
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1273
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1281
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1310
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1243
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1302
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1294
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1324
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1264
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1339
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1232
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1455
virtual void resubscribe(Client &client_)=0
Called by Client to get all subscriptions placed again.
virtual void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)=0
Called by Client when a subscription is placed.
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1482
virtual void unsubscribe(const Message::Field &subId_)=0
Called by Client when a subscription is unsubscribed.
virtual void clear()=0
Clear subscriptions and reset to the initial state.
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8257
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8266
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8293
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8286
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8303
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8275
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6772
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6940
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6448
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6298
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6656
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6919
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6330
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6857
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6556
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6198
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription.
Definition: BookmarkStore.hpp:55
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6733
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6589
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6884
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6795
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6236
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6362
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6400
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6692
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6487