25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
28 #include "amps/ampsver.h"
46 #include <sys/atomic.h>
48 #include "amps/BookmarkStore.hpp"
49 #include "amps/MessageRouter.hpp"
50 #include "amps/util.hpp"
51 #include "amps/ampscrc.hpp"
52 #if __cplusplus >= 201100L || _MSC_VER >= 1900
56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
57 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
91 #define AMPS_DEFAULT_TOP_N -1
92 #define AMPS_DEFAULT_BATCH_SIZE 10
93 #define AMPS_NUMBER_BUFFER_LEN 20
94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
105 typedef std::map<std::string, std::string> ConnectionInfo;
108 inline std::string asString(Type x_)
110 std::ostringstream os;
116 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
118 size_t pos = AMPS_NUMBER_BUFFER_LEN;
119 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
123 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
132 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
134 size_t pos = AMPS_NUMBER_BUFFER_LEN;
135 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
139 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
153 static const char* duplicate()
157 static const char* badFilter()
161 static const char* badRegexTopic()
163 return "bad regex topic";
165 static const char* subscriptionAlreadyExists()
167 return "subscription already exists";
169 static const char* nameInUse()
171 return "name in use";
173 static const char* authFailure()
175 return "auth failure";
177 static const char* notEntitled()
179 return "not entitled";
181 static const char* authDisabled()
183 return "authentication disabled";
185 static const char* subidInUse()
187 return "subid in use";
189 static const char* noTopic()
207 virtual void exceptionThrown(
const std::exception&)
const {;}
213 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
218 catch (std::exception& ex_)\
222 _exceptionListener->exceptionThrown(ex_);\
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
250 while(me->_connected)\
257 catch(MessageStreamFullException&)\
261 me->checkAndSendHeartbeat(false);\
263 catch (std::exception& ex_)\
267 me->_exceptionListener->exceptionThrown(ex_);\
278 catch (std::exception& ex_)\
282 me->_exceptionListener->exceptionThrown(ex_);\
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307 while(me->_connected)\
314 catch(MessageStreamFullException&)\
318 me->checkAndSendHeartbeat(false);\
320 catch (std::exception& ex_)\
324 me->_exceptionListener->exceptionThrown(ex_);\
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
338 while(me->_connected)\
345 catch(MessageStreamFullException& ex_)\
349 me->checkAndSendHeartbeat(false);\
351 catch (std::exception& ex_)\
355 me->_exceptionListener->exceptionThrown(ex_);\
366 catch (std::exception& ex_)\
370 me->_exceptionListener->exceptionThrown(ex_);\
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395 while(me->_connected)\
402 catch(MessageStreamFullException& ex_)\
406 me->checkAndSendHeartbeat(false);\
408 catch (std::exception& ex_)\
412 me->_exceptionListener->exceptionThrown(ex_);\
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
427 _exceptionListener->exceptionThrown(ex);\
432 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
435 me->_exceptionListener->exceptionThrown(ex);\
474 static const unsigned Subscribe = 1;
475 static const unsigned SOW = 2;
476 static const unsigned NeedsSequenceNumber = 4;
477 static const unsigned ProcessedAck = 8;
478 static const unsigned StatsAck = 16;
479 void init(Message::Command::Type command_)
488 void init(
const std::string& command_)
497 void init(
const char* command_,
size_t commandLen_)
509 if (!(command & Message::Command::NoDataCommands))
512 if (command == Message::Command::Subscribe ||
513 command == Message::Command::SOWAndSubscribe ||
514 command == Message::Command::DeltaSubscribe ||
515 command == Message::Command::SOWAndDeltaSubscribe)
520 if (command == Message::Command::SOW
521 || command == Message::Command::SOWAndSubscribe
522 || command == Message::Command::SOWAndDeltaSubscribe)
529 if (command == Message::Command::SOW)
534 _flags |= ProcessedAck;
536 else if (command == Message::Command::SOWDelete)
539 _flags |= ProcessedAck;
540 _flags |= NeedsSequenceNumber;
542 else if (command == Message::Command::Publish
543 || command == Message::Command::DeltaPublish)
545 _flags |= NeedsSequenceNumber;
547 else if (command == Message::Command::StopTimer)
564 Command(
const char* command_,
size_t commandLen_)
566 init(command_, commandLen_);
590 init(command_, commandLen_);
684 _message.
setTopic(topic_, topicLen_);
814 std::ostringstream os;
819 amps_uint64_t getSequence()
const
835 _message.
setData(data_, dataLen_);
865 _batchSize = batchSize_;
887 if (ackType_ ==
"processed")
889 _flags |= ProcessedAck;
891 else if (ackType_ ==
"stats")
901 if (ackType_.find(
"processed") != std::string::npos)
903 _flags |= ProcessedAck;
907 _flags &= ~ProcessedAck;
909 if (ackType_.find(
"stats") != std::string::npos)
923 if (ackType_ & Message::AckType::Processed)
925 _flags |= ProcessedAck;
929 _flags &= ~ProcessedAck;
931 if (ackType_ & Message::AckType::Stats)
956 unsigned getTimeout(
void)
const
960 unsigned getBatchSize(
void)
const
964 bool isSubscribe(
void)
const
966 return _flags & Subscribe;
968 bool isSow(
void)
const
970 return (_flags & SOW) != 0;
972 bool hasProcessedAck(
void)
const
974 return (_flags & ProcessedAck) != 0;
976 bool hasStatsAck(
void)
const
978 return (_flags & StatsAck) != 0;
980 bool needsSequenceNumber(
void)
const
982 return (_flags & NeedsSequenceNumber) != 0;
988 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
1005 virtual std::string
authenticate(
const std::string& userName_,
const std::string& password_) = 0;
1013 virtual std::string
retry(
const std::string& userName_,
const std::string& password_) = 0;
1020 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1032 std::string
authenticate(
const std::string& ,
const std::string& password_)
1039 std::string
retry(
const std::string& ,
const std::string& )
1041 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1044 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1080 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1095 : _resizeHandler(NULL)
1096 , _resizeHandlerData(NULL)
1097 , _errorOnPublishGap(errorOnPublishGap_)
1151 return AMPS_UNSET_INDEX;
1158 return AMPS_UNSET_SEQUENCE;
1183 _resizeHandler = handler_;
1184 _resizeHandlerData = userData_;
1189 return _resizeHandler;
1192 bool callResizeHandler(
size_t newSize_);
1194 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1196 _errorOnPublishGap = errorOnPublishGap_;
1199 inline virtual bool getErrorOnPublishGap()
const
1201 return _errorOnPublishGap;
1206 void* _resizeHandlerData;
1207 bool _errorOnPublishGap;
1214 RefHandle<StoreImpl> _body;
1218 Store(
const Store& rhs) : _body(rhs._body) {;}
1230 return _body.get().store(message_);
1241 _body.get().discardUpTo(index_);
1250 _body.get().replay(replayer_);
1262 return _body.get().replaySingle(replayer_, index_);
1271 return _body.get().unpersistedCount();
1279 return _body.isValid();
1292 return _body.get().flush(timeout_);
1300 return _body.get().getLowestUnpersisted();
1308 return _body.get().getLastPersisted();
1323 _body.get().setResizeHandler(handler_, userData_);
1328 return _body.get().getResizeHandler();
1337 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1346 return _body.get().getErrorOnPublishGap();
1354 if (_body.isValid())
1356 return &_body.get();
1381 const char* reason_,
size_t reasonLength_) = 0;
1385 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1389 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1403 long* timeoutp = (
long*)data_;
1411 store_.
flush(*timeoutp);
1414 catch (
const TimedOutException&)
1416 catch (
const TimedOutException& e)
1443 unsigned requestedAckTypes_,
1444 const AMPSException& exception_) = 0;
1462 unsigned requestedAckTypes_) = 0;
1480 _failedResubscribeHandler = handler_;
1483 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1494 typedef enum { Disconnected = 0,
1498 PublishReplayed = 8,
1499 HeartbeatInitiated = 16,
1518 class MessageStreamImpl;
1519 class MessageStream;
1521 typedef void(*DeferredExecutionFunc)(
void*);
1523 class ClientImpl :
public RefBody
1529 AMPS_SOCKET _socket;
1535 socklen_t _valueLen;
1539 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1541 _valuePtr = (
char*)&_noDelay;
1543 if (_socket != AMPS_INVALID_SOCKET)
1545 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1549 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1553 _socket = AMPS_INVALID_SOCKET;
1560 if (_socket != AMPS_INVALID_SOCKET)
1563 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1568 friend class Client;
1571 DisconnectHandler _disconnectHandler;
1572 enum GlobalCommandTypeHandlers :
size_t
1582 DuplicateMessage = 8,
1585 std::vector<MessageHandler> _globalCommandTypeHandlers;
1586 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1587 MessageRouter _routes;
1588 MessageRouter::RouteCache _routeCache;
1589 mutable Mutex _lock;
1590 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1591 amps_uint64_t _nameHashValue;
1592 BookmarkStore _bookmarkStore;
1593 Store _publishStore;
1594 bool _isRetryOnDisconnect;
1595 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1596 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1597 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1599 volatile amps_uint64_t _lastSentHaSequenceNumber;
1601 AMPS_ATOMIC_TYPE_8 _logonInProgress;
1602 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1603 VersionInfo _serverVersion;
1604 Timer _heartbeatTimer;
1605 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1608 int _queueAckTimeout;
1609 bool _isAutoAckEnabled;
1610 unsigned _ackBatchSize;
1611 unsigned _queuedAckCount;
1612 unsigned _defaultMaxDepth;
1613 struct QueueBookmarks
1615 QueueBookmarks(
const std::string& topic_)
1622 amps_uint64_t _oldestTime;
1623 unsigned _bookmarkCount;
1625 typedef amps_uint64_t topic_hash;
1626 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1627 TopicHashMap _topicHashMap;
1629 class ClientStoreReplayer :
public StoreReplayer
1631 ClientImpl* _client;
1636 ClientStoreReplayer()
1637 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1640 ClientStoreReplayer(ClientImpl* client_)
1641 : _client(client_), _version(0), _res(
AMPS_E_OK)
1644 void setClient(ClientImpl* client_)
1649 void execute(Message& message_)
1653 throw CommandException(
"Can't replay without a client.");
1657 if (index > _client->_lastSentHaSequenceNumber)
1659 _client->_lastSentHaSequenceNumber = index;
1666 if (!message_.getCommand().empty() &&
1667 (!_client->_logonInProgress ||
1668 message_.getOptions().len() < 6))
1671 message_.getMessage(),
1675 throw DisconnectedException(
"AMPS Server disconnected during replay");
1681 ClientStoreReplayer _replayer;
1683 class FailedWriteStoreReplayer :
public StoreReplayer
1685 ClientImpl* _parent;
1686 const char* _reason;
1687 size_t _reasonLength;
1688 size_t _replayCount;
1690 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1693 _reasonLength(reasonLength_),
1696 void execute(Message& message_)
1698 if (_parent->_failedWriteHandler)
1701 _parent->_failedWriteHandler->failedWrite(message_,
1702 _reason, _reasonLength);
1705 size_t replayCount(
void)
const
1707 return _replayCount;
1711 struct AckResponseImpl :
public RefBody
1713 std::string username, password, reason, status, bookmark, options;
1714 amps_uint64_t sequenceNo;
1715 amps_uint64_t nameHashValue;
1716 VersionInfo serverVersion;
1717 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1718 std::atomic<bool> responded;
1719 std::atomic<bool> abandoned;
1721 volatile bool responded;
1722 volatile bool abandoned;
1724 unsigned connectionVersion;
1727 username(), password(), reason(), status(), bookmark(), options(),
1728 sequenceNo((amps_uint64_t)0),
1732 connectionVersion(0)
1739 RefHandle<AckResponseImpl> _body;
1741 AckResponse() : _body(NULL) {;}
1742 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1743 static AckResponse create()
1746 r._body =
new AckResponseImpl();
1750 const std::string& username()
1752 return _body.get().username;
1754 void setUsername(
const char* data_,
size_t len_)
1758 _body.get().username.assign(data_, len_);
1762 _body.get().username.clear();
1765 const std::string& password()
1767 return _body.get().password;
1769 void setPassword(
const char* data_,
size_t len_)
1773 _body.get().password.assign(data_, len_);
1777 _body.get().password.clear();
1780 const std::string& reason()
1782 return _body.get().reason;
1784 void setReason(
const char* data_,
size_t len_)
1788 _body.get().reason.assign(data_, len_);
1792 _body.get().reason.clear();
1795 const std::string& status()
1797 return _body.get().status;
1799 void setStatus(
const char* data_,
size_t len_)
1803 _body.get().status.assign(data_, len_);
1807 _body.get().status.clear();
1810 const std::string& bookmark()
1812 return _body.get().bookmark;
1814 void setBookmark(
const Field& bookmark_)
1816 if (!bookmark_.empty())
1818 _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1819 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1820 _body.get().sequenceNo);
1824 _body.get().bookmark.clear();
1825 _body.get().sequenceNo = (amps_uint64_t)0;
1826 _body.get().nameHashValue = (amps_uint64_t)0;
1829 amps_uint64_t sequenceNo()
const
1831 return _body.get().sequenceNo;
1833 amps_uint64_t nameHashValue()
const
1835 return _body.get().nameHashValue;
1837 void setSequenceNo(
const char* data_,
size_t len_)
1839 amps_uint64_t result = (amps_uint64_t)0;
1842 for (
size_t i = 0; i < len_; ++i)
1844 result *= (amps_uint64_t)10;
1845 result += (amps_uint64_t)(data_[i] -
'0');
1848 _body.get().sequenceNo = result;
1850 VersionInfo serverVersion()
const
1852 return _body.get().serverVersion;
1854 void setServerVersion(
const char* data_,
size_t len_)
1858 _body.get().serverVersion.setVersion(std::string(data_, len_));
1863 return _body.get().responded;
1867 _body.get().responded =
true;
1871 return _body.get().abandoned;
1875 if (_body.isValid())
1877 _body.get().abandoned =
true;
1881 void setConnectionVersion(
unsigned connectionVersion)
1883 _body.get().connectionVersion = connectionVersion;
1886 unsigned getConnectionVersion()
1888 return _body.get().connectionVersion;
1890 void setOptions(
const char* data_,
size_t len_)
1894 _body.get().options.assign(data_, len_);
1898 _body.get().options.clear();
1902 const std::string& options()
1904 return _body.get().options;
1907 AckResponse& operator=(
const AckResponse& rhs)
1915 typedef std::map<std::string, AckResponse> AckMap;
1918 DefaultExceptionListener _defaultExceptionListener;
1921 struct DeferredExecutionRequest
1923 DeferredExecutionRequest(DeferredExecutionFunc func_,
1926 _userData(userData_)
1929 DeferredExecutionFunc _func;
1932 const ExceptionListener* _exceptionListener;
1933 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1934 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1935 volatile bool _connected;
1936 std::string _username;
1937 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1938 ConnectionStateListeners _connectionStateListeners;
1939 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1940 Mutex _deferredExecutionLock;
1941 DeferredExecutionList _deferredExecutionList;
1942 unsigned _heartbeatInterval;
1943 unsigned _readTimeout;
1951 if (!_connected && newState_ > ConnectionStateListener::Connected)
1955 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1957 AMPS_CALL_EXCEPTION_WRAPPER(
1958 (*it)->connectionStateChanged(newState_));
1961 unsigned processedAck(Message& message);
1962 unsigned persistedAck(Message& meesage);
1963 void lastChance(Message& message);
1964 void checkAndSendHeartbeat(
bool force =
false);
1965 virtual ConnectionInfo getConnectionInfo()
const;
1967 ClientImplMessageHandler(
amps_handle message,
void* userData);
1969 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1971 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1973 void unsubscribeInternal(
const std::string&
id)
1980 Message::Field subId;
1981 subId.assign(
id.data(),
id.length());
1982 _routes.removeRoute(subId);
1984 if (_subscriptionManager)
1987 Unlock<Mutex> unlock(_lock);
1988 _subscriptionManager->unsubscribe(subId);
1991 _message.setCommandEnum(Message::Command::Unsubscribe);
1992 _message.newCommandId();
1993 _message.setSubscriptionId(
id);
1994 _sendWithoutRetry(_message);
1995 deferredExecution(&s_noOpFn, NULL);
1998 AckResponse syncAckProcessing(
long timeout_, Message& message_,
1999 bool isHASubscribe_)
2001 return syncAckProcessing(timeout_, message_,
2002 (amps_uint64_t)0, isHASubscribe_);
2005 AckResponse syncAckProcessing(
long timeout_, Message& message_,
2006 amps_uint64_t haSeq = (amps_uint64_t)0,
2007 bool isHASubscribe_ =
false)
2010 AckResponse ack = AckResponse::create();
2013 Lock<Mutex> guard(_ackMapLock);
2014 _ackMap[message_.getCommandId()] = ack;
2016 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
2017 if (ack.getConnectionVersion() == 0)
2020 throw DisconnectedException(
"Connection closed while waiting for response.");
2022 bool timedOut =
false;
2023 AMPS_START_TIMER(timeout_)
2024 while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
2028 timedOut = !_lock.wait(timeout_);
2032 AMPS_RESET_TIMER(timedOut, timeout_);
2039 Unlock<Mutex> unlck(_lock);
2040 amps_invoke_waiting_function();
2043 if (ack.responded())
2045 if (ack.status() !=
"failure")
2047 if (message_.getCommand() ==
"logon")
2049 amps_uint64_t ackSequence = ack.sequenceNo();
2050 if (_lastSentHaSequenceNumber < ackSequence)
2052 _lastSentHaSequenceNumber = ackSequence;
2054 if (_publishStore.isValid())
2059 _publishStore.discardUpTo(ackSequence);
2060 if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2062 _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2065 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2066 _nameHashValue = ack.nameHashValue();
2067 _serverVersion = ack.serverVersion();
2068 if (_bookmarkStore.isValid())
2070 _bookmarkStore.setServerVersion(_serverVersion);
2075 const std::string& options = ack.options();
2076 size_t index = options.find_first_of(
"max_backlog=");
2077 if (index != std::string::npos)
2080 const char* c = options.c_str() + index + 12;
2081 while (*c && *c !=
',')
2083 data = (data * 10) + (
unsigned)(*c++ -48);
2085 if (_ackBatchSize > data)
2087 _ackBatchSize = data;
2093 const size_t NotEntitled = 12;
2094 std::string ackReason = ack.reason();
2095 if (ackReason.length() == 0)
2099 if (ackReason.length() == NotEntitled &&
2100 ackReason[0] ==
'n' &&
2101 message_.getUserId().len() == 0)
2103 message_.assignUserId(_username);
2105 message_.throwFor(_client, ackReason);
2109 if (!ack.abandoned())
2111 throw TimedOutException(
"timed out waiting for operation.");
2115 throw DisconnectedException(
"Connection closed while waiting for response.");
2129 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2130 _pEmptyMessageStream.reset(NULL);
2137 ClientImpl(
const std::string& clientName)
2138 : _client(NULL), _name(clientName)
2139 , _isRetryOnDisconnect(true)
2140 , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2141 , _badTimeToHASubscribe(0), _serverVersion()
2142 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2143 , _isAutoAckEnabled(false)
2145 , _queuedAckCount(0)
2146 , _defaultMaxDepth(0)
2148 , _heartbeatInterval(0)
2151 _replayer.setClient(
this);
2154 (amps_handler)ClientImpl::ClientImplMessageHandler,
2157 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2160 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2162 _exceptionListener = &_defaultExceptionListener;
2163 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2165 #ifdef AMPS_USE_EMPLACE
2166 _globalCommandTypeHandlers.emplace_back(MessageHandler());
2168 _globalCommandTypeHandlers.push_back(MessageHandler());
2173 virtual ~ClientImpl()
2178 const std::string& getName()
const
2183 const std::string& getNameHash()
const
2188 const amps_uint64_t getNameHashValue()
const
2190 return _nameHashValue;
2193 void setName(
const std::string& name)
2200 AMPSException::throwFor(_client, result);
2205 const std::string& getLogonCorrelationData()
const
2207 return _logonCorrelationData;
2210 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2212 _logonCorrelationData = logonCorrelationData_;
2215 size_t getServerVersion()
const
2217 return _serverVersion.getOldStyleVersion();
2220 VersionInfo getServerVersionInfo()
const
2222 return _serverVersion;
2225 const std::string& getURI()
const
2230 virtual void connect(
const std::string& uri)
2232 Lock<Mutex> l(_lock);
2236 virtual void _connect(
const std::string& uri)
2242 AMPSException::throwFor(_client, result);
2245 _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2246 _publishMessage.setCommandEnum(Message::Command::Publish);
2247 _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2248 _beatMessage.setOptions(
"beat");
2249 _readMessage.setClientImpl(
this);
2250 if (_queueAckTimeout)
2255 AMPSException::throwFor(_client, result);
2259 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2262 void setDisconnected()
2265 Lock<Mutex> l(_lock);
2268 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2271 _heartbeatTimer.setTimeout(0.0);
2278 virtual void disconnect()
2280 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2282 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2283 Lock<Mutex> l(_lock);
2284 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2287 void clearAcks(
unsigned failedVersion)
2290 Lock<Mutex> guard(_ackMapLock);
2293 std::vector<std::string> worklist;
2294 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2296 if (i->second.getConnectionVersion() <= failedVersion)
2298 i->second.setAbandoned();
2299 worklist.push_back(i->first);
2303 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2312 int send(
const Message& message)
2314 Lock<Mutex> l(_lock);
2315 return _send(message);
2318 void sendWithoutRetry(
const Message& message_)
2320 Lock<Mutex> l(_lock);
2323 if (_logonInProgress)
2325 throw DisconnectedException(
"The client has been disconnected.");
2327 _sendWithoutRetry(message_);
2330 void _sendWithoutRetry(
const Message& message_)
2335 AMPSException::throwFor(_client, result);
2339 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2340 bool isHASubscribe_ =
false)
2347 Message localMessage = message;
2348 unsigned version = 0;
2352 if (haSeq && _logonInProgress)
2356 if (!_isRetryOnDisconnect)
2360 if (!_lock.wait(1000))
2362 amps_invoke_waiting_function();
2367 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2368 (isHASubscribe_ && _badTimeToHASubscribe))
2370 return (
int)version;
2374 if (haSeq > _lastSentHaSequenceNumber)
2376 while (haSeq > _lastSentHaSequenceNumber + 1)
2381 if (!_publishStore.replaySingle(_replayer,
2382 _lastSentHaSequenceNumber + 1))
2388 version = _replayer._version;
2391 catch (
const DisconnectedException&)
2393 catch (
const DisconnectedException& e)
2396 result = _replayer._res;
2401 localMessage.getMessage(),
2403 ++_lastSentHaSequenceNumber;
2407 if (_logonInProgress && localMessage.getCommand().data()[0] !=
'l')
2409 while (_logonInProgress)
2411 if (!_lock.wait(1000))
2413 amps_invoke_waiting_function();
2418 localMessage.getMessage(),
2423 if (!isHASubscribe_ && !haSeq &&
2424 localMessage.getMessage() == message.getMessage())
2426 localMessage = message.deepCopy();
2428 if (_isRetryOnDisconnect)
2430 Unlock<Mutex> u(_lock);
2435 if ((isHASubscribe_ || haSeq) &&
2438 return (
int)version;
2445 AMPSException::throwFor(_client, result);
2451 amps_invoke_waiting_function();
2457 AMPSException::throwFor(_client, result);
2459 return (
int)version;
2462 void addMessageHandler(
const Field& commandId_,
2464 unsigned requestedAcks_, Message::Command::Type commandType_)
2466 Lock<Mutex> lock(_lock);
2467 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2471 bool removeMessageHandler(
const Field& commandId_)
2473 Lock<Mutex> lock(_lock);
2474 return _routes.removeRoute(commandId_);
2477 std::string send(
const MessageHandler& messageHandler_, Message& message_,
int timeout_ = 0)
2479 Field
id = message_.getCommandId();
2480 Field subId = message_.getSubscriptionId();
2481 Field qid = message_.getQueryId();
2482 bool isSubscribeOnly =
false;
2483 bool replace =
false;
2484 unsigned requestedAcks = message_.getAckTypeEnum();
2485 unsigned systemAddedAcks = Message::AckType::None;
2486 Message::Command::Type commandType = message_.getCommandEnum();
2488 switch (commandType)
2490 case Message::Command::Subscribe:
2491 case Message::Command::DeltaSubscribe:
2492 replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2493 isSubscribeOnly =
true;
2495 case Message::Command::SOWAndSubscribe:
2496 case Message::Command::SOWAndDeltaSubscribe:
2499 id = message_.newCommandId().getCommandId();
2503 while (!replace &&
id != subId && _routes.hasRoute(
id))
2505 id = message_.newCommandId().getCommandId();
2510 message_.setSubscriptionId(
id);
2513 if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2515 systemAddedAcks |= Message::AckType::Persisted;
2518 case Message::Command::SOW:
2521 id = message_.newCommandId().getCommandId();
2525 while (!replace &&
id != subId && _routes.hasRoute(
id))
2527 message_.newCommandId();
2530 qid = message_.getCommandId();
2531 message_.setQueryId(qid);
2533 id = message_.getCommandId();
2536 if (!isSubscribeOnly)
2540 message_.setQueryID(
id);
2545 while (!replace && qid != subId && qid !=
id
2546 && _routes.hasRoute(qid))
2548 qid = message_.newQueryId().getQueryId();
2552 systemAddedAcks |= Message::AckType::Processed;
2553 message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2555 int routesAdded = 0;
2556 Lock<Mutex> l(_lock);
2557 if (!subId.empty() && messageHandler_.isValid())
2559 if (!_routes.hasRoute(subId))
2565 _routes.addRoute(subId, messageHandler_, requestedAcks,
2566 systemAddedAcks, commandType);
2568 if (!isSubscribeOnly && !qid.empty()
2569 && messageHandler_.isValid() && qid != subId)
2571 if (routesAdded == 0)
2573 _routes.addRoute(qid, messageHandler_,
2574 requestedAcks, systemAddedAcks, commandType);
2580 Unlock<Mutex> u(_lock);
2581 data = amps_invoke_copy_route_function(
2582 messageHandler_.userData());
2586 _routes.addRoute(qid, messageHandler_, requestedAcks,
2587 systemAddedAcks, commandType);
2591 _routes.addRoute(qid,
2592 MessageHandler(messageHandler_.function(),
2594 requestedAcks, systemAddedAcks, commandType);
2599 if (!
id.empty() && messageHandler_.isValid()
2600 && requestedAcks & ~Message::AckType::Persisted
2601 &&
id != subId &&
id != qid)
2603 if (routesAdded == 0)
2605 _routes.addRoute(
id, messageHandler_, requestedAcks,
2606 systemAddedAcks, commandType);
2612 Unlock<Mutex> u(_lock);
2613 data = amps_invoke_copy_route_function(
2614 messageHandler_.userData());
2618 _routes.addRoute(
id, messageHandler_, requestedAcks,
2619 systemAddedAcks, commandType);
2623 _routes.addRoute(
id,
2624 MessageHandler(messageHandler_.function(),
2627 systemAddedAcks, commandType);
2636 syncAckProcessing(timeout_, message_, 0,
false);
2637 message_.setAckTypeEnum(requestedAcks);
2641 _routes.removeRoute(message_.getQueryID());
2642 _routes.removeRoute(message_.getSubscriptionId());
2643 _routes.removeRoute(
id);
2644 message_.setAckTypeEnum(requestedAcks);
2650 case Message::Command::Unsubscribe:
2651 case Message::Command::Heartbeat:
2652 case Message::Command::Logon:
2653 case Message::Command::StartTimer:
2654 case Message::Command::StopTimer:
2655 case Message::Command::SOWDelete:
2657 Lock<Mutex> l(_lock);
2659 if (message_.getAckTypeEnum() != Message::AckType::None)
2663 message_.newCommandId();
2664 id = message_.getCommandId();
2666 if (messageHandler_.isValid())
2668 _routes.addRoute(
id, messageHandler_, requestedAcks,
2669 Message::AckType::None, commandType);
2675 case Message::Command::DeltaPublish:
2676 case Message::Command::Publish:
2678 bool useSync = message_.getFilter().len() > 0;
2679 Lock<Mutex> l(_lock);
2681 unsigned ackType = message_.getAckTypeEnum();
2682 if (ackType != Message::AckType::None
2687 message_.newCommandId();
2688 id = message_.getCommandId();
2690 if (messageHandler_.isValid())
2692 _routes.addRoute(
id, messageHandler_, requestedAcks,
2693 Message::AckType::None, commandType);
2698 message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2699 syncAckProcessing(timeout_, message_, 0,
false);
2708 case Message::Command::GroupBegin:
2709 case Message::Command::GroupEnd:
2710 case Message::Command::OOF:
2711 case Message::Command::Ack:
2712 case Message::Command::Unknown:
2714 throw CommandException(
"Command type " + message_.getCommand() +
" can not be sent directly to AMPS");
2716 message_.setAckTypeEnum(requestedAcks);
2720 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2722 Lock<Mutex> l(_lock);
2723 _disconnectHandler = disconnectHandler;
2726 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2728 switch (command_[0])
2732 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2735 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2739 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2743 if (command_[6] ==
'b')
2745 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2747 else if (command_[6] ==
'e')
2749 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2753 std::ostringstream os;
2754 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2755 throw CommandException(os.str());
2759 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2763 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2767 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2771 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2774 std::ostringstream os;
2775 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2776 throw CommandException(os.str());
2781 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2786 case Message::Command::Publish:
2787 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2789 case Message::Command::SOW:
2790 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2793 case Message::Command::Heartbeat:
2794 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2797 case Message::Command::GroupBegin:
2798 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2800 case Message::Command::GroupEnd:
2801 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2803 case Message::Command::OOF:
2804 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2807 case Message::Command::Ack:
2808 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2812 unsigned command = command_;
2819 AMPS_snprintf(errBuf,
sizeof(errBuf),
2820 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2821 CommandConstants<0>::Lengths[bits],
2822 CommandConstants<0>::Values[bits]);
2823 throw CommandException(errBuf);
2828 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2830 _globalCommandTypeHandlers[handlerType_] = handler_;
2833 void setFailedWriteHandler(FailedWriteHandler* handler_)
2835 Lock<Mutex> l(_lock);
2836 _failedWriteHandler.reset(handler_);
2839 void setPublishStore(
const Store& publishStore_)
2841 Lock<Mutex> l(_lock);
2844 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2846 _publishStore = publishStore_;
2849 void setBookmarkStore(
const BookmarkStore& bookmarkStore_)
2851 Lock<Mutex> l(_lock);
2854 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2856 _bookmarkStore = bookmarkStore_;
2859 void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2861 Lock<Mutex> l(_lock);
2862 _subscriptionManager.reset(subscriptionManager_);
2865 SubscriptionManager* getSubscriptionManager()
const
2867 return const_cast<SubscriptionManager*
>(_subscriptionManager.get());
2870 DisconnectHandler getDisconnectHandler()
const
2872 return _disconnectHandler;
2875 MessageHandler getDuplicateMessageHandler()
const
2877 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2880 FailedWriteHandler* getFailedWriteHandler()
const
2882 return const_cast<FailedWriteHandler*
>(_failedWriteHandler.get());
2885 Store getPublishStore()
const
2887 return _publishStore;
2890 BookmarkStore getBookmarkStore()
const
2892 return _bookmarkStore;
2895 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2897 if (!_publishStore.isValid())
2899 Lock<Mutex> l(_lock);
2900 _publishMessage.assignTopic(topic_, topicLen_);
2901 _publishMessage.assignData(data_, dataLen_);
2902 _send(_publishMessage);
2907 publishStoreMessage.reset();
2909 return _publish(topic_, topicLen_, data_, dataLen_);
2913 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2914 size_t dataLen_,
unsigned long expiration_)
2916 if (!_publishStore.isValid())
2918 Lock<Mutex> l(_lock);
2919 _publishMessage.assignTopic(topic_, topicLen_);
2920 _publishMessage.assignData(data_, dataLen_);
2921 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2922 size_t pos = convertToCharArray(exprBuf, expiration_);
2923 _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2924 _send(_publishMessage);
2925 _publishMessage.assignExpiration(NULL, 0);
2930 publishStoreMessage.reset();
2931 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2932 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2935 AMPS_NUMBER_BUFFER_LEN - exprPos);
2936 return _publish(topic_, topicLen_, data_, dataLen_);
2940 class FlushAckHandler : ConnectionStateListener
2943 ClientImpl* _pClient;
2945 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2946 std::atomic<bool> _acked;
2947 std::atomic<bool> _disconnected;
2949 volatile bool _acked;
2950 volatile bool _disconnected;
2953 FlushAckHandler(ClientImpl* pClient_)
2954 : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2956 pClient_->addConnectionStateListener(
this);
2960 _pClient->removeConnectionStateListener(
this);
2961 _pClient->removeMessageHandler(_cmdId);
2964 void setCommandId(
const Field& cmdId_)
2966 _cmdId.deepCopy(cmdId_);
2968 void invoke(
const Message&)
2972 void connectionStateChanged(State state_)
2974 if (state_ <= Shutdown)
2976 _disconnected =
true;
2985 return _acked || _disconnected;
2989 void publishFlush(
long timeout_,
unsigned ackType_)
2991 static const char* processed =
"processed";
2992 static const size_t processedLen = strlen(processed);
2993 static const char* persisted =
"persisted";
2994 static const size_t persistedLen = strlen(persisted);
2995 static const char* flush =
"flush";
2996 static const size_t flushLen = strlen(flush);
2997 static VersionInfo minPersisted(
"5.3.3.0");
2998 static VersionInfo minFlush(
"4");
2999 if (ackType_ != Message::AckType::Processed
3000 && ackType_ != Message::AckType::Persisted)
3002 throw CommandException(
"Flush can only be used with processed or persisted acks.");
3004 FlushAckHandler flushHandler(
this);
3005 if (_serverVersion >= minFlush)
3007 Lock<Mutex> l(_lock);
3010 throw DisconnectedException(
"Not connected trying to flush");
3013 _message.newCommandId();
3014 _message.assignCommand(flush, flushLen);
3015 if (_serverVersion < minPersisted
3016 || ackType_ == Message::AckType::Processed)
3018 _message.assignAckType(processed, processedLen);
3022 _message.assignAckType(persisted, persistedLen);
3024 flushHandler.setCommandId(_message.getCommandId());
3025 addMessageHandler(_message.getCommandId(),
3026 std::bind(&FlushAckHandler::invoke,
3027 std::ref(flushHandler),
3028 std::placeholders::_1),
3029 ackType_, _message.getCommandEnum());
3030 NoDelay noDelay(_client);
3031 if (_send(_message) == -1)
3033 throw DisconnectedException(
"Disconnected trying to flush");
3036 if (_publishStore.isValid())
3040 _publishStore.flush(timeout_);
3042 catch (
const AMPSException& ex)
3044 AMPS_UNHANDLED_EXCEPTION(ex);
3048 else if (_serverVersion < minFlush)
3052 AMPS_USLEEP(timeout_ * 1000);
3056 AMPS_USLEEP(1000 * 1000);
3062 Timer timer((
double)timeout_);
3064 while (!timer.check() && !flushHandler.done())
3067 amps_invoke_waiting_function();
3072 while (!flushHandler.done())
3075 amps_invoke_waiting_function();
3079 if (!flushHandler.done())
3081 throw TimedOutException(
"Timed out waiting for flush");
3084 if (!flushHandler.acked() && !_publishStore.isValid())
3086 throw DisconnectedException(
"Disconnected waiting for flush");
3090 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3091 const char* data_,
size_t dataLength_)
3093 if (!_publishStore.isValid())
3095 Lock<Mutex> l(_lock);
3096 _deltaMessage.assignTopic(topic_, topicLength_);
3097 _deltaMessage.assignData(data_, dataLength_);
3098 _send(_deltaMessage);
3103 publishStoreMessage.reset();
3104 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish);
3105 return _publish(topic_, topicLength_, data_, dataLength_);
3109 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3110 const char* data_,
size_t dataLength_,
3111 unsigned long expiration_)
3113 if (!_publishStore.isValid())
3115 Lock<Mutex> l(_lock);
3116 _deltaMessage.assignTopic(topic_, topicLength_);
3117 _deltaMessage.assignData(data_, dataLength_);
3118 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3119 size_t pos = convertToCharArray(exprBuf, expiration_);
3120 _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3121 _send(_deltaMessage);
3122 _deltaMessage.assignExpiration(NULL, 0);
3127 publishStoreMessage.reset();
3128 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3129 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3130 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish)
3132 AMPS_NUMBER_BUFFER_LEN - exprPos);
3133 return _publish(topic_, topicLength_, data_, dataLength_);
3137 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3138 const char* data_,
size_t dataLength_)
3140 publishStoreMessage.
assignTopic(topic_, topicLength_)
3142 .assignData(data_, dataLength_);
3143 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3144 char buf[AMPS_NUMBER_BUFFER_LEN];
3145 size_t pos = convertToCharArray(buf, haSequenceNumber);
3146 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3148 Lock<Mutex> l(_lock);
3149 _send(publishStoreMessage, haSequenceNumber);
3151 return haSequenceNumber;
3154 virtual std::string logon(
long timeout_, Authenticator& authenticator_,
3155 const char* options_ = NULL)
3157 Lock<Mutex> l(_lock);
3158 return _logon(timeout_, authenticator_, options_);
3161 virtual std::string _logon(
long timeout_, Authenticator& authenticator_,
3162 const char* options_ = NULL)
3165 _message.newCommandId();
3166 std::string newCommandId = _message.getCommandId();
3167 _message.setCommandEnum(Message::Command::Logon);
3168 _message.setClientName(_name);
3169 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3170 _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3171 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3174 if (uri.user().size())
3176 _message.setUserId(uri.user());
3178 if (uri.password().size())
3180 _message.setPassword(uri.password());
3182 if (uri.protocol() ==
"amps" && uri.messageType().size())
3184 _message.setMessageType(uri.messageType());
3186 if (uri.isTrue(
"pretty"))
3188 _message.setOptions(
"pretty");
3191 _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3192 if (!_logonCorrelationData.empty())
3194 _message.assignCorrelationId(_logonCorrelationData);
3198 _message.setOptions(options_);
3200 _username = _message.getUserId();
3203 AtomicFlagFlip pubFlip(&_logonInProgress);
3204 NoDelay noDelay(_client);
3207 _message.setAckTypeEnum(Message::AckType::Processed);
3208 AckResponse ack = syncAckProcessing(timeout_, _message);
3209 if (ack.status() ==
"retry")
3211 _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3212 _username = ack.username();
3213 _message.setUserId(_username);
3217 authenticator_.completed(ack.username(), ack.password(), ack.reason());
3221 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3228 catch (
const AMPSException& ex)
3231 AMPS_UNHANDLED_EXCEPTION(ex);
3240 if (_publishStore.isValid())
3244 _publishStore.replay(_replayer);
3245 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3247 catch (
const PublishStoreGapException& ex)
3250 AMPS_UNHANDLED_EXCEPTION(ex);
3253 catch (
const StoreException& ex)
3256 std::ostringstream os;
3257 os <<
"A local store exception occurred while logging on."
3259 throw ConnectionException(os.str());
3261 catch (
const AMPSException& ex)
3264 AMPS_UNHANDLED_EXCEPTION(ex);
3267 catch (
const std::exception& ex)
3270 AMPS_UNHANDLED_EXCEPTION(ex);
3280 return newCommandId;
3283 std::string subscribe(
const MessageHandler& messageHandler_,
3284 const std::string& topic_,
3286 const std::string& filter_,
3287 const std::string& bookmark_,
3288 const std::string& options_,
3289 const std::string& subId_,
3290 bool isHASubscribe_ =
true)
3292 isHASubscribe_ &= (bool)_subscriptionManager;
3293 Lock<Mutex> l(_lock);
3295 _message.setCommandEnum(Message::Command::Subscribe);
3296 _message.newCommandId();
3297 std::string subId(subId_);
3300 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3302 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3305 subId = _message.getCommandId();
3307 _message.setSubscriptionId(subId);
3312 unsigned ackTypes = Message::AckType::Processed;
3314 if (!bookmark_.empty() && _bookmarkStore.isValid())
3316 ackTypes |= Message::AckType::Persisted;
3318 _message.setTopic(topic_);
3320 if (filter_.length())
3322 _message.setFilter(filter_);
3324 if (bookmark_.length())
3328 Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3329 _message.setBookmark(mostRecent);
3333 _message.setBookmark(bookmark_);
3334 if (_bookmarkStore.isValid())
3339 _bookmarkStore.log(_message);
3340 _bookmarkStore.discard(_message);
3341 _bookmarkStore.persisted(subIdField, _message.getBookmark());
3346 if (options_.length())
3348 _message.setOptions(options_);
3351 Message message = _message;
3354 message = _message.deepCopy();
3355 Unlock<Mutex> u(_lock);
3356 _subscriptionManager->subscribe(messageHandler_, message,
3357 Message::AckType::None);
3358 if (_badTimeToHASubscribe)
3363 if (!_routes.hasRoute(_message.getSubscriptionId()))
3365 _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3366 Message::AckType::None, ackTypes, _message.getCommandEnum());
3368 message.setAckTypeEnum(ackTypes);
3369 if (!options_.empty())
3371 message.setOptions(options_);
3375 syncAckProcessing(timeout_, message, isHASubscribe_);
3377 catch (
const DisconnectedException&)
3379 if (!isHASubscribe_)
3381 _routes.removeRoute(subIdField);
3386 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3390 catch (
const TimedOutException&)
3392 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3400 Unlock<Mutex> unlock(_lock);
3401 _subscriptionManager->unsubscribe(subIdField);
3403 _routes.removeRoute(subIdField);
3409 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3410 const std::string& topic_,
3412 const std::string& filter_,
3413 const std::string& bookmark_,
3414 const std::string& options_,
3415 const std::string& subId_ =
"",
3416 bool isHASubscribe_ =
true)
3418 isHASubscribe_ &= (bool)_subscriptionManager;
3419 Lock<Mutex> l(_lock);
3421 _message.setCommandEnum(Message::Command::DeltaSubscribe);
3422 _message.newCommandId();
3423 std::string subId(subId_);
3426 subId = _message.getCommandId();
3428 _message.setSubscriptionId(subId);
3433 unsigned ackTypes = Message::AckType::Processed;
3435 if (!bookmark_.empty() && _bookmarkStore.isValid())
3437 ackTypes |= Message::AckType::Persisted;
3439 _message.setTopic(topic_);
3440 if (filter_.length())
3442 _message.setFilter(filter_);
3444 if (bookmark_.length())
3448 Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3449 _message.setBookmark(mostRecent);
3453 _message.setBookmark(bookmark_);
3454 if (_bookmarkStore.isValid())
3459 _bookmarkStore.log(_message);
3460 _bookmarkStore.discard(_message);
3461 _bookmarkStore.persisted(subIdField, _message.getBookmark());
3466 if (options_.length())
3468 _message.setOptions(options_);
3470 Message message = _message;
3473 message = _message.deepCopy();
3474 Unlock<Mutex> u(_lock);
3475 _subscriptionManager->subscribe(messageHandler_, message,
3476 Message::AckType::None);
3477 if (_badTimeToHASubscribe)
3482 if (!_routes.hasRoute(_message.getSubscriptionId()))
3484 _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3485 Message::AckType::None, ackTypes, _message.getCommandEnum());
3487 message.setAckTypeEnum(ackTypes);
3488 if (!options_.empty())
3490 message.setOptions(options_);
3494 syncAckProcessing(timeout_, message, isHASubscribe_);
3496 catch (
const DisconnectedException&)
3498 if (!isHASubscribe_)
3500 _routes.removeRoute(subIdField);
3504 catch (
const TimedOutException&)
3506 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3514 Unlock<Mutex> unlock(_lock);
3515 _subscriptionManager->unsubscribe(subIdField);
3517 _routes.removeRoute(subIdField);
3523 void unsubscribe(
const std::string&
id)
3525 Lock<Mutex> l(_lock);
3526 unsubscribeInternal(
id);
3529 void unsubscribe(
void)
3531 if (_subscriptionManager)
3533 _subscriptionManager->clear();
3536 _routes.unsubscribeAll();
3537 Lock<Mutex> l(_lock);
3539 _message.setCommandEnum(Message::Command::Unsubscribe);
3540 _message.newCommandId();
3541 _message.setSubscriptionId(
"all");
3542 _sendWithoutRetry(_message);
3544 deferredExecution(&s_noOpFn, NULL);
3547 std::string sow(
const MessageHandler& messageHandler_,
3548 const std::string& topic_,
3549 const std::string& filter_ =
"",
3550 const std::string& orderBy_ =
"",
3551 const std::string& bookmark_ =
"",
3552 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3553 int topN_ = AMPS_DEFAULT_TOP_N,
3554 const std::string& options_ =
"",
3555 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3557 Lock<Mutex> l(_lock);
3559 _message.setCommandEnum(Message::Command::SOW);
3560 _message.newCommandId();
3562 std::string commandId = _message.getCommandId();
3563 _message.setQueryID(_message.getCommandId());
3564 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3565 _message.setAckTypeEnum(ackTypes);
3566 _message.setTopic(topic_);
3567 if (filter_.length())
3569 _message.setFilter(filter_);
3571 if (orderBy_.length())
3573 _message.setOrderBy(orderBy_);
3575 if (bookmark_.length())
3577 _message.setBookmark(bookmark_);
3579 _message.setBatchSize(AMPS::asString(batchSize_));
3580 if (topN_ != AMPS_DEFAULT_TOP_N)
3582 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3584 if (options_.length())
3586 _message.setOptions(options_);
3589 _routes.addRoute(_message.getQueryID(), messageHandler_,
3590 Message::AckType::None, ackTypes, _message.getCommandEnum());
3594 syncAckProcessing(timeout_, _message);
3598 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3605 std::string sow(
const MessageHandler& messageHandler_,
3606 const std::string& topic_,
3608 const std::string& filter_ =
"",
3609 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3610 int topN_ = AMPS_DEFAULT_TOP_N)
3613 return sow(messageHandler_,
3624 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3625 const std::string& topic_,
3626 const std::string& filter_ =
"",
3627 const std::string& orderBy_ =
"",
3628 const std::string& bookmark_ =
"",
3629 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3630 int topN_ = AMPS_DEFAULT_TOP_N,
3631 const std::string& options_ =
"",
3632 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3633 bool isHASubscribe_ =
true)
3635 isHASubscribe_ &= (bool)_subscriptionManager;
3636 unsigned ackTypes = Message::AckType::Processed;
3637 Lock<Mutex> l(_lock);
3639 _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3640 _message.newCommandId();
3641 Field cid = _message.getCommandId();
3642 std::string subId = cid;
3643 _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3644 if (filter_.length())
3646 _message.setFilter(filter_);
3648 if (orderBy_.length())
3650 _message.setOrderBy(orderBy_);
3652 if (bookmark_.length())
3654 _message.setBookmark(bookmark_);
3655 Message::Field bookmark = _message.getBookmark();
3656 if (_bookmarkStore.isValid())
3658 ackTypes |= Message::AckType::Persisted;
3661 _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3666 _bookmarkStore.log(_message);
3667 if (!BookmarkRange::isRange(bookmark))
3669 _bookmarkStore.discard(_message);
3670 _bookmarkStore.persisted(_message.getSubscriptionId(),
3680 _message.setBatchSize(AMPS::asString(batchSize_));
3681 if (topN_ != AMPS_DEFAULT_TOP_N)
3683 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3685 if (options_.length())
3687 _message.setOptions(options_);
3690 Message message = _message;
3693 message = _message.deepCopy();
3694 Unlock<Mutex> u(_lock);
3695 _subscriptionManager->subscribe(messageHandler_, message,
3696 Message::AckType::None);
3697 if (_badTimeToHASubscribe)
3702 _routes.addRoute(cid, messageHandler_,
3703 Message::AckType::None, ackTypes, _message.getCommandEnum());
3704 message.setAckTypeEnum(ackTypes);
3705 if (!options_.empty())
3707 message.setOptions(options_);
3711 syncAckProcessing(timeout_, message, isHASubscribe_);
3713 catch (
const DisconnectedException&)
3715 if (!isHASubscribe_)
3717 _routes.removeRoute(subId);
3721 catch (
const TimedOutException&)
3723 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3731 Unlock<Mutex> unlock(_lock);
3732 _subscriptionManager->unsubscribe(cid);
3734 _routes.removeRoute(subId);
3740 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3741 const std::string& topic_,
3743 const std::string& filter_ =
"",
3744 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3745 bool oofEnabled_ =
false,
3746 int topN_ = AMPS_DEFAULT_TOP_N,
3747 bool isHASubscribe_ =
true)
3750 return sowAndSubscribe(messageHandler_,
3757 (oofEnabled_ ?
"oof" :
""),
3762 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3763 const std::string& topic_,
3764 const std::string& filter_ =
"",
3765 const std::string& orderBy_ =
"",
3766 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3767 int topN_ = AMPS_DEFAULT_TOP_N,
3768 const std::string& options_ =
"",
3769 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3770 bool isHASubscribe_ =
true)
3772 isHASubscribe_ &= (bool)_subscriptionManager;
3773 Lock<Mutex> l(_lock);
3775 _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3776 _message.newCommandId();
3777 _message.setQueryID(_message.getCommandId());
3778 _message.setSubscriptionId(_message.getCommandId());
3779 std::string subId = _message.getSubscriptionId();
3780 _message.setTopic(topic_);
3781 if (filter_.length())
3783 _message.setFilter(filter_);
3785 if (orderBy_.length())
3787 _message.setOrderBy(orderBy_);
3789 _message.setBatchSize(AMPS::asString(batchSize_));
3790 if (topN_ != AMPS_DEFAULT_TOP_N)
3792 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3794 if (options_.length())
3796 _message.setOptions(options_);
3798 Message message = _message;
3801 message = _message.deepCopy();
3802 Unlock<Mutex> u(_lock);
3803 _subscriptionManager->subscribe(messageHandler_, message,
3804 Message::AckType::None);
3805 if (_badTimeToHASubscribe)
3810 _routes.addRoute(message.getQueryID(), messageHandler_,
3811 Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3812 message.setAckTypeEnum(Message::AckType::Processed);
3813 if (!options_.empty())
3815 message.setOptions(options_);
3819 syncAckProcessing(timeout_, message, isHASubscribe_);
3821 catch (
const DisconnectedException&)
3823 if (!isHASubscribe_)
3825 _routes.removeRoute(subId);
3829 catch (
const TimedOutException&)
3831 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3839 Unlock<Mutex> unlock(_lock);
3840 _subscriptionManager->unsubscribe(Field(subId));
3842 _routes.removeRoute(subId);
3848 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3849 const std::string& topic_,
3851 const std::string& filter_ =
"",
3852 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3853 bool oofEnabled_ =
false,
3854 bool sendEmpties_ =
false,
3855 int topN_ = AMPS_DEFAULT_TOP_N,
3856 bool isHASubscribe_ =
true)
3859 Message::Options options;
3864 if (sendEmpties_ ==
false)
3866 options.setNoEmpties();
3868 return sowAndDeltaSubscribe(messageHandler_,
3879 std::string sowDelete(
const MessageHandler& messageHandler_,
3880 const std::string& topic_,
3881 const std::string& filter_,
3883 Message::Field commandId_ = Message::Field())
3885 if (_publishStore.isValid())
3887 unsigned ackType = Message::AckType::Processed |
3888 Message::AckType::Stats |
3889 Message::AckType::Persisted;
3890 publishStoreMessage.reset();
3891 if (commandId_.empty())
3898 publishStoreMessage.
setCommandId(commandId_.data(), commandId_.len());
3906 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3907 char buf[AMPS_NUMBER_BUFFER_LEN];
3908 size_t pos = convertToCharArray(buf, haSequenceNumber);
3909 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3913 Lock<Mutex> l(_lock);
3914 _routes.addRoute(commandId_, messageHandler_,
3915 Message::AckType::Stats,
3916 Message::AckType::Processed | Message::AckType::Persisted,
3918 syncAckProcessing(timeout_, publishStoreMessage,
3921 catch (
const DisconnectedException&)
3928 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3932 return (std::string)commandId_;
3936 Lock<Mutex> l(_lock);
3938 if (commandId_.empty())
3940 _message.newCommandId();
3941 commandId_ = _message.getCommandId();
3945 _message.setCommandId(commandId_.data(), commandId_.len());
3947 _message.setCommandEnum(Message::Command::SOWDelete)
3948 .assignSubscriptionId(commandId_.data(), commandId_.len())
3949 .assignQueryID(commandId_.data(), commandId_.len())
3950 .setAckTypeEnum(Message::AckType::Processed |
3951 Message::AckType::Stats)
3952 .assignTopic(topic_.c_str(), topic_.length())
3953 .assignFilter(filter_.c_str(), filter_.length());
3954 _routes.addRoute(commandId_, messageHandler_,
3955 Message::AckType::Stats,
3956 Message::AckType::Processed,
3957 _message.getCommandEnum());
3960 syncAckProcessing(timeout_, _message);
3964 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3967 return (std::string)commandId_;
3971 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3972 const std::string& topic_,
3973 const std::string& data_,
3975 Message::Field commandId_ = Message::Field())
3977 if (_publishStore.isValid())
3979 unsigned ackType = Message::AckType::Processed |
3980 Message::AckType::Stats |
3981 Message::AckType::Persisted;
3982 publishStoreMessage.reset();
3983 if (commandId_.empty())
3990 publishStoreMessage.
setCommandId(commandId_.data(), commandId_.len());
3997 .assignData(data_.c_str(), data_.length());
3998 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3999 char buf[AMPS_NUMBER_BUFFER_LEN];
4000 size_t pos = convertToCharArray(buf, haSequenceNumber);
4001 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4005 Lock<Mutex> l(_lock);
4006 _routes.addRoute(commandId_, messageHandler_,
4007 Message::AckType::Stats,
4008 Message::AckType::Processed | Message::AckType::Persisted,
4010 syncAckProcessing(timeout_, publishStoreMessage,
4013 catch (
const DisconnectedException&)
4020 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4024 return (std::string)commandId_;
4028 Lock<Mutex> l(_lock);
4030 if (commandId_.empty())
4032 _message.newCommandId();
4033 commandId_ = _message.getCommandId();
4037 _message.setCommandId(commandId_.data(), commandId_.len());
4039 _message.setCommandEnum(Message::Command::SOWDelete)
4040 .assignSubscriptionId(commandId_.data(), commandId_.len())
4041 .assignQueryID(commandId_.data(), commandId_.len())
4042 .setAckTypeEnum(Message::AckType::Processed |
4043 Message::AckType::Stats)
4044 .assignTopic(topic_.c_str(), topic_.length())
4045 .assignData(data_.c_str(), data_.length());
4046 _routes.addRoute(commandId_, messageHandler_,
4047 Message::AckType::Stats,
4048 Message::AckType::Processed,
4049 _message.getCommandEnum());
4052 syncAckProcessing(timeout_, _message);
4056 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4059 return (std::string)commandId_;
4063 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4064 const std::string& topic_,
4065 const std::string& keys_,
4067 Message::Field commandId_ = Message::Field())
4069 if (_publishStore.isValid())
4071 unsigned ackType = Message::AckType::Processed |
4072 Message::AckType::Stats |
4073 Message::AckType::Persisted;
4074 publishStoreMessage.reset();
4075 if (commandId_.empty())
4082 publishStoreMessage.
setCommandId(commandId_.data(), commandId_.len());
4090 amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4091 char buf[AMPS_NUMBER_BUFFER_LEN];
4092 size_t pos = convertToCharArray(buf, haSequenceNumber);
4093 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4097 Lock<Mutex> l(_lock);
4098 _routes.addRoute(commandId_, messageHandler_,
4099 Message::AckType::Stats,
4100 Message::AckType::Processed | Message::AckType::Persisted,
4102 syncAckProcessing(timeout_, publishStoreMessage,
4105 catch (
const DisconnectedException&)
4112 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4116 return (std::string)commandId_;
4120 Lock<Mutex> l(_lock);
4122 if (commandId_.empty())
4124 _message.newCommandId();
4125 commandId_ = _message.getCommandId();
4129 _message.setCommandId(commandId_.data(), commandId_.len());
4131 _message.setCommandEnum(Message::Command::SOWDelete)
4132 .assignSubscriptionId(commandId_.data(), commandId_.len())
4133 .assignQueryID(commandId_.data(), commandId_.len())
4134 .setAckTypeEnum(Message::AckType::Processed |
4135 Message::AckType::Stats)
4136 .assignTopic(topic_.c_str(), topic_.length())
4137 .assignSowKeys(keys_.c_str(), keys_.length());
4138 _routes.addRoute(commandId_, messageHandler_,
4139 Message::AckType::Stats,
4140 Message::AckType::Processed,
4141 _message.getCommandEnum());
4144 syncAckProcessing(timeout_, _message);
4148 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4151 return (std::string)commandId_;
4155 void startTimer(
void)
4157 if (_serverVersion >=
"5.3.2.0")
4159 throw CommandException(
"The start_timer command is deprecated.");
4161 Lock<Mutex> l(_lock);
4163 _message.setCommandEnum(Message::Command::StartTimer);
4168 std::string stopTimer(MessageHandler messageHandler_)
4170 if (_serverVersion >=
"5.3.2.0")
4172 throw CommandException(
"The stop_timer command is deprecated.");
4174 return executeAsync(Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4189 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4191 _pExceptionListener = pListener_;
4192 _exceptionListener = _pExceptionListener.get();
4195 void setExceptionListener(
const ExceptionListener& listener_)
4197 _exceptionListener = &listener_;
4200 const ExceptionListener& getExceptionListener(
void)
const
4202 return *_exceptionListener;
4205 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4207 if (readTimeout_ < heartbeatInterval_)
4209 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4211 Lock<Mutex> l(_lock);
4212 if (_heartbeatInterval != heartbeatInterval_ ||
4213 _readTimeout != readTimeout_)
4215 _heartbeatInterval = heartbeatInterval_;
4216 _readTimeout = readTimeout_;
4221 void _sendHeartbeat(
void)
4223 if (_connected && _heartbeatInterval != 0)
4225 std::ostringstream options;
4226 options <<
"start," << _heartbeatInterval;
4227 _beatMessage.setOptions(options.str());
4229 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4230 _heartbeatTimer.start();
4233 _sendWithoutRetry(_beatMessage);
4234 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4236 catch (ConnectionException& ex_)
4240 AMPS_UNHANDLED_EXCEPTION(ex_);
4242 _beatMessage.setOptions(
"beat");
4245 if (_readTimeout && _connected)
4250 AMPSException::throwFor(_client, result);
4252 if (!_queueAckTimeout)
4255 (
int)(_heartbeatInterval * 1000));
4258 AMPSException::throwFor(_client, result);
4264 void addConnectionStateListener(ConnectionStateListener* listener_)
4266 Lock<Mutex> lock(_lock);
4267 _connectionStateListeners.insert(listener_);
4270 void removeConnectionStateListener(ConnectionStateListener* listener_)
4272 Lock<Mutex> lock(_lock);
4273 _connectionStateListeners.erase(listener_);
4276 void clearConnectionStateListeners()
4278 Lock<Mutex> lock(_lock);
4279 _connectionStateListeners.clear();
4282 void _registerHandler(Command& command_, Message::Field& cid_,
4283 MessageHandler& handler_,
unsigned requestedAcks_,
4284 unsigned systemAddedAcks_, Message::Command::Type commandType_)
4286 Message message = command_.getMessage();
4287 Message::Command::Type commandType = message.getCommandEnum();
4288 Message::Field subid = message.getSubscriptionId();
4289 Message::Field qid = message.getQueryID();
4291 bool added = qid.len() || subid.len() || cid_.len();
4292 bool cidIsQid = cid_ == qid;
4293 bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4295 if (subid.len() > 0)
4299 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4300 systemAddedAcks_, commandType_);
4302 && (commandType == Message::Command::Subscribe
4303 || commandType == Message::Command::DeltaSubscribe))
4310 if (qid.len() > 0 && qid != subid
4311 && (commandType == Message::Command::SOW
4312 || commandType == Message::Command::SOWDelete
4313 || commandType == Message::Command::SOWAndSubscribe
4314 || commandType == Message::Command::SOWAndDeltaSubscribe))
4316 while (_routes.hasRoute(qid))
4318 message.newQueryId();
4321 cid_ = message.getQueryId();
4323 qid = message.getQueryId();
4325 if (addedCount == 0)
4327 _routes.addRoute(qid, handler_, requestedAcks_,
4328 systemAddedAcks_, commandType_);
4334 Unlock<Mutex> u(_lock);
4335 data = amps_invoke_copy_route_function(handler_.userData());
4339 _routes.addRoute(qid, handler_, requestedAcks_,
4340 systemAddedAcks_, commandType_);
4344 _routes.addRoute(qid,
4345 MessageHandler(handler_.function(),
4348 systemAddedAcks_, commandType_);
4353 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4355 while (_routes.hasRoute(cid_))
4357 cid_ = message.newCommandId().getCommandId();
4359 if (addedCount == 0)
4361 _routes.addRoute(cid_, handler_, requestedAcks_,
4362 systemAddedAcks_, commandType_);
4368 Unlock<Mutex> u(_lock);
4369 data = amps_invoke_copy_route_function(handler_.userData());
4373 _routes.addRoute(cid_, handler_, requestedAcks_,
4374 systemAddedAcks_, commandType_);
4378 _routes.addRoute(cid_,
4379 MessageHandler(handler_.function(),
4382 systemAddedAcks_, commandType_);
4386 else if ((commandType == Message::Command::Publish ||
4387 commandType == Message::Command::DeltaPublish)
4388 && requestedAcks_ & ~Message::AckType::Persisted)
4390 cid_ = command_.getMessage().newCommandId().getCommandId();
4391 _routes.addRoute(cid_, handler_, requestedAcks_,
4392 systemAddedAcks_, commandType_);
4397 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4401 std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4402 bool isHASubscribe_ =
true)
4404 isHASubscribe_ &= (bool)_subscriptionManager;
4405 Message& message = command_.getMessage();
4406 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4407 Message::AckType::Processed : Message::AckType::None;
4408 unsigned requestedAcks = message.getAckTypeEnum();
4409 bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4410 Message::Command::Type commandType = message.getCommandEnum();
4411 if (commandType == Message::Command::StopTimer)
4413 systemAddedAcks |= Message::AckType::Completed;
4415 Message::Field cid = message.getCommandId();
4416 if (handler_.isValid() && cid.empty())
4418 cid = message.newCommandId().getCommandId();
4420 if (message.getBookmark().len() > 0)
4422 if (command_.isSubscribe())
4424 Message::Field bookmark = message.getBookmark();
4425 if (_bookmarkStore.isValid())
4427 systemAddedAcks |= Message::AckType::Persisted;
4430 message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4435 _bookmarkStore.log(message);
4436 if (!BookmarkRange::isRange(bookmark))
4438 _bookmarkStore.discard(message);
4439 _bookmarkStore.persisted(message.getSubscriptionId(),
4452 systemAddedAcks |= Message::AckType::Persisted;
4454 bool isSubscribe = command_.isSubscribe();
4455 if (handler_.isValid() && !isSubscribe)
4457 _registerHandler(command_, cid, handler_,
4458 requestedAcks, systemAddedAcks, commandType);
4462 bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4463 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4464 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4466 Unlock<Mutex> u(_lock);
4467 haSequenceNumber = _publishStore.store(message);
4469 message.setSequence(haSequenceNumber);
4474 syncAckProcessing((
long)command_.getTimeout(), message,
4479 _send(message, haSequenceNumber);
4482 catch (
const DisconnectedException&)
4489 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4497 const Message::Field& subId = message.getSubscriptionId();
4500 Unlock<Mutex> u(_lock);
4501 _subscriptionManager->subscribe(handler_,
4504 if (_badTimeToHASubscribe)
4506 message.setAckTypeEnum(requestedAcks);
4507 return std::string(subId.data(), subId.len());
4510 if (handler_.isValid())
4512 _registerHandler(command_, cid, handler_,
4513 requestedAcks, systemAddedAcks, commandType);
4515 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4518 syncAckProcessing((
long)command_.getTimeout(), message,
4521 catch (
const DisconnectedException&)
4523 if (!isHASubscribe_)
4525 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4526 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4527 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4528 message.setAckTypeEnum(requestedAcks);
4532 catch (
const TimedOutException&)
4534 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4535 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4536 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4537 message.setAckTypeEnum(requestedAcks);
4545 Unlock<Mutex> unlock(_lock);
4546 _subscriptionManager->unsubscribe(subId);
4548 if (message.getQueryID().len() > 0)
4550 _routes.removeRoute(message.getQueryID());
4552 _routes.removeRoute(cid);
4553 _routes.removeRoute(subId);
4554 message.setAckTypeEnum(requestedAcks);
4557 if (subId.len() > 0)
4559 message.setAckTypeEnum(requestedAcks);
4560 return std::string(subId.data(), subId.len());
4566 bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4567 || (cid.len() > 0 && command_.hasProcessedAck());
4568 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4573 syncAckProcessing((
long)(command_.getTimeout()), message);
4580 catch (
const TimedOutException&)
4582 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4583 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4584 message.setAckTypeEnum(requestedAcks);
4587 catch (
const DisconnectedException&)
4589 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4590 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4591 message.setAckTypeEnum(requestedAcks);
4596 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4597 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4598 message.setAckTypeEnum(requestedAcks);
4603 message.setAckTypeEnum(requestedAcks);
4607 MessageStream getEmptyMessageStream(
void);
4609 std::string executeAsync(Command& command_, MessageHandler& handler_,
4610 bool isHASubscribe_ =
true)
4612 Lock<Mutex> lock(_lock);
4613 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4617 void setAutoAck(
bool isAutoAckEnabled_)
4619 _isAutoAckEnabled = isAutoAckEnabled_;
4621 bool getAutoAck(
void)
const
4623 return _isAutoAckEnabled;
4625 void setAckBatchSize(
const unsigned batchSize_)
4627 _ackBatchSize = batchSize_;
4628 if (!_queueAckTimeout)
4630 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4634 unsigned getAckBatchSize(
void)
const
4636 return _ackBatchSize;
4638 int getAckTimeout(
void)
const
4640 return _queueAckTimeout;
4642 void setAckTimeout(
const int ackTimeout_)
4645 _queueAckTimeout = ackTimeout_;
4647 size_t _ack(QueueBookmarks& queueBookmarks_)
4649 if (queueBookmarks_._bookmarkCount)
4651 publishStoreMessage.reset();
4656 amps_uint64_t haSequenceNumber = 0;
4657 if (_publishStore.isValid())
4659 haSequenceNumber = _publishStore.store(publishStoreMessage);
4662 queueBookmarks_._data.erase();
4663 queueBookmarks_._bookmarkCount = 0;
4665 _send(publishStoreMessage, haSequenceNumber);
4666 if (!_publishStore.isValid())
4668 queueBookmarks_._data.erase();
4669 queueBookmarks_._bookmarkCount = 0;
4675 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4677 if (_isAutoAckEnabled)
4681 _ack(topic_, bookmark_, options_);
4683 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4685 if (bookmark_.len() == 0)
4689 Lock<Mutex> lock(_lock);
4690 if (_ackBatchSize < 2 || options_ != NULL)
4692 publishStoreMessage.reset();
4700 amps_uint64_t haSequenceNumber = 0;
4701 if (_publishStore.isValid())
4703 haSequenceNumber = _publishStore.store(publishStoreMessage);
4707 _send(publishStoreMessage, haSequenceNumber);
4711 topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4712 TopicHashMap::iterator it = _topicHashMap.find(hash);
4713 if (it == _topicHashMap.end())
4716 #ifdef AMPS_USE_EMPLACE
4717 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4719 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4722 QueueBookmarks& queueBookmarks = it->second;
4723 if (queueBookmarks._data.length())
4725 queueBookmarks._data.append(
",");
4729 queueBookmarks._oldestTime = amps_now();
4731 queueBookmarks._data.append(bookmark_);
4732 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4734 _ack(queueBookmarks);
4737 void flushAcks(
void)
4739 size_t sendCount = 0;
4746 Lock<Mutex> lock(_lock);
4747 typedef TopicHashMap::iterator iterator;
4748 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4750 QueueBookmarks& queueBookmarks = it->second;
4751 sendCount += _ack(queueBookmarks);
4754 if (sendCount && _connected)
4756 publishFlush(0, Message::AckType::Processed);
4760 void checkQueueAcks(
void)
4762 if (!_topicHashMap.size())
4766 Lock<Mutex> lock(_lock);
4769 amps_uint64_t threshold = amps_now()
4770 - (amps_uint64_t)_queueAckTimeout;
4771 typedef TopicHashMap::iterator iterator;
4772 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4774 QueueBookmarks& queueBookmarks = it->second;
4775 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4777 _ack(queueBookmarks);
4781 catch (std::exception& ex)
4783 AMPS_UNHANDLED_EXCEPTION(ex);
4787 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4789 Lock<Mutex> lock(_deferredExecutionLock);
4790 #ifdef AMPS_USE_EMPLACE
4791 _deferredExecutionList.emplace_back(
4792 DeferredExecutionRequest(func_, userData_));
4794 _deferredExecutionList.push_back(
4795 DeferredExecutionRequest(func_, userData_));
4799 inline void processDeferredExecutions(
void)
4801 if (_deferredExecutionList.size())
4803 Lock<Mutex> lock(_deferredExecutionLock);
4804 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4805 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4806 for (; it != end; ++it)
4810 it->_func(it->_userData);
4818 _deferredExecutionList.clear();
4819 _routes.invalidateCache();
4820 _routeCache.invalidateCache();
4824 bool getRetryOnDisconnect(
void)
const
4826 return _isRetryOnDisconnect;
4829 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4831 _isRetryOnDisconnect = isRetryOnDisconnect_;
4834 void setDefaultMaxDepth(
unsigned maxDepth_)
4836 _defaultMaxDepth = maxDepth_;
4839 unsigned getDefaultMaxDepth(
void)
const
4841 return _defaultMaxDepth;
4933 RefHandle<MessageStreamImpl> _body;
4943 inline void advance(
void);
4950 : _pStream(pStream_)
4955 bool operator==(
const iterator& rhs)
const
4957 return _pStream == rhs._pStream;
4959 bool operator!=(
const iterator& rhs)
const
4961 return _pStream != rhs._pStream;
4963 void operator++(
void)
4979 return _body.isValid();
4986 if (!_body.isValid())
4988 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5027 inline void setSOWOnly(
const std::string& commandId_,
5028 const std::string& queryId_ =
"");
5029 inline void setSubscription(
const std::string& subId_,
5030 const std::string& commandId_ =
"",
5031 const std::string& queryId_ =
"");
5032 inline void setStatsOnly(
const std::string& commandId_,
5033 const std::string& queryId_ =
"");
5034 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5066 BorrowRefHandle<ClientImpl> _body;
5068 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5069 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5070 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5081 : _body(new ClientImpl(clientName), true)
5084 Client(ClientImpl* existingClient)
5085 : _body(existingClient, true)
5088 Client(ClientImpl* existingClient,
bool isRef)
5089 : _body(existingClient, isRef)
5093 virtual ~
Client(
void) {;}
5103 return _body.isValid();
5120 _body.get().setName(name);
5127 return _body.get().getName();
5135 return _body.get().getNameHash();
5143 return _body.get().getNameHashValue();
5154 _body.get().setLogonCorrelationData(logonCorrelationData_);
5161 return _body.get().getLogonCorrelationData();
5174 return _body.get().getServerVersion();
5185 return _body.get().getServerVersionInfo();
5199 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5214 return AMPS::convertVersionToNumber(data_, len_);
5221 return _body.get().getURI();
5245 _body.get().connect(uri);
5252 _body.get().disconnect();
5270 _body.get().send(message);
5283 unsigned requestedAcks_,
bool isSubscribe_)
5285 Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5286 _body.get().addMessageHandler(commandId_, messageHandler_,
5287 requestedAcks_, commandType);
5300 unsigned requestedAcks_, Message::Command::Type commandType_)
5302 _body.get().addMessageHandler(commandId_, messageHandler_,
5303 requestedAcks_, commandType_);
5311 return _body.get().removeMessageHandler(commandId_);
5339 return _body.get().send(messageHandler, message, timeout);
5353 _body.get().setDisconnectHandler(disconnectHandler);
5361 return _body.get().getDisconnectHandler();
5370 return _body.get().getConnectionInfo();
5383 _body.get().setBookmarkStore(bookmarkStore_);
5391 return _body.
get().getBookmarkStore();
5399 return _body.get().getSubscriptionManager();
5411 _body.get().setSubscriptionManager(subscriptionManager_);
5435 _body.get().setPublishStore(publishStore_);
5443 return _body.
get().getPublishStore();
5451 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5452 duplicateMessageHandler_);
5466 return _body.get().getDuplicateMessageHandler();
5480 _body.get().setFailedWriteHandler(handler_);
5488 return _body.get().getFailedWriteHandler();
5509 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5511 return _body.get().publish(topic_.c_str(), topic_.length(),
5512 data_.c_str(), data_.length());
5534 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5535 const char* data_,
size_t dataLength_)
5537 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5558 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5559 unsigned long expiration_)
5561 return _body.get().publish(topic_.c_str(), topic_.length(),
5562 data_.c_str(), data_.length(), expiration_);
5585 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5586 const char* data_,
size_t dataLength_,
5587 unsigned long expiration_)
5589 return _body.get().publish(topic_, topicLength_,
5590 data_, dataLength_, expiration_);
5631 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5633 _body.get().publishFlush(timeout_, ackType_);
5652 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5654 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5655 data_.c_str(), data_.length());
5676 const char* data_,
size_t dataLength_)
5678 return _body.get().deltaPublish(topic_, topicLength_,
5679 data_, dataLength_);
5698 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5699 unsigned long expiration_)
5701 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5702 data_.c_str(), data_.length(),
5725 const char* data_,
size_t dataLength_,
5726 unsigned long expiration_)
5728 return _body.get().deltaPublish(topic_, topicLength_,
5729 data_, dataLength_, expiration_);
5749 const char* options_ = NULL)
5751 return _body.get().logon(timeout_, authenticator_, options_);
5766 std::string
logon(
const char* options_,
int timeout_ = 0)
5785 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5811 const std::string& topic_,
5813 const std::string& filter_ =
"",
5814 const std::string& options_ =
"",
5815 const std::string& subId_ =
"")
5817 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5818 filter_,
"", options_, subId_);
5837 long timeout_ = 0,
const std::string& filter_ =
"",
5838 const std::string& options_ =
"",
5839 const std::string& subId_ =
"")
5842 if (_body.get().getDefaultMaxDepth())
5844 result.
maxDepth(_body.get().getDefaultMaxDepth());
5846 result.setSubscription(_body.get().subscribe(
5848 topic_, timeout_, filter_,
"",
5849 options_, subId_,
false));
5869 long timeout_ = 0,
const std::string& filter_ =
"",
5870 const std::string& options_ =
"",
5871 const std::string& subId_ =
"")
5874 if (_body.get().getDefaultMaxDepth())
5876 result.
maxDepth(_body.get().getDefaultMaxDepth());
5878 result.setSubscription(_body.get().subscribe(
5880 topic_, timeout_, filter_,
"",
5881 options_, subId_,
false));
5898 const std::string& topic_,
5900 const std::string& filter_ =
"",
5901 const std::string& options_ =
"",
5902 const std::string& subId_ =
"")
5904 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5905 filter_,
"", options_, subId_);
5916 long timeout_,
const std::string& filter_ =
"",
5917 const std::string& options_ =
"",
5918 const std::string& subId_ =
"")
5921 if (_body.get().getDefaultMaxDepth())
5923 result.
maxDepth(_body.get().getDefaultMaxDepth());
5925 result.setSubscription(_body.get().deltaSubscribe(
5927 topic_, timeout_, filter_,
"",
5928 options_, subId_,
false));
5934 long timeout_,
const std::string& filter_ =
"",
5935 const std::string& options_ =
"",
5936 const std::string& subId_ =
"")
5939 if (_body.get().getDefaultMaxDepth())
5941 result.
maxDepth(_body.get().getDefaultMaxDepth());
5943 result.setSubscription(_body.get().deltaSubscribe(
5945 topic_, timeout_, filter_,
"",
5946 options_, subId_,
false));
5976 const std::string& topic_,
5978 const std::string& bookmark_,
5979 const std::string& filter_ =
"",
5980 const std::string& options_ =
"",
5981 const std::string& subId_ =
"")
5983 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5984 filter_, bookmark_, options_, subId_);
6005 const std::string& bookmark_,
6006 const std::string& filter_ =
"",
6007 const std::string& options_ =
"",
6008 const std::string& subId_ =
"")
6011 if (_body.get().getDefaultMaxDepth())
6013 result.
maxDepth(_body.get().getDefaultMaxDepth());
6015 result.setSubscription(_body.get().subscribe(
6017 topic_, timeout_, filter_,
6018 bookmark_, options_,
6026 const std::string& bookmark_,
6027 const std::string& filter_ =
"",
6028 const std::string& options_ =
"",
6029 const std::string& subId_ =
"")
6032 if (_body.get().getDefaultMaxDepth())
6034 result.
maxDepth(_body.get().getDefaultMaxDepth());
6036 result.setSubscription(_body.get().subscribe(
6038 topic_, timeout_, filter_,
6039 bookmark_, options_,
6054 return _body.get().unsubscribe(commandId);
6066 return _body.get().unsubscribe();
6100 const std::string& topic_,
6101 const std::string& filter_ =
"",
6102 const std::string& orderBy_ =
"",
6103 const std::string& bookmark_ =
"",
6104 int batchSize_ = DEFAULT_BATCH_SIZE,
6105 int topN_ = DEFAULT_TOP_N,
6106 const std::string& options_ =
"",
6107 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6109 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6110 bookmark_, batchSize_, topN_, options_,
6138 const std::string& filter_ =
"",
6139 const std::string& orderBy_ =
"",
6140 const std::string& bookmark_ =
"",
6141 int batchSize_ = DEFAULT_BATCH_SIZE,
6142 int topN_ = DEFAULT_TOP_N,
6143 const std::string& options_ =
"",
6144 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6147 if (_body.get().getDefaultMaxDepth())
6149 result.
maxDepth(_body.get().getDefaultMaxDepth());
6151 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6152 topic_, filter_, orderBy_, bookmark_,
6153 batchSize_, topN_, options_, timeout_));
6159 const std::string& filter_ =
"",
6160 const std::string& orderBy_ =
"",
6161 const std::string& bookmark_ =
"",
6162 int batchSize_ = DEFAULT_BATCH_SIZE,
6163 int topN_ = DEFAULT_TOP_N,
6164 const std::string& options_ =
"",
6165 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6168 if (_body.get().getDefaultMaxDepth())
6170 result.
maxDepth(_body.get().getDefaultMaxDepth());
6172 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6173 topic_, filter_, orderBy_, bookmark_,
6174 batchSize_, topN_, options_, timeout_));
6200 const std::string& topic_,
6202 const std::string& filter_ =
"",
6203 int batchSize_ = DEFAULT_BATCH_SIZE,
6204 int topN_ = DEFAULT_TOP_N)
6206 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6232 const std::string& topic_,
6234 const std::string& filter_ =
"",
6235 int batchSize_ = DEFAULT_BATCH_SIZE,
6236 bool oofEnabled_ =
false,
6237 int topN_ = DEFAULT_TOP_N)
6239 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6240 filter_, batchSize_, oofEnabled_,
6265 const std::string& filter_ =
"",
6266 int batchSize_ = DEFAULT_BATCH_SIZE,
6267 bool oofEnabled_ =
false,
6268 int topN_ = DEFAULT_TOP_N)
6271 if (_body.get().getDefaultMaxDepth())
6273 result.
maxDepth(_body.get().getDefaultMaxDepth());
6275 result.setSubscription(_body.get().sowAndSubscribe(
6277 topic_, timeout_, filter_,
6278 batchSize_, oofEnabled_,
6303 const std::string& filter_ =
"",
6304 int batchSize_ = DEFAULT_BATCH_SIZE,
6305 bool oofEnabled_ =
false,
6306 int topN_ = DEFAULT_TOP_N)
6309 if (_body.get().getDefaultMaxDepth())
6311 result.
maxDepth(_body.get().getDefaultMaxDepth());
6313 result.setSubscription(_body.get().sowAndSubscribe(
6315 topic_, timeout_, filter_,
6316 batchSize_, oofEnabled_,
6350 const std::string& topic_,
6351 const std::string& filter_ =
"",
6352 const std::string& orderBy_ =
"",
6353 const std::string& bookmark_ =
"",
6354 int batchSize_ = DEFAULT_BATCH_SIZE,
6355 int topN_ = DEFAULT_TOP_N,
6356 const std::string& options_ =
"",
6357 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6359 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6360 orderBy_, bookmark_, batchSize_,
6361 topN_, options_, timeout_);
6389 const std::string& filter_ =
"",
6390 const std::string& orderBy_ =
"",
6391 const std::string& bookmark_ =
"",
6392 int batchSize_ = DEFAULT_BATCH_SIZE,
6393 int topN_ = DEFAULT_TOP_N,
6394 const std::string& options_ =
"",
6395 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6398 if (_body.get().getDefaultMaxDepth())
6400 result.
maxDepth(_body.get().getDefaultMaxDepth());
6402 result.setSubscription(_body.get().sowAndSubscribe(
6404 topic_, filter_, orderBy_,
6405 bookmark_, batchSize_, topN_,
6406 options_, timeout_,
false));
6412 const std::string& filter_ =
"",
6413 const std::string& orderBy_ =
"",
6414 const std::string& bookmark_ =
"",
6415 int batchSize_ = DEFAULT_BATCH_SIZE,
6416 int topN_ = DEFAULT_TOP_N,
6417 const std::string& options_ =
"",
6418 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6421 if (_body.get().getDefaultMaxDepth())
6423 result.
maxDepth(_body.get().getDefaultMaxDepth());
6425 result.setSubscription(_body.get().sowAndSubscribe(
6427 topic_, filter_, orderBy_,
6428 bookmark_, batchSize_, topN_,
6429 options_, timeout_,
false));
6458 const std::string& topic_,
6459 const std::string& filter_ =
"",
6460 const std::string& orderBy_ =
"",
6461 int batchSize_ = DEFAULT_BATCH_SIZE,
6462 int topN_ = DEFAULT_TOP_N,
6463 const std::string& options_ =
"",
6464 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6466 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6467 filter_, orderBy_, batchSize_,
6468 topN_, options_, timeout_);
6491 const std::string& filter_ =
"",
6492 const std::string& orderBy_ =
"",
6493 int batchSize_ = DEFAULT_BATCH_SIZE,
6494 int topN_ = DEFAULT_TOP_N,
6495 const std::string& options_ =
"",
6496 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6499 if (_body.get().getDefaultMaxDepth())
6501 result.
maxDepth(_body.get().getDefaultMaxDepth());
6503 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6505 topic_, filter_, orderBy_,
6506 batchSize_, topN_, options_,
6513 const std::string& filter_ =
"",
6514 const std::string& orderBy_ =
"",
6515 int batchSize_ = DEFAULT_BATCH_SIZE,
6516 int topN_ = DEFAULT_TOP_N,
6517 const std::string& options_ =
"",
6518 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6521 if (_body.get().getDefaultMaxDepth())
6523 result.
maxDepth(_body.get().getDefaultMaxDepth());
6525 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6527 topic_, filter_, orderBy_,
6528 batchSize_, topN_, options_,
6558 const std::string& topic_,
6560 const std::string& filter_ =
"",
6561 int batchSize_ = DEFAULT_BATCH_SIZE,
6562 bool oofEnabled_ =
false,
6563 bool sendEmpties_ =
false,
6564 int topN_ = DEFAULT_TOP_N)
6566 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6567 timeout_, filter_, batchSize_,
6568 oofEnabled_, sendEmpties_,
6595 const std::string& filter_ =
"",
6596 int batchSize_ = DEFAULT_BATCH_SIZE,
6597 bool oofEnabled_ =
false,
6598 bool sendEmpties_ =
false,
6599 int topN_ = DEFAULT_TOP_N)
6602 if (_body.get().getDefaultMaxDepth())
6604 result.
maxDepth(_body.get().getDefaultMaxDepth());
6606 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6608 topic_, timeout_, filter_,
6609 batchSize_, oofEnabled_,
6610 sendEmpties_, topN_,
false));
6636 const std::string& filter_ =
"",
6637 int batchSize_ = DEFAULT_BATCH_SIZE,
6638 bool oofEnabled_ =
false,
6639 bool sendEmpties_ =
false,
6640 int topN_ = DEFAULT_TOP_N)
6643 if (_body.get().getDefaultMaxDepth())
6645 result.
maxDepth(_body.get().getDefaultMaxDepth());
6647 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6649 topic_, timeout_, filter_,
6650 batchSize_, oofEnabled_,
6651 sendEmpties_, topN_,
false));
6674 const std::string& topic,
6675 const std::string& filter,
6678 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6706 stream.setStatsOnly(cid);
6707 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6708 return *(stream.
begin());
6710 catch (
const DisconnectedException&)
6723 _body.get().startTimer();
6734 return _body.get().stopTimer(messageHandler);
6759 const std::string& topic_,
6760 const std::string& keys_,
6763 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6795 stream.setStatsOnly(cid);
6796 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6797 return *(stream.
begin());
6799 catch (
const DisconnectedException&)
6821 const std::string& topic_,
const std::string& data_,
6824 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6851 stream.setStatsOnly(cid);
6852 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6853 return *(stream.
begin());
6855 catch (
const DisconnectedException&)
6867 return _body.get().getHandle();
6880 _body.get().setExceptionListener(pListener_);
6893 _body.get().setExceptionListener(listener_);
6900 return _body.get().getExceptionListener();
6926 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6950 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6963 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6989 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7014 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7097 _body.get().addConnectionStateListener(listener);
7105 _body.get().removeConnectionStateListener(listener);
7112 _body.get().clearConnectionStateListeners();
7142 return _body.get().executeAsync(command_, handler_);
7180 if (command_.isSubscribe())
7182 Message& message = command_.getMessage();
7185 if (useExistingHandler)
7188 if (_body.get()._routes.getRoute(subId, existingHandler))
7191 _body.get().executeAsync(command_, existingHandler,
false);
7196 id = _body.get().executeAsync(command_, handler_,
false);
7198 catch (
const DisconnectedException&)
7201 if (command_.isSubscribe())
7205 if (command_.isSow())
7238 _body.get().ack(topic_, bookmark_, options_);
7260 void ack(
const std::string& topic_,
const std::string& bookmark_,
7261 const char* options_ = NULL)
7263 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7271 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7273 _body.get()._ack(topic_, bookmark_, options_);
7286 _body.get().flushAcks();
7295 return _body.get().getAutoAck();
7305 _body.get().setAutoAck(isAutoAckEnabled_);
7313 return _body.get().getAckBatchSize();
7323 _body.get().setAckBatchSize(ackBatchSize_);
7334 return _body.get().getAckTimeout();
7346 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7348 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7350 _body.get().setAckTimeout(ackTimeout_);
7364 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7373 return _body.get().getRetryOnDisconnect();
7382 _body.get().setDefaultMaxDepth(maxDepth_);
7391 return _body.get().getDefaultMaxDepth();
7403 return _body.get().setTransportFilterFunction(filter_, userData_);
7417 return _body.get().setThreadCreatedCallback(callback_, userData_);
7425 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7427 _body.get().deferredExecution(func_, userData_);
7437 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7443 unsigned deliveries = 0;
7455 const char* data = NULL;
7457 const char* status = NULL;
7458 size_t statusLen = 0;
7460 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7463 if (len == NotEntitled || len == Duplicate ||
7464 (statusLen == Failure && status[0] ==
'f'))
7466 if (_failedWriteHandler)
7468 if (_publishStore.isValid())
7470 amps_uint64_t sequence =
7472 FailedWriteStoreReplayer replayer(
this, data, len);
7473 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7474 replayer, sequence));
7478 static Message emptyMessage;
7480 AMPS_CALL_EXCEPTION_WRAPPER(
7481 _failedWriteHandler->failedWrite(emptyMessage,
7487 if (_publishStore.isValid())
7496 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7500 if (!deliveries && _bookmarkStore.isValid())
7506 Message::Field subId(data, len);
7507 const char* bookmarkData = NULL;
7508 size_t bookmarkLen = 0;
7514 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7517 _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7522 catch (std::exception& ex)
7524 AMPS_UNHANDLED_EXCEPTION(ex);
7530 ClientImpl::processedAck(Message& message)
7532 unsigned deliveries = 0;
7534 const char* data = NULL;
7538 Lock<Mutex> l(_lock);
7541 Lock<Mutex> guard(_ackMapLock);
7542 AckMap::iterator i = _ackMap.find(std::string(data, len));
7543 if (i != _ackMap.end())
7553 ack.setStatus(data, len);
7555 ack.setReason(data, len);
7557 ack.setUsername(data, len);
7559 ack.setPassword(data, len);
7561 ack.setServerVersion(data, len);
7563 ack.setOptions(data, len);
7565 ack.setBookmark(message.getBookmark());
7573 ClientImpl::checkAndSendHeartbeat(
bool force)
7575 if (force || _heartbeatTimer.check())
7577 _heartbeatTimer.start();
7580 sendWithoutRetry(_beatMessage);
7582 catch (
const AMPSException&)
7589 inline ConnectionInfo ClientImpl::getConnectionInfo()
const
7591 ConnectionInfo info;
7592 std::ostringstream writer;
7594 info[
"client.uri"] = _lastUri;
7595 info[
"client.name"] = _name;
7596 info[
"client.username"] = _username;
7597 if (_publishStore.isValid())
7599 writer << _publishStore.unpersistedCount();
7600 info[
"publishStore.unpersistedCount"] = writer.str();
7609 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7611 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7612 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7613 ClientImpl* me = (ClientImpl*) userData_;
7614 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7615 if (!messageHandle_)
7617 if (me->_queueAckTimeout)
7619 me->checkQueueAcks();
7621 me->checkAndSendHeartbeat();
7625 me->_readMessage.replace(messageHandle_);
7626 Message& message = me->_readMessage;
7627 Message::Command::Type commandType = message.getCommandEnum();
7628 if (commandType & SOWMask)
7634 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7635 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7637 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7638 message.getQueryID()));
7640 else if (commandType & PublishMask)
7643 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7644 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7645 GlobalCommandTypeHandlers::Publish :
7646 GlobalCommandTypeHandlers::OOF)].invoke(message));
7648 const char* subIds = NULL;
7649 size_t subIdsLen = 0;
7652 &subIds, &subIdsLen);
7653 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7654 for (
size_t i = 0; i < subIdCount; ++i)
7656 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7657 MessageHandler& handler = lookupResult.handler;
7658 if (handler.isValid())
7661 AMPS_SubscriptionId,
7662 subIds + lookupResult.idOffset,
7663 lookupResult.idLength);
7664 Message::Field bookmark = message.getBookmark();
7665 bool isMessageQueue = message.getLeasePeriod().len() != 0;
7666 bool isAutoAck = me->_isAutoAckEnabled;
7668 if (!isMessageQueue && !bookmark.empty() &&
7669 me->_bookmarkStore.isValid())
7671 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7674 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7676 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7681 me->_bookmarkStore.log(me->_readMessage);
7682 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7683 handler.invoke(message));
7688 if (isMessageQueue && isAutoAck)
7692 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7693 if (!message.getIgnoreAutoAck())
7695 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7696 me->_ack(message.getTopic(), message.getBookmark()));
7699 catch (std::exception& ex)
7701 if (!message.getIgnoreAutoAck())
7703 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7704 me->_ack(message.getTopic(), message.getBookmark(),
"cancel"));
7706 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7711 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7712 handler.invoke(message));
7718 me->lastChance(message);
7722 else if (commandType == Message::Command::Ack)
7724 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7725 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7726 unsigned ackType = message.getAckTypeEnum();
7727 unsigned deliveries = 0U;
7730 case Message::AckType::Persisted:
7731 deliveries += me->persistedAck(message);
7733 case Message::AckType::Processed:
7734 deliveries += me->processedAck(message);
7737 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7738 if (deliveries == 0)
7740 me->lastChance(message);
7743 else if (commandType == Message::Command::Heartbeat)
7745 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7746 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7747 if (me->_heartbeatTimer.getTimeout() != 0.0)
7749 me->checkAndSendHeartbeat(
true);
7753 me->lastChance(message);
7757 else if (!message.getCommandId().empty())
7759 unsigned deliveries = 0U;
7762 while (me->_connected)
7766 deliveries = me->_routes.deliverData(message, message.getCommandId());
7770 catch (MessageStreamFullException&)
7772 catch (MessageStreamFullException& ex_)
7777 me->checkAndSendHeartbeat(
false);
7780 catch (std::exception&)
7782 catch (std::exception& ex_)
7790 catch (std::exception& ex_)
7794 me->_exceptionListener->exceptionThrown(ex_);
7801 if (deliveries == 0)
7803 me->lastChance(message);
7806 me->checkAndSendHeartbeat();
7811 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7813 ClientImpl* me = (ClientImpl*) userData;
7816 me->clearAcks(failedConnectionVersion);
7820 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7822 ClientImpl* me = (ClientImpl*) userData;
7823 Lock<Mutex> l(me->_lock);
7824 Client wrapper(me,
false);
7827 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7831 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7834 me->_connected =
false;
7838 Unlock<Mutex> unlock(me->_lock);
7839 me->_disconnectHandler.invoke(wrapper);
7842 catch (
const std::exception& ex)
7844 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7846 me->_lock.signalAll();
7848 if (!me->_connected)
7850 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7851 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7857 if (me->_subscriptionManager)
7862 Unlock<Mutex> unlock(me->_lock);
7863 me->_subscriptionManager->resubscribe(wrapper);
7865 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7869 catch (
const AMPSException& subEx)
7871 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7873 catch (
const std::exception& subEx)
7875 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7898 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
7899 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7901 while (_pos != _len && _data[_pos] == _fieldSep)
7907 typedef void* difference_type;
7908 typedef std::forward_iterator_tag iterator_category;
7909 typedef std::pair<Message::Field, Message::Field> value_type;
7910 typedef value_type* pointer;
7911 typedef value_type& reference;
7912 bool operator==(
const iterator& rhs)
const
7914 return _pos == rhs._pos;
7916 bool operator!=(
const iterator& rhs)
const
7918 return _pos != rhs._pos;
7920 iterator& operator++()
7923 while (_pos != _len && _data[_pos] != _fieldSep)
7928 while (_pos != _len && _data[_pos] == _fieldSep)
7935 value_type operator*()
const
7938 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7939 for (; i < _len && _data[i] !=
'='; ++i)
7944 result.first.assign(_data + _pos, keyLength);
7946 if (i < _len && _data[i] ==
'=')
7950 for (; i < _len && _data[i] != _fieldSep; ++i)
7955 result.second.assign(_data + valueStart, valueLength);
7961 class reverse_iterator
7968 typedef std::pair<Message::Field, Message::Field> value_type;
7969 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7970 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7975 while (_pos >= _data && *_pos == _fieldSep)
7979 while (_pos > _data && *_pos != _fieldSep)
7986 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7996 bool operator==(
const reverse_iterator& rhs)
const
7998 return _pos == rhs._pos;
8000 bool operator!=(
const reverse_iterator& rhs)
const
8002 return _pos != rhs._pos;
8004 reverse_iterator& operator++()
8015 while (_pos >= _data && *_pos == _fieldSep)
8020 while (_pos > _data && *_pos != _fieldSep)
8024 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8035 value_type operator*()
const
8038 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8039 size_t i = (size_t)(_pos - _data);
8040 for (; i < _len && _data[i] !=
'='; ++i)
8044 result.first.assign(_pos, keyLength);
8045 if (i < _len && _data[i] ==
'=')
8049 for (; i < _len && _data[i] != _fieldSep; ++i)
8054 result.second.assign(_data + valueStart, valueLength);
8058 FIX(
const Message::Field& data,
char fieldSeparator = 1)
8059 : _data(data.data()), _len(data.len()),
8060 _fieldSep(fieldSeparator)
8064 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8065 : _data(data), _len(len), _fieldSep(fieldSeparator)
8069 iterator begin()
const
8071 return iterator(_data, _len, 0, _fieldSep);
8073 iterator end()
const
8075 return iterator(_data, _len, _len, _fieldSep);
8079 reverse_iterator rbegin()
const
8081 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8084 reverse_iterator rend()
const
8086 return reverse_iterator(_data, _len, 0, _fieldSep);
8107 std::stringstream _data;
8124 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8126 _data << tag <<
'=';
8127 _data.write(value + offset, (std::streamsize)length);
8135 void append(
const T& tag,
const std::string& value)
8137 _data << tag <<
'=' << value << _fs;
8146 operator std::string()
const
8154 _data.str(std::string());
8191 typedef std::map<Message::Field, Message::Field>
map_type;
8202 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8211 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8215 std::deque<Message> _q;
8216 std::deque<Message> _cache;
8217 std::string _commandId;
8219 std::string _queryId;
8223 unsigned _requestedAcks;
8225 Message::Field _previousTopic;
8226 Message::Field _previousBookmark;
8227 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8228 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8229 std::atomic<State> _state;
8231 volatile State _state;
8233 typedef std::map<std::string, Message*> SOWKeyMap;
8234 SOWKeyMap _sowKeyMap;
8236 MessageStreamImpl(
const Client& client_)
8239 _maxDepth((unsigned)~0),
8241 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8244 if (_client.isValid())
8246 _client.addConnectionStateListener(
this);
8250 MessageStreamImpl(ClientImpl* client_)
8253 _maxDepth((unsigned)~0),
8257 if (_client.isValid())
8259 _client.addConnectionStateListener(
this);
8263 ~MessageStreamImpl()
8267 virtual void destroy()
8273 catch (std::exception& e)
8277 if (_client.isValid())
8279 _client.getExceptionListener().exceptionThrown(e);
8284 if (_client.isValid())
8286 _client.removeConnectionStateListener(
this);
8288 _client = Client((ClientImpl*)NULL);
8289 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8297 static void destroyer(
void* vpMessageStreamImpl_)
8299 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8302 void setSubscription(
const std::string& subId_,
8303 const std::string& commandId_ =
"",
8304 const std::string& queryId_ =
"")
8306 Lock<Mutex> lock(_lock);
8308 if (!commandId_.empty() && commandId_ != subId_)
8310 _commandId = commandId_;
8312 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8314 _queryId = queryId_;
8317 if (Disconnected == _state)
8321 assert(Unset == _state);
8325 void setSOWOnly(
const std::string& commandId_,
8326 const std::string& queryId_ =
"")
8328 Lock<Mutex> lock(_lock);
8329 _commandId = commandId_;
8330 if (!queryId_.empty() && queryId_ != commandId_)
8332 _queryId = queryId_;
8335 if (Disconnected == _state)
8339 assert(Unset == _state);
8343 void setStatsOnly(
const std::string& commandId_,
8344 const std::string& queryId_ =
"")
8346 Lock<Mutex> lock(_lock);
8347 _commandId = commandId_;
8348 if (!queryId_.empty() && queryId_ != commandId_)
8350 _queryId = queryId_;
8353 if (Disconnected == _state)
8357 assert(Unset == _state);
8359 _requestedAcks = Message::AckType::Stats;
8362 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8364 Lock<Mutex> lock(_lock);
8365 _commandId = commandId_;
8367 if (Disconnected == _state)
8371 assert(Unset == _state);
8373 _requestedAcks = acks_;
8378 Lock<Mutex> lock(_lock);
8379 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8381 _state = Disconnected;
8387 void timeout(
unsigned timeout_)
8389 _timeout = timeout_;
8393 if (_state == Subscribe)
8398 void maxDepth(
unsigned maxDepth_)
8402 _maxDepth = maxDepth_;
8406 _maxDepth = (unsigned)~0;
8409 unsigned getMaxDepth(
void)
const
8413 unsigned getDepth(
void)
const
8415 return (
unsigned)(_q.size());
8418 bool next(Message& current_)
8420 Lock<Mutex> lock(_lock);
8421 if (!_previousTopic.empty() && !_previousBookmark.empty())
8425 if (_client.isValid())
8427 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8431 catch (AMPSException&)
8433 catch (AMPSException& e)
8436 current_.invalidate();
8437 _previousTopic.clear();
8438 _previousBookmark.clear();
8441 _previousTopic.clear();
8442 _previousBookmark.clear();
8445 long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8446 Timer timer((
double)_timeout);
8448 while (_q.empty() && _state & Running)
8451 _lock.wait(minWaitTime);
8453 Unlock<Mutex> unlck(_lock);
8454 amps_invoke_waiting_function();
8459 if (timer.checkAndGetRemaining(&minWaitTime))
8465 minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8468 if (current_.isValid() && _cache.size() < _cacheMax)
8471 _cache.push_back(current_);
8475 current_ = _q.front();
8476 if (_q.size() == _maxDepth)
8481 if (_state == Conflate)
8483 std::string sowKey = current_.getSowKey();
8484 if (sowKey.length())
8486 _sowKeyMap.erase(sowKey);
8489 else if (_state == AcksOnly)
8491 _requestedAcks &= ~(current_.getAckTypeEnum());
8493 if ((_state == AcksOnly && _requestedAcks == 0) ||
8494 (_state == SOWOnly && current_.getCommand() ==
"group_end"))
8498 else if (current_.isValid()
8499 && current_.getCommandEnum() == Message::Command::Publish
8500 && _client.isValid() && _client.getAutoAck()
8501 && !current_.getLeasePeriod().empty()
8502 && !current_.getBookmark().empty())
8504 _previousTopic = current_.getTopic().deepCopy();
8505 _previousBookmark = current_.getBookmark().deepCopy();
8509 if (_state == Disconnected)
8511 throw DisconnectedException(
"Connection closed.");
8513 current_.invalidate();
8514 if (_state == Closed)
8518 return _timeout != 0;
8522 if (_client.isValid())
8524 if (_state == SOWOnly || _state == Subscribe)
8526 if (!_commandId.empty())
8528 _client.unsubscribe(_commandId);
8530 if (!_subId.empty())
8532 _client.unsubscribe(_subId);
8534 if (!_queryId.empty())
8536 _client.unsubscribe(_queryId);
8541 if (!_commandId.empty())
8543 _client.removeMessageHandler(_commandId);
8545 if (!_subId.empty())
8547 _client.removeMessageHandler(_subId);
8549 if (!_queryId.empty())
8551 _client.removeMessageHandler(_queryId);
8555 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8560 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8562 Lock<Mutex> lock(this_->_lock);
8563 if (this_->_state != Conflate)
8565 AMPS_TESTING_SLOW_MESSAGE_STREAM
8566 if (this_->_q.size() >= this_->_maxDepth)
8571 this_->_lock.signalAll();
8572 throw MessageStreamFullException(
"Stream is currently full.");
8574 if (!this_->_cache.empty())
8576 this_->_cache.front().deepCopy(message_);
8577 this_->_q.push_back(this_->_cache.front());
8578 this_->_cache.pop_front();
8582 #ifdef AMPS_USE_EMPLACE
8583 this_->_q.emplace_back(message_.deepCopy());
8585 this_->_q.push_back(message_.deepCopy());
8588 if (message_.getCommandEnum() == Message::Command::Publish &&
8589 this_->_client.isValid() && this_->_client.getAutoAck() &&
8590 !message_.getLeasePeriod().empty() &&
8591 !message_.getBookmark().empty())
8593 message_.setIgnoreAutoAck();
8598 std::string sowKey = message_.getSowKey();
8599 if (sowKey.length())
8601 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8602 if (it != this_->_sowKeyMap.end())
8604 it->second->deepCopy(message_);
8608 if (this_->_q.size() >= this_->_maxDepth)
8614 this_->_lock.signalAll();
8615 throw MessageStreamFullException(
"Stream is currently full.");
8617 if (!this_->_cache.empty())
8619 this_->_cache.front().deepCopy(message_);
8620 this_->_q.push_back(this_->_cache.front());
8621 this_->_cache.pop_front();
8625 #ifdef AMPS_USE_EMPLACE
8626 this_->_q.emplace_back(message_.deepCopy());
8628 this_->_q.push_back(message_.deepCopy());
8631 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8636 if (this_->_q.size() >= this_->_maxDepth)
8641 this_->_lock.signalAll();
8642 throw MessageStreamFullException(
"Stream is currently full.");
8644 if (!this_->_cache.empty())
8646 this_->_cache.front().deepCopy(message_);
8647 this_->_q.push_back(this_->_cache.front());
8648 this_->_cache.pop_front();
8652 #ifdef AMPS_USE_EMPLACE
8653 this_->_q.emplace_back(message_.deepCopy());
8655 this_->_q.push_back(message_.deepCopy());
8658 if (message_.getCommandEnum() == Message::Command::Publish &&
8659 this_->_client.isValid() && this_->_client.getAutoAck() &&
8660 !message_.getLeasePeriod().empty() &&
8661 !message_.getBookmark().empty())
8663 message_.setIgnoreAutoAck();
8667 this_->_lock.signalAll();
8670 inline MessageStream::MessageStream(
void)
8673 inline MessageStream::MessageStream(
const Client& client_)
8674 : _body(new MessageStreamImpl(client_))
8677 inline void MessageStream::iterator::advance(
void)
8679 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8681 inline MessageStream::operator MessageHandler(
void)
8683 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8685 inline MessageStream MessageStream::fromExistingHandler(
const MessageHandler& handler_)
8687 MessageStream result;
8688 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8690 result._body = (MessageStreamImpl*)(handler_._userData);
8695 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8696 const std::string& queryId_)
8698 _body->setSOWOnly(commandId_, queryId_);
8700 inline void MessageStream::setSubscription(
const std::string& subId_,
8701 const std::string& commandId_,
8702 const std::string& queryId_)
8704 _body->setSubscription(subId_, commandId_, queryId_);
8706 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8707 const std::string& queryId_)
8709 _body->setStatsOnly(commandId_, queryId_);
8711 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8714 _body->setAcksOnly(commandId_, acks_);
8733 return _body->getMaxDepth();
8737 return _body->getDepth();
8740 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8742 return *(_pEmptyMessageStream.get());
8750 ClientImpl& body = _body.get();
8751 Message& message = command_.getMessage();
8755 if (useExistingHandler)
8761 if (body._routes.getRoute(subId, existingHandler))
8764 body.executeAsync(command_, existingHandler,
false);
8765 return MessageStream::fromExistingHandler(existingHandler);
8774 if ((command & Message::Command::NoDataCommands)
8775 && (ackTypes == Message::AckType::Persisted
8776 || ackTypes == Message::AckType::None))
8779 if (!body._pEmptyMessageStream)
8781 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8782 body._pEmptyMessageStream.get()->_body->close();
8784 return body.getEmptyMessageStream();
8787 if (body.getDefaultMaxDepth())
8789 stream.
maxDepth(body.getDefaultMaxDepth());
8792 std::string commandID = body.executeAsync(command_, handler,
false);
8793 if (command_.hasStatsAck())
8795 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8797 else if (command_.isSow())
8801 stream.setAcksOnly(commandID,
8806 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8809 else if (command_.isSubscribe())
8811 stream.setSubscription(commandID,
8818 if (command == Message::Command::Publish ||
8819 command == Message::Command::DeltaPublish ||
8820 command == Message::Command::SOWDelete)
8822 stream.setAcksOnly(commandID,
8823 ackTypes & (
unsigned)~Message::AckType::Persisted);
8827 stream.setAcksOnly(commandID, ackTypes);
8834 inline void Message::ack(
const char* options_)
const
8836 ClientImpl* pClient = _body.get().clientImpl();
8838 if (pClient && bookmark.
len() &&
8839 !pClient->getAutoAck())
8842 pClient->ack(getTopic(), bookmark, options_);
Core type and function declarations for the AMPS C client.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
amps_result
Return values from amps_xxx functions.
Definition: amps.h:217
@ AMPS_E_RETRY
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
@ AMPS_E_OK
Success.
Definition: amps.h:221
@ AMPS_E_DISCONNECTED
The client and server are disconnected.
Definition: amps.h:249
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead.
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:668
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received.
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:642
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1400
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1080
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:996
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:229
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5064
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6158
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5534
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example,...
Definition: ampsplusplus.hpp:6878
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5766
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7284
void startTimer()
Definition: ampsplusplus.hpp:6721
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5836
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5381
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5219
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7103
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5558
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7041
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5897
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5397
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5975
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6064
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5337
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6948
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6961
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7062
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6865
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5478
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5118
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5652
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5486
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7380
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5509
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6512
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7321
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5250
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7031
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5675
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5212
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5080
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6898
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6052
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6891
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5359
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5351
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time afte...
Definition: ampsplusplus.hpp:7332
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6003
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7371
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5389
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5868
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5159
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5747
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5141
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7012
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5125
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7293
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7389
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5409
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7236
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7110
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7022
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6024
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7174
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5172
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7051
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5268
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5631
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5724
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7260
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5933
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6954
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id,...
Definition: ampsplusplus.hpp:5281
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5698
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id,...
Definition: ampsplusplus.hpp:5298
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7140
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7311
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7303
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8745
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5243
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5133
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7073
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5309
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7362
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6987
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7400
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5183
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5433
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6924
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5915
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5152
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7095
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7414
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5810
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7344
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5464
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5368
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5449
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6411
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7084
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6732
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7248
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5197
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5585
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5441
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5785
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:469
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:833
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:853
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:806
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:675
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:920
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:701
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:793
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:621
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:898
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:847
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:755
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:734
std::string getAckType() const
Definition: ampsplusplus.hpp:942
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:596
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:862
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:721
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:812
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:785
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:656
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:714
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:662
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:799
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:669
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:571
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:708
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:825
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:727
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:556
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:682
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:688
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:766
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:695
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:564
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:778
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:947
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:608
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:638
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:884
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:588
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:878
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:744
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:579
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1491
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1494
virtual void connectionStateChanged(State newState_)=0
Pure virtual method for receiving the change in connection state.
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1027
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1032
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client.
Definition: ampsplusplus.hpp:1049
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1039
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1044
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8180
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8191
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8198
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields.
Definition: ampsplusplus.hpp:8187
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1429
virtual bool failure(const Message &message_, const MessageHandler &handler_, unsigned requestedAckTypes_, const AMPSException &exception_)=0
Implement this function to return true if the subscription should be removed from the SubscriptionMan...
Class to handle when a client receives a duplicate publish message, or not entitled message.
Definition: ampsplusplus.hpp:1371
virtual void failedWrite(const Message &message_, const char *reason_, size_t reasonLength_)=0
Called when the server indicates a message could not be written.
Field represents the value of a single field in a Message.
Definition: Field.hpp:87
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4940
An iterable object representing the results of an AMPS subscription and/or query.
Definition: ampsplusplus.hpp:4932
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4984
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4977
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8721
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8726
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8716
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8735
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8731
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4995
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:532
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message & assignFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType.
Definition: Message.hpp:1160
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Message & assignSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Message & assignQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1087
virtual amps_uint64_t getLowestUnpersisted() const =0
Get the oldest unpersisted message sequence in the store.
virtual size_t unpersistedCount() const =0
Method to return how many messages are in the store that have not been discarded, indicating that the...
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1156
virtual amps_uint64_t getLastPersisted()=0
Get the last persisted sequence number.
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1149
virtual void discardUpTo(amps_uint64_t index_)=0
Called by Client to indicate that all messages up to and including.
virtual void flush(long timeout_)=0
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
virtual amps_uint64_t store(const Message &message_)=0
Called by Client to store a message being published.
virtual void replay(StoreReplayer &replayer_)=0
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
virtual bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)=0
Called by Client to get a single message replayed by the store onto the StoreReplayer.
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1094
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1180
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1059
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1213
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1248
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1352
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1344
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1269
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1277
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1306
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1239
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1298
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1290
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1320
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1260
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1335
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1228
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1451
virtual void resubscribe(Client &client_)=0
Called by Client to get all subscriptions placed again.
virtual void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)=0
Called by Client when a subscription is placed.
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1478
virtual void unsubscribe(const Message::Field &subId_)=0
Called by Client when a subscription is unsubscribed.
virtual void clear()=0
Clear subscriptions and reset to the initial state.
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8106
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8115
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8142
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8135
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8152
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8124
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6673
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6841
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6349
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6199
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6557
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6820
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6231
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6758
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6457
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6099
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription.
Definition: BookmarkStore.hpp:55
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6634
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6490
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6785
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6696
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6137
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6263
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6301
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6593
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6388