25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
28 #include "amps/ampsver.h"
46 #include <sys/atomic.h>
48 #include "amps/BookmarkStore.hpp"
49 #include "amps/MessageRouter.hpp"
50 #include "amps/util.hpp"
51 #include "amps/ampscrc.hpp"
52 #if __cplusplus >= 201100L || _MSC_VER >= 1900
56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
57 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
91 #define AMPS_DEFAULT_TOP_N -1
92 #define AMPS_DEFAULT_BATCH_SIZE 10
93 #define AMPS_NUMBER_BUFFER_LEN 20
94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
101 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
109 typedef std::map<std::string, std::string> ConnectionInfo;
111 class PerThreadMessageTracker
113 std::vector<AMPS::Message*> _messages;
115 PerThreadMessageTracker() {}
116 ~PerThreadMessageTracker()
118 for (
size_t i = 0; i < _messages.size(); ++i)
125 _messages.push_back(message);
129 static AMPS::Mutex _lock;
130 AMPS::Lock<Mutex> l(_lock);
131 _addMessageToCleanupList(message);
135 static PerThreadMessageTracker tracker;
136 tracker.addMessage(message);
141 inline std::string asString(Type x_)
143 std::ostringstream os;
149 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
151 size_t pos = AMPS_NUMBER_BUFFER_LEN;
152 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
156 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
165 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
167 size_t pos = AMPS_NUMBER_BUFFER_LEN;
168 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
172 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
186 static const char* duplicate()
190 static const char* badFilter()
194 static const char* badRegexTopic()
196 return "bad regex topic";
198 static const char* subscriptionAlreadyExists()
200 return "subscription already exists";
202 static const char* nameInUse()
204 return "name in use";
206 static const char* authFailure()
208 return "auth failure";
210 static const char* notEntitled()
212 return "not entitled";
214 static const char* authDisabled()
216 return "authentication disabled";
218 static const char* subidInUse()
220 return "subid in use";
222 static const char* noTopic()
240 virtual void exceptionThrown(
const std::exception&)
const {;}
246 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
251 catch (std::exception& ex_)\
255 _exceptionListener->exceptionThrown(ex_);\
280 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
283 while(me->_connected)\
290 catch(MessageStreamFullException&)\
292 me->checkAndSendHeartbeat(false);\
296 catch (std::exception& ex_)\
300 me->_exceptionListener->exceptionThrown(ex_);\
324 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
325 while(me->_connected)\
332 catch(MessageStreamFullException&)\
334 me->checkAndSendHeartbeat(false);\
338 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
341 while(me->_connected)\
348 catch(MessageStreamFullException& ex_)\
350 me->checkAndSendHeartbeat(false);\
354 catch (std::exception& ex_)\
358 me->_exceptionListener->exceptionThrown(ex_);\
382 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
383 while(me->_connected)\
390 catch(MessageStreamFullException& ex_)\
392 me->checkAndSendHeartbeat(false);\
397 #define AMPS_UNHANDLED_EXCEPTION(ex) \
400 _exceptionListener->exceptionThrown(ex);\
405 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
408 me->_exceptionListener->exceptionThrown(ex);\
447 static const unsigned Subscribe = 1;
448 static const unsigned SOW = 2;
449 static const unsigned NeedsSequenceNumber = 4;
450 static const unsigned ProcessedAck = 8;
451 static const unsigned StatsAck = 16;
452 void init(Message::Command::Type command_)
461 void init(
const std::string& command_)
470 void init(
const char* command_,
size_t commandLen_)
482 if (!(command & Message::Command::NoDataCommands))
485 if (command == Message::Command::Subscribe ||
486 command == Message::Command::SOWAndSubscribe ||
487 command == Message::Command::DeltaSubscribe ||
488 command == Message::Command::SOWAndDeltaSubscribe)
493 if (command == Message::Command::SOW
494 || command == Message::Command::SOWAndSubscribe
495 || command == Message::Command::SOWAndDeltaSubscribe)
502 if (command == Message::Command::SOW)
507 _flags |= ProcessedAck;
509 else if (command == Message::Command::SOWDelete)
512 _flags |= ProcessedAck;
513 _flags |= NeedsSequenceNumber;
515 else if (command == Message::Command::Publish
516 || command == Message::Command::DeltaPublish)
518 _flags |= NeedsSequenceNumber;
520 else if (command == Message::Command::StopTimer)
537 Command(
const char* command_,
size_t commandLen_)
539 init(command_, commandLen_);
563 init(command_, commandLen_);
657 _message.
setTopic(topic_, topicLen_);
787 std::ostringstream os;
792 amps_uint64_t getSequence()
const
808 _message.
setData(data_, dataLen_);
838 _batchSize = batchSize_;
860 if (ackType_ ==
"processed")
862 _flags |= ProcessedAck;
864 else if (ackType_ ==
"stats")
874 if (ackType_.find(
"processed") != std::string::npos)
876 _flags |= ProcessedAck;
880 _flags &= ~ProcessedAck;
882 if (ackType_.find(
"stats") != std::string::npos)
896 if (ackType_ & Message::AckType::Processed)
898 _flags |= ProcessedAck;
902 _flags &= ~ProcessedAck;
904 if (ackType_ & Message::AckType::Stats)
929 unsigned getTimeout(
void)
const
933 unsigned getBatchSize(
void)
const
937 bool isSubscribe(
void)
const
939 return _flags & Subscribe;
941 bool isSow(
void)
const
943 return (_flags & SOW) != 0;
945 bool hasProcessedAck(
void)
const
947 return (_flags & ProcessedAck) != 0;
949 bool hasStatsAck(
void)
const
951 return (_flags & StatsAck) != 0;
953 bool needsSequenceNumber(
void)
const
955 return (_flags & NeedsSequenceNumber) != 0;
961 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
978 virtual std::string
authenticate(
const std::string& userName_,
const std::string& password_) = 0;
986 virtual std::string
retry(
const std::string& userName_,
const std::string& password_) = 0;
993 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1005 std::string
authenticate(
const std::string& ,
const std::string& password_)
1012 std::string
retry(
const std::string& ,
const std::string& )
1014 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1017 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1053 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1068 : _resizeHandler(NULL)
1069 , _resizeHandlerData(NULL)
1070 , _errorOnPublishGap(errorOnPublishGap_)
1124 return AMPS_UNSET_INDEX;
1131 return AMPS_UNSET_SEQUENCE;
1156 _resizeHandler = handler_;
1157 _resizeHandlerData = userData_;
1162 return _resizeHandler;
1165 bool callResizeHandler(
size_t newSize_);
1167 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1169 _errorOnPublishGap = errorOnPublishGap_;
1172 inline virtual bool getErrorOnPublishGap()
const
1174 return _errorOnPublishGap;
1179 void* _resizeHandlerData;
1180 bool _errorOnPublishGap;
1187 RefHandle<StoreImpl> _body;
1191 Store(
const Store& rhs) : _body(rhs._body) {;}
1203 return _body.get().store(message_);
1214 _body.get().discardUpTo(index_);
1223 _body.get().replay(replayer_);
1235 return _body.get().replaySingle(replayer_, index_);
1244 return _body.get().unpersistedCount();
1252 return _body.isValid();
1265 return _body.get().flush(timeout_);
1273 return _body.get().getLowestUnpersisted();
1281 return _body.get().getLastPersisted();
1296 _body.get().setResizeHandler(handler_, userData_);
1301 return _body.get().getResizeHandler();
1310 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1319 return _body.get().getErrorOnPublishGap();
1327 if (_body.isValid())
1329 return &_body.get();
1354 const char* reason_,
size_t reasonLength_) = 0;
1358 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1362 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1376 long* timeoutp = (
long*)data_;
1384 store_.
flush(*timeoutp);
1387 catch (
const TimedOutException&)
1389 catch (
const TimedOutException& e)
1416 unsigned requestedAckTypes_,
1417 const AMPSException& exception_) = 0;
1435 unsigned requestedAckTypes_) = 0;
1453 _failedResubscribeHandler = handler_;
1456 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1467 typedef enum { Disconnected = 0,
1471 PublishReplayed = 8,
1472 HeartbeatInitiated = 16,
1491 class MessageStreamImpl;
1492 class MessageStream;
1494 typedef void(*DeferredExecutionFunc)(
void*);
1496 class ClientImpl :
public RefBody
1502 AMPS_SOCKET _socket;
1508 socklen_t _valueLen;
1512 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1514 _valuePtr = (
char*)&_noDelay;
1516 if (_socket != AMPS_INVALID_SOCKET)
1518 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1522 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1526 _socket = AMPS_INVALID_SOCKET;
1533 if (_socket != AMPS_INVALID_SOCKET)
1536 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1541 friend class Client;
1544 DisconnectHandler _disconnectHandler;
1545 enum GlobalCommandTypeHandlers :
size_t
1555 DuplicateMessage = 8,
1558 std::vector<MessageHandler> _globalCommandTypeHandlers;
1559 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1560 MessageRouter _routes;
1561 MessageRouter::RouteCache _routeCache;
1562 mutable Mutex _lock;
1563 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1564 amps_uint64_t _nameHashValue;
1565 BookmarkStore _bookmarkStore;
1566 Store _publishStore;
1567 bool _isRetryOnDisconnect;
1568 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1569 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1570 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1572 volatile amps_uint64_t _lastSentHaSequenceNumber;
1574 AMPS_ATOMIC_TYPE_8 _badTimeToHAPublish;
1575 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1576 VersionInfo _serverVersion;
1577 Timer _heartbeatTimer;
1578 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1581 int _queueAckTimeout;
1582 bool _isAutoAckEnabled;
1583 unsigned _ackBatchSize;
1584 unsigned _queuedAckCount;
1585 unsigned _defaultMaxDepth;
1586 struct QueueBookmarks
1588 QueueBookmarks(
const std::string& topic_)
1595 amps_uint64_t _oldestTime;
1596 unsigned _bookmarkCount;
1598 typedef amps_uint64_t topic_hash;
1599 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1600 TopicHashMap _topicHashMap;
1602 class ClientStoreReplayer :
public StoreReplayer
1604 ClientImpl* _client;
1609 ClientStoreReplayer()
1610 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1613 ClientStoreReplayer(ClientImpl* client_)
1614 : _client(client_), _version(0), _res(
AMPS_E_OK)
1617 void setClient(ClientImpl* client_)
1622 void execute(Message& message_)
1626 throw CommandException(
"Can't replay without a client.");
1630 if (index > _client->_lastSentHaSequenceNumber)
1632 _client->_lastSentHaSequenceNumber = index;
1639 if (!message_.getCommand().empty() &&
1640 (!_client->_badTimeToHAPublish ||
1641 message_.getOptions().len() < 6))
1644 message_.getMessage(),
1648 throw DisconnectedException(
"AMPS Server disconnected during replay");
1654 ClientStoreReplayer _replayer;
1656 class FailedWriteStoreReplayer :
public StoreReplayer
1658 ClientImpl* _parent;
1659 const char* _reason;
1660 size_t _reasonLength;
1661 size_t _replayCount;
1663 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1666 _reasonLength(reasonLength_),
1669 void execute(Message& message_)
1671 if (_parent->_failedWriteHandler)
1674 _parent->_failedWriteHandler->failedWrite(message_,
1675 _reason, _reasonLength);
1678 size_t replayCount(
void)
const
1680 return _replayCount;
1684 struct AckResponseImpl :
public RefBody
1686 std::string username, password, reason, status, bookmark, options;
1687 amps_uint64_t sequenceNo;
1688 amps_uint64_t nameHashValue;
1689 VersionInfo serverVersion;
1690 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1691 std::atomic<bool> responded;
1692 std::atomic<bool> abandoned;
1694 volatile bool responded;
1695 volatile bool abandoned;
1697 unsigned connectionVersion;
1700 sequenceNo((amps_uint64_t)0),
1704 connectionVersion(0)
1711 RefHandle<AckResponseImpl> _body;
1713 AckResponse() : _body(NULL) {;}
1714 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1715 static AckResponse create()
1718 r._body =
new AckResponseImpl();
1722 const std::string& username()
1724 return _body.get().username;
1726 void setUsername(
const char* data_,
size_t len_)
1730 _body.get().username.assign(data_, len_);
1734 _body.get().username.clear();
1737 const std::string& password()
1739 return _body.get().password;
1741 void setPassword(
const char* data_,
size_t len_)
1745 _body.get().password.assign(data_, len_);
1749 _body.get().password.clear();
1752 const std::string& reason()
1754 return _body.get().reason;
1756 void setReason(
const char* data_,
size_t len_)
1760 _body.get().reason.assign(data_, len_);
1764 _body.get().reason.clear();
1767 const std::string& status()
1769 return _body.get().status;
1771 void setStatus(
const char* data_,
size_t len_)
1775 _body.get().status.assign(data_, len_);
1779 _body.get().status.clear();
1782 const std::string& bookmark()
1784 return _body.get().bookmark;
1786 void setBookmark(
const Field& bookmark_)
1788 if (!bookmark_.empty())
1790 _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1791 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1792 _body.get().sequenceNo);
1796 _body.get().bookmark.clear();
1797 _body.get().sequenceNo = (amps_uint64_t)0;
1798 _body.get().nameHashValue = (amps_uint64_t)0;
1801 amps_uint64_t sequenceNo()
const
1803 return _body.get().sequenceNo;
1805 amps_uint64_t nameHashValue()
const
1807 return _body.get().nameHashValue;
1809 void setSequenceNo(
const char* data_,
size_t len_)
1811 amps_uint64_t result = (amps_uint64_t)0;
1814 for (
size_t i = 0; i < len_; ++i)
1816 result *= (amps_uint64_t)10;
1817 result += (amps_uint64_t)(data_[i] -
'0');
1820 _body.get().sequenceNo = result;
1822 VersionInfo serverVersion()
const
1824 return _body.get().serverVersion;
1826 void setServerVersion(
const char* data_,
size_t len_)
1830 _body.get().serverVersion.setVersion(std::string(data_, len_));
1835 return _body.get().responded;
1839 _body.get().responded =
true;
1843 return _body.get().abandoned;
1847 if (_body.isValid())
1849 _body.get().abandoned =
true;
1853 void setConnectionVersion(
unsigned connectionVersion)
1855 _body.get().connectionVersion = connectionVersion;
1858 unsigned getConnectionVersion()
1860 return _body.get().connectionVersion;
1862 void setOptions(
const char* data_,
size_t len_)
1866 _body.get().options.assign(data_, len_);
1870 _body.get().options.clear();
1874 const std::string& options()
1876 return _body.get().options;
1879 AckResponse& operator=(
const AckResponse& rhs)
1887 typedef std::map<std::string, AckResponse> AckMap;
1890 DefaultExceptionListener _defaultExceptionListener;
1893 struct DeferredExecutionRequest
1895 DeferredExecutionRequest(DeferredExecutionFunc func_,
1898 _userData(userData_)
1901 DeferredExecutionFunc _func;
1904 const ExceptionListener* _exceptionListener;
1905 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1906 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1907 volatile bool _connected;
1908 std::string _username;
1909 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1910 ConnectionStateListeners _connectionStateListeners;
1911 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1912 Mutex _deferredExecutionLock;
1913 DeferredExecutionList _deferredExecutionList;
1914 unsigned _heartbeatInterval;
1915 unsigned _readTimeout;
1923 if (!_connected && newState_ > ConnectionStateListener::Connected)
1927 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1929 AMPS_CALL_EXCEPTION_WRAPPER(
1930 (*it)->connectionStateChanged(newState_));
1933 unsigned processedAck(Message& message);
1934 unsigned persistedAck(Message& meesage);
1935 void lastChance(Message& message);
1936 void checkAndSendHeartbeat(
bool force =
false);
1937 virtual ConnectionInfo getConnectionInfo()
const;
1939 ClientImplMessageHandler(
amps_handle message,
void* userData);
1941 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1943 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1945 void unsubscribeInternal(
const std::string&
id)
1952 Message::Field subId;
1953 subId.assign(
id.data(),
id.length());
1954 _routes.removeRoute(subId);
1956 if (_subscriptionManager)
1959 Unlock<Mutex> unlock(_lock);
1960 _subscriptionManager->unsubscribe(subId);
1963 _message.setCommandEnum(Message::Command::Unsubscribe);
1964 _message.newCommandId();
1965 _message.setSubscriptionId(
id);
1966 _sendWithoutRetry(_message);
1967 deferredExecution(&s_noOpFn, NULL);
1970 AckResponse syncAckProcessing(
long timeout_, Message& message_,
1971 bool isHASubscribe_)
1973 return syncAckProcessing(timeout_, message_,
1974 (amps_uint64_t)0, isHASubscribe_);
1977 AckResponse syncAckProcessing(
long timeout_, Message& message_,
1978 amps_uint64_t haSeq = (amps_uint64_t)0,
1979 bool isHASubscribe_ =
false)
1982 AckResponse ack = AckResponse::create();
1985 Lock<Mutex> guard(_ackMapLock);
1986 _ackMap[message_.getCommandId()] = ack;
1988 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1989 if (ack.getConnectionVersion() == 0)
1992 throw DisconnectedException(
"Connection closed while waiting for response.");
1994 bool timedOut =
false;
1995 AMPS_START_TIMER(timeout_)
1996 while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
2000 timedOut = !_lock.wait(timeout_);
2004 AMPS_RESET_TIMER(timedOut, timeout_);
2011 Unlock<Mutex> unlck(_lock);
2012 amps_invoke_waiting_function();
2015 if (ack.responded())
2017 if (ack.status() !=
"failure")
2019 if (message_.getCommand() ==
"logon")
2021 amps_uint64_t ackSequence = ack.sequenceNo();
2022 if (_lastSentHaSequenceNumber < ackSequence)
2024 _lastSentHaSequenceNumber = ackSequence;
2026 if (_publishStore.isValid())
2031 _publishStore.discardUpTo(ackSequence);
2032 if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2034 _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2037 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2038 _nameHashValue = ack.nameHashValue();
2039 _serverVersion = ack.serverVersion();
2040 if (_bookmarkStore.isValid())
2042 _bookmarkStore.setServerVersion(_serverVersion);
2047 const std::string& options = ack.options();
2048 size_t index = options.find_first_of(
"max_backlog=");
2049 if (index != std::string::npos)
2052 const char* c = options.c_str() + index + 12;
2053 while (*c && *c !=
',')
2055 data = (data * 10) + (
unsigned)(*c++ -48);
2057 if (_ackBatchSize > data)
2059 _ackBatchSize = data;
2065 const size_t NotEntitled = 12;
2066 std::string ackReason = ack.reason();
2067 if (ackReason.length() == 0)
2071 if (ackReason.length() == NotEntitled &&
2072 ackReason[0] ==
'n' &&
2073 message_.getUserId().len() == 0)
2075 message_.assignUserId(_username);
2077 message_.throwFor(_client, ackReason);
2081 if (!ack.abandoned())
2083 throw TimedOutException(
"timed out waiting for operation.");
2087 throw DisconnectedException(
"Connection closed while waiting for response.");
2101 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2102 _pEmptyMessageStream.reset(NULL);
2109 ClientImpl(
const std::string& clientName)
2110 : _client(NULL), _name(clientName)
2111 , _isRetryOnDisconnect(true)
2112 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
2113 , _badTimeToHASubscribe(0), _serverVersion()
2114 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2115 , _isAutoAckEnabled(false)
2117 , _queuedAckCount(0)
2118 , _defaultMaxDepth(0)
2120 , _heartbeatInterval(0)
2123 _replayer.setClient(
this);
2126 (amps_handler)ClientImpl::ClientImplMessageHandler,
2129 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2132 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2134 _exceptionListener = &_defaultExceptionListener;
2135 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2137 #ifdef AMPS_USE_EMPLACE
2138 _globalCommandTypeHandlers.emplace_back(MessageHandler());
2140 _globalCommandTypeHandlers.push_back(MessageHandler());
2145 virtual ~ClientImpl()
2150 const std::string& getName()
const
2155 const std::string& getNameHash()
const
2160 const amps_uint64_t getNameHashValue()
const
2162 return _nameHashValue;
2165 void setName(
const std::string& name)
2172 AMPSException::throwFor(_client, result);
2177 const std::string& getLogonCorrelationData()
const
2179 return _logonCorrelationData;
2182 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2184 _logonCorrelationData = logonCorrelationData_;
2187 size_t getServerVersion()
const
2189 return _serverVersion.getOldStyleVersion();
2192 VersionInfo getServerVersionInfo()
const
2194 return _serverVersion;
2197 const std::string& getURI()
const
2202 virtual void connect(
const std::string& uri)
2204 Lock<Mutex> l(_lock);
2208 virtual void _connect(
const std::string& uri)
2214 AMPSException::throwFor(_client, result);
2217 _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2218 _publishMessage.setCommandEnum(Message::Command::Publish);
2219 _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2220 _beatMessage.setOptions(
"beat");
2221 _readMessage.setClientImpl(
this);
2222 if (_queueAckTimeout)
2227 AMPSException::throwFor(_client, result);
2231 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2234 void setDisconnected()
2237 Lock<Mutex> l(_lock);
2240 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2243 _heartbeatTimer.setTimeout(0.0);
2250 virtual void disconnect()
2252 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2254 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2255 Lock<Mutex> l(_lock);
2256 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2259 void clearAcks(
unsigned failedVersion)
2262 Lock<Mutex> guard(_ackMapLock);
2265 std::vector<std::string> worklist;
2266 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2268 if (i->second.getConnectionVersion() <= failedVersion)
2270 i->second.setAbandoned();
2271 worklist.push_back(i->first);
2275 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2284 int send(
const Message& message)
2286 Lock<Mutex> l(_lock);
2287 return _send(message);
2290 void sendWithoutRetry(
const Message& message_)
2292 Lock<Mutex> l(_lock);
2293 _sendWithoutRetry(message_);
2296 void _sendWithoutRetry(
const Message& message_)
2301 AMPSException::throwFor(_client, result);
2305 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2306 bool isHASubscribe_ =
false)
2313 Message localMessage = message;
2314 unsigned version = 0;
2318 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
2322 if (!_isRetryOnDisconnect)
2326 if (!_lock.wait(1000))
2328 amps_invoke_waiting_function();
2333 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2334 (isHASubscribe_ && _badTimeToHASubscribe != 0))
2336 return (
int)version;
2340 if (haSeq > _lastSentHaSequenceNumber)
2342 while (haSeq > _lastSentHaSequenceNumber + 1)
2347 if (!_publishStore.replaySingle(_replayer,
2348 _lastSentHaSequenceNumber + 1))
2354 version = _replayer._version;
2357 catch (
const DisconnectedException&)
2359 catch (
const DisconnectedException& e)
2362 result = _replayer._res;
2367 localMessage.getMessage(),
2369 ++_lastSentHaSequenceNumber;
2373 localMessage.getMessage(),
2377 if (!isHASubscribe_ && !haSeq &&
2378 localMessage.getMessage() == message.getMessage())
2380 localMessage = message.deepCopy();
2382 if (_isRetryOnDisconnect)
2384 Unlock<Mutex> u(_lock);
2389 if ((isHASubscribe_ || haSeq) &&
2392 return (
int)version;
2399 AMPSException::throwFor(_client, result);
2405 amps_invoke_waiting_function();
2411 AMPSException::throwFor(_client, result);
2413 return (
int)version;
2416 void addMessageHandler(
const Field& commandId_,
2418 unsigned requestedAcks_,
bool isSubscribe_)
2420 Lock<Mutex> lock(_lock);
2421 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2425 bool removeMessageHandler(
const Field& commandId_)
2427 Lock<Mutex> lock(_lock);
2428 return _routes.removeRoute(commandId_);
2431 std::string send(
const MessageHandler& messageHandler_, Message& message_,
int timeout_ = 0)
2433 Field
id = message_.getCommandId();
2434 Field subId = message_.getSubscriptionId();
2435 Field qid = message_.getQueryId();
2436 bool isSubscribe =
false;
2437 bool isSubscribeOnly =
false;
2438 bool replace =
false;
2439 unsigned requestedAcks = message_.getAckTypeEnum();
2440 unsigned systemAddedAcks = Message::AckType::None;
2442 switch (message_.getCommandEnum())
2444 case Message::Command::Subscribe:
2445 case Message::Command::DeltaSubscribe:
2446 replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2447 isSubscribeOnly =
true;
2449 case Message::Command::SOWAndSubscribe:
2450 case Message::Command::SOWAndDeltaSubscribe:
2453 id = message_.newCommandId().getCommandId();
2457 while (!replace &&
id != subId && _routes.hasRoute(
id))
2459 id = message_.newCommandId().getCommandId();
2464 message_.setSubscriptionId(
id);
2468 if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2470 systemAddedAcks |= Message::AckType::Persisted;
2473 case Message::Command::SOW:
2476 id = message_.newCommandId().getCommandId();
2480 while (!replace &&
id != subId && _routes.hasRoute(
id))
2482 message_.newCommandId();
2485 qid = message_.getCommandId();
2486 message_.setQueryId(qid);
2488 id = message_.getCommandId();
2491 if (!isSubscribeOnly)
2495 message_.setQueryID(
id);
2500 while (!replace && qid != subId && qid !=
id
2501 && _routes.hasRoute(qid))
2503 qid = message_.newQueryId().getQueryId();
2507 systemAddedAcks |= Message::AckType::Processed;
2509 if (!isSubscribeOnly)
2511 systemAddedAcks |= Message::AckType::Completed;
2513 message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2515 int routesAdded = 0;
2516 Lock<Mutex> l(_lock);
2517 if (!subId.empty() && messageHandler_.isValid())
2519 if (!_routes.hasRoute(subId))
2525 _routes.addRoute(subId, messageHandler_, requestedAcks,
2526 systemAddedAcks, isSubscribe);
2528 if (!isSubscribeOnly && !qid.empty()
2529 && messageHandler_.isValid() && qid != subId)
2531 if (routesAdded == 0)
2533 _routes.addRoute(qid, messageHandler_,
2534 requestedAcks, systemAddedAcks,
false);
2540 Unlock<Mutex> u(_lock);
2541 data = amps_invoke_copy_route_function(
2542 messageHandler_.userData());
2546 _routes.addRoute(qid, messageHandler_, requestedAcks,
2547 systemAddedAcks,
false);
2551 _routes.addRoute(qid,
2552 MessageHandler(messageHandler_.function(),
2554 requestedAcks, systemAddedAcks,
false);
2559 if (!
id.empty() && messageHandler_.isValid()
2560 && requestedAcks & ~Message::AckType::Persisted
2561 &&
id != subId &&
id != qid)
2563 if (routesAdded == 0)
2565 _routes.addRoute(
id, messageHandler_, requestedAcks,
2566 systemAddedAcks,
false);
2572 Unlock<Mutex> u(_lock);
2573 data = amps_invoke_copy_route_function(
2574 messageHandler_.userData());
2578 _routes.addRoute(
id, messageHandler_, requestedAcks,
2579 systemAddedAcks,
false);
2583 _routes.addRoute(
id,
2584 MessageHandler(messageHandler_.function(),
2587 systemAddedAcks,
false);
2596 syncAckProcessing(timeout_, message_, 0,
false);
2597 message_.setAckTypeEnum(requestedAcks);
2601 _routes.removeRoute(message_.getQueryID());
2602 _routes.removeRoute(message_.getSubscriptionId());
2603 _routes.removeRoute(
id);
2604 message_.setAckTypeEnum(requestedAcks);
2610 case Message::Command::Unsubscribe:
2611 case Message::Command::Heartbeat:
2612 case Message::Command::Logon:
2613 case Message::Command::StartTimer:
2614 case Message::Command::StopTimer:
2615 case Message::Command::SOWDelete:
2617 Lock<Mutex> l(_lock);
2619 if (message_.getAckTypeEnum() != Message::AckType::None)
2623 message_.newCommandId();
2624 id = message_.getCommandId();
2626 if (messageHandler_.isValid())
2628 _routes.addRoute(
id, messageHandler_, requestedAcks,
2629 Message::AckType::None,
false);
2635 case Message::Command::DeltaPublish:
2636 case Message::Command::Publish:
2638 bool useSync = message_.getFilter().len() > 0;
2639 Lock<Mutex> l(_lock);
2641 unsigned ackType = message_.getAckTypeEnum();
2642 if (ackType != Message::AckType::None
2647 message_.newCommandId();
2648 id = message_.getCommandId();
2650 if (messageHandler_.isValid())
2652 _routes.addRoute(
id, messageHandler_, requestedAcks,
2653 Message::AckType::None,
false);
2658 message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2659 syncAckProcessing(timeout_, message_, 0,
false);
2668 case Message::Command::GroupBegin:
2669 case Message::Command::GroupEnd:
2670 case Message::Command::OOF:
2671 case Message::Command::Ack:
2672 case Message::Command::Unknown:
2674 throw CommandException(
"Command type " + message_.getCommand() +
" can not be sent directly to AMPS");
2676 message_.setAckTypeEnum(requestedAcks);
2680 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2682 Lock<Mutex> l(_lock);
2683 _disconnectHandler = disconnectHandler;
2686 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2688 switch (command_[0])
2692 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2695 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2699 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2703 if (command_[6] ==
'b')
2705 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2707 else if (command_[6] ==
'e')
2709 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2713 std::ostringstream os;
2714 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2715 throw CommandException(os.str());
2719 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2723 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2727 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2731 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2734 std::ostringstream os;
2735 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2736 throw CommandException(os.str());
2741 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2746 case Message::Command::Publish:
2747 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2749 case Message::Command::SOW:
2750 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2753 case Message::Command::Heartbeat:
2754 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2757 case Message::Command::GroupBegin:
2758 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2760 case Message::Command::GroupEnd:
2761 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2763 case Message::Command::OOF:
2764 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2767 case Message::Command::Ack:
2768 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2772 unsigned command = command_;
2779 AMPS_snprintf(errBuf,
sizeof(errBuf),
2780 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2781 CommandConstants<0>::Lengths[bits],
2782 CommandConstants<0>::Values[bits]);
2783 throw CommandException(errBuf);
2788 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2790 _globalCommandTypeHandlers[handlerType_] = handler_;
2793 void setFailedWriteHandler(FailedWriteHandler* handler_)
2795 Lock<Mutex> l(_lock);
2796 _failedWriteHandler.reset(handler_);
2799 void setPublishStore(
const Store& publishStore_)
2801 Lock<Mutex> l(_lock);
2804 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2806 _publishStore = publishStore_;
2809 void setBookmarkStore(
const BookmarkStore& bookmarkStore_)
2811 Lock<Mutex> l(_lock);
2814 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2816 _bookmarkStore = bookmarkStore_;
2819 void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2821 Lock<Mutex> l(_lock);
2822 _subscriptionManager.reset(subscriptionManager_);
2825 SubscriptionManager* getSubscriptionManager()
const
2827 return const_cast<SubscriptionManager*
>(_subscriptionManager.get());
2830 DisconnectHandler getDisconnectHandler()
const
2832 return _disconnectHandler;
2835 MessageHandler getDuplicateMessageHandler()
const
2837 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2840 FailedWriteHandler* getFailedWriteHandler()
const
2842 return const_cast<FailedWriteHandler*
>(_failedWriteHandler.get());
2845 Store getPublishStore()
const
2847 return _publishStore;
2850 BookmarkStore getBookmarkStore()
const
2852 return _bookmarkStore;
2855 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2857 if (!_publishStore.isValid())
2859 Lock<Mutex> l(_lock);
2860 _publishMessage.assignTopic(topic_, topicLen_);
2861 _publishMessage.assignData(data_, dataLen_);
2862 _send(_publishMessage);
2867 if (!publishStoreMessage)
2869 publishStoreMessage =
new Message();
2870 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2872 publishStoreMessage->reset();
2874 return _publish(topic_, topicLen_, data_, dataLen_);
2878 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2879 size_t dataLen_,
unsigned long expiration_)
2881 if (!_publishStore.isValid())
2883 Lock<Mutex> l(_lock);
2884 _publishMessage.assignTopic(topic_, topicLen_);
2885 _publishMessage.assignData(data_, dataLen_);
2886 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2887 size_t pos = convertToCharArray(exprBuf, expiration_);
2888 _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2889 _send(_publishMessage);
2890 _publishMessage.assignExpiration(NULL, 0);
2895 if (!publishStoreMessage)
2897 publishStoreMessage =
new Message();
2898 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2900 publishStoreMessage->reset();
2901 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2902 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2905 AMPS_NUMBER_BUFFER_LEN - exprPos);
2906 return _publish(topic_, topicLen_, data_, dataLen_);
2910 class FlushAckHandler : ConnectionStateListener
2913 ClientImpl* _pClient;
2915 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2916 std::atomic<bool> _acked;
2917 std::atomic<bool> _disconnected;
2919 volatile bool _acked;
2920 volatile bool _disconnected;
2923 FlushAckHandler(ClientImpl* pClient_)
2924 : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2926 pClient_->addConnectionStateListener(
this);
2930 _pClient->removeConnectionStateListener(
this);
2931 _pClient->removeMessageHandler(_cmdId);
2934 void setCommandId(
const Field& cmdId_)
2936 _cmdId.deepCopy(cmdId_);
2938 void invoke(
const Message&)
2942 void connectionStateChanged(State state_)
2944 if (state_ <= Shutdown)
2946 _disconnected =
true;
2955 return _acked || _disconnected;
2959 void publishFlush(
long timeout_,
unsigned ackType_)
2961 static const char* processed =
"processed";
2962 static const size_t processedLen = strlen(processed);
2963 static const char* persisted =
"persisted";
2964 static const size_t persistedLen = strlen(persisted);
2965 static const char* flush =
"flush";
2966 static const size_t flushLen = strlen(flush);
2967 static VersionInfo minPersisted(
"5.3.3.0");
2968 static VersionInfo minFlush(
"4");
2969 if (ackType_ != Message::AckType::Processed
2970 && ackType_ != Message::AckType::Persisted)
2972 throw CommandException(
"Flush can only be used with processed or persisted acks.");
2974 FlushAckHandler flushHandler(
this);
2975 if (_serverVersion >= minFlush)
2977 Lock<Mutex> l(_lock);
2980 throw DisconnectedException(
"Not connected trying to flush");
2983 _message.newCommandId();
2984 _message.assignCommand(flush, flushLen);
2985 if (_serverVersion < minPersisted
2986 || ackType_ == Message::AckType::Processed)
2988 _message.assignAckType(processed, processedLen);
2992 _message.assignAckType(persisted, persistedLen);
2994 flushHandler.setCommandId(_message.getCommandId());
2995 addMessageHandler(_message.getCommandId(),
2996 std::bind(&FlushAckHandler::invoke,
2997 std::ref(flushHandler),
2998 std::placeholders::_1),
3000 NoDelay noDelay(_client);
3001 if (_send(_message) == -1)
3003 throw DisconnectedException(
"Disconnected trying to flush");
3006 if (_publishStore.isValid())
3010 _publishStore.flush(timeout_);
3012 catch (
const AMPSException& ex)
3014 AMPS_UNHANDLED_EXCEPTION(ex);
3018 else if (_serverVersion < minFlush)
3022 AMPS_USLEEP(timeout_ * 1000);
3026 AMPS_USLEEP(1000 * 1000);
3032 Timer timer((
double)timeout_);
3034 while (!timer.check() && !flushHandler.done())
3037 amps_invoke_waiting_function();
3042 while (!flushHandler.done())
3045 amps_invoke_waiting_function();
3049 if (!flushHandler.done())
3051 throw TimedOutException(
"Timed out waiting for flush");
3054 if (!flushHandler.acked() && !_publishStore.isValid())
3056 throw DisconnectedException(
"Disconnected waiting for flush");
3060 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3061 const char* data_,
size_t dataLength_)
3063 if (!_publishStore.isValid())
3065 Lock<Mutex> l(_lock);
3066 _deltaMessage.assignTopic(topic_, topicLength_);
3067 _deltaMessage.assignData(data_, dataLength_);
3068 _send(_deltaMessage);
3073 if (!publishStoreMessage)
3075 publishStoreMessage =
new Message();
3076 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3078 publishStoreMessage->reset();
3079 publishStoreMessage->
setCommandEnum(Message::Command::DeltaPublish);
3080 return _publish(topic_, topicLength_, data_, dataLength_);
3084 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3085 const char* data_,
size_t dataLength_,
3086 unsigned long expiration_)
3088 if (!_publishStore.isValid())
3090 Lock<Mutex> l(_lock);
3091 _deltaMessage.assignTopic(topic_, topicLength_);
3092 _deltaMessage.assignData(data_, dataLength_);
3093 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3094 size_t pos = convertToCharArray(exprBuf, expiration_);
3095 _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3096 _send(_deltaMessage);
3097 _deltaMessage.assignExpiration(NULL, 0);
3102 if (!publishStoreMessage)
3104 publishStoreMessage =
new Message();
3105 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3107 publishStoreMessage->reset();
3108 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3109 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3110 publishStoreMessage->
setCommandEnum(Message::Command::DeltaPublish)
3112 AMPS_NUMBER_BUFFER_LEN - exprPos);
3113 return _publish(topic_, topicLength_, data_, dataLength_);
3117 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3118 const char* data_,
size_t dataLength_)
3120 publishStoreMessage->
assignTopic(topic_, topicLength_)
3122 .assignData(data_, dataLength_);
3123 amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3124 char buf[AMPS_NUMBER_BUFFER_LEN];
3125 size_t pos = convertToCharArray(buf, haSequenceNumber);
3126 publishStoreMessage->
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3128 Lock<Mutex> l(_lock);
3129 _send(*publishStoreMessage, haSequenceNumber);
3131 return haSequenceNumber;
3134 virtual std::string logon(
long timeout_, Authenticator& authenticator_,
3135 const char* options_ = NULL)
3137 Lock<Mutex> l(_lock);
3138 return _logon(timeout_, authenticator_, options_);
3141 virtual std::string _logon(
long timeout_, Authenticator& authenticator_,
3142 const char* options_ = NULL)
3144 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
3146 _message.setCommandEnum(Message::Command::Logon);
3147 _message.newCommandId();
3148 std::string newCommandId = _message.getCommandId();
3149 _message.setClientName(_name);
3150 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3151 _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3152 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3155 if (uri.user().size())
3157 _message.setUserId(uri.user());
3159 if (uri.password().size())
3161 _message.setPassword(uri.password());
3163 if (uri.protocol() ==
"amps" && uri.messageType().size())
3165 _message.setMessageType(uri.messageType());
3167 if (uri.isTrue(
"pretty"))
3169 _message.setOptions(
"pretty");
3172 _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3173 if (!_logonCorrelationData.empty())
3175 _message.assignCorrelationId(_logonCorrelationData);
3179 _message.setOptions(options_);
3181 _username = _message.getUserId();
3184 NoDelay noDelay(_client);
3187 _message.setAckTypeEnum(Message::AckType::Processed);
3188 AckResponse ack = syncAckProcessing(timeout_, _message);
3189 if (ack.status() ==
"retry")
3191 _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3192 _username = ack.username();
3193 _message.setUserId(_username);
3197 authenticator_.completed(ack.username(), ack.password(), ack.reason());
3201 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3206 catch (
const AMPSException& ex)
3209 AMPS_UNHANDLED_EXCEPTION(ex);
3218 if (_publishStore.isValid())
3222 _publishStore.replay(_replayer);
3223 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3225 catch (
const PublishStoreGapException& ex)
3228 AMPS_UNHANDLED_EXCEPTION(ex);
3231 catch (
const StoreException& ex)
3234 std::ostringstream os;
3235 os <<
"A local store exception occurred while logging on."
3237 throw ConnectionException(os.str());
3239 catch (
const AMPSException& ex)
3242 AMPS_UNHANDLED_EXCEPTION(ex);
3245 catch (
const std::exception& ex)
3248 AMPS_UNHANDLED_EXCEPTION(ex);
3258 return newCommandId;
3261 std::string subscribe(
const MessageHandler& messageHandler_,
3262 const std::string& topic_,
3264 const std::string& filter_,
3265 const std::string& bookmark_,
3266 const std::string& options_,
3267 const std::string& subId_,
3268 bool isHASubscribe_ =
true)
3270 isHASubscribe_ &= (bool)_subscriptionManager;
3271 Lock<Mutex> l(_lock);
3273 _message.setCommandEnum(Message::Command::Subscribe);
3274 _message.newCommandId();
3275 std::string subId(subId_);
3278 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3280 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3283 subId = _message.getCommandId();
3285 _message.setSubscriptionId(subId);
3290 unsigned ackTypes = Message::AckType::Processed;
3292 if (!bookmark_.empty() && _bookmarkStore.isValid())
3294 ackTypes |= Message::AckType::Persisted;
3296 _message.setTopic(topic_);
3298 if (filter_.length())
3300 _message.setFilter(filter_);
3302 if (bookmark_.length())
3306 Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3307 _message.setBookmark(mostRecent);
3311 _message.setBookmark(bookmark_);
3312 if (_bookmarkStore.isValid())
3317 _bookmarkStore.log(_message);
3318 _bookmarkStore.discard(_message);
3319 _bookmarkStore.persisted(subIdField, _message.getBookmark());
3324 if (options_.length())
3326 _message.setOptions(options_);
3329 Message message = _message;
3332 message = _message.deepCopy();
3333 Unlock<Mutex> u(_lock);
3334 _subscriptionManager->subscribe(messageHandler_, message,
3335 Message::AckType::None);
3336 if (_badTimeToHASubscribe)
3341 if (!_routes.hasRoute(_message.getSubscriptionId()))
3343 _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3344 Message::AckType::None, ackTypes,
true);
3346 message.setAckTypeEnum(ackTypes);
3347 if (!options_.empty())
3349 message.setOptions(options_);
3353 syncAckProcessing(timeout_, message, isHASubscribe_);
3355 catch (
const DisconnectedException&)
3357 if (!isHASubscribe_)
3359 _routes.removeRoute(subIdField);
3364 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3368 catch (
const TimedOutException&)
3370 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3378 Unlock<Mutex> unlock(_lock);
3379 _subscriptionManager->unsubscribe(subIdField);
3381 _routes.removeRoute(subIdField);
3387 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3388 const std::string& topic_,
3390 const std::string& filter_,
3391 const std::string& bookmark_,
3392 const std::string& options_,
3393 const std::string& subId_ =
"",
3394 bool isHASubscribe_ =
true)
3396 isHASubscribe_ &= (bool)_subscriptionManager;
3397 Lock<Mutex> l(_lock);
3399 _message.setCommandEnum(Message::Command::DeltaSubscribe);
3400 _message.newCommandId();
3401 std::string subId(subId_);
3404 subId = _message.getCommandId();
3406 _message.setSubscriptionId(subId);
3411 unsigned ackTypes = Message::AckType::Processed;
3413 if (!bookmark_.empty() && _bookmarkStore.isValid())
3415 ackTypes |= Message::AckType::Persisted;
3417 _message.setTopic(topic_);
3418 if (filter_.length())
3420 _message.setFilter(filter_);
3422 if (bookmark_.length())
3426 Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3427 _message.setBookmark(mostRecent);
3431 _message.setBookmark(bookmark_);
3432 if (_bookmarkStore.isValid())
3437 _bookmarkStore.log(_message);
3438 _bookmarkStore.discard(_message);
3439 _bookmarkStore.persisted(subIdField, _message.getBookmark());
3444 if (options_.length())
3446 _message.setOptions(options_);
3448 Message message = _message;
3451 message = _message.deepCopy();
3452 Unlock<Mutex> u(_lock);
3453 _subscriptionManager->subscribe(messageHandler_, message,
3454 Message::AckType::None);
3455 if (_badTimeToHASubscribe)
3460 if (!_routes.hasRoute(_message.getSubscriptionId()))
3462 _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3463 Message::AckType::None, ackTypes,
true);
3465 message.setAckTypeEnum(ackTypes);
3466 if (!options_.empty())
3468 message.setOptions(options_);
3472 syncAckProcessing(timeout_, message, isHASubscribe_);
3474 catch (
const DisconnectedException&)
3476 if (!isHASubscribe_)
3478 _routes.removeRoute(subIdField);
3482 catch (
const TimedOutException&)
3484 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3492 Unlock<Mutex> unlock(_lock);
3493 _subscriptionManager->unsubscribe(subIdField);
3495 _routes.removeRoute(subIdField);
3501 void unsubscribe(
const std::string&
id)
3503 Lock<Mutex> l(_lock);
3504 unsubscribeInternal(
id);
3507 void unsubscribe(
void)
3509 if (_subscriptionManager)
3511 _subscriptionManager->clear();
3514 _routes.unsubscribeAll();
3515 Lock<Mutex> l(_lock);
3517 _message.setCommandEnum(Message::Command::Unsubscribe);
3518 _message.newCommandId();
3519 _message.setSubscriptionId(
"all");
3520 _sendWithoutRetry(_message);
3522 deferredExecution(&s_noOpFn, NULL);
3525 std::string sow(
const MessageHandler& messageHandler_,
3526 const std::string& topic_,
3527 const std::string& filter_ =
"",
3528 const std::string& orderBy_ =
"",
3529 const std::string& bookmark_ =
"",
3530 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3531 int topN_ = AMPS_DEFAULT_TOP_N,
3532 const std::string& options_ =
"",
3533 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3535 Lock<Mutex> l(_lock);
3537 _message.setCommandEnum(Message::Command::SOW);
3538 _message.newCommandId();
3540 std::string commandId = _message.getCommandId();
3541 _message.setQueryID(_message.getCommandId());
3542 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3543 _message.setAckTypeEnum(ackTypes);
3544 _message.setTopic(topic_);
3545 if (filter_.length())
3547 _message.setFilter(filter_);
3549 if (orderBy_.length())
3551 _message.setOrderBy(orderBy_);
3553 if (bookmark_.length())
3555 _message.setBookmark(bookmark_);
3557 _message.setBatchSize(AMPS::asString(batchSize_));
3558 if (topN_ != AMPS_DEFAULT_TOP_N)
3560 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3562 if (options_.length())
3564 _message.setOptions(options_);
3567 _routes.addRoute(_message.getQueryID(), messageHandler_,
3568 Message::AckType::None, ackTypes,
false);
3572 syncAckProcessing(timeout_, _message);
3576 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3583 std::string sow(
const MessageHandler& messageHandler_,
3584 const std::string& topic_,
3586 const std::string& filter_ =
"",
3587 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3588 int topN_ = AMPS_DEFAULT_TOP_N)
3591 return sow(messageHandler_,
3602 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3603 const std::string& topic_,
3604 const std::string& filter_ =
"",
3605 const std::string& orderBy_ =
"",
3606 const std::string& bookmark_ =
"",
3607 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3608 int topN_ = AMPS_DEFAULT_TOP_N,
3609 const std::string& options_ =
"",
3610 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3611 bool isHASubscribe_ =
true)
3613 isHASubscribe_ &= (bool)_subscriptionManager;
3614 unsigned ackTypes = Message::AckType::Processed;
3615 Lock<Mutex> l(_lock);
3617 _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3618 _message.newCommandId();
3619 Field cid = _message.getCommandId();
3620 std::string subId = cid;
3621 _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3622 if (filter_.length())
3624 _message.setFilter(filter_);
3626 if (orderBy_.length())
3628 _message.setOrderBy(orderBy_);
3630 if (bookmark_.length())
3632 _message.setBookmark(bookmark_);
3633 Message::Field bookmark = _message.getBookmark();
3634 if (_bookmarkStore.isValid())
3636 ackTypes |= Message::AckType::Persisted;
3639 _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3644 _bookmarkStore.log(_message);
3645 if (!BookmarkRange::isRange(bookmark))
3647 _bookmarkStore.discard(_message);
3648 _bookmarkStore.persisted(_message.getSubscriptionId(),
3658 _message.setBatchSize(AMPS::asString(batchSize_));
3659 if (topN_ != AMPS_DEFAULT_TOP_N)
3661 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3663 if (options_.length())
3665 _message.setOptions(options_);
3668 Message message = _message;
3671 message = _message.deepCopy();
3672 Unlock<Mutex> u(_lock);
3673 _subscriptionManager->subscribe(messageHandler_, message,
3674 Message::AckType::None);
3675 if (_badTimeToHASubscribe)
3680 _routes.addRoute(cid, messageHandler_,
3681 Message::AckType::None, ackTypes,
true);
3682 message.setAckTypeEnum(ackTypes);
3683 if (!options_.empty())
3685 message.setOptions(options_);
3689 syncAckProcessing(timeout_, message, isHASubscribe_);
3691 catch (
const DisconnectedException&)
3693 if (!isHASubscribe_)
3695 _routes.removeRoute(subId);
3699 catch (
const TimedOutException&)
3701 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3709 Unlock<Mutex> unlock(_lock);
3710 _subscriptionManager->unsubscribe(cid);
3712 _routes.removeRoute(subId);
3718 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3719 const std::string& topic_,
3721 const std::string& filter_ =
"",
3722 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3723 bool oofEnabled_ =
false,
3724 int topN_ = AMPS_DEFAULT_TOP_N,
3725 bool isHASubscribe_ =
true)
3728 return sowAndSubscribe(messageHandler_,
3735 (oofEnabled_ ?
"oof" :
""),
3740 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3741 const std::string& topic_,
3742 const std::string& filter_ =
"",
3743 const std::string& orderBy_ =
"",
3744 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3745 int topN_ = AMPS_DEFAULT_TOP_N,
3746 const std::string& options_ =
"",
3747 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3748 bool isHASubscribe_ =
true)
3750 isHASubscribe_ &= (bool)_subscriptionManager;
3751 Lock<Mutex> l(_lock);
3753 _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3754 _message.newCommandId();
3755 _message.setQueryID(_message.getCommandId());
3756 _message.setSubscriptionId(_message.getCommandId());
3757 std::string subId = _message.getSubscriptionId();
3758 _message.setTopic(topic_);
3759 if (filter_.length())
3761 _message.setFilter(filter_);
3763 if (orderBy_.length())
3765 _message.setOrderBy(orderBy_);
3767 _message.setBatchSize(AMPS::asString(batchSize_));
3768 if (topN_ != AMPS_DEFAULT_TOP_N)
3770 _message.setTopNRecordsReturned(AMPS::asString(topN_));
3772 if (options_.length())
3774 _message.setOptions(options_);
3776 Message message = _message;
3779 message = _message.deepCopy();
3780 Unlock<Mutex> u(_lock);
3781 _subscriptionManager->subscribe(messageHandler_, message,
3782 Message::AckType::None);
3783 if (_badTimeToHASubscribe)
3788 _routes.addRoute(message.getQueryID(), messageHandler_,
3789 Message::AckType::None, Message::AckType::Processed,
true);
3790 message.setAckTypeEnum(Message::AckType::Processed);
3791 if (!options_.empty())
3793 message.setOptions(options_);
3797 syncAckProcessing(timeout_, message, isHASubscribe_);
3799 catch (
const DisconnectedException&)
3801 if (!isHASubscribe_)
3803 _routes.removeRoute(subId);
3807 catch (
const TimedOutException&)
3809 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3817 Unlock<Mutex> unlock(_lock);
3818 _subscriptionManager->unsubscribe(Field(subId));
3820 _routes.removeRoute(subId);
3826 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3827 const std::string& topic_,
3829 const std::string& filter_ =
"",
3830 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3831 bool oofEnabled_ =
false,
3832 bool sendEmpties_ =
false,
3833 int topN_ = AMPS_DEFAULT_TOP_N,
3834 bool isHASubscribe_ =
true)
3837 Message::Options options;
3842 if (sendEmpties_ ==
false)
3844 options.setNoEmpties();
3846 return sowAndDeltaSubscribe(messageHandler_,
3857 std::string sowDelete(
const MessageHandler& messageHandler_,
3858 const std::string& topic_,
3859 const std::string& filter_,
3861 Message::Field commandId_ = Message::Field())
3863 if (_publishStore.isValid())
3865 unsigned ackType = Message::AckType::Processed |
3866 Message::AckType::Stats |
3867 Message::AckType::Persisted;
3868 if (!publishStoreMessage)
3870 publishStoreMessage =
new Message();
3871 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3873 publishStoreMessage->reset();
3874 if (commandId_.empty())
3881 publishStoreMessage->
setCommandId(commandId_.data(), commandId_.len());
3889 amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3890 char buf[AMPS_NUMBER_BUFFER_LEN];
3891 size_t pos = convertToCharArray(buf, haSequenceNumber);
3892 publishStoreMessage->
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3896 Lock<Mutex> l(_lock);
3897 _routes.addRoute(commandId_, messageHandler_,
3898 Message::AckType::Stats,
3899 Message::AckType::Processed | Message::AckType::Persisted,
3901 syncAckProcessing(timeout_, *publishStoreMessage,
3904 catch (
const DisconnectedException&)
3911 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3915 return (std::string)commandId_;
3919 Lock<Mutex> l(_lock);
3921 if (commandId_.empty())
3923 _message.newCommandId();
3924 commandId_ = _message.getCommandId();
3928 _message.setCommandId(commandId_.data(), commandId_.len());
3930 _message.setCommandEnum(Message::Command::SOWDelete)
3931 .assignSubscriptionId(commandId_.data(), commandId_.len())
3932 .assignQueryID(commandId_.data(), commandId_.len())
3933 .setAckTypeEnum(Message::AckType::Processed |
3934 Message::AckType::Stats)
3935 .assignTopic(topic_.c_str(), topic_.length())
3936 .assignFilter(filter_.c_str(), filter_.length());
3937 _routes.addRoute(commandId_, messageHandler_,
3938 Message::AckType::Stats,
3939 Message::AckType::Processed,
3943 syncAckProcessing(timeout_, _message);
3947 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3950 return (std::string)commandId_;
3954 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3955 const std::string& topic_,
3956 const std::string& data_,
3958 Message::Field commandId_ = Message::Field())
3960 if (_publishStore.isValid())
3962 unsigned ackType = Message::AckType::Processed |
3963 Message::AckType::Stats |
3964 Message::AckType::Persisted;
3965 if (!publishStoreMessage)
3967 publishStoreMessage =
new Message();
3968 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3970 publishStoreMessage->reset();
3971 if (commandId_.empty())
3978 publishStoreMessage->
setCommandId(commandId_.data(), commandId_.len());
3985 .assignData(data_.c_str(), data_.length());
3986 amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3987 char buf[AMPS_NUMBER_BUFFER_LEN];
3988 size_t pos = convertToCharArray(buf, haSequenceNumber);
3989 publishStoreMessage->
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3993 Lock<Mutex> l(_lock);
3994 _routes.addRoute(commandId_, messageHandler_,
3995 Message::AckType::Stats,
3996 Message::AckType::Processed | Message::AckType::Persisted,
3998 syncAckProcessing(timeout_, *publishStoreMessage,
4001 catch (
const DisconnectedException&)
4008 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4012 return (std::string)commandId_;
4016 Lock<Mutex> l(_lock);
4018 if (commandId_.empty())
4020 _message.newCommandId();
4021 commandId_ = _message.getCommandId();
4025 _message.setCommandId(commandId_.data(), commandId_.len());
4027 _message.setCommandEnum(Message::Command::SOWDelete)
4028 .assignSubscriptionId(commandId_.data(), commandId_.len())
4029 .assignQueryID(commandId_.data(), commandId_.len())
4030 .setAckTypeEnum(Message::AckType::Processed |
4031 Message::AckType::Stats)
4032 .assignTopic(topic_.c_str(), topic_.length())
4033 .assignData(data_.c_str(), data_.length());
4034 _routes.addRoute(commandId_, messageHandler_,
4035 Message::AckType::Stats,
4036 Message::AckType::Processed,
4040 syncAckProcessing(timeout_, _message);
4044 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4047 return (std::string)commandId_;
4051 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4052 const std::string& topic_,
4053 const std::string& keys_,
4055 Message::Field commandId_ = Message::Field())
4057 if (_publishStore.isValid())
4059 unsigned ackType = Message::AckType::Processed |
4060 Message::AckType::Stats |
4061 Message::AckType::Persisted;
4062 if (!publishStoreMessage)
4064 publishStoreMessage =
new Message();
4065 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4067 publishStoreMessage->reset();
4068 if (commandId_.empty())
4075 publishStoreMessage->
setCommandId(commandId_.data(), commandId_.len());
4083 amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
4084 char buf[AMPS_NUMBER_BUFFER_LEN];
4085 size_t pos = convertToCharArray(buf, haSequenceNumber);
4086 publishStoreMessage->
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4090 Lock<Mutex> l(_lock);
4091 _routes.addRoute(commandId_, messageHandler_,
4092 Message::AckType::Stats,
4093 Message::AckType::Processed | Message::AckType::Persisted,
4095 syncAckProcessing(timeout_, *publishStoreMessage,
4098 catch (
const DisconnectedException&)
4105 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4109 return (std::string)commandId_;
4113 Lock<Mutex> l(_lock);
4115 if (commandId_.empty())
4117 _message.newCommandId();
4118 commandId_ = _message.getCommandId();
4122 _message.setCommandId(commandId_.data(), commandId_.len());
4124 _message.setCommandEnum(Message::Command::SOWDelete)
4125 .assignSubscriptionId(commandId_.data(), commandId_.len())
4126 .assignQueryID(commandId_.data(), commandId_.len())
4127 .setAckTypeEnum(Message::AckType::Processed |
4128 Message::AckType::Stats)
4129 .assignTopic(topic_.c_str(), topic_.length())
4130 .assignSowKeys(keys_.c_str(), keys_.length());
4131 _routes.addRoute(commandId_, messageHandler_,
4132 Message::AckType::Stats,
4133 Message::AckType::Processed,
4137 syncAckProcessing(timeout_, _message);
4141 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4144 return (std::string)commandId_;
4148 void startTimer(
void)
4150 if (_serverVersion >=
"5.3.2.0")
4152 throw CommandException(
"The start_timer command is deprecated.");
4154 Lock<Mutex> l(_lock);
4156 _message.setCommandEnum(Message::Command::StartTimer);
4161 std::string stopTimer(MessageHandler messageHandler_)
4163 if (_serverVersion >=
"5.3.2.0")
4165 throw CommandException(
"The stop_timer command is deprecated.");
4167 return executeAsync(Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4182 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4184 _pExceptionListener = pListener_;
4185 _exceptionListener = _pExceptionListener.get();
4188 void setExceptionListener(
const ExceptionListener& listener_)
4190 _exceptionListener = &listener_;
4193 const ExceptionListener& getExceptionListener(
void)
const
4195 return *_exceptionListener;
4198 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4200 if (readTimeout_ < heartbeatInterval_)
4202 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4204 Lock<Mutex> l(_lock);
4205 if (_heartbeatInterval != heartbeatInterval_ ||
4206 _readTimeout != readTimeout_)
4208 _heartbeatInterval = heartbeatInterval_;
4209 _readTimeout = readTimeout_;
4214 void _sendHeartbeat(
void)
4216 if (_connected && _heartbeatInterval != 0)
4218 std::ostringstream options;
4219 options <<
"start," << _heartbeatInterval;
4220 _beatMessage.setOptions(options.str());
4222 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4223 _heartbeatTimer.start();
4226 _sendWithoutRetry(_beatMessage);
4227 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4229 catch (ConnectionException& ex_)
4233 AMPS_UNHANDLED_EXCEPTION(ex_);
4235 _beatMessage.setOptions(
"beat");
4238 if (_readTimeout && _connected)
4244 AMPSException::throwFor(_client, result);
4248 void addConnectionStateListener(ConnectionStateListener* listener_)
4250 Lock<Mutex> lock(_lock);
4251 _connectionStateListeners.insert(listener_);
4254 void removeConnectionStateListener(ConnectionStateListener* listener_)
4256 Lock<Mutex> lock(_lock);
4257 _connectionStateListeners.erase(listener_);
4260 void clearConnectionStateListeners()
4262 Lock<Mutex> lock(_lock);
4263 _connectionStateListeners.clear();
4266 void _registerHandler(Command& command_, Message::Field& cid_,
4267 MessageHandler& handler_,
unsigned requestedAcks_,
4268 unsigned systemAddedAcks_,
bool isSubscribe_)
4270 Message message = command_.getMessage();
4271 Message::Command::Type commandType = message.getCommandEnum();
4272 Message::Field subid = message.getSubscriptionId();
4273 Message::Field qid = message.getQueryID();
4275 bool added = qid.len() || subid.len() || cid_.len();
4276 bool cidIsQid = cid_ == qid;
4277 bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4279 if (subid.len() > 0)
4283 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4284 systemAddedAcks_, isSubscribe_);
4286 && (commandType == Message::Command::Subscribe
4287 || commandType == Message::Command::DeltaSubscribe))
4294 if (qid.len() > 0 && qid != subid
4295 && (commandType == Message::Command::SOW
4296 || commandType == Message::Command::SOWDelete
4297 || commandType == Message::Command::SOWAndSubscribe
4298 || commandType == Message::Command::SOWAndDeltaSubscribe))
4300 while (_routes.hasRoute(qid))
4302 message.newQueryId();
4305 cid_ = message.getQueryId();
4307 qid = message.getQueryId();
4309 if (addedCount == 0)
4311 _routes.addRoute(qid, handler_, requestedAcks_,
4312 systemAddedAcks_, isSubscribe_);
4318 Unlock<Mutex> u(_lock);
4319 data = amps_invoke_copy_route_function(handler_.userData());
4323 _routes.addRoute(qid, handler_, requestedAcks_,
4324 systemAddedAcks_,
false);
4328 _routes.addRoute(qid,
4329 MessageHandler(handler_.function(),
4332 systemAddedAcks_,
false);
4337 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4339 while (_routes.hasRoute(cid_))
4341 cid_ = message.newCommandId().getCommandId();
4343 if (addedCount == 0)
4345 _routes.addRoute(cid_, handler_, requestedAcks_,
4346 systemAddedAcks_,
false);
4352 Unlock<Mutex> u(_lock);
4353 data = amps_invoke_copy_route_function(handler_.userData());
4357 _routes.addRoute(cid_, handler_, requestedAcks_,
4358 systemAddedAcks_,
false);
4362 _routes.addRoute(cid_,
4363 MessageHandler(handler_.function(),
4366 systemAddedAcks_,
false);
4370 else if ((commandType == Message::Command::Publish ||
4371 commandType == Message::Command::DeltaPublish)
4372 && requestedAcks_ & ~Message::AckType::Persisted)
4374 cid_ = command_.getMessage().newCommandId().getCommandId();
4375 _routes.addRoute(cid_, handler_, requestedAcks_,
4376 systemAddedAcks_,
false);
4381 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4385 std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4386 bool isHASubscribe_ =
true)
4388 isHASubscribe_ &= (bool)_subscriptionManager;
4389 Message& message = command_.getMessage();
4390 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4391 Message::AckType::Processed : Message::AckType::None;
4392 unsigned requestedAcks = message.getAckTypeEnum();
4393 bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4394 Message::Command::Type commandType = message.getCommandEnum();
4395 if (commandType == Message::Command::SOW
4396 || commandType == Message::Command::SOWAndSubscribe
4397 || commandType == Message::Command::SOWAndDeltaSubscribe
4398 || commandType == Message::Command::StopTimer)
4400 systemAddedAcks |= Message::AckType::Completed;
4402 Message::Field cid = message.getCommandId();
4403 if (handler_.isValid() && cid.empty())
4405 cid = message.newCommandId().getCommandId();
4407 if (message.getBookmark().len() > 0)
4409 if (command_.isSubscribe())
4411 Message::Field bookmark = message.getBookmark();
4412 if (_bookmarkStore.isValid())
4414 systemAddedAcks |= Message::AckType::Persisted;
4417 message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4422 _bookmarkStore.log(message);
4423 if (!BookmarkRange::isRange(bookmark))
4425 _bookmarkStore.discard(message);
4426 _bookmarkStore.persisted(message.getSubscriptionId(),
4439 systemAddedAcks |= Message::AckType::Persisted;
4441 bool isSubscribe = command_.isSubscribe();
4442 if (handler_.isValid() && !isSubscribe)
4444 _registerHandler(command_, cid, handler_,
4445 requestedAcks, systemAddedAcks, isSubscribe);
4447 bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4450 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4451 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4453 Unlock<Mutex> u(_lock);
4454 haSequenceNumber = _publishStore.store(message);
4456 message.setSequence(haSequenceNumber);
4461 syncAckProcessing((
long)command_.getTimeout(), message,
4466 _send(message, haSequenceNumber);
4469 catch (
const DisconnectedException&)
4476 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4484 const Message::Field& subId = message.getSubscriptionId();
4487 Unlock<Mutex> u(_lock);
4488 _subscriptionManager->subscribe(handler_,
4491 if (_badTimeToHASubscribe)
4493 message.setAckTypeEnum(requestedAcks);
4494 return std::string(subId.data(), subId.len());
4497 if (handler_.isValid())
4499 _registerHandler(command_, cid, handler_,
4500 requestedAcks, systemAddedAcks, isSubscribe);
4502 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4507 syncAckProcessing((
long)command_.getTimeout(), message,
4515 catch (
const DisconnectedException&)
4517 if (!isHASubscribe_)
4519 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4520 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4521 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4522 message.setAckTypeEnum(requestedAcks);
4526 catch (
const TimedOutException&)
4528 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4529 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4530 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4538 Unlock<Mutex> unlock(_lock);
4539 _subscriptionManager->unsubscribe(subId);
4541 if (message.getQueryID().len() > 0)
4543 _routes.removeRoute(message.getQueryID());
4545 _routes.removeRoute(cid);
4546 _routes.removeRoute(subId);
4549 if (subId.len() > 0)
4551 message.setAckTypeEnum(requestedAcks);
4552 return std::string(subId.data(), subId.len());
4557 message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4562 syncAckProcessing((
long)(command_.getTimeout()), message);
4569 catch (
const DisconnectedException&)
4571 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4572 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4573 message.setAckTypeEnum(requestedAcks);
4578 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4579 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4580 message.setAckTypeEnum(requestedAcks);
4585 message.setAckTypeEnum(requestedAcks);
4589 MessageStream getEmptyMessageStream(
void);
4591 std::string executeAsync(Command& command_, MessageHandler& handler_,
4592 bool isHASubscribe_ =
true)
4594 Lock<Mutex> lock(_lock);
4595 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4599 void setAutoAck(
bool isAutoAckEnabled_)
4601 _isAutoAckEnabled = isAutoAckEnabled_;
4603 bool getAutoAck(
void)
const
4605 return _isAutoAckEnabled;
4607 void setAckBatchSize(
const unsigned batchSize_)
4609 _ackBatchSize = batchSize_;
4610 if (!_queueAckTimeout)
4612 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4616 unsigned getAckBatchSize(
void)
const
4618 return _ackBatchSize;
4620 int getAckTimeout(
void)
const
4622 return _queueAckTimeout;
4624 void setAckTimeout(
const int ackTimeout_)
4627 _queueAckTimeout = ackTimeout_;
4629 size_t _ack(QueueBookmarks& queueBookmarks_)
4631 if (queueBookmarks_._bookmarkCount)
4633 if (!publishStoreMessage)
4635 publishStoreMessage =
new Message();
4636 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4638 publishStoreMessage->reset();
4643 amps_uint64_t haSequenceNumber = 0;
4644 if (_publishStore.isValid())
4646 haSequenceNumber = _publishStore.store(*publishStoreMessage);
4649 queueBookmarks_._data.erase();
4650 queueBookmarks_._bookmarkCount = 0;
4652 _send(*publishStoreMessage, haSequenceNumber);
4653 if (!_publishStore.isValid())
4655 queueBookmarks_._data.erase();
4656 queueBookmarks_._bookmarkCount = 0;
4662 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4664 if (_isAutoAckEnabled)
4668 _ack(topic_, bookmark_, options_);
4670 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4672 if (bookmark_.len() == 0)
4676 Lock<Mutex> lock(_lock);
4677 if (_ackBatchSize < 2 || options_ != NULL)
4679 if (!publishStoreMessage)
4681 publishStoreMessage =
new Message();
4682 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4684 publishStoreMessage->reset();
4692 amps_uint64_t haSequenceNumber = 0;
4693 if (_publishStore.isValid())
4695 haSequenceNumber = _publishStore.store(*publishStoreMessage);
4699 _send(*publishStoreMessage, haSequenceNumber);
4703 topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4704 TopicHashMap::iterator it = _topicHashMap.find(hash);
4705 if (it == _topicHashMap.end())
4708 #ifdef AMPS_USE_EMPLACE
4709 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4711 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4714 QueueBookmarks& queueBookmarks = it->second;
4715 if (queueBookmarks._data.length())
4717 queueBookmarks._data.append(
",");
4721 queueBookmarks._oldestTime = amps_now();
4723 queueBookmarks._data.append(bookmark_);
4724 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4726 _ack(queueBookmarks);
4729 void flushAcks(
void)
4731 size_t sendCount = 0;
4738 Lock<Mutex> lock(_lock);
4739 typedef TopicHashMap::iterator iterator;
4740 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4742 QueueBookmarks& queueBookmarks = it->second;
4743 sendCount += _ack(queueBookmarks);
4746 if (sendCount && _connected)
4748 publishFlush(0, Message::AckType::Processed);
4752 void checkQueueAcks(
void)
4754 if (!_topicHashMap.size())
4758 Lock<Mutex> lock(_lock);
4761 amps_uint64_t threshold = amps_now()
4762 - (amps_uint64_t)_queueAckTimeout;
4763 typedef TopicHashMap::iterator iterator;
4764 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4766 QueueBookmarks& queueBookmarks = it->second;
4767 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4769 _ack(queueBookmarks);
4773 catch (std::exception& ex)
4775 AMPS_UNHANDLED_EXCEPTION(ex);
4779 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4781 Lock<Mutex> lock(_deferredExecutionLock);
4782 #ifdef AMPS_USE_EMPLACE
4783 _deferredExecutionList.emplace_back(
4784 DeferredExecutionRequest(func_, userData_));
4786 _deferredExecutionList.push_back(
4787 DeferredExecutionRequest(func_, userData_));
4791 inline void processDeferredExecutions(
void)
4793 if (_deferredExecutionList.size())
4795 Lock<Mutex> lock(_deferredExecutionLock);
4796 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4797 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4798 for (; it != end; ++it)
4802 it->_func(it->_userData);
4810 _deferredExecutionList.clear();
4811 _routes.invalidateCache();
4812 _routeCache.invalidateCache();
4816 bool getRetryOnDisconnect(
void)
const
4818 return _isRetryOnDisconnect;
4821 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4823 _isRetryOnDisconnect = isRetryOnDisconnect_;
4826 void setDefaultMaxDepth(
unsigned maxDepth_)
4828 _defaultMaxDepth = maxDepth_;
4831 unsigned getDefaultMaxDepth(
void)
const
4833 return _defaultMaxDepth;
4925 RefHandle<MessageStreamImpl> _body;
4935 inline void advance(
void);
4942 : _pStream(pStream_)
4947 bool operator==(
const iterator& rhs)
const
4949 return _pStream == rhs._pStream;
4951 bool operator!=(
const iterator& rhs)
const
4953 return _pStream != rhs._pStream;
4955 void operator++(
void)
4971 return _body.isValid();
4978 if (!_body.isValid())
4980 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5019 inline void setSOWOnly(
const std::string& commandId_,
5020 const std::string& queryId_ =
"");
5021 inline void setSubscription(
const std::string& subId_,
5022 const std::string& commandId_ =
"",
5023 const std::string& queryId_ =
"");
5024 inline void setStatsOnly(
const std::string& commandId_,
5025 const std::string& queryId_ =
"");
5026 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5058 BorrowRefHandle<ClientImpl> _body;
5060 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5061 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5062 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5073 : _body(new ClientImpl(clientName), true)
5076 Client(ClientImpl* existingClient)
5077 : _body(existingClient, true)
5080 Client(ClientImpl* existingClient,
bool isRef)
5081 : _body(existingClient, isRef)
5085 virtual ~
Client(
void) {;}
5095 return _body.isValid();
5112 _body.get().setName(name);
5119 return _body.get().getName();
5127 return _body.get().getNameHash();
5135 return _body.get().getNameHashValue();
5146 _body.get().setLogonCorrelationData(logonCorrelationData_);
5153 return _body.get().getLogonCorrelationData();
5166 return _body.get().getServerVersion();
5177 return _body.get().getServerVersionInfo();
5191 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5206 return AMPS::convertVersionToNumber(data_, len_);
5213 return _body.get().getURI();
5237 _body.get().connect(uri);
5244 _body.get().disconnect();
5262 _body.get().send(message);
5275 unsigned requestedAcks_,
bool isSubscribe_)
5277 _body.get().addMessageHandler(commandId_, messageHandler_,
5278 requestedAcks_, isSubscribe_);
5286 return _body.get().removeMessageHandler(commandId_);
5314 return _body.get().send(messageHandler, message, timeout);
5328 _body.get().setDisconnectHandler(disconnectHandler);
5336 return _body.get().getDisconnectHandler();
5345 return _body.get().getConnectionInfo();
5358 _body.get().setBookmarkStore(bookmarkStore_);
5366 return _body.
get().getBookmarkStore();
5374 return _body.get().getSubscriptionManager();
5386 _body.get().setSubscriptionManager(subscriptionManager_);
5410 _body.get().setPublishStore(publishStore_);
5418 return _body.
get().getPublishStore();
5426 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5427 duplicateMessageHandler_);
5441 return _body.get().getDuplicateMessageHandler();
5455 _body.get().setFailedWriteHandler(handler_);
5463 return _body.get().getFailedWriteHandler();
5484 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5486 return _body.get().publish(topic_.c_str(), topic_.length(),
5487 data_.c_str(), data_.length());
5509 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5510 const char* data_,
size_t dataLength_)
5512 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5533 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5534 unsigned long expiration_)
5536 return _body.get().publish(topic_.c_str(), topic_.length(),
5537 data_.c_str(), data_.length(), expiration_);
5560 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5561 const char* data_,
size_t dataLength_,
5562 unsigned long expiration_)
5564 return _body.get().publish(topic_, topicLength_,
5565 data_, dataLength_, expiration_);
5606 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5608 _body.get().publishFlush(timeout_, ackType_);
5627 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5629 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5630 data_.c_str(), data_.length());
5651 const char* data_,
size_t dataLength_)
5653 return _body.get().deltaPublish(topic_, topicLength_,
5654 data_, dataLength_);
5673 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5674 unsigned long expiration_)
5676 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5677 data_.c_str(), data_.length(),
5700 const char* data_,
size_t dataLength_,
5701 unsigned long expiration_)
5703 return _body.get().deltaPublish(topic_, topicLength_,
5704 data_, dataLength_, expiration_);
5724 const char* options_ = NULL)
5726 return _body.get().logon(timeout_, authenticator_, options_);
5741 std::string
logon(
const char* options_,
int timeout_ = 0)
5760 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5786 const std::string& topic_,
5788 const std::string& filter_ =
"",
5789 const std::string& options_ =
"",
5790 const std::string& subId_ =
"")
5792 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5793 filter_,
"", options_, subId_);
5812 long timeout_ = 0,
const std::string& filter_ =
"",
5813 const std::string& options_ =
"",
5814 const std::string& subId_ =
"")
5817 if (_body.get().getDefaultMaxDepth())
5819 result.
maxDepth(_body.get().getDefaultMaxDepth());
5821 result.setSubscription(_body.get().subscribe(
5823 topic_, timeout_, filter_,
"",
5824 options_, subId_,
false));
5844 long timeout_ = 0,
const std::string& filter_ =
"",
5845 const std::string& options_ =
"",
5846 const std::string& subId_ =
"")
5849 if (_body.get().getDefaultMaxDepth())
5851 result.
maxDepth(_body.get().getDefaultMaxDepth());
5853 result.setSubscription(_body.get().subscribe(
5855 topic_, timeout_, filter_,
"",
5856 options_, subId_,
false));
5873 const std::string& topic_,
5875 const std::string& filter_ =
"",
5876 const std::string& options_ =
"",
5877 const std::string& subId_ =
"")
5879 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5880 filter_,
"", options_, subId_);
5891 long timeout_,
const std::string& filter_ =
"",
5892 const std::string& options_ =
"",
5893 const std::string& subId_ =
"")
5896 if (_body.get().getDefaultMaxDepth())
5898 result.
maxDepth(_body.get().getDefaultMaxDepth());
5900 result.setSubscription(_body.get().deltaSubscribe(
5902 topic_, timeout_, filter_,
"",
5903 options_, subId_,
false));
5909 long timeout_,
const std::string& filter_ =
"",
5910 const std::string& options_ =
"",
5911 const std::string& subId_ =
"")
5914 if (_body.get().getDefaultMaxDepth())
5916 result.
maxDepth(_body.get().getDefaultMaxDepth());
5918 result.setSubscription(_body.get().deltaSubscribe(
5920 topic_, timeout_, filter_,
"",
5921 options_, subId_,
false));
5951 const std::string& topic_,
5953 const std::string& bookmark_,
5954 const std::string& filter_ =
"",
5955 const std::string& options_ =
"",
5956 const std::string& subId_ =
"")
5958 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5959 filter_, bookmark_, options_, subId_);
5980 const std::string& bookmark_,
5981 const std::string& filter_ =
"",
5982 const std::string& options_ =
"",
5983 const std::string& subId_ =
"")
5986 if (_body.get().getDefaultMaxDepth())
5988 result.
maxDepth(_body.get().getDefaultMaxDepth());
5990 result.setSubscription(_body.get().subscribe(
5992 topic_, timeout_, filter_,
5993 bookmark_, options_,
6001 const std::string& bookmark_,
6002 const std::string& filter_ =
"",
6003 const std::string& options_ =
"",
6004 const std::string& subId_ =
"")
6007 if (_body.get().getDefaultMaxDepth())
6009 result.
maxDepth(_body.get().getDefaultMaxDepth());
6011 result.setSubscription(_body.get().subscribe(
6013 topic_, timeout_, filter_,
6014 bookmark_, options_,
6029 return _body.get().unsubscribe(commandId);
6041 return _body.get().unsubscribe();
6075 const std::string& topic_,
6076 const std::string& filter_ =
"",
6077 const std::string& orderBy_ =
"",
6078 const std::string& bookmark_ =
"",
6079 int batchSize_ = DEFAULT_BATCH_SIZE,
6080 int topN_ = DEFAULT_TOP_N,
6081 const std::string& options_ =
"",
6082 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6084 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6085 bookmark_, batchSize_, topN_, options_,
6113 const std::string& filter_ =
"",
6114 const std::string& orderBy_ =
"",
6115 const std::string& bookmark_ =
"",
6116 int batchSize_ = DEFAULT_BATCH_SIZE,
6117 int topN_ = DEFAULT_TOP_N,
6118 const std::string& options_ =
"",
6119 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6122 if (_body.get().getDefaultMaxDepth())
6124 result.
maxDepth(_body.get().getDefaultMaxDepth());
6126 result.setSOWOnly(
sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
6132 const std::string& filter_ =
"",
6133 const std::string& orderBy_ =
"",
6134 const std::string& bookmark_ =
"",
6135 int batchSize_ = DEFAULT_BATCH_SIZE,
6136 int topN_ = DEFAULT_TOP_N,
6137 const std::string& options_ =
"",
6138 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6141 if (_body.get().getDefaultMaxDepth())
6143 result.
maxDepth(_body.get().getDefaultMaxDepth());
6145 result.setSOWOnly(
sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
6171 const std::string& topic_,
6173 const std::string& filter_ =
"",
6174 int batchSize_ = DEFAULT_BATCH_SIZE,
6175 int topN_ = DEFAULT_TOP_N)
6177 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6203 const std::string& topic_,
6205 const std::string& filter_ =
"",
6206 int batchSize_ = DEFAULT_BATCH_SIZE,
6207 bool oofEnabled_ =
false,
6208 int topN_ = DEFAULT_TOP_N)
6210 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6211 filter_, batchSize_, oofEnabled_,
6236 const std::string& filter_ =
"",
6237 int batchSize_ = DEFAULT_BATCH_SIZE,
6238 bool oofEnabled_ =
false,
6239 int topN_ = DEFAULT_TOP_N)
6242 if (_body.get().getDefaultMaxDepth())
6244 result.
maxDepth(_body.get().getDefaultMaxDepth());
6246 result.setSubscription(_body.get().sowAndSubscribe(
6248 topic_, timeout_, filter_,
6249 batchSize_, oofEnabled_,
6274 const std::string& filter_ =
"",
6275 int batchSize_ = DEFAULT_BATCH_SIZE,
6276 bool oofEnabled_ =
false,
6277 int topN_ = DEFAULT_TOP_N)
6280 if (_body.get().getDefaultMaxDepth())
6282 result.
maxDepth(_body.get().getDefaultMaxDepth());
6284 result.setSubscription(_body.get().sowAndSubscribe(
6286 topic_, timeout_, filter_,
6287 batchSize_, oofEnabled_,
6321 const std::string& topic_,
6322 const std::string& filter_ =
"",
6323 const std::string& orderBy_ =
"",
6324 const std::string& bookmark_ =
"",
6325 int batchSize_ = DEFAULT_BATCH_SIZE,
6326 int topN_ = DEFAULT_TOP_N,
6327 const std::string& options_ =
"",
6328 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6330 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6331 orderBy_, bookmark_, batchSize_,
6332 topN_, options_, timeout_);
6360 const std::string& filter_ =
"",
6361 const std::string& orderBy_ =
"",
6362 const std::string& bookmark_ =
"",
6363 int batchSize_ = DEFAULT_BATCH_SIZE,
6364 int topN_ = DEFAULT_TOP_N,
6365 const std::string& options_ =
"",
6366 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6369 if (_body.get().getDefaultMaxDepth())
6371 result.
maxDepth(_body.get().getDefaultMaxDepth());
6373 result.setSubscription(_body.get().sowAndSubscribe(
6375 topic_, filter_, orderBy_,
6376 bookmark_, batchSize_, topN_,
6377 options_, timeout_,
false));
6383 const std::string& filter_ =
"",
6384 const std::string& orderBy_ =
"",
6385 const std::string& bookmark_ =
"",
6386 int batchSize_ = DEFAULT_BATCH_SIZE,
6387 int topN_ = DEFAULT_TOP_N,
6388 const std::string& options_ =
"",
6389 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6392 if (_body.get().getDefaultMaxDepth())
6394 result.
maxDepth(_body.get().getDefaultMaxDepth());
6396 result.setSubscription(_body.get().sowAndSubscribe(
6398 topic_, filter_, orderBy_,
6399 bookmark_, batchSize_, topN_,
6400 options_, timeout_,
false));
6429 const std::string& topic_,
6430 const std::string& filter_ =
"",
6431 const std::string& orderBy_ =
"",
6432 int batchSize_ = DEFAULT_BATCH_SIZE,
6433 int topN_ = DEFAULT_TOP_N,
6434 const std::string& options_ =
"",
6435 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6437 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6438 filter_, orderBy_, batchSize_,
6439 topN_, options_, timeout_);
6462 const std::string& filter_ =
"",
6463 const std::string& orderBy_ =
"",
6464 int batchSize_ = DEFAULT_BATCH_SIZE,
6465 int topN_ = DEFAULT_TOP_N,
6466 const std::string& options_ =
"",
6467 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6470 if (_body.get().getDefaultMaxDepth())
6472 result.
maxDepth(_body.get().getDefaultMaxDepth());
6475 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6477 topic_, filter_, orderBy_,
6478 batchSize_, topN_, options_,
6485 const std::string& filter_ =
"",
6486 const std::string& orderBy_ =
"",
6487 int batchSize_ = DEFAULT_BATCH_SIZE,
6488 int topN_ = DEFAULT_TOP_N,
6489 const std::string& options_ =
"",
6490 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6493 if (_body.get().getDefaultMaxDepth())
6495 result.
maxDepth(_body.get().getDefaultMaxDepth());
6497 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6499 topic_, filter_, orderBy_,
6500 batchSize_, topN_, options_,
6530 const std::string& topic_,
6532 const std::string& filter_ =
"",
6533 int batchSize_ = DEFAULT_BATCH_SIZE,
6534 bool oofEnabled_ =
false,
6535 bool sendEmpties_ =
false,
6536 int topN_ = DEFAULT_TOP_N)
6538 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6539 timeout_, filter_, batchSize_,
6540 oofEnabled_, sendEmpties_,
6567 const std::string& filter_ =
"",
6568 int batchSize_ = DEFAULT_BATCH_SIZE,
6569 bool oofEnabled_ =
false,
6570 bool sendEmpties_ =
false,
6571 int topN_ = DEFAULT_TOP_N)
6574 if (_body.get().getDefaultMaxDepth())
6576 result.
maxDepth(_body.get().getDefaultMaxDepth());
6578 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6580 topic_, timeout_, filter_,
6581 batchSize_, oofEnabled_,
6582 sendEmpties_, topN_,
false));
6608 const std::string& filter_ =
"",
6609 int batchSize_ = DEFAULT_BATCH_SIZE,
6610 bool oofEnabled_ =
false,
6611 bool sendEmpties_ =
false,
6612 int topN_ = DEFAULT_TOP_N)
6615 if (_body.get().getDefaultMaxDepth())
6617 result.
maxDepth(_body.get().getDefaultMaxDepth());
6619 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6621 topic_, timeout_, filter_,
6622 batchSize_, oofEnabled_,
6623 sendEmpties_, topN_,
false));
6646 const std::string& topic,
6647 const std::string& filter,
6650 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6678 stream.setStatsOnly(cid);
6679 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6680 return *(stream.
begin());
6682 catch (
const DisconnectedException&)
6695 _body.get().startTimer();
6706 return _body.get().stopTimer(messageHandler);
6731 const std::string& topic_,
6732 const std::string& keys_,
6735 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6767 stream.setStatsOnly(cid);
6768 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6769 return *(stream.
begin());
6771 catch (
const DisconnectedException&)
6793 const std::string& topic_,
const std::string& data_,
6796 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6823 stream.setStatsOnly(cid);
6824 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6825 return *(stream.
begin());
6827 catch (
const DisconnectedException&)
6839 return _body.get().getHandle();
6852 _body.get().setExceptionListener(pListener_);
6865 _body.get().setExceptionListener(listener_);
6872 return _body.get().getExceptionListener();
6898 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6922 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6935 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6961 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6986 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7069 _body.get().addConnectionStateListener(listener);
7077 _body.get().removeConnectionStateListener(listener);
7084 _body.get().clearConnectionStateListeners();
7114 return _body.get().executeAsync(command_, handler_);
7152 if (command_.isSubscribe())
7154 Message& message = command_.getMessage();
7157 if (useExistingHandler)
7160 if (_body.get()._routes.getRoute(subId, existingHandler))
7163 _body.get().executeAsync(command_, existingHandler,
false);
7168 id = _body.get().executeAsync(command_, handler_,
false);
7170 catch (
const DisconnectedException&)
7173 if (command_.isSubscribe())
7177 if (command_.isSow())
7210 _body.get().ack(topic_, bookmark_, options_);
7232 void ack(
const std::string& topic_,
const std::string& bookmark_,
7233 const char* options_ = NULL)
7235 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7243 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7245 _body.get()._ack(topic_, bookmark_, options_);
7258 _body.get().flushAcks();
7267 return _body.get().getAutoAck();
7277 _body.get().setAutoAck(isAutoAckEnabled_);
7285 return _body.get().getAckBatchSize();
7295 _body.get().setAckBatchSize(ackBatchSize_);
7306 return _body.get().getAckTimeout();
7318 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7320 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7322 _body.get().setAckTimeout(ackTimeout_);
7336 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7345 return _body.get().getRetryOnDisconnect();
7354 _body.get().setDefaultMaxDepth(maxDepth_);
7363 return _body.get().getDefaultMaxDepth();
7375 return _body.get().setTransportFilterFunction(filter_, userData_);
7389 return _body.get().setThreadCreatedCallback(callback_, userData_);
7397 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7399 _body.get().deferredExecution(func_, userData_);
7409 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7415 unsigned deliveries = 0;
7427 const char* data = NULL;
7429 const char* status = NULL;
7430 size_t statusLen = 0;
7432 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7435 if (len == NotEntitled || len == Duplicate ||
7436 (statusLen == Failure && status[0] ==
'f'))
7438 if (_failedWriteHandler)
7440 if (_publishStore.isValid())
7442 amps_uint64_t sequence =
7444 FailedWriteStoreReplayer replayer(
this, data, len);
7445 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7446 replayer, sequence));
7450 static Message emptyMessage;
7452 AMPS_CALL_EXCEPTION_WRAPPER(
7453 _failedWriteHandler->failedWrite(emptyMessage,
7459 if (_publishStore.isValid())
7468 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7472 if (!deliveries && _bookmarkStore.isValid())
7478 Message::Field subId(data, len);
7479 const char* bookmarkData = NULL;
7480 size_t bookmarkLen = 0;
7486 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7489 _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7494 catch (std::exception& ex)
7496 AMPS_UNHANDLED_EXCEPTION(ex);
7502 ClientImpl::processedAck(Message& message)
7504 unsigned deliveries = 0;
7506 const char* data = NULL;
7510 Lock<Mutex> l(_lock);
7513 Lock<Mutex> guard(_ackMapLock);
7514 AckMap::iterator i = _ackMap.find(std::string(data, len));
7515 if (i != _ackMap.end())
7525 ack.setStatus(data, len);
7527 ack.setReason(data, len);
7529 ack.setUsername(data, len);
7531 ack.setPassword(data, len);
7533 ack.setServerVersion(data, len);
7535 ack.setOptions(data, len);
7537 ack.setBookmark(message.getBookmark());
7545 ClientImpl::checkAndSendHeartbeat(
bool force)
7547 if (force || _heartbeatTimer.check())
7549 _heartbeatTimer.start();
7552 sendWithoutRetry(_beatMessage);
7554 catch (
const AMPSException&)
7561 inline ConnectionInfo ClientImpl::getConnectionInfo()
const
7563 ConnectionInfo info;
7564 std::ostringstream writer;
7566 info[
"client.uri"] = _lastUri;
7567 info[
"client.name"] = _name;
7568 info[
"client.username"] = _username;
7569 if (_publishStore.isValid())
7571 writer << _publishStore.unpersistedCount();
7572 info[
"publishStore.unpersistedCount"] = writer.str();
7581 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7583 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7584 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7585 ClientImpl* me = (ClientImpl*) userData_;
7586 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7587 if (!messageHandle_)
7589 if (me->_queueAckTimeout)
7591 me->checkQueueAcks();
7596 me->_readMessage.replace(messageHandle_);
7597 Message& message = me->_readMessage;
7598 Message::Command::Type commandType = message.getCommandEnum();
7599 if (commandType & SOWMask)
7605 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7606 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7608 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7609 message.getQueryID()));
7611 else if (commandType & PublishMask)
7614 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7615 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7616 GlobalCommandTypeHandlers::Publish :
7617 GlobalCommandTypeHandlers::OOF)].invoke(message));
7619 const char* subIds = NULL;
7620 size_t subIdsLen = 0;
7623 &subIds, &subIdsLen);
7624 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7625 for (
size_t i = 0; i < subIdCount; ++i)
7627 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7628 MessageHandler& handler = lookupResult.handler;
7629 if (handler.isValid())
7632 AMPS_SubscriptionId,
7633 subIds + lookupResult.idOffset,
7634 lookupResult.idLength);
7635 Message::Field bookmark = message.getBookmark();
7636 bool isMessageQueue = message.getLeasePeriod().len() != 0;
7637 bool isAutoAck = me->_isAutoAckEnabled;
7639 if (!isMessageQueue && !bookmark.empty() &&
7640 me->_bookmarkStore.isValid())
7642 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7645 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7647 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7652 me->_bookmarkStore.log(me->_readMessage);
7653 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7654 handler.invoke(message));
7659 if (isMessageQueue && isAutoAck)
7663 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7664 if (!message.getIgnoreAutoAck())
7666 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7667 me->_ack(message.getTopic(), message.getBookmark()));
7670 catch (std::exception& ex)
7672 if (!message.getIgnoreAutoAck())
7674 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7675 me->_ack(message.getTopic(), message.getBookmark(),
"cancel"));
7677 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7682 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7683 handler.invoke(message));
7689 me->lastChance(message);
7693 else if (commandType == Message::Command::Ack)
7695 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7696 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7697 unsigned ackType = message.getAckTypeEnum();
7698 unsigned deliveries = 0U;
7701 case Message::AckType::Persisted:
7702 deliveries += me->persistedAck(message);
7704 case Message::AckType::Processed:
7705 deliveries += me->processedAck(message);
7708 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7709 if (deliveries == 0)
7711 me->lastChance(message);
7714 else if (commandType == Message::Command::Heartbeat)
7716 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7717 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7718 if (me->_heartbeatTimer.getTimeout() != 0.0)
7720 me->checkAndSendHeartbeat(
true);
7724 me->lastChance(message);
7728 else if (!message.getCommandId().empty())
7730 unsigned deliveries = 0U;
7733 while (me->_connected)
7737 deliveries = me->_routes.deliverData(message, message.getCommandId());
7741 catch (MessageStreamFullException&)
7743 catch (MessageStreamFullException& ex_)
7746 me->checkAndSendHeartbeat(
false);
7750 catch (std::exception& ex_)
7754 me->_exceptionListener->exceptionThrown(ex_);
7761 if (deliveries == 0)
7763 me->lastChance(message);
7766 me->checkAndSendHeartbeat();
7771 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7773 ClientImpl* me = (ClientImpl*) userData;
7776 me->clearAcks(failedConnectionVersion);
7780 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7782 ClientImpl* me = (ClientImpl*) userData;
7783 Lock<Mutex> l(me->_lock);
7784 Client wrapper(me,
false);
7787 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7791 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7794 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
7795 me->_connected =
false;
7799 Unlock<Mutex> unlock(me->_lock);
7800 me->_disconnectHandler.invoke(wrapper);
7803 catch (
const std::exception& ex)
7805 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7807 me->_lock.signalAll();
7809 if (!me->_connected)
7811 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7812 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7818 if (me->_subscriptionManager)
7823 Unlock<Mutex> unlock(me->_lock);
7824 me->_subscriptionManager->resubscribe(wrapper);
7826 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7830 catch (
const AMPSException& subEx)
7832 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7834 catch (
const std::exception& subEx)
7836 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7859 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
7860 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7862 while (_pos != _len && _data[_pos] == _fieldSep)
7868 typedef void* difference_type;
7869 typedef std::forward_iterator_tag iterator_category;
7870 typedef std::pair<Message::Field, Message::Field> value_type;
7871 typedef value_type* pointer;
7872 typedef value_type& reference;
7873 bool operator==(
const iterator& rhs)
const
7875 return _pos == rhs._pos;
7877 bool operator!=(
const iterator& rhs)
const
7879 return _pos != rhs._pos;
7881 iterator& operator++()
7884 while (_pos != _len && _data[_pos] != _fieldSep)
7889 while (_pos != _len && _data[_pos] == _fieldSep)
7896 value_type operator*()
const
7899 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7900 for (; i < _len && _data[i] !=
'='; ++i)
7905 result.first.assign(_data + _pos, keyLength);
7907 if (i < _len && _data[i] ==
'=')
7911 for (; i < _len && _data[i] != _fieldSep; ++i)
7916 result.second.assign(_data + valueStart, valueLength);
7922 class reverse_iterator
7929 typedef std::pair<Message::Field, Message::Field> value_type;
7930 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7931 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7936 while (_pos >= _data && *_pos == _fieldSep)
7940 while (_pos > _data && *_pos != _fieldSep)
7947 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7957 bool operator==(
const reverse_iterator& rhs)
const
7959 return _pos == rhs._pos;
7961 bool operator!=(
const reverse_iterator& rhs)
const
7963 return _pos != rhs._pos;
7965 reverse_iterator& operator++()
7976 while (_pos >= _data && *_pos == _fieldSep)
7981 while (_pos > _data && *_pos != _fieldSep)
7985 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7996 value_type operator*()
const
7999 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8000 size_t i = (size_t)(_pos - _data);
8001 for (; i < _len && _data[i] !=
'='; ++i)
8005 result.first.assign(_pos, keyLength);
8006 if (i < _len && _data[i] ==
'=')
8010 for (; i < _len && _data[i] != _fieldSep; ++i)
8015 result.second.assign(_data + valueStart, valueLength);
8019 FIX(
const Message::Field& data,
char fieldSeparator = 1)
8020 : _data(data.data()), _len(data.len()),
8021 _fieldSep(fieldSeparator)
8025 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8026 : _data(data), _len(len), _fieldSep(fieldSeparator)
8030 iterator begin()
const
8032 return iterator(_data, _len, 0, _fieldSep);
8034 iterator end()
const
8036 return iterator(_data, _len, _len, _fieldSep);
8040 reverse_iterator rbegin()
const
8042 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8045 reverse_iterator rend()
const
8047 return reverse_iterator(_data, _len, 0, _fieldSep);
8068 std::stringstream _data;
8085 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8087 _data << tag <<
'=';
8088 _data.write(value + offset, (std::streamsize)length);
8096 void append(
const T& tag,
const std::string& value)
8098 _data << tag <<
'=' << value << _fs;
8107 operator std::string()
const
8115 _data.str(std::string());
8152 typedef std::map<Message::Field, Message::Field>
map_type;
8163 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8172 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8176 std::deque<Message> _q;
8177 std::deque<Message> _cache;
8178 std::string _commandId;
8180 std::string _queryId;
8184 unsigned _requestedAcks;
8186 Message::Field _previousTopic;
8187 Message::Field _previousBookmark;
8188 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8189 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8190 std::atomic<State> _state;
8192 volatile State _state;
8194 typedef std::map<std::string, Message*> SOWKeyMap;
8195 SOWKeyMap _sowKeyMap;
8197 MessageStreamImpl(
const Client& client_)
8200 _maxDepth((unsigned)~0),
8202 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8205 if (_client.isValid())
8207 _client.addConnectionStateListener(
this);
8211 MessageStreamImpl(ClientImpl* client_)
8214 _maxDepth((unsigned)~0),
8218 if (_client.isValid())
8220 _client.addConnectionStateListener(
this);
8224 ~MessageStreamImpl()
8228 virtual void destroy()
8234 catch (std::exception& e)
8238 if (_client.isValid())
8240 _client.getExceptionListener().exceptionThrown(e);
8245 if (_client.isValid())
8247 _client.removeConnectionStateListener(
this);
8249 _client = Client((ClientImpl*)NULL);
8250 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8258 static void destroyer(
void* vpMessageStreamImpl_)
8260 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8263 void setSubscription(
const std::string& subId_,
8264 const std::string& commandId_ =
"",
8265 const std::string& queryId_ =
"")
8267 Lock<Mutex> lock(_lock);
8269 if (!commandId_.empty() && commandId_ != subId_)
8271 _commandId = commandId_;
8273 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8275 _queryId = queryId_;
8278 if (Disconnected == _state)
8282 assert(Unset == _state);
8286 void setSOWOnly(
const std::string& commandId_,
8287 const std::string& queryId_ =
"")
8289 Lock<Mutex> lock(_lock);
8290 _commandId = commandId_;
8291 if (!queryId_.empty() && queryId_ != commandId_)
8293 _queryId = queryId_;
8296 if (Disconnected == _state)
8300 assert(Unset == _state);
8304 void setStatsOnly(
const std::string& commandId_,
8305 const std::string& queryId_ =
"")
8307 Lock<Mutex> lock(_lock);
8308 _commandId = commandId_;
8309 if (!queryId_.empty() && queryId_ != commandId_)
8311 _queryId = queryId_;
8314 if (Disconnected == _state)
8318 assert(Unset == _state);
8320 _requestedAcks = Message::AckType::Stats;
8323 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8325 Lock<Mutex> lock(_lock);
8326 _commandId = commandId_;
8328 if (Disconnected == _state)
8332 assert(Unset == _state);
8334 _requestedAcks = acks_;
8339 Lock<Mutex> lock(_lock);
8340 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8342 _state = Disconnected;
8348 void timeout(
unsigned timeout_)
8350 _timeout = timeout_;
8354 if (_state == Subscribe)
8359 void maxDepth(
unsigned maxDepth_)
8363 _maxDepth = maxDepth_;
8367 _maxDepth = (unsigned)~0;
8370 unsigned getMaxDepth(
void)
const
8374 unsigned getDepth(
void)
const
8376 return (
unsigned)(_q.size());
8379 bool next(Message& current_)
8381 Lock<Mutex> lock(_lock);
8382 if (!_previousTopic.empty() && !_previousBookmark.empty())
8386 if (_client.isValid())
8388 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8392 catch (AMPSException&)
8394 catch (AMPSException& e)
8397 current_.invalidate();
8398 _previousTopic.clear();
8399 _previousBookmark.clear();
8402 _previousTopic.clear();
8403 _previousBookmark.clear();
8405 double minWaitTime = (double)((_timeout && _timeout > 1000)
8407 Timer timer(minWaitTime);
8409 while (_q.empty() && _state & Running)
8412 _lock.wait((
long)minWaitTime);
8414 Unlock<Mutex> unlck(_lock);
8415 amps_invoke_waiting_function();
8420 if (timer.checkAndGetRemaining(&minWaitTime))
8426 if (current_.isValid() && _cache.size() < _cacheMax)
8429 _cache.push_back(current_);
8433 current_ = _q.front();
8434 if (_q.size() == _maxDepth)
8439 if (_state == Conflate)
8441 std::string sowKey = current_.getSowKey();
8442 if (sowKey.length())
8444 _sowKeyMap.erase(sowKey);
8447 else if (_state == AcksOnly)
8449 _requestedAcks &= ~(current_.getAckTypeEnum());
8451 if ((_state == AcksOnly && _requestedAcks == 0) ||
8452 (_state == SOWOnly && current_.getCommand() ==
"group_end"))
8456 else if (current_.getCommandEnum() == Message::Command::Publish &&
8457 _client.isValid() && _client.getAutoAck() &&
8458 !current_.getLeasePeriod().empty() &&
8459 !current_.getBookmark().empty())
8461 _previousTopic = current_.getTopic().deepCopy();
8462 _previousBookmark = current_.getBookmark().deepCopy();
8466 if (_state == Disconnected)
8468 throw DisconnectedException(
"Connection closed.");
8470 current_.invalidate();
8471 if (_state == Closed)
8475 return _timeout != 0;
8479 if (_client.isValid())
8481 if (_state == SOWOnly || _state == Subscribe)
8483 if (!_commandId.empty())
8485 _client.unsubscribe(_commandId);
8487 if (!_subId.empty())
8489 _client.unsubscribe(_subId);
8491 if (!_queryId.empty())
8493 _client.unsubscribe(_queryId);
8498 if (!_commandId.empty())
8500 _client.removeMessageHandler(_commandId);
8502 if (!_subId.empty())
8504 _client.removeMessageHandler(_subId);
8506 if (!_queryId.empty())
8508 _client.removeMessageHandler(_queryId);
8512 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8517 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8519 Lock<Mutex> lock(this_->_lock);
8520 if (this_->_state != Conflate)
8522 AMPS_TESTING_SLOW_MESSAGE_STREAM
8523 if (this_->_q.size() >= this_->_maxDepth)
8528 this_->_lock.signalAll();
8529 throw MessageStreamFullException(
"Stream is currently full.");
8531 if (!this_->_cache.empty())
8533 this_->_cache.front().deepCopy(message_);
8534 this_->_q.push_back(this_->_cache.front());
8535 this_->_cache.pop_front();
8539 #ifdef AMPS_USE_EMPLACE
8540 this_->_q.emplace_back(message_.deepCopy());
8542 this_->_q.push_back(message_.deepCopy());
8545 if (message_.getCommandEnum() == Message::Command::Publish &&
8546 this_->_client.isValid() && this_->_client.getAutoAck() &&
8547 !message_.getLeasePeriod().empty() &&
8548 !message_.getBookmark().empty())
8550 message_.setIgnoreAutoAck();
8555 std::string sowKey = message_.getSowKey();
8556 if (sowKey.length())
8558 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8559 if (it != this_->_sowKeyMap.end())
8561 it->second->deepCopy(message_);
8565 if (this_->_q.size() >= this_->_maxDepth)
8571 this_->_lock.signalAll();
8572 throw MessageStreamFullException(
"Stream is currently full.");
8574 if (!this_->_cache.empty())
8576 this_->_cache.front().deepCopy(message_);
8577 this_->_q.push_back(this_->_cache.front());
8578 this_->_cache.pop_front();
8582 #ifdef AMPS_USE_EMPLACE
8583 this_->_q.emplace_back(message_.deepCopy());
8585 this_->_q.push_back(message_.deepCopy());
8588 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8593 if (this_->_q.size() >= this_->_maxDepth)
8598 this_->_lock.signalAll();
8599 throw MessageStreamFullException(
"Stream is currently full.");
8601 if (!this_->_cache.empty())
8603 this_->_cache.front().deepCopy(message_);
8604 this_->_q.push_back(this_->_cache.front());
8605 this_->_cache.pop_front();
8609 #ifdef AMPS_USE_EMPLACE
8610 this_->_q.emplace_back(message_.deepCopy());
8612 this_->_q.push_back(message_.deepCopy());
8615 if (message_.getCommandEnum() == Message::Command::Publish &&
8616 this_->_client.isValid() && this_->_client.getAutoAck() &&
8617 !message_.getLeasePeriod().empty() &&
8618 !message_.getBookmark().empty())
8620 message_.setIgnoreAutoAck();
8624 this_->_lock.signalAll();
8627 inline MessageStream::MessageStream(
void)
8630 inline MessageStream::MessageStream(
const Client& client_)
8631 : _body(new MessageStreamImpl(client_))
8634 inline void MessageStream::iterator::advance(
void)
8636 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8638 inline MessageStream::operator MessageHandler(
void)
8640 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8642 inline MessageStream MessageStream::fromExistingHandler(
const MessageHandler& handler_)
8644 MessageStream result;
8645 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8647 result._body = (MessageStreamImpl*)(handler_._userData);
8652 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8653 const std::string& queryId_)
8655 _body->setSOWOnly(commandId_, queryId_);
8657 inline void MessageStream::setSubscription(
const std::string& subId_,
8658 const std::string& commandId_,
8659 const std::string& queryId_)
8661 _body->setSubscription(subId_, commandId_, queryId_);
8663 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8664 const std::string& queryId_)
8666 _body->setStatsOnly(commandId_, queryId_);
8668 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8671 _body->setAcksOnly(commandId_, acks_);
8690 return _body->getMaxDepth();
8694 return _body->getDepth();
8697 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8699 return *(_pEmptyMessageStream.get());
8707 ClientImpl& body = _body.get();
8708 Message& message = command_.getMessage();
8712 if (useExistingHandler)
8718 if (body._routes.getRoute(subId, existingHandler))
8721 body.executeAsync(command_, existingHandler,
false);
8722 return MessageStream::fromExistingHandler(existingHandler);
8731 if ((command & Message::Command::NoDataCommands)
8732 && (ackTypes == Message::AckType::Persisted
8733 || ackTypes == Message::AckType::None))
8736 if (!body._pEmptyMessageStream)
8738 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8739 body._pEmptyMessageStream.get()->_body->close();
8741 return body.getEmptyMessageStream();
8744 if (body.getDefaultMaxDepth())
8746 stream.
maxDepth(body.getDefaultMaxDepth());
8749 std::string commandID = body.executeAsync(command_, handler,
false);
8750 if (command_.hasStatsAck())
8752 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8754 else if (command_.isSow())
8756 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8758 else if (command_.isSubscribe())
8760 stream.setSubscription(commandID,
8767 if (command == Message::Command::Publish ||
8768 command == Message::Command::DeltaPublish ||
8769 command == Message::Command::SOWDelete)
8771 stream.setAcksOnly(commandID,
8772 ackTypes & (
unsigned)~Message::AckType::Persisted);
8776 stream.setAcksOnly(commandID, ackTypes);
8783 inline void Message::ack(
const char* options_)
const
8785 ClientImpl* pClient = _body.get().clientImpl();
8787 if (pClient && bookmark.
len() &&
8788 !pClient->getAutoAck())
8791 pClient->ack(getTopic(), bookmark, options_);
Core type and function declarations for the AMPS C client.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
amps_result
Return values from amps_xxx functions.
Definition: amps.h:201
@ AMPS_E_RETRY
The operation has not succeeded, but ought to be retried.
Definition: amps.h:229
@ AMPS_E_OK
Success.
Definition: amps.h:205
@ AMPS_E_DISCONNECTED
The client and server are disconnected.
Definition: amps.h:233
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead.
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:652
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received.
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:195
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:626
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1373
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1053
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:969
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:229
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5056
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6131
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5509
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example,...
Definition: ampsplusplus.hpp:6850
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5741
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7256
void startTimer()
Definition: ampsplusplus.hpp:6693
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5811
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5356
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5211
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7075
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5533
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7013
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5872
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5372
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5950
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6039
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5312
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6920
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6933
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7034
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6837
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5453
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5110
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5627
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5461
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7352
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5484
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6484
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7293
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5242
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7003
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5650
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5204
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5072
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6870
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6027
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6863
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5334
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5326
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time afte...
Definition: ampsplusplus.hpp:7304
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5978
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7343
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5364
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5843
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5151
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5722
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5133
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6984
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5117
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7265
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7361
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5384
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7208
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7082
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6994
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5999
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7146
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5164
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7023
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5260
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5606
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5699
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7232
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5908
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6926
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id,...
Definition: ampsplusplus.hpp:5273
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5673
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7112
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7283
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7275
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8702
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5235
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5125
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7045
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5284
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7334
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6959
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7372
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5175
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5408
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6896
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5890
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5144
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7067
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7386
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5785
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7316
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5439
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5343
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5424
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6382
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7056
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6704
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7220
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5189
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5560
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5416
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5760
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:442
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:806
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:826
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:779
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:648
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:893
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:674
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:766
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:594
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:871
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:820
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:728
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:707
std::string getAckType() const
Definition: ampsplusplus.hpp:915
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:569
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:835
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:694
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:785
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:758
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:629
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:687
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:635
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:772
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:642
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:544
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:681
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:798
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:700
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:529
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:655
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:661
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:739
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:668
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:537
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:751
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:920
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:581
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:611
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:857
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:561
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:851
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:717
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:552
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1464
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1467
virtual void connectionStateChanged(State newState_)=0
Pure virtual method for receiving the change in connection state.
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1000
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1005
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client.
Definition: ampsplusplus.hpp:1022
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1012
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1017
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:237
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8141
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8152
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8159
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields.
Definition: ampsplusplus.hpp:8148
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1402
virtual bool failure(const Message &message_, const MessageHandler &handler_, unsigned requestedAckTypes_, const AMPSException &exception_)=0
Implement this function to return true if the subscription should be removed from the SubscriptionMan...
Class to handle when a client receives a duplicate publish message, or not entitled message.
Definition: ampsplusplus.hpp:1344
virtual void failedWrite(const Message &message_, const char *reason_, size_t reasonLength_)=0
Called when the server indicates a message could not be written.
Field represents the value of a single field in a Message.
Definition: Field.hpp:87
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4932
An iterable object representing the results of an AMPS subscription and/or query.
Definition: ampsplusplus.hpp:4924
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4976
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4969
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8678
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8683
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8673
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8692
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8688
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4987
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:532
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message & assignFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType.
Definition: Message.hpp:1160
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Message & assignSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Message & assignQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:184
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1060
virtual amps_uint64_t getLowestUnpersisted() const =0
Get the oldest unpersisted message sequence in the store.
virtual size_t unpersistedCount() const =0
Method to return how many messages are in the store that have not been discarded, indicating that the...
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1129
virtual amps_uint64_t getLastPersisted()=0
Get the last persisted sequence number.
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1122
virtual void discardUpTo(amps_uint64_t index_)=0
Called by Client to indicate that all messages up to and including.
virtual void flush(long timeout_)=0
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
virtual amps_uint64_t store(const Message &message_)=0
Called by Client to store a message being published.
virtual void replay(StoreReplayer &replayer_)=0
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
virtual bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)=0
Called by Client to get a single message replayed by the store onto the StoreReplayer.
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1067
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1153
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1032
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1186
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1221
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1325
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1317
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1242
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1250
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1279
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1212
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1271
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1263
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1293
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1233
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1308
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1201
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1424
virtual void resubscribe(Client &client_)=0
Called by Client to get all subscriptions placed again.
virtual void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)=0
Called by Client when a subscription is placed.
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1451
virtual void unsubscribe(const Message::Field &subId_)=0
Called by Client when a subscription is unsubscribed.
virtual void clear()=0
Clear subscriptions and reset to the initial state.
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8067
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8076
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8103
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8096
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8113
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8085
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6645
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6813
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6320
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6170
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6529
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6792
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6202
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6730
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6428
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6074
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription.
Definition: BookmarkStore.hpp:55
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6606
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6461
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6757
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6668
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6112
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6234
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6272
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6565
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6359