25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 28 #include "amps/ampsver.h" 47 #include <sys/atomic.h> 49 #include "amps/BookmarkStore.hpp" 50 #include "amps/MessageRouter.hpp" 51 #include "amps/util.hpp" 52 #include "amps/ampscrc.hpp" 53 #if __cplusplus >= 201100L || _MSC_VER >= 1900 57 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 58 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 86 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 87 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 88 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 89 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 90 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 91 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 92 #define AMPS_DEFAULT_TOP_N -1 93 #define AMPS_DEFAULT_BATCH_SIZE 10 94 #define AMPS_NUMBER_BUFFER_LEN 20 95 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 97 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 106 typedef std::map<std::string, std::string> ConnectionInfo;
109 inline std::string asString(Type x_)
111 std::ostringstream os;
117 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
119 size_t pos = AMPS_NUMBER_BUFFER_LEN;
120 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
124 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
133 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
135 size_t pos = AMPS_NUMBER_BUFFER_LEN;
136 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
140 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
154 static const char* duplicate()
158 static const char* badFilter()
162 static const char* badRegexTopic()
164 return "bad regex topic";
166 static const char* subscriptionAlreadyExists()
168 return "subscription already exists";
170 static const char* nameInUse()
172 return "name in use";
174 static const char* authFailure()
176 return "auth failure";
178 static const char* notEntitled()
180 return "not entitled";
182 static const char* authDisabled()
184 return "authentication disabled";
186 static const char* subidInUse()
188 return "subid in use";
190 static const char* noTopic()
208 virtual void exceptionThrown(
const std::exception&)
const {;}
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 219 catch (std::exception& ex_)\ 223 _exceptionListener->exceptionThrown(ex_);\ 247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 250 while(me->_connected)\ 257 catch(MessageStreamFullException&)\ 261 me->checkAndSendHeartbeat(false);\ 263 catch (std::exception& ex_)\ 267 me->_exceptionListener->exceptionThrown(ex_);\ 278 catch (std::exception& ex_)\ 282 me->_exceptionListener->exceptionThrown(ex_);\ 306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 307 while(me->_connected)\ 314 catch(MessageStreamFullException&)\ 318 me->checkAndSendHeartbeat(false);\ 320 catch (std::exception& ex_)\ 324 me->_exceptionListener->exceptionThrown(ex_);\ 335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 338 while(me->_connected)\ 345 catch(MessageStreamFullException& ex_)\ 349 me->checkAndSendHeartbeat(false);\ 351 catch (std::exception& ex_)\ 355 me->_exceptionListener->exceptionThrown(ex_);\ 366 catch (std::exception& ex_)\ 370 me->_exceptionListener->exceptionThrown(ex_);\ 394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 395 while(me->_connected)\ 402 catch(MessageStreamFullException& ex_)\ 406 me->checkAndSendHeartbeat(false);\ 408 catch (std::exception& ex_)\ 412 me->_exceptionListener->exceptionThrown(ex_);\ 424 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 427 _exceptionListener->exceptionThrown(ex);\ 434 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 437 me->_exceptionListener->exceptionThrown(ex);\ 478 static const unsigned Subscribe = 1;
479 static const unsigned SOW = 2;
480 static const unsigned NeedsSequenceNumber = 4;
481 static const unsigned ProcessedAck = 8;
482 static const unsigned StatsAck = 16;
483 void init(Message::Command::Type command_)
492 void init(
const std::string& command_)
501 void init(
const char* command_,
size_t commandLen_)
513 if (!(command & Message::Command::NoDataCommands))
516 if (command == Message::Command::Subscribe ||
517 command == Message::Command::SOWAndSubscribe ||
518 command == Message::Command::DeltaSubscribe ||
519 command == Message::Command::SOWAndDeltaSubscribe)
524 if (command == Message::Command::SOW
525 || command == Message::Command::SOWAndSubscribe
526 || command == Message::Command::SOWAndDeltaSubscribe)
531 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
533 if (command == Message::Command::SOW)
538 _flags |= ProcessedAck;
540 else if (command == Message::Command::SOWDelete)
543 _flags |= ProcessedAck;
544 _flags |= NeedsSequenceNumber;
546 else if (command == Message::Command::Publish
547 || command == Message::Command::DeltaPublish)
549 _flags |= NeedsSequenceNumber;
551 else if (command == Message::Command::StopTimer)
568 Command(
const char* command_,
size_t commandLen_)
570 init(command_, commandLen_);
594 init(command_, commandLen_);
688 _message.
setTopic(topic_, topicLen_);
818 std::ostringstream os;
823 amps_uint64_t getSequence()
const 839 _message.
setData(data_, dataLen_);
859 if (topN_ != (
unsigned)AMPS_DEFAULT_TOP_N)
876 _batchSize = batchSize_;
898 if (ackType_ ==
"processed")
900 _flags |= ProcessedAck;
902 else if (ackType_ ==
"stats")
912 if (ackType_.find(
"processed") != std::string::npos)
914 _flags |= ProcessedAck;
918 _flags &= ~ProcessedAck;
920 if (ackType_.find(
"stats") != std::string::npos)
934 if (ackType_ & Message::AckType::Processed)
936 _flags |= ProcessedAck;
940 _flags &= ~ProcessedAck;
942 if (ackType_ & Message::AckType::Stats)
967 unsigned getTimeout(
void)
const 971 unsigned getBatchSize(
void)
const 975 bool isSubscribe(
void)
const 977 return _flags & Subscribe;
979 bool isSow(
void)
const 981 return (_flags & SOW) != 0;
983 bool hasProcessedAck(
void)
const 985 return (_flags & ProcessedAck) != 0;
987 bool hasStatsAck(
void)
const 989 return (_flags & StatsAck) != 0;
991 bool needsSequenceNumber(
void)
const 993 return (_flags & NeedsSequenceNumber) != 0;
999 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
1016 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
1024 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
1031 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1043 std::string
authenticate(
const std::string& ,
const std::string& password_)
1050 std::string
retry(
const std::string& ,
const std::string& )
1052 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1055 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1076 virtual void execute(
Message& message_) = 0;
1091 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1106 : _resizeHandler(NULL)
1107 , _resizeHandlerData(NULL)
1108 , _errorOnPublishGap(errorOnPublishGap_)
1115 virtual amps_uint64_t store(
const Message& message_) = 0;
1123 virtual void discardUpTo(amps_uint64_t index_) = 0;
1138 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1144 virtual size_t unpersistedCount()
const = 0;
1156 virtual void flush(
long timeout_) = 0;
1162 return AMPS_UNSET_INDEX;
1169 return AMPS_UNSET_SEQUENCE;
1175 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
1180 virtual amps_uint64_t getLastPersisted() = 0;
1194 _resizeHandler = handler_;
1195 _resizeHandlerData = userData_;
1200 return _resizeHandler;
1203 bool callResizeHandler(
size_t newSize_);
1205 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1207 _errorOnPublishGap = errorOnPublishGap_;
1210 inline virtual bool getErrorOnPublishGap()
const 1212 return _errorOnPublishGap;
1217 void* _resizeHandlerData;
1218 bool _errorOnPublishGap;
1225 RefHandle<StoreImpl> _body;
1229 Store(
const Store& rhs) : _body(rhs._body) {;}
1241 return _body.get().store(message_);
1252 _body.get().discardUpTo(index_);
1261 _body.get().replay(replayer_);
1273 return _body.get().replaySingle(replayer_, index_);
1282 return _body.get().unpersistedCount();
1290 return _body.isValid();
1303 return _body.get().flush(timeout_);
1311 return _body.get().getLowestUnpersisted();
1319 return _body.get().getLastPersisted();
1334 _body.get().setResizeHandler(handler_, userData_);
1339 return _body.get().getResizeHandler();
1348 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1357 return _body.get().getErrorOnPublishGap();
1365 if (_body.isValid())
1367 return &_body.get();
1391 virtual void failedWrite(
const Message& message_,
1392 const char* reason_,
size_t reasonLength_) = 0;
1396 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1400 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1414 long* timeoutp = (
long*)data_;
1422 store_.
flush(*timeoutp);
1425 catch (
const TimedOutException&)
1427 catch (
const TimedOutException& e)
1454 unsigned requestedAckTypes_,
1455 const AMPSException& exception_) = 0;
1473 unsigned requestedAckTypes_) = 0;
1480 virtual void clear() = 0;
1484 virtual void resubscribe(Client& client_) = 0;
1491 _failedResubscribeHandler = handler_;
1494 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1505 typedef enum { Disconnected = 0,
1509 PublishReplayed = 8,
1510 HeartbeatInitiated = 16,
1524 virtual void connectionStateChanged(
State newState_) = 0;
1529 class MessageStreamImpl;
1532 typedef void(*DeferredExecutionFunc)(
void*);
1534 class ClientImpl :
public RefBody
1540 AMPS_SOCKET _socket;
1546 socklen_t _valueLen;
1550 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(
sizeof(
int))
1552 _valuePtr = (
char*)&_noDelay;
1554 if (_socket != AMPS_INVALID_SOCKET)
1556 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1560 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1564 _socket = AMPS_INVALID_SOCKET;
1571 if (_socket != AMPS_INVALID_SOCKET)
1574 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1579 friend class Client;
1582 DisconnectHandler _disconnectHandler;
1583 enum GlobalCommandTypeHandlers :
size_t 1593 DuplicateMessage = 8,
1596 std::vector<MessageHandler> _globalCommandTypeHandlers;
1597 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1599 MessageRouter::RouteCache _routeCache;
1600 mutable Mutex _lock;
1601 std::string _name, _nameHash, _lastUri, _logonCorrelationData, _preflightMessage;
1602 std::vector<std::string> _httpPreflightHeaders;
1603 amps_uint64_t _nameHashValue;
1605 Store _publishStore;
1606 bool _isRetryOnDisconnect;
1607 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1608 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1609 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1611 volatile amps_uint64_t _lastSentHaSequenceNumber;
1613 AMPS_ATOMIC_TYPE_8 _logonInProgress;
1614 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1615 VersionInfo _serverVersion;
1616 Timer _heartbeatTimer;
1617 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1620 int _queueAckTimeout;
1621 bool _isAutoAckEnabled;
1622 unsigned _ackBatchSize;
1623 unsigned _queuedAckCount;
1624 unsigned _defaultMaxDepth;
1625 struct QueueBookmarks
1627 QueueBookmarks(
const std::string& topic_)
1634 amps_uint64_t _oldestTime;
1635 unsigned _bookmarkCount;
1637 typedef amps_uint64_t topic_hash;
1638 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1639 TopicHashMap _topicHashMap;
1643 ClientImpl* _client;
1648 ClientStoreReplayer()
1649 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1652 ClientStoreReplayer(ClientImpl* client_)
1653 : _client(client_), _version(0), _res(
AMPS_E_OK)
1656 void setClient(ClientImpl* client_)
1661 void execute(
Message& message_)
1665 throw CommandException(
"Can't replay without a client.");
1669 if (index > _client->_lastSentHaSequenceNumber)
1671 _client->_lastSentHaSequenceNumber = index;
1679 (!_client->_logonInProgress ||
1683 message_.getMessage(),
1687 throw DisconnectedException(
"AMPS Server disconnected during replay");
1693 ClientStoreReplayer _replayer;
1697 ClientImpl* _parent;
1698 const char* _reason;
1699 size_t _reasonLength;
1700 size_t _replayCount;
1702 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1705 _reasonLength(reasonLength_),
1708 void execute(
Message& message_)
1710 if (_parent->_failedWriteHandler)
1713 _parent->_failedWriteHandler->failedWrite(message_,
1714 _reason, _reasonLength);
1717 size_t replayCount(
void)
const 1719 return _replayCount;
1723 struct AckResponseImpl :
public RefBody
1725 std::string username, password, reason, status, bookmark, options;
1726 amps_uint64_t sequenceNo;
1727 amps_uint64_t nameHashValue;
1728 VersionInfo serverVersion;
1729 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1730 std::atomic<bool> responded;
1731 std::atomic<bool> abandoned;
1733 volatile bool responded;
1734 volatile bool abandoned;
1736 unsigned connectionVersion;
1739 username(), password(), reason(), status(), bookmark(), options(),
1740 sequenceNo((amps_uint64_t)0),
1744 connectionVersion(UINT_MAX)
1751 RefHandle<AckResponseImpl> _body;
1753 AckResponse() : _body(NULL) {;}
1754 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1755 static AckResponse create()
1758 r._body =
new AckResponseImpl();
1762 const std::string& username()
1764 return _body.get().username;
1766 void setUsername(
const char* data_,
size_t len_)
1770 _body.get().username.assign(data_, len_);
1774 _body.get().username.clear();
1777 const std::string& password()
1779 return _body.get().password;
1781 void setPassword(
const char* data_,
size_t len_)
1785 _body.get().password.assign(data_, len_);
1789 _body.get().password.clear();
1792 const std::string& reason()
1794 return _body.get().reason;
1796 void setReason(
const char* data_,
size_t len_)
1800 _body.get().reason.assign(data_, len_);
1804 _body.get().reason.clear();
1807 const std::string& status()
1809 return _body.get().status;
1811 void setStatus(
const char* data_,
size_t len_)
1815 _body.get().status.assign(data_, len_);
1819 _body.get().status.clear();
1822 const std::string& bookmark()
1824 return _body.get().bookmark;
1826 void setBookmark(
const Field& bookmark_)
1828 if (!bookmark_.
empty())
1830 _body.get().bookmark.assign(bookmark_.
data(), bookmark_.
len());
1831 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1832 _body.get().sequenceNo);
1836 _body.get().bookmark.clear();
1837 _body.get().sequenceNo = (amps_uint64_t)0;
1838 _body.get().nameHashValue = (amps_uint64_t)0;
1841 amps_uint64_t sequenceNo()
const 1843 return _body.get().sequenceNo;
1845 amps_uint64_t nameHashValue()
const 1847 return _body.get().nameHashValue;
1849 void setSequenceNo(
const char* data_,
size_t len_)
1851 amps_uint64_t result = (amps_uint64_t)0;
1854 for (
size_t i = 0; i < len_; ++i)
1856 result *= (amps_uint64_t)10;
1857 result += (amps_uint64_t)(data_[i] -
'0');
1860 _body.get().sequenceNo = result;
1862 VersionInfo serverVersion()
const 1864 return _body.get().serverVersion;
1866 void setServerVersion(
const char* data_,
size_t len_)
1870 _body.get().serverVersion.setVersion(std::string(data_, len_));
1875 return _body.get().responded;
1879 _body.get().responded =
true;
1883 return _body.get().abandoned;
1887 if (_body.isValid())
1889 _body.get().abandoned =
true;
1893 void setConnectionVersion(
unsigned connectionVersion)
1895 _body.get().connectionVersion = connectionVersion;
1898 unsigned getConnectionVersion()
1900 return _body.get().connectionVersion;
1902 void setOptions(
const char* data_,
size_t len_)
1906 _body.get().options.assign(data_, len_);
1910 _body.get().options.clear();
1914 const std::string& options()
1916 return _body.get().options;
1919 AckResponse& operator=(
const AckResponse& rhs)
1927 typedef std::map<std::string, AckResponse> AckMap;
1930 DefaultExceptionListener _defaultExceptionListener;
1933 struct DeferredExecutionRequest
1935 DeferredExecutionRequest(DeferredExecutionFunc func_,
1938 _userData(userData_)
1941 DeferredExecutionFunc _func;
1945 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1946 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1947 volatile bool _connected;
1948 std::string _username;
1949 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1950 ConnectionStateListeners _connectionStateListeners;
1951 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1952 Mutex _deferredExecutionLock;
1953 DeferredExecutionList _deferredExecutionList;
1954 unsigned _heartbeatInterval;
1955 unsigned _readTimeout;
1963 if (!_connected && newState_ > ConnectionStateListener::Connected)
1967 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1969 AMPS_CALL_EXCEPTION_WRAPPER(
1970 (*it)->connectionStateChanged(newState_));
1973 unsigned processedAck(
Message& message);
1974 unsigned persistedAck(
Message& meesage);
1975 void lastChance(
Message& message);
1976 void checkAndSendHeartbeat(
bool force =
false);
1977 virtual ConnectionInfo getConnectionInfo()
const;
1979 ClientImplMessageHandler(
amps_handle message,
void* userData);
1981 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1983 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1985 ClientImplGetHttpPreflightMessage(
void* userData);
1987 void unsubscribeInternal(
const std::string&
id)
1995 subId.assign(
id.data(),
id.length());
1996 _routes.removeRoute(subId);
1998 if (_subscriptionManager)
2001 Unlock<Mutex> unlock(_lock);
2002 _subscriptionManager->unsubscribe(subId);
2008 _sendWithoutRetry(_message);
2009 deferredExecution(&s_noOpFn, NULL);
2012 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
2013 bool isHASubscribe_)
2015 return syncAckProcessing(timeout_, message_,
2016 (amps_uint64_t)0, isHASubscribe_);
2019 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
2020 amps_uint64_t haSeq = (amps_uint64_t)0,
2021 bool isHASubscribe_ =
false)
2024 AckResponse ack = AckResponse::create();
2027 Lock<Mutex> guard(_ackMapLock);
2030 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
2031 if (ack.getConnectionVersion() == 0)
2034 throw DisconnectedException(
"Connection closed while waiting for response.");
2036 bool timedOut =
false;
2037 AMPS_START_TIMER(timeout_)
2038 while (!timedOut && !ack.responded() && !ack.abandoned())
2042 timedOut = !_lock.wait(timeout_);
2046 AMPS_RESET_TIMER(timedOut, timeout_);
2053 Unlock<Mutex> unlck(_lock);
2054 amps_invoke_waiting_function();
2057 if (ack.responded())
2059 if (ack.status() !=
"failure")
2063 amps_uint64_t ackSequence = ack.sequenceNo();
2064 if (_lastSentHaSequenceNumber < ackSequence)
2066 _lastSentHaSequenceNumber = ackSequence;
2079 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2080 _nameHashValue = ack.nameHashValue();
2081 _serverVersion = ack.serverVersion();
2082 if (_bookmarkStore.isValid())
2089 const std::string& options = ack.options();
2090 size_t index = options.find_first_of(
"max_backlog=");
2091 if (index != std::string::npos)
2094 const char* c = options.c_str() + index + 12;
2095 while (*c && *c !=
',')
2097 data = (data * 10) + (
unsigned)(*c++ -48);
2099 if (_ackBatchSize > data)
2101 _ackBatchSize = data;
2107 const size_t NotEntitled = 12;
2108 std::string ackReason = ack.reason();
2109 if (ackReason.length() == 0)
2113 if (ackReason.length() == NotEntitled &&
2114 ackReason[0] ==
'n' &&
2119 message_.throwFor(_client, ackReason);
2123 if (!ack.abandoned())
2125 throw TimedOutException(
"timed out waiting for operation.");
2129 throw DisconnectedException(
"Connection closed while waiting for response.");
2143 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2144 _pEmptyMessageStream.reset(NULL);
2151 ClientImpl(
const std::string& clientName)
2152 : _client(NULL), _name(clientName)
2153 , _isRetryOnDisconnect(
true)
2154 , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2155 , _badTimeToHASubscribe(0), _serverVersion()
2156 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2157 , _isAutoAckEnabled(
false)
2159 , _queuedAckCount(0)
2160 , _defaultMaxDepth(0)
2162 , _heartbeatInterval(0)
2165 _replayer.setClient(
this);
2168 (amps_handler)ClientImpl::ClientImplMessageHandler,
2171 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2174 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2177 ClientImpl::ClientImplGetHttpPreflightMessage,
2179 _exceptionListener = &_defaultExceptionListener;
2180 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2182 #ifdef AMPS_USE_EMPLACE 2190 virtual ~ClientImpl()
2195 const std::string& getName()
const 2200 const std::string& getNameHash()
const 2205 const amps_uint64_t getNameHashValue()
const 2207 return _nameHashValue;
2210 void setName(
const std::string& name)
2217 AMPSException::throwFor(_client, result);
2222 const std::string& getLogonCorrelationData()
const 2224 return _logonCorrelationData;
2227 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2229 _logonCorrelationData = logonCorrelationData_;
2232 size_t getServerVersion()
const 2234 return _serverVersion.getOldStyleVersion();
2237 VersionInfo getServerVersionInfo()
const 2239 return _serverVersion;
2242 const std::string& getURI()
const 2247 virtual void connect(
const std::string& uri)
2249 Lock<Mutex> l(_lock);
2253 virtual void _connect(
const std::string& uri)
2259 AMPSException::throwFor(_client, result);
2266 _readMessage.setClientImpl(
this);
2267 if (_queueAckTimeout)
2272 AMPSException::throwFor(_client, result);
2276 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2279 void addHttpPreflightHeader(
const std::string& header_)
2281 _httpPreflightHeaders.push_back(header_);
2284 void addHttpPreflightHeader(
const std::string& key_,
const std::string& value_)
2286 _httpPreflightHeaders.push_back(key_ + std::string(
": ") + value_);
2289 void clearHttpPreflightHeaders()
2291 _httpPreflightHeaders.clear();
2295 void setHttpPreflightHeaders(
const T& headers_)
2297 _httpPreflightHeaders.clear();
2298 for (
typename T::const_iterator i = headers_.begin(); i != headers_.end(); ++i)
2300 _httpPreflightHeaders.push_back(*i);
2304 void setDisconnected()
2307 Lock<Mutex> l(_lock);
2310 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2313 _heartbeatTimer.setTimeout(0.0);
2316 clearAcks(UINT_MAX-1);
2322 virtual void disconnect()
2324 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2327 clearAcks(UINT_MAX);
2328 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2329 Lock<Mutex> l(_lock);
2330 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2333 void clearAcks(
unsigned failedVersion)
2336 Lock<Mutex> guard(_ackMapLock);
2339 std::vector<std::string> worklist;
2340 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2342 if (i->second.getConnectionVersion() <= failedVersion)
2344 i->second.setAbandoned();
2345 worklist.push_back(i->first);
2349 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2358 int send(
const Message& message)
2360 Lock<Mutex> l(_lock);
2361 return _send(message);
2364 void sendWithoutRetry(
const Message& message_)
2366 Lock<Mutex> l(_lock);
2369 if (_logonInProgress)
2371 throw DisconnectedException(
"The client has been disconnected.");
2373 _sendWithoutRetry(message_);
2376 void _sendWithoutRetry(
const Message& message_)
2381 AMPSException::throwFor(_client, result);
2385 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2386 bool isHASubscribe_ =
false)
2393 Message localMessage = message;
2394 unsigned version = 0;
2398 if (haSeq && _logonInProgress)
2402 if (!_isRetryOnDisconnect)
2406 if (!_lock.wait(1000))
2408 amps_invoke_waiting_function();
2413 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2414 (isHASubscribe_ && _badTimeToHASubscribe))
2416 return (
int)version;
2420 if (haSeq > _lastSentHaSequenceNumber)
2422 while (haSeq > _lastSentHaSequenceNumber + 1)
2428 _lastSentHaSequenceNumber + 1))
2434 version = _replayer._version;
2437 catch (
const DisconnectedException&)
2439 catch (
const DisconnectedException& e)
2442 result = _replayer._res;
2447 localMessage.getMessage(),
2449 ++_lastSentHaSequenceNumber;
2453 if (_logonInProgress && localMessage.
getCommand().
data()[0] !=
'l')
2455 while (_logonInProgress)
2457 if (!_lock.wait(1000))
2459 amps_invoke_waiting_function();
2464 localMessage.getMessage(),
2469 if (!isHASubscribe_ && !haSeq &&
2470 localMessage.getMessage() == message.getMessage())
2474 if (_isRetryOnDisconnect)
2476 Unlock<Mutex> u(_lock);
2481 if ((isHASubscribe_ || haSeq) &&
2484 return (
int)version;
2491 AMPSException::throwFor(_client, result);
2497 amps_invoke_waiting_function();
2503 AMPSException::throwFor(_client, result);
2505 return (
int)version;
2508 void addMessageHandler(
const Field& commandId_,
2510 unsigned requestedAcks_, Message::Command::Type commandType_)
2512 Lock<Mutex> lock(_lock);
2513 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2517 bool removeMessageHandler(
const Field& commandId_)
2519 Lock<Mutex> lock(_lock);
2520 return _routes.removeRoute(commandId_);
2528 bool isSubscribeOnly =
false;
2529 bool replace =
false;
2531 unsigned systemAddedAcks = Message::AckType::None;
2534 switch (commandType)
2536 case Message::Command::Subscribe:
2537 case Message::Command::DeltaSubscribe:
2538 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2539 isSubscribeOnly =
true;
2541 case Message::Command::SOWAndSubscribe:
2542 case Message::Command::SOWAndDeltaSubscribe:
2549 while (!replace &&
id != subId && _routes.hasRoute(
id))
2561 systemAddedAcks |= Message::AckType::Persisted;
2564 case Message::Command::SOW:
2571 while (!replace &&
id != subId && _routes.hasRoute(
id))
2582 if (!isSubscribeOnly)
2591 while (!replace && qid != subId && qid !=
id 2592 && _routes.hasRoute(qid))
2598 systemAddedAcks |= Message::AckType::Processed;
2601 int routesAdded = 0;
2602 Lock<Mutex> l(_lock);
2603 if (!subId.
empty() && messageHandler_.isValid())
2605 if (!_routes.hasRoute(subId))
2611 _routes.addRoute(subId, messageHandler_, requestedAcks,
2612 systemAddedAcks, commandType);
2614 if (!isSubscribeOnly && !qid.
empty()
2615 && messageHandler_.isValid() && qid != subId)
2617 if (routesAdded == 0)
2619 _routes.addRoute(qid, messageHandler_,
2620 requestedAcks, systemAddedAcks, commandType);
2626 Unlock<Mutex> u(_lock);
2627 data = amps_invoke_copy_route_function(
2628 messageHandler_.userData());
2632 _routes.addRoute(qid, messageHandler_, requestedAcks,
2633 systemAddedAcks, commandType);
2637 _routes.addRoute(qid,
2640 requestedAcks, systemAddedAcks, commandType);
2645 if (!
id.empty() && messageHandler_.isValid()
2646 && requestedAcks & ~
Message::AckType::Persisted
2647 &&
id != subId &&
id != qid)
2649 if (routesAdded == 0)
2651 _routes.addRoute(
id, messageHandler_, requestedAcks,
2652 systemAddedAcks, commandType);
2658 Unlock<Mutex> u(_lock);
2659 data = amps_invoke_copy_route_function(
2660 messageHandler_.userData());
2664 _routes.addRoute(
id, messageHandler_, requestedAcks,
2665 systemAddedAcks, commandType);
2669 _routes.addRoute(
id,
2673 systemAddedAcks, commandType);
2682 syncAckProcessing(timeout_, message_, 0,
false);
2689 _routes.removeRoute(
id);
2696 case Message::Command::Unsubscribe:
2697 case Message::Command::Heartbeat:
2698 case Message::Command::Logon:
2699 case Message::Command::StartTimer:
2700 case Message::Command::StopTimer:
2701 case Message::Command::SOWDelete:
2703 Lock<Mutex> l(_lock);
2712 if (messageHandler_.isValid())
2714 _routes.addRoute(
id, messageHandler_, requestedAcks,
2715 Message::AckType::None, commandType);
2721 case Message::Command::DeltaPublish:
2722 case Message::Command::Publish:
2725 Lock<Mutex> l(_lock);
2728 if (ackType != Message::AckType::None
2736 if (messageHandler_.isValid())
2738 _routes.addRoute(
id, messageHandler_, requestedAcks,
2739 Message::AckType::None, commandType);
2745 syncAckProcessing(timeout_, message_, 0,
false);
2754 case Message::Command::GroupBegin:
2755 case Message::Command::GroupEnd:
2756 case Message::Command::OOF:
2757 case Message::Command::Ack:
2758 case Message::Command::Unknown:
2760 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2766 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2768 Lock<Mutex> l(_lock);
2769 _disconnectHandler = disconnectHandler;
2772 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2774 switch (command_[0])
2776 #if 0 // Not currently implemented to avoid an extra branch in delivery 2778 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2781 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2785 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2787 #if 0 // Not currently implemented to avoid an extra branch in delivery 2789 if (command_[6] ==
'b')
2791 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2793 else if (command_[6] ==
'e')
2795 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2799 std::ostringstream os;
2800 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2801 throw CommandException(os.str());
2805 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2809 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2813 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2817 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2820 std::ostringstream os;
2821 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2822 throw CommandException(os.str());
2827 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2831 #if 0 // Not currently implemented to avoid an extra branch in delivery 2832 case Message::Command::Publish:
2833 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2835 case Message::Command::SOW:
2836 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2839 case Message::Command::Heartbeat:
2840 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2842 #if 0 // Not currently implemented to avoid an extra branch in delivery 2843 case Message::Command::GroupBegin:
2844 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2846 case Message::Command::GroupEnd:
2847 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2849 case Message::Command::OOF:
2850 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2853 case Message::Command::Ack:
2854 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2858 unsigned command = command_;
2865 AMPS_snprintf(errBuf,
sizeof(errBuf),
2866 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2867 CommandConstants<0>::Lengths[bits],
2868 CommandConstants<0>::Values[bits]);
2869 throw CommandException(errBuf);
2874 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2876 _globalCommandTypeHandlers[handlerType_] = handler_;
2881 Lock<Mutex> l(_lock);
2882 _failedWriteHandler.reset(handler_);
2885 void setPublishStore(
const Store& publishStore_)
2887 Lock<Mutex> l(_lock);
2890 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2892 _publishStore = publishStore_;
2897 Lock<Mutex> l(_lock);
2900 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2902 _bookmarkStore = bookmarkStore_;
2907 Lock<Mutex> l(_lock);
2908 _subscriptionManager.reset(subscriptionManager_);
2916 DisconnectHandler getDisconnectHandler()
const 2918 return _disconnectHandler;
2923 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2931 Store getPublishStore()
const 2933 return _publishStore;
2938 return _bookmarkStore;
2941 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2945 Lock<Mutex> l(_lock);
2947 _publishMessage.assignData(data_, dataLen_);
2948 _send(_publishMessage);
2953 publishStoreMessage.reset();
2955 return _publish(topic_, topicLen_, data_, dataLen_);
2959 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2960 size_t dataLen_,
unsigned long expiration_)
2964 Lock<Mutex> l(_lock);
2966 _publishMessage.assignData(data_, dataLen_);
2967 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2968 size_t pos = convertToCharArray(exprBuf, expiration_);
2969 _publishMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2970 _send(_publishMessage);
2976 publishStoreMessage.reset();
2977 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2978 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2981 AMPS_NUMBER_BUFFER_LEN - exprPos);
2982 return _publish(topic_, topicLen_, data_, dataLen_);
2989 ClientImpl* _pClient;
2991 #if __cplusplus >= 201100L || _MSC_VER >= 1900 2992 std::atomic<bool> _acked;
2993 std::atomic<bool> _disconnected;
2995 volatile bool _acked;
2996 volatile bool _disconnected;
2999 FlushAckHandler(ClientImpl* pClient_)
3000 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
3002 pClient_->addConnectionStateListener(
this);
3006 _pClient->removeConnectionStateListener(
this);
3007 _pClient->removeMessageHandler(_cmdId);
3010 void setCommandId(
const Field& cmdId_)
3018 void connectionStateChanged(
State state_)
3020 if (state_ <= Shutdown)
3022 _disconnected =
true;
3031 return _acked || _disconnected;
3035 void publishFlush(
long timeout_,
unsigned ackType_)
3037 static const char* processed =
"processed";
3038 static const size_t processedLen = strlen(processed);
3039 static const char* persisted =
"persisted";
3040 static const size_t persistedLen = strlen(persisted);
3041 static const char* flush =
"flush";
3042 static const size_t flushLen = strlen(flush);
3043 static VersionInfo minPersisted(
"5.3.3.0");
3044 static VersionInfo minFlush(
"4");
3045 if (ackType_ != Message::AckType::Processed
3046 && ackType_ != Message::AckType::Persisted)
3048 throw CommandException(
"Flush can only be used with processed or persisted acks.");
3050 FlushAckHandler flushHandler(
this);
3051 if (_serverVersion >= minFlush)
3053 Lock<Mutex> l(_lock);
3056 throw DisconnectedException(
"Not connected trying to flush");
3061 if (_serverVersion < minPersisted
3062 || ackType_ == Message::AckType::Processed)
3072 std::bind(&FlushAckHandler::invoke,
3073 std::ref(flushHandler),
3074 std::placeholders::_1),
3076 NoDelay noDelay(_client);
3077 if (_send(_message) == -1)
3079 throw DisconnectedException(
"Disconnected trying to flush");
3086 _publishStore.
flush(timeout_);
3088 catch (
const AMPSException& ex)
3090 AMPS_UNHANDLED_EXCEPTION(ex);
3094 else if (_serverVersion < minFlush)
3098 AMPS_USLEEP(timeout_ * 1000);
3102 AMPS_USLEEP(1000 * 1000);
3108 Timer timer((
double)timeout_);
3110 while (!timer.check() && !flushHandler.done())
3113 amps_invoke_waiting_function();
3118 while (!flushHandler.done())
3121 amps_invoke_waiting_function();
3125 if (!flushHandler.done())
3127 throw TimedOutException(
"Timed out waiting for flush");
3130 if (!flushHandler.acked() && !_publishStore.
isValid())
3132 throw DisconnectedException(
"Disconnected waiting for flush");
3136 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3137 const char* data_,
size_t dataLength_)
3141 Lock<Mutex> l(_lock);
3143 _deltaMessage.assignData(data_, dataLength_);
3144 _send(_deltaMessage);
3149 publishStoreMessage.reset();
3150 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish);
3151 return _publish(topic_, topicLength_, data_, dataLength_);
3155 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3156 const char* data_,
size_t dataLength_,
3157 unsigned long expiration_)
3161 Lock<Mutex> l(_lock);
3163 _deltaMessage.assignData(data_, dataLength_);
3164 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3165 size_t pos = convertToCharArray(exprBuf, expiration_);
3166 _deltaMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3167 _send(_deltaMessage);
3173 publishStoreMessage.reset();
3174 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3175 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3176 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish)
3178 AMPS_NUMBER_BUFFER_LEN - exprPos);
3179 return _publish(topic_, topicLength_, data_, dataLength_);
3183 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3184 const char* data_,
size_t dataLength_)
3186 publishStoreMessage.
assignTopic(topic_, topicLength_)
3188 .assignData(data_, dataLength_);
3189 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3190 char buf[AMPS_NUMBER_BUFFER_LEN];
3191 size_t pos = convertToCharArray(buf, haSequenceNumber);
3192 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3194 Lock<Mutex> l(_lock);
3195 _send(publishStoreMessage, haSequenceNumber);
3197 return haSequenceNumber;
3200 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
3201 const char* options_ = NULL)
3203 Lock<Mutex> l(_lock);
3204 return _logon(timeout_, authenticator_, options_);
3207 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
3208 const char* options_ = NULL)
3215 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 3217 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3220 if (uri.user().size())
3224 if (uri.password().size())
3228 if (uri.protocol() ==
"amps" && uri.messageType().size())
3232 if (uri.isTrue(
"pretty"))
3238 if (!_logonCorrelationData.empty())
3249 AtomicFlagFlip pubFlip(&_logonInProgress);
3250 NoDelay noDelay(_client);
3254 AckResponse ack = syncAckProcessing(timeout_, _message);
3255 if (ack.status() ==
"retry")
3257 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
3258 _username = ack.username();
3263 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
3267 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3274 catch (
const AMPSException& ex)
3277 Unlock<Mutex> u(_lock);
3281 AMPS_UNHANDLED_EXCEPTION(ex);
3287 Unlock<Mutex> u(_lock);
3298 _publishStore.
replay(_replayer);
3299 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3301 catch (
const PublishStoreGapException& ex)
3304 Unlock<Mutex> u(_lock);
3308 AMPS_UNHANDLED_EXCEPTION(ex);
3311 catch (
const StoreException& ex)
3314 Unlock<Mutex> u(_lock);
3318 std::ostringstream os;
3319 os <<
"A local store exception occurred while logging on." 3321 throw ConnectionException(os.str());
3323 catch (
const AMPSException& ex)
3326 Unlock<Mutex> u(_lock);
3330 AMPS_UNHANDLED_EXCEPTION(ex);
3333 catch (
const std::exception& ex)
3336 Unlock<Mutex> u(_lock);
3340 AMPS_UNHANDLED_EXCEPTION(ex);
3346 Unlock<Mutex> u(_lock);
3354 return newCommandId;
3358 const std::string& topic_,
3360 const std::string& filter_,
3361 const std::string& bookmark_,
3362 const std::string& options_,
3363 const std::string& subId_,
3364 bool isHASubscribe_ =
true)
3366 isHASubscribe_ &= (bool)_subscriptionManager;
3367 Lock<Mutex> l(_lock);
3371 std::string subId(subId_);
3374 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3376 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3386 unsigned ackTypes = Message::AckType::Processed;
3388 if (!bookmark_.empty() && _bookmarkStore.isValid())
3390 ackTypes |= Message::AckType::Persisted;
3394 if (filter_.length())
3398 if (bookmark_.length())
3409 if (_bookmarkStore.isValid())
3414 _bookmarkStore.
log(_message);
3415 _bookmarkStore.
discard(_message);
3421 if (options_.length())
3430 Unlock<Mutex> u(_lock);
3431 _subscriptionManager->subscribe(messageHandler_, message,
3432 Message::AckType::None);
3433 if (_badTimeToHASubscribe)
3444 if (!options_.empty())
3450 syncAckProcessing(timeout_, message, isHASubscribe_);
3452 catch (
const DisconnectedException&)
3454 if (!isHASubscribe_)
3456 _routes.removeRoute(subIdField);
3461 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3465 catch (
const TimedOutException&)
3467 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3475 Unlock<Mutex> unlock(_lock);
3476 _subscriptionManager->unsubscribe(subIdField);
3478 _routes.removeRoute(subIdField);
3484 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3485 const std::string& topic_,
3487 const std::string& filter_,
3488 const std::string& bookmark_,
3489 const std::string& options_,
3490 const std::string& subId_ =
"",
3491 bool isHASubscribe_ =
true)
3493 isHASubscribe_ &= (bool)_subscriptionManager;
3494 Lock<Mutex> l(_lock);
3498 std::string subId(subId_);
3508 unsigned ackTypes = Message::AckType::Processed;
3510 if (!bookmark_.empty() && _bookmarkStore.isValid())
3512 ackTypes |= Message::AckType::Persisted;
3515 if (filter_.length())
3519 if (bookmark_.length())
3530 if (_bookmarkStore.isValid())
3535 _bookmarkStore.
log(_message);
3536 _bookmarkStore.
discard(_message);
3542 if (options_.length())
3550 Unlock<Mutex> u(_lock);
3551 _subscriptionManager->subscribe(messageHandler_, message,
3552 Message::AckType::None);
3553 if (_badTimeToHASubscribe)
3564 if (!options_.empty())
3570 syncAckProcessing(timeout_, message, isHASubscribe_);
3572 catch (
const DisconnectedException&)
3574 if (!isHASubscribe_)
3576 _routes.removeRoute(subIdField);
3580 catch (
const TimedOutException&)
3582 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3590 Unlock<Mutex> unlock(_lock);
3591 _subscriptionManager->unsubscribe(subIdField);
3593 _routes.removeRoute(subIdField);
3599 void unsubscribe(
const std::string&
id)
3601 Lock<Mutex> l(_lock);
3602 unsubscribeInternal(
id);
3605 void unsubscribe(
void)
3607 if (_subscriptionManager)
3609 _subscriptionManager->clear();
3612 _routes.unsubscribeAll();
3613 Lock<Mutex> l(_lock);
3618 _sendWithoutRetry(_message);
3620 deferredExecution(&s_noOpFn, NULL);
3624 const std::string& topic_,
3625 const std::string& filter_ =
"",
3626 const std::string& orderBy_ =
"",
3627 const std::string& bookmark_ =
"",
3628 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3629 int topN_ = AMPS_DEFAULT_TOP_N,
3630 const std::string& options_ =
"",
3631 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3633 Lock<Mutex> l(_lock);
3640 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3643 if (filter_.length())
3647 if (orderBy_.length())
3651 if (bookmark_.length())
3656 if (topN_ != AMPS_DEFAULT_TOP_N)
3660 if (options_.length())
3665 _routes.addRoute(_message.
getQueryID(), messageHandler_,
3670 syncAckProcessing(timeout_, _message);
3674 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3682 const std::string& topic_,
3684 const std::string& filter_ =
"",
3685 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3686 int topN_ = AMPS_DEFAULT_TOP_N)
3689 return sow(messageHandler_,
3700 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3701 const std::string& topic_,
3702 const std::string& filter_ =
"",
3703 const std::string& orderBy_ =
"",
3704 const std::string& bookmark_ =
"",
3705 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3706 int topN_ = AMPS_DEFAULT_TOP_N,
3707 const std::string& options_ =
"",
3708 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3709 bool isHASubscribe_ =
true)
3711 isHASubscribe_ &= (bool)_subscriptionManager;
3712 unsigned ackTypes = Message::AckType::Processed;
3713 Lock<Mutex> l(_lock);
3718 std::string subId = cid;
3720 if (filter_.length())
3724 if (orderBy_.length())
3728 if (bookmark_.length())
3732 if (_bookmarkStore.isValid())
3734 ackTypes |= Message::AckType::Persisted;
3742 _bookmarkStore.
log(_message);
3743 if (!BookmarkRange::isRange(bookmark))
3745 _bookmarkStore.
discard(_message);
3757 if (topN_ != AMPS_DEFAULT_TOP_N)
3761 if (options_.length())
3770 Unlock<Mutex> u(_lock);
3771 _subscriptionManager->subscribe(messageHandler_, message,
3772 Message::AckType::None);
3773 if (_badTimeToHASubscribe)
3778 _routes.addRoute(cid, messageHandler_,
3781 if (!options_.empty())
3787 syncAckProcessing(timeout_, message, isHASubscribe_);
3789 catch (
const DisconnectedException&)
3791 if (!isHASubscribe_)
3793 _routes.removeRoute(subId);
3797 catch (
const TimedOutException&)
3799 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3807 Unlock<Mutex> unlock(_lock);
3808 _subscriptionManager->unsubscribe(cid);
3810 _routes.removeRoute(subId);
3816 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3817 const std::string& topic_,
3819 const std::string& filter_ =
"",
3820 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3821 bool oofEnabled_ =
false,
3822 int topN_ = AMPS_DEFAULT_TOP_N,
3823 bool isHASubscribe_ =
true)
3826 return sowAndSubscribe(messageHandler_,
3833 (oofEnabled_ ?
"oof" :
""),
3838 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3839 const std::string& topic_,
3840 const std::string& filter_ =
"",
3841 const std::string& orderBy_ =
"",
3842 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3843 int topN_ = AMPS_DEFAULT_TOP_N,
3844 const std::string& options_ =
"",
3845 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3846 bool isHASubscribe_ =
true)
3848 isHASubscribe_ &= (bool)_subscriptionManager;
3849 Lock<Mutex> l(_lock);
3857 if (filter_.length())
3861 if (orderBy_.length())
3866 if (topN_ != AMPS_DEFAULT_TOP_N)
3870 if (options_.length())
3878 Unlock<Mutex> u(_lock);
3879 _subscriptionManager->subscribe(messageHandler_, message,
3880 Message::AckType::None);
3881 if (_badTimeToHASubscribe)
3886 _routes.addRoute(message.
getQueryID(), messageHandler_,
3887 Message::AckType::None, Message::AckType::Processed, message.
getCommandEnum());
3889 if (!options_.empty())
3895 syncAckProcessing(timeout_, message, isHASubscribe_);
3897 catch (
const DisconnectedException&)
3899 if (!isHASubscribe_)
3901 _routes.removeRoute(subId);
3905 catch (
const TimedOutException&)
3907 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3915 Unlock<Mutex> unlock(_lock);
3916 _subscriptionManager->unsubscribe(
Field(subId));
3918 _routes.removeRoute(subId);
3924 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3925 const std::string& topic_,
3927 const std::string& filter_ =
"",
3928 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3929 bool oofEnabled_ =
false,
3930 bool sendEmpties_ =
false,
3931 int topN_ = AMPS_DEFAULT_TOP_N,
3932 bool isHASubscribe_ =
true)
3940 if (sendEmpties_ ==
false)
3944 return sowAndDeltaSubscribe(messageHandler_,
3956 const std::string& topic_,
3957 const std::string& filter_,
3963 unsigned ackType = Message::AckType::Processed |
3964 Message::AckType::Stats |
3965 Message::AckType::Persisted;
3966 publishStoreMessage.reset();
3967 if (commandId_.
empty())
3978 .assignQueryID(commandId_.
data(), commandId_.
len())
3979 .setAckTypeEnum(ackType)
3981 .assignFilter(filter_.c_str(), filter_.length());
3982 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3983 char buf[AMPS_NUMBER_BUFFER_LEN];
3984 size_t pos = convertToCharArray(buf, haSequenceNumber);
3985 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3989 Lock<Mutex> l(_lock);
3990 _routes.addRoute(commandId_, messageHandler_,
3991 Message::AckType::Stats,
3992 Message::AckType::Processed | Message::AckType::Persisted,
3994 syncAckProcessing(timeout_, publishStoreMessage,
3997 catch (
const DisconnectedException&)
4004 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4008 return (std::string)commandId_;
4012 Lock<Mutex> l(_lock);
4014 if (commandId_.
empty())
4025 .assignQueryID(commandId_.
data(), commandId_.
len())
4026 .setAckTypeEnum(Message::AckType::Processed |
4027 Message::AckType::Stats)
4029 .assignFilter(filter_.c_str(), filter_.length());
4030 _routes.addRoute(commandId_, messageHandler_,
4031 Message::AckType::Stats,
4032 Message::AckType::Processed,
4036 syncAckProcessing(timeout_, _message);
4040 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4043 return (std::string)commandId_;
4047 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
4048 const std::string& topic_,
4049 const std::string& data_,
4055 unsigned ackType = Message::AckType::Processed |
4056 Message::AckType::Stats |
4057 Message::AckType::Persisted;
4058 publishStoreMessage.reset();
4059 if (commandId_.
empty())
4070 .assignQueryID(commandId_.
data(), commandId_.
len())
4071 .setAckTypeEnum(ackType)
4073 .assignData(data_.c_str(), data_.length());
4074 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
4075 char buf[AMPS_NUMBER_BUFFER_LEN];
4076 size_t pos = convertToCharArray(buf, haSequenceNumber);
4077 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4081 Lock<Mutex> l(_lock);
4082 _routes.addRoute(commandId_, messageHandler_,
4083 Message::AckType::Stats,
4084 Message::AckType::Processed | Message::AckType::Persisted,
4086 syncAckProcessing(timeout_, publishStoreMessage,
4089 catch (
const DisconnectedException&)
4096 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4100 return (std::string)commandId_;
4104 Lock<Mutex> l(_lock);
4106 if (commandId_.
empty())
4117 .assignQueryID(commandId_.
data(), commandId_.
len())
4118 .setAckTypeEnum(Message::AckType::Processed |
4119 Message::AckType::Stats)
4121 .assignData(data_.c_str(), data_.length());
4122 _routes.addRoute(commandId_, messageHandler_,
4123 Message::AckType::Stats,
4124 Message::AckType::Processed,
4128 syncAckProcessing(timeout_, _message);
4132 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4135 return (std::string)commandId_;
4139 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4140 const std::string& topic_,
4141 const std::string& keys_,
4147 unsigned ackType = Message::AckType::Processed |
4148 Message::AckType::Stats |
4149 Message::AckType::Persisted;
4150 publishStoreMessage.reset();
4151 if (commandId_.
empty())
4162 .assignQueryID(commandId_.
data(), commandId_.
len())
4163 .setAckTypeEnum(ackType)
4165 .assignSowKeys(keys_.c_str(), keys_.length());
4166 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
4167 char buf[AMPS_NUMBER_BUFFER_LEN];
4168 size_t pos = convertToCharArray(buf, haSequenceNumber);
4169 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4173 Lock<Mutex> l(_lock);
4174 _routes.addRoute(commandId_, messageHandler_,
4175 Message::AckType::Stats,
4176 Message::AckType::Processed | Message::AckType::Persisted,
4178 syncAckProcessing(timeout_, publishStoreMessage,
4181 catch (
const DisconnectedException&)
4188 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4192 return (std::string)commandId_;
4196 Lock<Mutex> l(_lock);
4198 if (commandId_.
empty())
4209 .assignQueryID(commandId_.
data(), commandId_.
len())
4210 .setAckTypeEnum(Message::AckType::Processed |
4211 Message::AckType::Stats)
4213 .assignSowKeys(keys_.c_str(), keys_.length());
4214 _routes.addRoute(commandId_, messageHandler_,
4215 Message::AckType::Stats,
4216 Message::AckType::Processed,
4220 syncAckProcessing(timeout_, _message);
4224 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4227 return (std::string)commandId_;
4231 void startTimer(
void)
4233 if (_serverVersion >=
"5.3.2.0")
4235 throw CommandException(
"The start_timer command is deprecated.");
4237 Lock<Mutex> l(_lock);
4246 if (_serverVersion >=
"5.3.2.0")
4248 throw CommandException(
"The stop_timer command is deprecated.");
4250 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4265 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4267 _pExceptionListener = pListener_;
4268 _exceptionListener = _pExceptionListener.get();
4273 _exceptionListener = &listener_;
4278 return *_exceptionListener;
4281 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4283 if (readTimeout_ < heartbeatInterval_)
4285 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4287 Lock<Mutex> l(_lock);
4288 if (_heartbeatInterval != heartbeatInterval_ ||
4289 _readTimeout != readTimeout_)
4291 _heartbeatInterval = heartbeatInterval_;
4292 _readTimeout = readTimeout_;
4297 void _sendHeartbeat(
void)
4299 if (_connected && _heartbeatInterval != 0)
4301 std::ostringstream options;
4302 options <<
"start," << _heartbeatInterval;
4305 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4306 _heartbeatTimer.start();
4309 _sendWithoutRetry(_beatMessage);
4310 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4312 catch (ConnectionException& ex_)
4316 AMPS_UNHANDLED_EXCEPTION(ex_);
4321 if (_readTimeout && _connected)
4326 AMPSException::throwFor(_client, result);
4328 if (!_queueAckTimeout)
4331 (
int)(_heartbeatInterval * 1000));
4334 AMPSException::throwFor(_client, result);
4342 Lock<Mutex> lock(_lock);
4343 _connectionStateListeners.insert(listener_);
4348 Lock<Mutex> lock(_lock);
4349 _connectionStateListeners.erase(listener_);
4352 void clearConnectionStateListeners()
4354 Lock<Mutex> lock(_lock);
4355 _connectionStateListeners.clear();
4360 unsigned systemAddedAcks_, Message::Command::Type commandType_)
4362 Message message = command_.getMessage();
4367 bool added = qid.
len() || subid.
len() || cid_.
len();
4368 bool cidIsQid = cid_ == qid;
4369 bool cidUnique = !cidIsQid && cid_.
len() > 0 && cid_ != subid;
4371 if (subid.
len() > 0)
4375 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4376 systemAddedAcks_, commandType_);
4378 && (commandType == Message::Command::Subscribe
4379 || commandType == Message::Command::DeltaSubscribe))
4386 if (qid.
len() > 0 && qid != subid
4387 && (commandType == Message::Command::SOW
4388 || commandType == Message::Command::SOWDelete
4389 || commandType == Message::Command::SOWAndSubscribe
4390 || commandType == Message::Command::SOWAndDeltaSubscribe))
4392 while (_routes.hasRoute(qid))
4401 if (addedCount == 0)
4403 _routes.addRoute(qid, handler_, requestedAcks_,
4404 systemAddedAcks_, commandType_);
4410 Unlock<Mutex> u(_lock);
4411 data = amps_invoke_copy_route_function(handler_.userData());
4415 _routes.addRoute(qid, handler_, requestedAcks_,
4416 systemAddedAcks_, commandType_);
4420 _routes.addRoute(qid,
4424 systemAddedAcks_, commandType_);
4429 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4431 while (_routes.hasRoute(cid_))
4435 if (addedCount == 0)
4437 _routes.addRoute(cid_, handler_, requestedAcks_,
4438 systemAddedAcks_, commandType_);
4444 Unlock<Mutex> u(_lock);
4445 data = amps_invoke_copy_route_function(handler_.userData());
4449 _routes.addRoute(cid_, handler_, requestedAcks_,
4450 systemAddedAcks_, commandType_);
4454 _routes.addRoute(cid_,
4458 systemAddedAcks_, commandType_);
4462 else if ((commandType == Message::Command::Publish ||
4463 commandType == Message::Command::DeltaPublish)
4464 && requestedAcks_ & ~
Message::AckType::Persisted)
4467 _routes.addRoute(cid_, handler_, requestedAcks_,
4468 systemAddedAcks_, commandType_);
4473 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4478 bool isHASubscribe_ =
true)
4480 isHASubscribe_ &= (bool)_subscriptionManager;
4481 Message& message = command_.getMessage();
4482 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4483 Message::AckType::Processed : Message::AckType::None;
4485 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
4487 if (commandType == Message::Command::StopTimer)
4489 systemAddedAcks |= Message::AckType::Completed;
4491 else if (commandType == Message::Command::Unsubscribe)
4497 _routes.unsubscribeAll();
4498 if (_subscriptionManager)
4500 Unlock<Mutex> unlock(_lock);
4501 _subscriptionManager->clear();
4506 _routes.removeRoute(subId);
4508 if (_subscriptionManager)
4511 Unlock<Mutex> unlock(_lock);
4512 _subscriptionManager->unsubscribe(subId);
4516 deferredExecution(&s_noOpFn, NULL);
4519 if (handler_.isValid() && cid.
empty())
4525 if (command_.isSubscribe())
4528 if (_bookmarkStore.isValid())
4530 systemAddedAcks |= Message::AckType::Persisted;
4538 _bookmarkStore.
log(message);
4539 if (!BookmarkRange::isRange(bookmark))
4541 _bookmarkStore.
discard(message);
4555 systemAddedAcks |= Message::AckType::Persisted;
4557 bool isSubscribe = command_.isSubscribe();
4558 if (handler_.isValid() && !isSubscribe)
4560 _registerHandler(command_, cid, handler_,
4561 requestedAcks, systemAddedAcks, commandType);
4565 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
4566 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4569 Unlock<Mutex> u(_lock);
4570 haSequenceNumber = _publishStore.
store(message);
4577 syncAckProcessing((
long)command_.getTimeout(), message,
4582 _send(message, haSequenceNumber);
4585 catch (
const DisconnectedException&)
4591 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4602 Unlock<Mutex> u(_lock);
4603 _subscriptionManager->subscribe(handler_,
4606 if (_badTimeToHASubscribe)
4609 return std::string(subId.
data(), subId.
len());
4612 if (handler_.isValid())
4614 _registerHandler(command_, cid, handler_,
4615 requestedAcks, systemAddedAcks, commandType);
4620 syncAckProcessing((
long)command_.getTimeout(), message,
4623 catch (
const DisconnectedException&)
4625 if (!isHASubscribe_)
4627 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4628 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4629 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4634 catch (
const TimedOutException&)
4636 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4637 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4638 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4647 Unlock<Mutex> unlock(_lock);
4648 _subscriptionManager->unsubscribe(subId);
4654 _routes.removeRoute(cid);
4655 _routes.removeRoute(subId);
4659 if (subId.
len() > 0)
4662 return std::string(subId.
data(), subId.
len());
4668 bool useSyncSend = commandType & ~
Message::Command::NoDataCommands
4669 || (cid.
len() > 0 && command_.hasProcessedAck());
4675 syncAckProcessing((
long)(command_.getTimeout()), message);
4682 catch (
const TimedOutException&)
4684 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4685 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4689 catch (
const DisconnectedException&)
4691 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4692 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4698 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4699 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4712 bool isHASubscribe_ =
true)
4714 Lock<Mutex> lock(_lock);
4715 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4719 void setAutoAck(
bool isAutoAckEnabled_)
4721 _isAutoAckEnabled = isAutoAckEnabled_;
4723 bool getAutoAck(
void)
const 4725 return _isAutoAckEnabled;
4727 void setAckBatchSize(
const unsigned batchSize_)
4729 _ackBatchSize = batchSize_;
4730 if (!_queueAckTimeout)
4732 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4736 unsigned getAckBatchSize(
void)
const 4738 return _ackBatchSize;
4740 int getAckTimeout(
void)
const 4742 return _queueAckTimeout;
4744 void setAckTimeout(
const int ackTimeout_)
4747 _queueAckTimeout = ackTimeout_;
4749 size_t _ack(QueueBookmarks& queueBookmarks_)
4751 if (queueBookmarks_._bookmarkCount)
4753 publishStoreMessage.reset();
4758 amps_uint64_t haSequenceNumber = 0;
4761 haSequenceNumber = _publishStore.
store(publishStoreMessage);
4764 queueBookmarks_._data.erase();
4765 queueBookmarks_._bookmarkCount = 0;
4767 _send(publishStoreMessage, haSequenceNumber);
4770 queueBookmarks_._data.erase();
4771 queueBookmarks_._bookmarkCount = 0;
4777 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4779 if (_isAutoAckEnabled)
4783 _ack(topic_, bookmark_, options_);
4785 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4787 if (bookmark_.
len() == 0)
4791 Lock<Mutex> lock(_lock);
4792 if (_ackBatchSize < 2 || options_ != NULL)
4794 publishStoreMessage.reset();
4802 amps_uint64_t haSequenceNumber = 0;
4805 haSequenceNumber = _publishStore.
store(publishStoreMessage);
4809 _send(publishStoreMessage, haSequenceNumber);
4813 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(), topic_.
len());
4814 TopicHashMap::iterator it = _topicHashMap.find(hash);
4815 if (it == _topicHashMap.end())
4818 #ifdef AMPS_USE_EMPLACE 4819 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4821 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4824 QueueBookmarks& queueBookmarks = it->second;
4825 if (queueBookmarks._data.length())
4827 queueBookmarks._data.append(
",");
4831 queueBookmarks._oldestTime = amps_now();
4833 queueBookmarks._data.append(bookmark_);
4834 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4836 _ack(queueBookmarks);
4839 void flushAcks(
void)
4841 size_t sendCount = 0;
4848 Lock<Mutex> lock(_lock);
4849 typedef TopicHashMap::iterator iterator;
4850 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4852 QueueBookmarks& queueBookmarks = it->second;
4853 sendCount += _ack(queueBookmarks);
4856 if (sendCount && _connected)
4858 publishFlush(0, Message::AckType::Processed);
4862 void checkQueueAcks(
void)
4864 if (!_topicHashMap.size())
4868 Lock<Mutex> lock(_lock);
4871 amps_uint64_t threshold = amps_now()
4872 - (amps_uint64_t)_queueAckTimeout;
4873 typedef TopicHashMap::iterator iterator;
4874 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4876 QueueBookmarks& queueBookmarks = it->second;
4877 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4879 _ack(queueBookmarks);
4883 catch (std::exception& ex)
4885 AMPS_UNHANDLED_EXCEPTION(ex);
4889 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4891 Lock<Mutex> lock(_deferredExecutionLock);
4892 #ifdef AMPS_USE_EMPLACE 4893 _deferredExecutionList.emplace_back(
4894 DeferredExecutionRequest(func_, userData_));
4896 _deferredExecutionList.push_back(
4897 DeferredExecutionRequest(func_, userData_));
4901 inline void processDeferredExecutions(
void)
4903 if (_deferredExecutionList.size())
4905 Lock<Mutex> lock(_deferredExecutionLock);
4906 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4907 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4908 for (; it != end; ++it)
4912 it->_func(it->_userData);
4920 _deferredExecutionList.clear();
4921 _routes.invalidateCache();
4922 _routeCache.invalidateCache();
4926 bool getRetryOnDisconnect(
void)
const 4928 return _isRetryOnDisconnect;
4931 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4933 _isRetryOnDisconnect = isRetryOnDisconnect_;
4936 void setDefaultMaxDepth(
unsigned maxDepth_)
4938 _defaultMaxDepth = maxDepth_;
4941 unsigned getDefaultMaxDepth(
void)
const 4943 return _defaultMaxDepth;
5035 RefHandle<MessageStreamImpl> _body;
5045 inline void advance(
void);
5052 : _pStream(pStream_)
5057 bool operator==(
const iterator& rhs)
const 5059 return _pStream == rhs._pStream;
5061 bool operator!=(
const iterator& rhs)
const 5063 return _pStream != rhs._pStream;
5065 void operator++(
void)
5081 return _body.isValid();
5088 if (!_body.isValid())
5090 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5122 unsigned getMaxDepth(
void)
const;
5125 unsigned getDepth(
void)
const;
5130 inline void setSOWOnly(
const std::string& commandId_,
5131 const std::string& queryId_ =
"");
5132 inline void setSubscription(
const std::string& subId_,
5133 const std::string& commandId_ =
"",
5134 const std::string& queryId_ =
"");
5135 inline void setStatsOnly(
const std::string& commandId_,
5136 const std::string& queryId_ =
"");
5137 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5143 friend class Client;
5144 friend class ClientImpl;
5170 BorrowRefHandle<ClientImpl> _body;
5172 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5173 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5174 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5185 : _body(new ClientImpl(clientName), true)
5188 Client(ClientImpl* existingClient)
5189 : _body(existingClient,
true)
5192 Client(ClientImpl* existingClient,
bool isRef)
5193 : _body(existingClient, isRef)
5196 Client(
const Client& rhs) : _body(rhs._body) {;}
5197 virtual ~Client(
void) {;}
5199 Client& operator=(
const Client& rhs)
5207 return _body.isValid();
5224 _body.get().setName(name);
5231 return _body.get().getName();
5239 return _body.get().getNameHash();
5247 return _body.get().getNameHashValue();
5258 _body.get().setLogonCorrelationData(logonCorrelationData_);
5265 return _body.get().getLogonCorrelationData();
5273 _body.get().addHttpPreflightHeader(header_);
5282 _body.get().addHttpPreflightHeader(key_, value_);
5288 _body.get().clearHttpPreflightHeaders();
5297 _body.get().setHttpPreflightHeaders(headers_);
5310 return _body.get().getServerVersion();
5321 return _body.get().getServerVersionInfo();
5335 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5350 return AMPS::convertVersionToNumber(data_, len_);
5357 return _body.get().getURI();
5381 _body.get().connect(uri);
5388 _body.get().disconnect();
5406 _body.get().send(message);
5419 unsigned requestedAcks_,
bool isSubscribe_)
5421 Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5422 _body.get().addMessageHandler(commandId_, messageHandler_,
5423 requestedAcks_, commandType);
5436 unsigned requestedAcks_, Message::Command::Type commandType_)
5438 _body.get().addMessageHandler(commandId_, messageHandler_,
5439 requestedAcks_, commandType_);
5447 return _body.get().removeMessageHandler(commandId_);
5475 return _body.get().send(messageHandler, message, timeout);
5491 #if defined(_WIN32) || __cplusplus >= 201402L 5492 [[deprecated(
"Use HAClient for automatic reconnection and a ConnectionStateListener to monitor connection state.")]]
5496 _body.get().setDisconnectHandler(disconnectHandler);
5505 #if defined(_WIN32) || __cplusplus >= 201402L 5506 [[deprecated(
"Use HAClient for automatic reconnection and a ConnectionStateListener to monitor connection state.")]]
5510 return _body.get().getDisconnectHandler();
5519 return _body.get().getConnectionInfo();
5532 _body.get().setBookmarkStore(bookmarkStore_);
5540 return _body.
get().getBookmarkStore();
5548 return _body.get().getSubscriptionManager();
5560 _body.get().setSubscriptionManager(subscriptionManager_);
5584 _body.get().setPublishStore(publishStore_);
5592 return _body.
get().getPublishStore();
5600 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5601 duplicateMessageHandler_);
5615 return _body.get().getDuplicateMessageHandler();
5629 _body.get().setFailedWriteHandler(handler_);
5637 return _body.get().getFailedWriteHandler();
5658 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5660 return _body.get().publish(topic_.c_str(), topic_.length(),
5661 data_.c_str(), data_.length());
5683 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5684 const char* data_,
size_t dataLength_)
5686 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5707 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5708 unsigned long expiration_)
5710 return _body.get().publish(topic_.c_str(), topic_.length(),
5711 data_.c_str(), data_.length(), expiration_);
5734 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5735 const char* data_,
size_t dataLength_,
5736 unsigned long expiration_)
5738 return _body.get().publish(topic_, topicLength_,
5739 data_, dataLength_, expiration_);
5780 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5782 _body.get().publishFlush(timeout_, ackType_);
5801 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5803 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5804 data_.c_str(), data_.length());
5825 const char* data_,
size_t dataLength_)
5827 return _body.get().deltaPublish(topic_, topicLength_,
5828 data_, dataLength_);
5847 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5848 unsigned long expiration_)
5850 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5851 data_.c_str(), data_.length(),
5874 const char* data_,
size_t dataLength_,
5875 unsigned long expiration_)
5877 return _body.get().deltaPublish(topic_, topicLength_,
5878 data_, dataLength_, expiration_);
5898 const char* options_ = NULL)
5900 return _body.get().logon(timeout_, authenticator_, options_);
5915 std::string
logon(
const char* options_,
int timeout_ = 0)
5934 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5960 const std::string& topic_,
5962 const std::string& filter_ =
"",
5963 const std::string& options_ =
"",
5964 const std::string& subId_ =
"")
5966 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5967 filter_,
"", options_, subId_);
5986 long timeout_ = 0,
const std::string& filter_ =
"",
5987 const std::string& options_ =
"",
5988 const std::string& subId_ =
"")
5991 if (_body.get().getDefaultMaxDepth())
5993 result.
maxDepth(_body.get().getDefaultMaxDepth());
5995 result.setSubscription(_body.get().subscribe(
5997 topic_, timeout_, filter_,
"",
5998 options_, subId_,
false));
6018 long timeout_ = 0,
const std::string& filter_ =
"",
6019 const std::string& options_ =
"",
6020 const std::string& subId_ =
"")
6023 if (_body.get().getDefaultMaxDepth())
6025 result.
maxDepth(_body.get().getDefaultMaxDepth());
6027 result.setSubscription(_body.get().subscribe(
6029 topic_, timeout_, filter_,
"",
6030 options_, subId_,
false));
6047 const std::string& topic_,
6049 const std::string& filter_ =
"",
6050 const std::string& options_ =
"",
6051 const std::string& subId_ =
"")
6053 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
6054 filter_,
"", options_, subId_);
6065 long timeout_,
const std::string& filter_ =
"",
6066 const std::string& options_ =
"",
6067 const std::string& subId_ =
"")
6070 if (_body.get().getDefaultMaxDepth())
6072 result.
maxDepth(_body.get().getDefaultMaxDepth());
6074 result.setSubscription(_body.get().deltaSubscribe(
6076 topic_, timeout_, filter_,
"",
6077 options_, subId_,
false));
6083 long timeout_,
const std::string& filter_ =
"",
6084 const std::string& options_ =
"",
6085 const std::string& subId_ =
"")
6088 if (_body.get().getDefaultMaxDepth())
6090 result.
maxDepth(_body.get().getDefaultMaxDepth());
6092 result.setSubscription(_body.get().deltaSubscribe(
6094 topic_, timeout_, filter_,
"",
6095 options_, subId_,
false));
6125 const std::string& topic_,
6127 const std::string& bookmark_,
6128 const std::string& filter_ =
"",
6129 const std::string& options_ =
"",
6130 const std::string& subId_ =
"")
6132 return _body.get().subscribe(messageHandler_, topic_, timeout_,
6133 filter_, bookmark_, options_, subId_);
6154 const std::string& bookmark_,
6155 const std::string& filter_ =
"",
6156 const std::string& options_ =
"",
6157 const std::string& subId_ =
"")
6160 if (_body.get().getDefaultMaxDepth())
6162 result.
maxDepth(_body.get().getDefaultMaxDepth());
6164 result.setSubscription(_body.get().subscribe(
6166 topic_, timeout_, filter_,
6167 bookmark_, options_,
6175 const std::string& bookmark_,
6176 const std::string& filter_ =
"",
6177 const std::string& options_ =
"",
6178 const std::string& subId_ =
"")
6181 if (_body.get().getDefaultMaxDepth())
6183 result.
maxDepth(_body.get().getDefaultMaxDepth());
6185 result.setSubscription(_body.get().subscribe(
6187 topic_, timeout_, filter_,
6188 bookmark_, options_,
6203 return _body.get().unsubscribe(commandId);
6215 return _body.get().unsubscribe();
6249 const std::string& topic_,
6250 const std::string& filter_ =
"",
6251 const std::string& orderBy_ =
"",
6252 const std::string& bookmark_ =
"",
6253 int batchSize_ = DEFAULT_BATCH_SIZE,
6254 int topN_ = DEFAULT_TOP_N,
6255 const std::string& options_ =
"",
6256 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6258 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6259 bookmark_, batchSize_, topN_, options_,
6287 const std::string& filter_ =
"",
6288 const std::string& orderBy_ =
"",
6289 const std::string& bookmark_ =
"",
6290 int batchSize_ = DEFAULT_BATCH_SIZE,
6291 int topN_ = DEFAULT_TOP_N,
6292 const std::string& options_ =
"",
6293 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6296 if (_body.get().getDefaultMaxDepth())
6298 result.
maxDepth(_body.get().getDefaultMaxDepth());
6300 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6301 topic_, filter_, orderBy_, bookmark_,
6302 batchSize_, topN_, options_, timeout_));
6308 const std::string& filter_ =
"",
6309 const std::string& orderBy_ =
"",
6310 const std::string& bookmark_ =
"",
6311 int batchSize_ = DEFAULT_BATCH_SIZE,
6312 int topN_ = DEFAULT_TOP_N,
6313 const std::string& options_ =
"",
6314 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6317 if (_body.get().getDefaultMaxDepth())
6319 result.
maxDepth(_body.get().getDefaultMaxDepth());
6321 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6322 topic_, filter_, orderBy_, bookmark_,
6323 batchSize_, topN_, options_, timeout_));
6349 const std::string& topic_,
6351 const std::string& filter_ =
"",
6352 int batchSize_ = DEFAULT_BATCH_SIZE,
6353 int topN_ = DEFAULT_TOP_N)
6355 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6381 const std::string& topic_,
6383 const std::string& filter_ =
"",
6384 int batchSize_ = DEFAULT_BATCH_SIZE,
6385 bool oofEnabled_ =
false,
6386 int topN_ = DEFAULT_TOP_N)
6388 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6389 filter_, batchSize_, oofEnabled_,
6414 const std::string& filter_ =
"",
6415 int batchSize_ = DEFAULT_BATCH_SIZE,
6416 bool oofEnabled_ =
false,
6417 int topN_ = DEFAULT_TOP_N)
6420 if (_body.get().getDefaultMaxDepth())
6422 result.
maxDepth(_body.get().getDefaultMaxDepth());
6424 result.setSubscription(_body.get().sowAndSubscribe(
6426 topic_, timeout_, filter_,
6427 batchSize_, oofEnabled_,
6452 const std::string& filter_ =
"",
6453 int batchSize_ = DEFAULT_BATCH_SIZE,
6454 bool oofEnabled_ =
false,
6455 int topN_ = DEFAULT_TOP_N)
6458 if (_body.get().getDefaultMaxDepth())
6460 result.
maxDepth(_body.get().getDefaultMaxDepth());
6462 result.setSubscription(_body.get().sowAndSubscribe(
6464 topic_, timeout_, filter_,
6465 batchSize_, oofEnabled_,
6499 const std::string& topic_,
6500 const std::string& filter_ =
"",
6501 const std::string& orderBy_ =
"",
6502 const std::string& bookmark_ =
"",
6503 int batchSize_ = DEFAULT_BATCH_SIZE,
6504 int topN_ = DEFAULT_TOP_N,
6505 const std::string& options_ =
"",
6506 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6508 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6509 orderBy_, bookmark_, batchSize_,
6510 topN_, options_, timeout_);
6538 const std::string& filter_ =
"",
6539 const std::string& orderBy_ =
"",
6540 const std::string& bookmark_ =
"",
6541 int batchSize_ = DEFAULT_BATCH_SIZE,
6542 int topN_ = DEFAULT_TOP_N,
6543 const std::string& options_ =
"",
6544 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6547 if (_body.get().getDefaultMaxDepth())
6549 result.
maxDepth(_body.get().getDefaultMaxDepth());
6551 result.setSubscription(_body.get().sowAndSubscribe(
6553 topic_, filter_, orderBy_,
6554 bookmark_, batchSize_, topN_,
6555 options_, timeout_,
false));
6561 const std::string& filter_ =
"",
6562 const std::string& orderBy_ =
"",
6563 const std::string& bookmark_ =
"",
6564 int batchSize_ = DEFAULT_BATCH_SIZE,
6565 int topN_ = DEFAULT_TOP_N,
6566 const std::string& options_ =
"",
6567 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6570 if (_body.get().getDefaultMaxDepth())
6572 result.
maxDepth(_body.get().getDefaultMaxDepth());
6574 result.setSubscription(_body.get().sowAndSubscribe(
6576 topic_, filter_, orderBy_,
6577 bookmark_, batchSize_, topN_,
6578 options_, timeout_,
false));
6607 const std::string& topic_,
6608 const std::string& filter_ =
"",
6609 const std::string& orderBy_ =
"",
6610 int batchSize_ = DEFAULT_BATCH_SIZE,
6611 int topN_ = DEFAULT_TOP_N,
6612 const std::string& options_ =
"",
6613 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6615 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6616 filter_, orderBy_, batchSize_,
6617 topN_, options_, timeout_);
6640 const std::string& filter_ =
"",
6641 const std::string& orderBy_ =
"",
6642 int batchSize_ = DEFAULT_BATCH_SIZE,
6643 int topN_ = DEFAULT_TOP_N,
6644 const std::string& options_ =
"",
6645 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6648 if (_body.get().getDefaultMaxDepth())
6650 result.
maxDepth(_body.get().getDefaultMaxDepth());
6652 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6654 topic_, filter_, orderBy_,
6655 batchSize_, topN_, options_,
6662 const std::string& filter_ =
"",
6663 const std::string& orderBy_ =
"",
6664 int batchSize_ = DEFAULT_BATCH_SIZE,
6665 int topN_ = DEFAULT_TOP_N,
6666 const std::string& options_ =
"",
6667 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6670 if (_body.get().getDefaultMaxDepth())
6672 result.
maxDepth(_body.get().getDefaultMaxDepth());
6674 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6676 topic_, filter_, orderBy_,
6677 batchSize_, topN_, options_,
6707 const std::string& topic_,
6709 const std::string& filter_ =
"",
6710 int batchSize_ = DEFAULT_BATCH_SIZE,
6711 bool oofEnabled_ =
false,
6712 bool sendEmpties_ =
false,
6713 int topN_ = DEFAULT_TOP_N)
6715 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6716 timeout_, filter_, batchSize_,
6717 oofEnabled_, sendEmpties_,
6744 const std::string& filter_ =
"",
6745 int batchSize_ = DEFAULT_BATCH_SIZE,
6746 bool oofEnabled_ =
false,
6747 bool sendEmpties_ =
false,
6748 int topN_ = DEFAULT_TOP_N)
6751 if (_body.get().getDefaultMaxDepth())
6753 result.
maxDepth(_body.get().getDefaultMaxDepth());
6755 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6757 topic_, timeout_, filter_,
6758 batchSize_, oofEnabled_,
6759 sendEmpties_, topN_,
false));
6785 const std::string& filter_ =
"",
6786 int batchSize_ = DEFAULT_BATCH_SIZE,
6787 bool oofEnabled_ =
false,
6788 bool sendEmpties_ =
false,
6789 int topN_ = DEFAULT_TOP_N)
6792 if (_body.get().getDefaultMaxDepth())
6794 result.
maxDepth(_body.get().getDefaultMaxDepth());
6796 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6798 topic_, timeout_, filter_,
6799 batchSize_, oofEnabled_,
6800 sendEmpties_, topN_,
false));
6823 const std::string& topic,
6824 const std::string& filter,
6827 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6849 stream.
timeout((
unsigned int)timeout_);
6856 stream.setStatsOnly(cid);
6857 _body.get().sowDelete(stream.operator
MessageHandler(), topic_, filter_, timeout_, cid);
6858 return *(stream.
begin());
6860 catch (
const DisconnectedException&)
6862 removeMessageHandler(cid);
6865 catch (
const TimedOutException&)
6867 removeMessageHandler(cid);
6878 _body.get().startTimer();
6889 return _body.get().stopTimer(messageHandler);
6914 const std::string& topic_,
6915 const std::string& keys_,
6918 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6944 stream.
timeout((
unsigned int)timeout_);
6951 stream.setStatsOnly(cid);
6952 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6953 return *(stream.
begin());
6955 catch (
const DisconnectedException&)
6957 removeMessageHandler(cid);
6960 catch (
const TimedOutException&)
6962 removeMessageHandler(cid);
6982 const std::string& topic_,
const std::string& data_,
6985 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
7006 stream.
timeout((
unsigned int)timeout_);
7013 stream.setStatsOnly(cid);
7014 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
7015 return *(stream.
begin());
7017 catch (
const DisconnectedException&)
7019 removeMessageHandler(cid);
7022 catch (
const TimedOutException&)
7024 removeMessageHandler(cid);
7034 return _body.get().getHandle();
7047 _body.get().setExceptionListener(pListener_);
7058 #if defined(_WIN32) || __cplusplus >= 201402L 7059 [[deprecated(
"Use setExceptionListener(std::shared_ptr<const ExceptionListener>&)")]]
7063 _body.get().setExceptionListener(listener_);
7070 return _body.get().getExceptionListener();
7096 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
7120 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
7124 #if defined(_WIN32) || __cplusplus >= 201402L 7125 [[deprecated(
"Use setLastChanceMessageHandler.")]]
7129 setLastChanceMessageHandler(messageHandler);
7136 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
7162 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7187 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7270 _body.get().addConnectionStateListener(listener);
7278 _body.get().removeConnectionStateListener(listener);
7285 _body.get().clearConnectionStateListeners();
7315 return _body.get().executeAsync(command_, handler_);
7353 if (command_.isSubscribe())
7355 Message& message = command_.getMessage();
7358 if (useExistingHandler)
7361 if (_body.get()._routes.getRoute(subId, existingHandler))
7364 _body.get().executeAsync(command_, existingHandler,
false);
7369 id = _body.get().executeAsync(command_, handler_,
false);
7371 catch (
const DisconnectedException&)
7373 removeMessageHandler(command_.getMessage().
getCommandId());
7374 if (command_.isSubscribe())
7378 if (command_.isSow())
7380 removeMessageHandler(command_.getMessage().
getQueryID());
7411 _body.get().ack(topic_, bookmark_, options_);
7433 void ack(
const std::string& topic_,
const std::string& bookmark_,
7434 const char* options_ = NULL)
7436 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7444 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7446 _body.get()._ack(topic_, bookmark_, options_);
7459 _body.get().flushAcks();
7468 return _body.get().getAutoAck();
7478 _body.get().setAutoAck(isAutoAckEnabled_);
7486 return _body.get().getAckBatchSize();
7496 _body.get().setAckBatchSize(ackBatchSize_);
7507 return _body.get().getAckTimeout();
7519 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7521 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7523 _body.get().setAckTimeout(ackTimeout_);
7537 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7546 return _body.get().getRetryOnDisconnect();
7555 _body.get().setDefaultMaxDepth(maxDepth_);
7564 return _body.get().getDefaultMaxDepth();
7576 return _body.get().setTransportFilterFunction(filter_, userData_);
7590 return _body.get().setThreadCreatedCallback(callback_, userData_);
7598 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7600 _body.get().deferredExecution(func_, userData_);
7610 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7616 unsigned deliveries = 0;
7628 const char* data = NULL;
7630 const char* status = NULL;
7631 size_t statusLen = 0;
7633 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7636 if (len == NotEntitled || len == Duplicate ||
7637 (statusLen == Failure && status[0] ==
'f'))
7639 if (_failedWriteHandler)
7641 if (_publishStore.isValid())
7643 amps_uint64_t sequence =
7645 FailedWriteStoreReplayer replayer(
this, data, len);
7646 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7647 replayer, sequence));
7653 AMPS_CALL_EXCEPTION_WRAPPER(
7654 _failedWriteHandler->failedWrite(emptyMessage,
7660 if (_publishStore.isValid())
7669 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7673 if (!deliveries && _bookmarkStore.isValid())
7680 const char* bookmarkData = NULL;
7681 size_t bookmarkLen = 0;
7687 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7690 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
7695 catch (std::exception& ex)
7697 AMPS_UNHANDLED_EXCEPTION(ex);
7703 ClientImpl::processedAck(
Message& message)
7705 unsigned deliveries = 0;
7707 const char* data = NULL;
7711 Lock<Mutex> l(_lock);
7714 Lock<Mutex> guard(_ackMapLock);
7715 AckMap::iterator i = _ackMap.find(std::string(data, len));
7716 if (i != _ackMap.end())
7726 ack.setStatus(data, len);
7728 ack.setReason(data, len);
7730 ack.setUsername(data, len);
7732 ack.setPassword(data, len);
7734 ack.setServerVersion(data, len);
7736 ack.setOptions(data, len);
7746 ClientImpl::checkAndSendHeartbeat(
bool force)
7748 if (force || _heartbeatTimer.check())
7750 _heartbeatTimer.start();
7753 sendWithoutRetry(_beatMessage);
7755 catch (
const AMPSException&)
7762 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 7764 ConnectionInfo info;
7765 std::ostringstream writer;
7767 info[
"client.uri"] = _lastUri;
7768 info[
"client.name"] = _name;
7769 info[
"client.username"] = _username;
7770 if (_publishStore.isValid())
7772 writer << _publishStore.unpersistedCount();
7773 info[
"publishStore.unpersistedCount"] = writer.str();
7782 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7784 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7785 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7786 ClientImpl* me = (ClientImpl*) userData_;
7787 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7788 if (!messageHandle_)
7790 if (me->_queueAckTimeout)
7792 me->checkQueueAcks();
7794 me->checkAndSendHeartbeat();
7798 me->_readMessage.replace(messageHandle_);
7799 Message& message = me->_readMessage;
7801 if (commandType & SOWMask)
7803 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7807 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7808 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7810 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7813 else if (commandType & PublishMask)
7815 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7816 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7817 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7818 GlobalCommandTypeHandlers::Publish :
7819 GlobalCommandTypeHandlers::OOF)].invoke(message));
7821 const char* subIds = NULL;
7822 size_t subIdsLen = 0;
7825 &subIds, &subIdsLen);
7826 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7827 for (
size_t i = 0; i < subIdCount; ++i)
7829 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7831 if (handler.isValid())
7834 AMPS_SubscriptionId,
7835 subIds + lookupResult.idOffset,
7836 lookupResult.idLength);
7839 bool isAutoAck = me->_isAutoAckEnabled;
7841 if (!isMessageQueue && !bookmark.
empty() &&
7842 me->_bookmarkStore.isValid())
7844 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7847 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7849 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7854 me->_bookmarkStore.log(me->_readMessage);
7855 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7856 handler.invoke(message));
7861 if (isMessageQueue && isAutoAck)
7865 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7866 if (!message.getIgnoreAutoAck())
7868 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7872 catch (std::exception& ex)
7874 if (!message.getIgnoreAutoAck())
7876 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7879 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7884 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7885 handler.invoke(message));
7891 me->lastChance(message);
7895 else if (commandType == Message::Command::Ack)
7897 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7898 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7900 unsigned deliveries = 0U;
7903 case Message::AckType::Persisted:
7904 deliveries += me->persistedAck(message);
7906 case Message::AckType::Processed:
7907 deliveries += me->processedAck(message);
7910 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7911 if (deliveries == 0)
7913 me->lastChance(message);
7916 else if (commandType == Message::Command::Heartbeat)
7918 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7919 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7920 if (me->_heartbeatTimer.getTimeout() != 0.0)
7922 me->checkAndSendHeartbeat(
true);
7926 me->lastChance(message);
7932 unsigned deliveries = 0U;
7935 while (me->_connected)
7939 deliveries = me->_routes.deliverData(message, message.
getCommandId());
7943 catch (MessageStreamFullException&)
7945 catch (MessageStreamFullException& ex_)
7950 me->checkAndSendHeartbeat(
false);
7953 catch (std::exception&)
7955 catch (std::exception& ex_)
7963 catch (std::exception& ex_)
7967 me->_exceptionListener->exceptionThrown(ex_);
7974 if (deliveries == 0)
7976 me->lastChance(message);
7979 me->checkAndSendHeartbeat();
7984 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7986 ClientImpl* me = (ClientImpl*) userData;
7989 me->clearAcks(failedConnectionVersion);
7993 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7995 ClientImpl* me = (ClientImpl*) userData;
7996 Lock<Mutex> l(me->_lock);
7997 Client wrapper(me,
false);
8000 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
8004 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
8005 bool retryInProgress =
false;
8008 me->_connected =
false;
8009 me->_lock.signalAll();
8012 Unlock<Mutex> unlock(me->_lock);
8013 me->_disconnectHandler.invoke(wrapper);
8016 catch (
const RetryOperationException&)
8018 catch (
const RetryOperationException& ex)
8021 retryInProgress =
true;
8023 catch (
const std::exception& ex)
8025 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
8027 me->_lock.signalAll();
8029 if (!me->_connected)
8031 if (retryInProgress)
8033 AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException(
"Reconnect in progress."));
8037 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
8038 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
8045 if (me->_subscriptionManager)
8050 Unlock<Mutex> unlock(me->_lock);
8051 me->_subscriptionManager->resubscribe(wrapper);
8053 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
8057 catch (
const AMPSException& subEx)
8059 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8061 catch (
const std::exception& subEx)
8063 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8075 ClientImpl::ClientImplGetHttpPreflightMessage(
void* userData_)
8077 ClientImpl* me = (ClientImpl*)userData_;
8078 std::ostringstream os;
8081 size_t firstColon = me->_lastUri.find(
':');
8083 size_t pathEnd = me->_lastUri.find(
'?');
8085 size_t lastColon = me->_lastUri.rfind(
':', pathEnd);
8087 size_t at = me->_lastUri.rfind(
'@', lastColon);
8089 size_t hostStart = at == std::string::npos ? firstColon + 3 : at + 1;
8090 size_t hostLen = lastColon - hostStart;
8092 size_t pathStart = me->_lastUri.find(
'/', lastColon);
8093 size_t pathLen = pathEnd;
8094 if (pathEnd != std::string::npos)
8096 pathLen = pathEnd - pathStart;
8098 os <<
"GET " << me->_lastUri.substr(pathStart, pathLen)
8099 <<
" HTTP/1.1\r\nHost: " << me->_lastUri.substr(hostStart, hostLen)
8100 <<
"\r\nConnection: upgrade\r\nUpgrade: " 8101 << me->_lastUri.substr(0, firstColon) <<
"\r\n";
8102 for (
auto header : me->_httpPreflightHeaders)
8104 os << header <<
"\r\n";
8107 me->_preflightMessage = os.str();
8108 return me->_preflightMessage.c_str();
8123 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
8124 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
8126 while (_pos != _len && _data[_pos] == _fieldSep)
8132 typedef void* difference_type;
8133 typedef std::forward_iterator_tag iterator_category;
8134 typedef std::pair<Message::Field, Message::Field> value_type;
8135 typedef value_type* pointer;
8136 typedef value_type& reference;
8137 bool operator==(
const iterator& rhs)
const 8139 return _pos == rhs._pos;
8141 bool operator!=(
const iterator& rhs)
const 8143 return _pos != rhs._pos;
8145 iterator& operator++()
8148 while (_pos != _len && _data[_pos] != _fieldSep)
8153 while (_pos != _len && _data[_pos] == _fieldSep)
8160 value_type operator*()
const 8163 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
8164 for (; i < _len && _data[i] !=
'='; ++i)
8169 result.first.assign(_data + _pos, keyLength);
8171 if (i < _len && _data[i] ==
'=')
8175 for (; i < _len && _data[i] != _fieldSep; ++i)
8180 result.second.assign(_data + valueStart, valueLength);
8186 class reverse_iterator
8193 typedef std::pair<Message::Field, Message::Field> value_type;
8194 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
8195 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
8200 while (_pos >= _data && *_pos == _fieldSep)
8204 while (_pos > _data && *_pos != _fieldSep)
8211 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8221 bool operator==(
const reverse_iterator& rhs)
const 8223 return _pos == rhs._pos;
8225 bool operator!=(
const reverse_iterator& rhs)
const 8227 return _pos != rhs._pos;
8229 reverse_iterator& operator++()
8240 while (_pos >= _data && *_pos == _fieldSep)
8245 while (_pos > _data && *_pos != _fieldSep)
8249 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8260 value_type operator*()
const 8263 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8264 size_t i = (size_t)(_pos - _data);
8265 for (; i < _len && _data[i] !=
'='; ++i)
8269 result.first.assign(_pos, keyLength);
8270 if (i < _len && _data[i] ==
'=')
8274 for (; i < _len && _data[i] != _fieldSep; ++i)
8279 result.second.assign(_data + valueStart, valueLength);
8284 : _data(data.
data()), _len(data.
len()),
8285 _fieldSep(fieldSeparator)
8289 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8290 : _data(data), _len(len), _fieldSep(fieldSeparator)
8294 iterator begin()
const 8296 return iterator(_data, _len, 0, _fieldSep);
8298 iterator end()
const 8300 return iterator(_data, _len, _len, _fieldSep);
8304 reverse_iterator rbegin()
const 8306 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8309 reverse_iterator rend()
const 8311 return reverse_iterator(_data, _len, 0, _fieldSep);
8332 std::stringstream _data;
8349 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8351 _data << tag <<
'=';
8352 _data.write(value + offset, (std::streamsize)length);
8360 void append(
const T& tag,
const std::string& value)
8362 _data << tag <<
'=' << value << _fs;
8371 operator std::string()
const 8379 _data.str(std::string());
8416 typedef std::map<Message::Field, Message::Field>
map_type;
8427 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8436 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128 8440 std::deque<Message> _q;
8441 std::deque<Message> _cache;
8442 std::string _commandId;
8444 std::string _queryId;
8448 unsigned _requestedAcks;
8452 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8453 #if __cplusplus >= 201100L || _MSC_VER >= 1900 8454 std::atomic<State> _state;
8456 volatile State _state;
8458 typedef std::map<std::string, Message*> SOWKeyMap;
8459 SOWKeyMap _sowKeyMap;
8461 MessageStreamImpl(
const Client& client_)
8464 _maxDepth((
unsigned)~0),
8466 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8469 if (_client.isValid())
8475 MessageStreamImpl(ClientImpl* client_)
8478 _maxDepth((
unsigned)~0),
8482 if (_client.isValid())
8488 ~MessageStreamImpl()
8492 virtual void destroy()
8498 catch (std::exception& e)
8502 if (_client.isValid())
8509 if (_client.isValid())
8513 _client = Client((ClientImpl*)NULL);
8514 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8522 static void destroyer(
void* vpMessageStreamImpl_)
8524 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8527 void setSubscription(
const std::string& subId_,
8528 const std::string& commandId_ =
"",
8529 const std::string& queryId_ =
"")
8531 Lock<Mutex> lock(_lock);
8533 if (!commandId_.empty() && commandId_ != subId_)
8535 _commandId = commandId_;
8537 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8539 _queryId = queryId_;
8542 if (Disconnected == _state)
8546 assert(Unset == _state);
8550 void setSOWOnly(
const std::string& commandId_,
8551 const std::string& queryId_ =
"")
8553 Lock<Mutex> lock(_lock);
8554 _commandId = commandId_;
8555 if (!queryId_.empty() && queryId_ != commandId_)
8557 _queryId = queryId_;
8560 if (Disconnected == _state)
8564 assert(Unset == _state);
8568 void setStatsOnly(
const std::string& commandId_,
8569 const std::string& queryId_ =
"")
8571 Lock<Mutex> lock(_lock);
8572 _commandId = commandId_;
8573 if (!queryId_.empty() && queryId_ != commandId_)
8575 _queryId = queryId_;
8578 if (Disconnected == _state)
8582 assert(Unset == _state);
8584 _requestedAcks = Message::AckType::Stats;
8587 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8589 Lock<Mutex> lock(_lock);
8590 _commandId = commandId_;
8592 if (Disconnected == _state)
8596 assert(Unset == _state);
8598 _requestedAcks = acks_;
8603 Lock<Mutex> lock(_lock);
8604 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8606 _state = Disconnected;
8609 else if (state_ == AMPS::ConnectionStateListener::Connected
8610 && _commandId.empty()
8612 && _queryId.empty())
8620 void timeout(
unsigned timeout_)
8622 _timeout = timeout_;
8626 if (_state == Subscribe)
8631 void maxDepth(
unsigned maxDepth_)
8635 _maxDepth = maxDepth_;
8639 _maxDepth = (unsigned)~0;
8642 unsigned getMaxDepth(
void)
const 8646 unsigned getDepth(
void)
const 8648 return (
unsigned)(_q.size());
8653 Lock<Mutex> lock(_lock);
8654 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
8658 if (_client.isValid())
8660 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8664 catch (AMPSException&)
8666 catch (AMPSException& e)
8669 current_.invalidate();
8670 _previousTopic.
clear();
8671 _previousBookmark.
clear();
8674 _previousTopic.
clear();
8675 _previousBookmark.
clear();
8678 long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8679 Timer timer((
double)_timeout);
8681 while (_q.empty() && _state & Running)
8684 _lock.wait(minWaitTime);
8686 Unlock<Mutex> unlck(_lock);
8687 amps_invoke_waiting_function();
8692 if (timer.checkAndGetRemaining(&minWaitTime))
8698 minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8701 if (current_.isValid() && _cache.size() < _cacheMax)
8704 _cache.push_back(current_);
8708 current_ = _q.front();
8709 if (_q.size() == _maxDepth)
8714 if (_state == Conflate)
8716 std::string sowKey = current_.
getSowKey();
8717 if (sowKey.length())
8719 _sowKeyMap.erase(sowKey);
8722 else if (_state == AcksOnly)
8726 if ((_state == AcksOnly && _requestedAcks == 0) ||
8727 (_state == SOWOnly && current_.
getCommand() ==
"group_end"))
8731 else if (current_.isValid()
8742 if (_state == Disconnected)
8744 throw DisconnectedException(
"Connection closed.");
8746 current_.invalidate();
8747 if (_state == Closed)
8751 return _timeout != 0;
8755 if (_client.isValid())
8757 if (_state == SOWOnly || _state == Subscribe)
8759 if (!_commandId.empty())
8763 if (!_subId.empty())
8767 if (!_queryId.empty())
8774 if (!_commandId.empty())
8778 if (!_subId.empty())
8782 if (!_queryId.empty())
8788 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8793 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8795 Lock<Mutex> lock(this_->_lock);
8796 if (this_->_state != Conflate)
8798 AMPS_TESTING_SLOW_MESSAGE_STREAM
8799 if (this_->_q.size() >= this_->_maxDepth)
8804 this_->_lock.signalAll();
8805 throw MessageStreamFullException(
"Stream is currently full.");
8807 if (!this_->_cache.empty())
8809 this_->_cache.front().deepCopy(message_);
8810 this_->_q.push_back(this_->_cache.front());
8811 this_->_cache.pop_front();
8815 #ifdef AMPS_USE_EMPLACE 8816 this_->_q.emplace_back(message_.
deepCopy());
8818 this_->_q.push_back(message_.
deepCopy());
8822 this_->_client.isValid() && this_->_client.getAutoAck() &&
8826 message_.setIgnoreAutoAck();
8831 std::string sowKey = message_.
getSowKey();
8832 if (sowKey.length())
8834 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8835 if (it != this_->_sowKeyMap.end())
8837 it->second->deepCopy(message_);
8841 if (this_->_q.size() >= this_->_maxDepth)
8847 this_->_lock.signalAll();
8848 throw MessageStreamFullException(
"Stream is currently full.");
8850 if (!this_->_cache.empty())
8852 this_->_cache.front().deepCopy(message_);
8853 this_->_q.push_back(this_->_cache.front());
8854 this_->_cache.pop_front();
8858 #ifdef AMPS_USE_EMPLACE 8859 this_->_q.emplace_back(message_.
deepCopy());
8861 this_->_q.push_back(message_.
deepCopy());
8864 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8869 if (this_->_q.size() >= this_->_maxDepth)
8874 this_->_lock.signalAll();
8875 throw MessageStreamFullException(
"Stream is currently full.");
8877 if (!this_->_cache.empty())
8879 this_->_cache.front().deepCopy(message_);
8880 this_->_q.push_back(this_->_cache.front());
8881 this_->_cache.pop_front();
8885 #ifdef AMPS_USE_EMPLACE 8886 this_->_q.emplace_back(message_.
deepCopy());
8888 this_->_q.push_back(message_.
deepCopy());
8892 this_->_client.isValid() && this_->_client.getAutoAck() &&
8896 message_.setIgnoreAutoAck();
8900 this_->_lock.signalAll();
8903 inline MessageStream::MessageStream(
void)
8906 inline MessageStream::MessageStream(
const Client& client_)
8907 : _body(
new MessageStreamImpl(client_))
8910 inline MessageStream::MessageStream(RefHandle<MessageStreamImpl> body_)
8914 inline void MessageStream::iterator::advance(
void)
8916 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8920 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8925 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8927 result._body = (MessageStreamImpl*)(handler_._userData);
8932 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8933 const std::string& queryId_)
8935 _body->setSOWOnly(commandId_, queryId_);
8937 inline void MessageStream::setSubscription(
const std::string& subId_,
8938 const std::string& commandId_,
8939 const std::string& queryId_)
8941 _body->setSubscription(subId_, commandId_, queryId_);
8943 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8944 const std::string& queryId_)
8946 _body->setStatsOnly(commandId_, queryId_);
8948 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8951 _body->setAcksOnly(commandId_, acks_);
8970 return _body->getMaxDepth();
8974 return _body->getDepth();
8977 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8987 ClientImpl& body = _body.get();
8988 Message& message = command_.getMessage();
8992 if (useExistingHandler)
8998 if (body._routes.getRoute(subId, existingHandler))
9001 body.executeAsync(command_, existingHandler,
false);
9002 return MessageStream::fromExistingHandler(existingHandler);
9011 if ((command & Message::Command::NoDataCommands)
9012 && (ackTypes == Message::AckType::Persisted
9013 || ackTypes == Message::AckType::None))
9016 if (!body._pEmptyMessageStream)
9018 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
9019 body._pEmptyMessageStream.get()->_body->close();
9021 return body.getEmptyMessageStream();
9024 if (body.getDefaultMaxDepth())
9026 stream.
maxDepth(body.getDefaultMaxDepth());
9029 std::string commandID = body.executeAsync(command_, handler,
false);
9030 if (command_.hasStatsAck())
9032 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
9034 else if (command_.isSow())
9038 stream.setAcksOnly(commandID,
9043 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
9046 else if (command_.isSubscribe())
9048 stream.setSubscription(commandID,
9055 if (command == Message::Command::Publish ||
9056 command == Message::Command::DeltaPublish ||
9057 command == Message::Command::SOWDelete)
9059 stream.setAcksOnly(commandID,
9060 ackTypes & (
unsigned)~Message::AckType::Persisted);
9064 stream.setAcksOnly(commandID, ackTypes);
9071 inline void Message::ack(
const char* options_)
const 9073 ClientImpl* pClient = _body.get().clientImpl();
9075 if (pClient && bookmark.
len() &&
9076 !pClient->getAutoAck())
9079 pClient->ack(getTopic(), bookmark, options_);
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:748
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:699
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1538
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5184
AMPSDLL amps_result amps_client_set_http_preflight_callback(amps_handle client, amps_http_preflight_callback callback, void *userData)
Sets a user-supplied callback function for when a connection is established and the provided uri incl...
Field getUserId() const
Retrieves the value of the UserId header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1515
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6913
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6887
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:550
std::string getAckType() const
Definition: ampsplusplus.hpp:953
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Message & assignOwnershipBookmark(const Field &f)
Assigns the value of the Bookmark header for this Message without copying and makes this Message resp...
Definition: Message.hpp:1256
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5445
Message & assignTopic(const std::string &v)
Assigns the value of the Topic header for this Message without copying.
Definition: Message.hpp:1511
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8330
void startTimer()
Definition: ampsplusplus.hpp:6876
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6412
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8958
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1484
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5473
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1366
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1479
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:560
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1290
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:759
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7185
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:931
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7553
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5245
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5333
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6201
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1477
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:673
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7562
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7421
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6152
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5582
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1346
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5847
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5348
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1368
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:725
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:870
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5598
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:587
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7276
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:853
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7505
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5319
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6940
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:568
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1245
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1317
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8367
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7573
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5734
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5086
void addHttpPreflightHeader(const std::string &key_, const std::string &value_)
Adds a given key/value pair as an HTTP header line as "key: value" to the end of the headers that wil...
Definition: ampsplusplus.hpp:5280
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a Field which references the underlyi...
Definition: Message.hpp:1371
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:660
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7204
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:909
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7214
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5404
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1486
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6124
Field getFilter() const
Retrieves the value of the Filter header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1368
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1160
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7535
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8968
Message & assignUserId(const std::string &v)
Assigns the value of the UserId header for this Message without copying.
Definition: Message.hpp:1515
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5379
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5434
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1461
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6606
Success.
Definition: amps.h:221
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1350
Field getOptions() const
Retrieves the value of the Options header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1378
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1043
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5824
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8412
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6248
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:609
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5417
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:5079
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5635
Field getAckType() const
Retrieves the value of the AckType header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1192
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1257
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8982
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:656
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1511
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5590
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7587
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:958
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5308
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1505
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:829
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5896
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1250
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5237
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1105
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6560
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5167
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:7160
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:718
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7195
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1364
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7268
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:583
void clearHttpPreflightHeaders()
Clears all previously set HTTP header lines.
Definition: ampsplusplus.hpp:5286
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7283
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1484
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5683
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:666
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1515
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a Field which references the underlying buf...
Definition: Message.hpp:1511
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5546
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5959
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:575
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1374
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a Field which references the underlying...
Definition: Message.hpp:1364
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:7068
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1479
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1060
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1501
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1489
virtual void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Definition: ampsplusplus.hpp:5494
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6981
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:797
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1479
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Assigns the value of the Expiration header for this Message without copying.
Definition: Message.hpp:1367
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1271
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1055
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6064
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7224
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:592
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:810
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5915
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1365
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1486
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1037
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5256
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1257
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8340
void addHttpPreflightHeader(const std::string &header_)
Adds a given HTTP header line to the end of the headers that will be sent for the HTTP GET Upgrade re...
Definition: ampsplusplus.hpp:5271
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7433
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1050
Message & assignSubscriptionId(const std::string &v)
Assigns the value of the SubscriptionId header for this Message without copying.
Definition: Message.hpp:1489
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1091
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6450
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1191
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8349
Message & assignCommand(const std::string &v)
Assigns the value of the Command header for this Message without copying.
Definition: Message.hpp:1257
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1331
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5538
void setHttpPreflightHeaders(const T &headers_)
Sets the given HTTP header lines to be sent for the HTTP GET Upgrade request.
Definition: ampsplusplus.hpp:5295
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1288
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7494
Field getPassword() const
Retrieves the value of the Password header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1478
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1513
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1381
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1489
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1487
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5530
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1367
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:705
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:7061
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:600
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5558
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:7127
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8360
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7484
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5801
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6537
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:889
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5041
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6783
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1280
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:837
Message & assignAckType(const std::string &v)
Assigns the value of the AckType header for this Message without copying.
Definition: Message.hpp:1192
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5780
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7313
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:625
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6742
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5934
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6213
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7476
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6082
Message & assignCorrelationId(const std::string &v)
Assigns the value of the CorrelationId header for this Message without copying.
Definition: Message.hpp:1366
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1239
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6706
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:873
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:7045
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1363
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5613
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:738
DisconnectHandler getDisconnectHandler(void) const
Definition: ampsplusplus.hpp:5508
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8404
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1192
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5517
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1406
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7409
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1439
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8416
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:686
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1223
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8423
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6639
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7118
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:6017
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:5033
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1355
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:803
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7246
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:731
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message sowDelete(const std::string &topic_, const std::string &filter_, long timeout_=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6845
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1364
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:816
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7257
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1479
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:682
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1309
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8963
Message & assignVersion(const std::string &v)
Assigns the value of the Version header for this Message without copying.
Definition: Message.hpp:1514
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5386
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6286
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8972
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1479
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5263
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7094
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:857
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5355
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:895
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1259
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7235
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1256
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:712
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1222
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:7032
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8377
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5627
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6661
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7517
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:642
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1478
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7466
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5658
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5222
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:1006
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1301
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1411
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:782
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6307
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5707
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:7134
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5873
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:7002
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6046
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8953
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6348
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5229
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:472
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6498
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1255
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5985
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6173
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:612
Message & assignSequence(const std::string &v)
Assigns the value of the Sequence header for this Message without copying.
Definition: Message.hpp:1484
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7347
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:770
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6822
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5097
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7544
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7457
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6380