25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 28 #include "amps/ampsver.h" 46 #include <sys/atomic.h> 48 #include "amps/BookmarkStore.hpp" 49 #include "amps/MessageRouter.hpp" 50 #include "amps/util.hpp" 51 #include "amps/ampscrc.hpp" 52 #if __cplusplus >= 201100L || _MSC_VER >= 1900 56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 57 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 91 #define AMPS_DEFAULT_TOP_N -1 92 #define AMPS_DEFAULT_BATCH_SIZE 10 93 #define AMPS_NUMBER_BUFFER_LEN 20 94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 105 typedef std::map<std::string, std::string> ConnectionInfo;
108 inline std::string asString(Type x_)
110 std::ostringstream os;
116 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
118 size_t pos = AMPS_NUMBER_BUFFER_LEN;
119 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
123 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
132 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
134 size_t pos = AMPS_NUMBER_BUFFER_LEN;
135 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
139 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
153 static const char* duplicate()
157 static const char* badFilter()
161 static const char* badRegexTopic()
163 return "bad regex topic";
165 static const char* subscriptionAlreadyExists()
167 return "subscription already exists";
169 static const char* nameInUse()
171 return "name in use";
173 static const char* authFailure()
175 return "auth failure";
177 static const char* notEntitled()
179 return "not entitled";
181 static const char* authDisabled()
183 return "authentication disabled";
185 static const char* subidInUse()
187 return "subid in use";
189 static const char* noTopic()
207 virtual void exceptionThrown(
const std::exception&)
const {;}
213 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 218 catch (std::exception& ex_)\ 222 _exceptionListener->exceptionThrown(ex_);\ 247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 250 while(me->_connected)\ 257 catch(MessageStreamFullException&)\ 261 me->checkAndSendHeartbeat(false);\ 263 catch (std::exception& ex_)\ 267 me->_exceptionListener->exceptionThrown(ex_);\ 278 catch (std::exception& ex_)\ 282 me->_exceptionListener->exceptionThrown(ex_);\ 306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 307 while(me->_connected)\ 314 catch(MessageStreamFullException&)\ 318 me->checkAndSendHeartbeat(false);\ 320 catch (std::exception& ex_)\ 324 me->_exceptionListener->exceptionThrown(ex_);\ 335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 338 while(me->_connected)\ 345 catch(MessageStreamFullException& ex_)\ 349 me->checkAndSendHeartbeat(false);\ 351 catch (std::exception& ex_)\ 355 me->_exceptionListener->exceptionThrown(ex_);\ 366 catch (std::exception& ex_)\ 370 me->_exceptionListener->exceptionThrown(ex_);\ 394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 395 while(me->_connected)\ 402 catch(MessageStreamFullException& ex_)\ 406 me->checkAndSendHeartbeat(false);\ 408 catch (std::exception& ex_)\ 412 me->_exceptionListener->exceptionThrown(ex_);\ 424 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 427 _exceptionListener->exceptionThrown(ex);\ 432 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 435 me->_exceptionListener->exceptionThrown(ex);\ 474 static const unsigned Subscribe = 1;
475 static const unsigned SOW = 2;
476 static const unsigned NeedsSequenceNumber = 4;
477 static const unsigned ProcessedAck = 8;
478 static const unsigned StatsAck = 16;
479 void init(Message::Command::Type command_)
488 void init(
const std::string& command_)
497 void init(
const char* command_,
size_t commandLen_)
509 if (!(command & Message::Command::NoDataCommands))
512 if (command == Message::Command::Subscribe ||
513 command == Message::Command::SOWAndSubscribe ||
514 command == Message::Command::DeltaSubscribe ||
515 command == Message::Command::SOWAndDeltaSubscribe)
520 if (command == Message::Command::SOW
521 || command == Message::Command::SOWAndSubscribe
522 || command == Message::Command::SOWAndDeltaSubscribe)
527 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
529 if (command == Message::Command::SOW)
534 _flags |= ProcessedAck;
536 else if (command == Message::Command::SOWDelete)
539 _flags |= ProcessedAck;
540 _flags |= NeedsSequenceNumber;
542 else if (command == Message::Command::Publish
543 || command == Message::Command::DeltaPublish)
545 _flags |= NeedsSequenceNumber;
547 else if (command == Message::Command::StopTimer)
564 Command(
const char* command_,
size_t commandLen_)
566 init(command_, commandLen_);
590 init(command_, commandLen_);
684 _message.
setTopic(topic_, topicLen_);
814 std::ostringstream os;
819 amps_uint64_t getSequence()
const 835 _message.
setData(data_, dataLen_);
865 _batchSize = batchSize_;
887 if (ackType_ ==
"processed")
889 _flags |= ProcessedAck;
891 else if (ackType_ ==
"stats")
901 if (ackType_.find(
"processed") != std::string::npos)
903 _flags |= ProcessedAck;
907 _flags &= ~ProcessedAck;
909 if (ackType_.find(
"stats") != std::string::npos)
923 if (ackType_ & Message::AckType::Processed)
925 _flags |= ProcessedAck;
929 _flags &= ~ProcessedAck;
931 if (ackType_ & Message::AckType::Stats)
956 unsigned getTimeout(
void)
const 960 unsigned getBatchSize(
void)
const 964 bool isSubscribe(
void)
const 966 return _flags & Subscribe;
968 bool isSow(
void)
const 970 return (_flags & SOW) != 0;
972 bool hasProcessedAck(
void)
const 974 return (_flags & ProcessedAck) != 0;
976 bool hasStatsAck(
void)
const 978 return (_flags & StatsAck) != 0;
980 bool needsSequenceNumber(
void)
const 982 return (_flags & NeedsSequenceNumber) != 0;
988 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
1005 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
1013 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
1020 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1032 std::string
authenticate(
const std::string& ,
const std::string& password_)
1039 std::string
retry(
const std::string& ,
const std::string& )
1041 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1044 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1065 virtual void execute(
Message& message_) = 0;
1080 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1095 : _resizeHandler(NULL)
1096 , _resizeHandlerData(NULL)
1097 , _errorOnPublishGap(errorOnPublishGap_)
1104 virtual amps_uint64_t store(
const Message& message_) = 0;
1112 virtual void discardUpTo(amps_uint64_t index_) = 0;
1127 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1133 virtual size_t unpersistedCount()
const = 0;
1145 virtual void flush(
long timeout_) = 0;
1151 return AMPS_UNSET_INDEX;
1158 return AMPS_UNSET_SEQUENCE;
1164 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
1169 virtual amps_uint64_t getLastPersisted() = 0;
1183 _resizeHandler = handler_;
1184 _resizeHandlerData = userData_;
1189 return _resizeHandler;
1192 bool callResizeHandler(
size_t newSize_);
1194 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1196 _errorOnPublishGap = errorOnPublishGap_;
1199 inline virtual bool getErrorOnPublishGap()
const 1201 return _errorOnPublishGap;
1206 void* _resizeHandlerData;
1207 bool _errorOnPublishGap;
1214 RefHandle<StoreImpl> _body;
1218 Store(
const Store& rhs) : _body(rhs._body) {;}
1230 return _body.get().store(message_);
1241 _body.get().discardUpTo(index_);
1250 _body.get().replay(replayer_);
1262 return _body.get().replaySingle(replayer_, index_);
1271 return _body.get().unpersistedCount();
1279 return _body.isValid();
1292 return _body.get().flush(timeout_);
1300 return _body.get().getLowestUnpersisted();
1308 return _body.get().getLastPersisted();
1323 _body.get().setResizeHandler(handler_, userData_);
1328 return _body.get().getResizeHandler();
1337 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1346 return _body.get().getErrorOnPublishGap();
1354 if (_body.isValid())
1356 return &_body.get();
1380 virtual void failedWrite(
const Message& message_,
1381 const char* reason_,
size_t reasonLength_) = 0;
1385 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1389 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1403 long* timeoutp = (
long*)data_;
1411 store_.
flush(*timeoutp);
1414 catch (
const TimedOutException&)
1416 catch (
const TimedOutException& e)
1443 unsigned requestedAckTypes_,
1444 const AMPSException& exception_) = 0;
1462 unsigned requestedAckTypes_) = 0;
1469 virtual void clear() = 0;
1473 virtual void resubscribe(Client& client_) = 0;
1480 _failedResubscribeHandler = handler_;
1483 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1494 typedef enum { Disconnected = 0,
1498 PublishReplayed = 8,
1499 HeartbeatInitiated = 16,
1513 virtual void connectionStateChanged(
State newState_) = 0;
1518 class MessageStreamImpl;
1521 typedef void(*DeferredExecutionFunc)(
void*);
1523 class ClientImpl :
public RefBody
1529 AMPS_SOCKET _socket;
1535 socklen_t _valueLen;
1539 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(
sizeof(
int))
1541 _valuePtr = (
char*)&_noDelay;
1543 if (_socket != AMPS_INVALID_SOCKET)
1545 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1549 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1553 _socket = AMPS_INVALID_SOCKET;
1560 if (_socket != AMPS_INVALID_SOCKET)
1563 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1568 friend class Client;
1571 DisconnectHandler _disconnectHandler;
1572 enum GlobalCommandTypeHandlers :
size_t 1582 DuplicateMessage = 8,
1585 std::vector<MessageHandler> _globalCommandTypeHandlers;
1586 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1588 MessageRouter::RouteCache _routeCache;
1589 mutable Mutex _lock;
1590 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1591 amps_uint64_t _nameHashValue;
1593 Store _publishStore;
1594 bool _isRetryOnDisconnect;
1595 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1596 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1597 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1599 volatile amps_uint64_t _lastSentHaSequenceNumber;
1601 AMPS_ATOMIC_TYPE_8 _logonInProgress;
1602 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1603 VersionInfo _serverVersion;
1604 Timer _heartbeatTimer;
1605 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1608 int _queueAckTimeout;
1609 bool _isAutoAckEnabled;
1610 unsigned _ackBatchSize;
1611 unsigned _queuedAckCount;
1612 unsigned _defaultMaxDepth;
1613 struct QueueBookmarks
1615 QueueBookmarks(
const std::string& topic_)
1622 amps_uint64_t _oldestTime;
1623 unsigned _bookmarkCount;
1625 typedef amps_uint64_t topic_hash;
1626 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1627 TopicHashMap _topicHashMap;
1631 ClientImpl* _client;
1636 ClientStoreReplayer()
1637 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1640 ClientStoreReplayer(ClientImpl* client_)
1641 : _client(client_), _version(0), _res(
AMPS_E_OK)
1644 void setClient(ClientImpl* client_)
1649 void execute(
Message& message_)
1653 throw CommandException(
"Can't replay without a client.");
1657 if (index > _client->_lastSentHaSequenceNumber)
1659 _client->_lastSentHaSequenceNumber = index;
1667 (!_client->_logonInProgress ||
1671 message_.getMessage(),
1675 throw DisconnectedException(
"AMPS Server disconnected during replay");
1681 ClientStoreReplayer _replayer;
1685 ClientImpl* _parent;
1686 const char* _reason;
1687 size_t _reasonLength;
1688 size_t _replayCount;
1690 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1693 _reasonLength(reasonLength_),
1696 void execute(
Message& message_)
1698 if (_parent->_failedWriteHandler)
1701 _parent->_failedWriteHandler->failedWrite(message_,
1702 _reason, _reasonLength);
1705 size_t replayCount(
void)
const 1707 return _replayCount;
1711 struct AckResponseImpl :
public RefBody
1713 std::string username, password, reason, status, bookmark, options;
1714 amps_uint64_t sequenceNo;
1715 amps_uint64_t nameHashValue;
1716 VersionInfo serverVersion;
1717 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1718 std::atomic<bool> responded;
1719 std::atomic<bool> abandoned;
1721 volatile bool responded;
1722 volatile bool abandoned;
1724 unsigned connectionVersion;
1727 sequenceNo((amps_uint64_t)0),
1731 connectionVersion(0)
1738 RefHandle<AckResponseImpl> _body;
1740 AckResponse() : _body(NULL) {;}
1741 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1742 static AckResponse create()
1745 r._body =
new AckResponseImpl();
1749 const std::string& username()
1751 return _body.get().username;
1753 void setUsername(
const char* data_,
size_t len_)
1757 _body.get().username.assign(data_, len_);
1761 _body.get().username.clear();
1764 const std::string& password()
1766 return _body.get().password;
1768 void setPassword(
const char* data_,
size_t len_)
1772 _body.get().password.assign(data_, len_);
1776 _body.get().password.clear();
1779 const std::string& reason()
1781 return _body.get().reason;
1783 void setReason(
const char* data_,
size_t len_)
1787 _body.get().reason.assign(data_, len_);
1791 _body.get().reason.clear();
1794 const std::string& status()
1796 return _body.get().status;
1798 void setStatus(
const char* data_,
size_t len_)
1802 _body.get().status.assign(data_, len_);
1806 _body.get().status.clear();
1809 const std::string& bookmark()
1811 return _body.get().bookmark;
1813 void setBookmark(
const Field& bookmark_)
1815 if (!bookmark_.
empty())
1817 _body.get().bookmark.assign(bookmark_.
data(), bookmark_.
len());
1818 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1819 _body.get().sequenceNo);
1823 _body.get().bookmark.clear();
1824 _body.get().sequenceNo = (amps_uint64_t)0;
1825 _body.get().nameHashValue = (amps_uint64_t)0;
1828 amps_uint64_t sequenceNo()
const 1830 return _body.get().sequenceNo;
1832 amps_uint64_t nameHashValue()
const 1834 return _body.get().nameHashValue;
1836 void setSequenceNo(
const char* data_,
size_t len_)
1838 amps_uint64_t result = (amps_uint64_t)0;
1841 for (
size_t i = 0; i < len_; ++i)
1843 result *= (amps_uint64_t)10;
1844 result += (amps_uint64_t)(data_[i] -
'0');
1847 _body.get().sequenceNo = result;
1849 VersionInfo serverVersion()
const 1851 return _body.get().serverVersion;
1853 void setServerVersion(
const char* data_,
size_t len_)
1857 _body.get().serverVersion.setVersion(std::string(data_, len_));
1862 return _body.get().responded;
1866 _body.get().responded =
true;
1870 return _body.get().abandoned;
1874 if (_body.isValid())
1876 _body.get().abandoned =
true;
1880 void setConnectionVersion(
unsigned connectionVersion)
1882 _body.get().connectionVersion = connectionVersion;
1885 unsigned getConnectionVersion()
1887 return _body.get().connectionVersion;
1889 void setOptions(
const char* data_,
size_t len_)
1893 _body.get().options.assign(data_, len_);
1897 _body.get().options.clear();
1901 const std::string& options()
1903 return _body.get().options;
1906 AckResponse& operator=(
const AckResponse& rhs)
1914 typedef std::map<std::string, AckResponse> AckMap;
1917 DefaultExceptionListener _defaultExceptionListener;
1920 struct DeferredExecutionRequest
1922 DeferredExecutionRequest(DeferredExecutionFunc func_,
1925 _userData(userData_)
1928 DeferredExecutionFunc _func;
1932 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1933 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1934 volatile bool _connected;
1935 std::string _username;
1936 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1937 ConnectionStateListeners _connectionStateListeners;
1938 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1939 Mutex _deferredExecutionLock;
1940 DeferredExecutionList _deferredExecutionList;
1941 unsigned _heartbeatInterval;
1942 unsigned _readTimeout;
1950 if (!_connected && newState_ > ConnectionStateListener::Connected)
1954 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1956 AMPS_CALL_EXCEPTION_WRAPPER(
1957 (*it)->connectionStateChanged(newState_));
1960 unsigned processedAck(
Message& message);
1961 unsigned persistedAck(
Message& meesage);
1962 void lastChance(
Message& message);
1963 void checkAndSendHeartbeat(
bool force =
false);
1964 virtual ConnectionInfo getConnectionInfo()
const;
1966 ClientImplMessageHandler(
amps_handle message,
void* userData);
1968 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1970 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1972 void unsubscribeInternal(
const std::string&
id)
1980 subId.assign(
id.data(),
id.length());
1981 _routes.removeRoute(subId);
1983 if (_subscriptionManager)
1986 Unlock<Mutex> unlock(_lock);
1987 _subscriptionManager->unsubscribe(subId);
1993 _sendWithoutRetry(_message);
1994 deferredExecution(&s_noOpFn, NULL);
1997 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1998 bool isHASubscribe_)
2000 return syncAckProcessing(timeout_, message_,
2001 (amps_uint64_t)0, isHASubscribe_);
2004 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
2005 amps_uint64_t haSeq = (amps_uint64_t)0,
2006 bool isHASubscribe_ =
false)
2009 AckResponse ack = AckResponse::create();
2012 Lock<Mutex> guard(_ackMapLock);
2015 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
2016 if (ack.getConnectionVersion() == 0)
2019 throw DisconnectedException(
"Connection closed while waiting for response.");
2021 bool timedOut =
false;
2022 AMPS_START_TIMER(timeout_)
2023 while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
2027 timedOut = !_lock.wait(timeout_);
2031 AMPS_RESET_TIMER(timedOut, timeout_);
2038 Unlock<Mutex> unlck(_lock);
2039 amps_invoke_waiting_function();
2042 if (ack.responded())
2044 if (ack.status() !=
"failure")
2048 amps_uint64_t ackSequence = ack.sequenceNo();
2049 if (_lastSentHaSequenceNumber < ackSequence)
2051 _lastSentHaSequenceNumber = ackSequence;
2064 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2065 _nameHashValue = ack.nameHashValue();
2066 _serverVersion = ack.serverVersion();
2067 if (_bookmarkStore.isValid())
2074 const std::string& options = ack.options();
2075 size_t index = options.find_first_of(
"max_backlog=");
2076 if (index != std::string::npos)
2079 const char* c = options.c_str() + index + 12;
2080 while (*c && *c !=
',')
2082 data = (data * 10) + (
unsigned)(*c++ -48);
2084 if (_ackBatchSize > data)
2086 _ackBatchSize = data;
2092 const size_t NotEntitled = 12;
2093 std::string ackReason = ack.reason();
2094 if (ackReason.length() == 0)
2098 if (ackReason.length() == NotEntitled &&
2099 ackReason[0] ==
'n' &&
2104 message_.throwFor(_client, ackReason);
2108 if (!ack.abandoned())
2110 throw TimedOutException(
"timed out waiting for operation.");
2114 throw DisconnectedException(
"Connection closed while waiting for response.");
2128 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2129 _pEmptyMessageStream.reset(NULL);
2136 ClientImpl(
const std::string& clientName)
2137 : _client(NULL), _name(clientName)
2138 , _isRetryOnDisconnect(
true)
2139 , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2140 , _badTimeToHASubscribe(0), _serverVersion()
2141 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2142 , _isAutoAckEnabled(
false)
2144 , _queuedAckCount(0)
2145 , _defaultMaxDepth(0)
2147 , _heartbeatInterval(0)
2150 _replayer.setClient(
this);
2153 (amps_handler)ClientImpl::ClientImplMessageHandler,
2156 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2159 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2161 _exceptionListener = &_defaultExceptionListener;
2162 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2164 #ifdef AMPS_USE_EMPLACE 2172 virtual ~ClientImpl()
2177 const std::string& getName()
const 2182 const std::string& getNameHash()
const 2187 const amps_uint64_t getNameHashValue()
const 2189 return _nameHashValue;
2192 void setName(
const std::string& name)
2199 AMPSException::throwFor(_client, result);
2204 const std::string& getLogonCorrelationData()
const 2206 return _logonCorrelationData;
2209 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2211 _logonCorrelationData = logonCorrelationData_;
2214 size_t getServerVersion()
const 2216 return _serverVersion.getOldStyleVersion();
2219 VersionInfo getServerVersionInfo()
const 2221 return _serverVersion;
2224 const std::string& getURI()
const 2229 virtual void connect(
const std::string& uri)
2231 Lock<Mutex> l(_lock);
2235 virtual void _connect(
const std::string& uri)
2241 AMPSException::throwFor(_client, result);
2248 _readMessage.setClientImpl(
this);
2249 if (_queueAckTimeout)
2254 AMPSException::throwFor(_client, result);
2258 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2261 void setDisconnected()
2264 Lock<Mutex> l(_lock);
2267 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2270 _heartbeatTimer.setTimeout(0.0);
2277 virtual void disconnect()
2279 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2281 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2282 Lock<Mutex> l(_lock);
2283 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2286 void clearAcks(
unsigned failedVersion)
2289 Lock<Mutex> guard(_ackMapLock);
2292 std::vector<std::string> worklist;
2293 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2295 if (i->second.getConnectionVersion() <= failedVersion)
2297 i->second.setAbandoned();
2298 worklist.push_back(i->first);
2302 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2311 int send(
const Message& message)
2313 Lock<Mutex> l(_lock);
2314 return _send(message);
2317 void sendWithoutRetry(
const Message& message_)
2319 Lock<Mutex> l(_lock);
2322 if (_logonInProgress)
2324 throw DisconnectedException(
"The client has been disconnected.");
2326 _sendWithoutRetry(message_);
2329 void _sendWithoutRetry(
const Message& message_)
2334 AMPSException::throwFor(_client, result);
2338 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2339 bool isHASubscribe_ =
false)
2346 Message localMessage = message;
2347 unsigned version = 0;
2351 if (haSeq && _logonInProgress)
2355 if (!_isRetryOnDisconnect)
2359 if (!_lock.wait(1000))
2361 amps_invoke_waiting_function();
2366 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2367 (isHASubscribe_ && _badTimeToHASubscribe))
2369 return (
int)version;
2373 if (haSeq > _lastSentHaSequenceNumber)
2375 while (haSeq > _lastSentHaSequenceNumber + 1)
2381 _lastSentHaSequenceNumber + 1))
2387 version = _replayer._version;
2390 catch (
const DisconnectedException&)
2392 catch (
const DisconnectedException& e)
2395 result = _replayer._res;
2400 localMessage.getMessage(),
2402 ++_lastSentHaSequenceNumber;
2406 if (_logonInProgress && localMessage.
getCommand().
data()[0] !=
'l')
2408 while (_logonInProgress)
2410 if (!_lock.wait(1000))
2412 amps_invoke_waiting_function();
2417 localMessage.getMessage(),
2422 if (!isHASubscribe_ && !haSeq &&
2423 localMessage.getMessage() == message.getMessage())
2427 if (_isRetryOnDisconnect)
2429 Unlock<Mutex> u(_lock);
2434 if ((isHASubscribe_ || haSeq) &&
2437 return (
int)version;
2444 AMPSException::throwFor(_client, result);
2450 amps_invoke_waiting_function();
2456 AMPSException::throwFor(_client, result);
2458 return (
int)version;
2461 void addMessageHandler(
const Field& commandId_,
2463 unsigned requestedAcks_, Message::Command::Type commandType_)
2465 Lock<Mutex> lock(_lock);
2466 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2470 bool removeMessageHandler(
const Field& commandId_)
2472 Lock<Mutex> lock(_lock);
2473 return _routes.removeRoute(commandId_);
2481 bool isSubscribeOnly =
false;
2482 bool replace =
false;
2484 unsigned systemAddedAcks = Message::AckType::None;
2487 switch (commandType)
2489 case Message::Command::Subscribe:
2490 case Message::Command::DeltaSubscribe:
2491 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2492 isSubscribeOnly =
true;
2494 case Message::Command::SOWAndSubscribe:
2495 case Message::Command::SOWAndDeltaSubscribe:
2502 while (!replace &&
id != subId && _routes.hasRoute(
id))
2514 systemAddedAcks |= Message::AckType::Persisted;
2517 case Message::Command::SOW:
2524 while (!replace &&
id != subId && _routes.hasRoute(
id))
2535 if (!isSubscribeOnly)
2544 while (!replace && qid != subId && qid !=
id 2545 && _routes.hasRoute(qid))
2551 systemAddedAcks |= Message::AckType::Processed;
2554 int routesAdded = 0;
2555 Lock<Mutex> l(_lock);
2556 if (!subId.
empty() && messageHandler_.isValid())
2558 if (!_routes.hasRoute(subId))
2564 _routes.addRoute(subId, messageHandler_, requestedAcks,
2565 systemAddedAcks, commandType);
2567 if (!isSubscribeOnly && !qid.
empty()
2568 && messageHandler_.isValid() && qid != subId)
2570 if (routesAdded == 0)
2572 _routes.addRoute(qid, messageHandler_,
2573 requestedAcks, systemAddedAcks, commandType);
2579 Unlock<Mutex> u(_lock);
2580 data = amps_invoke_copy_route_function(
2581 messageHandler_.userData());
2585 _routes.addRoute(qid, messageHandler_, requestedAcks,
2586 systemAddedAcks, commandType);
2590 _routes.addRoute(qid,
2593 requestedAcks, systemAddedAcks, commandType);
2598 if (!
id.empty() && messageHandler_.isValid()
2599 && requestedAcks & ~
Message::AckType::Persisted
2600 &&
id != subId &&
id != qid)
2602 if (routesAdded == 0)
2604 _routes.addRoute(
id, messageHandler_, requestedAcks,
2605 systemAddedAcks, commandType);
2611 Unlock<Mutex> u(_lock);
2612 data = amps_invoke_copy_route_function(
2613 messageHandler_.userData());
2617 _routes.addRoute(
id, messageHandler_, requestedAcks,
2618 systemAddedAcks, commandType);
2622 _routes.addRoute(
id,
2626 systemAddedAcks, commandType);
2635 syncAckProcessing(timeout_, message_, 0,
false);
2642 _routes.removeRoute(
id);
2649 case Message::Command::Unsubscribe:
2650 case Message::Command::Heartbeat:
2651 case Message::Command::Logon:
2652 case Message::Command::StartTimer:
2653 case Message::Command::StopTimer:
2654 case Message::Command::SOWDelete:
2656 Lock<Mutex> l(_lock);
2665 if (messageHandler_.isValid())
2667 _routes.addRoute(
id, messageHandler_, requestedAcks,
2668 Message::AckType::None, commandType);
2674 case Message::Command::DeltaPublish:
2675 case Message::Command::Publish:
2678 Lock<Mutex> l(_lock);
2681 if (ackType != Message::AckType::None
2689 if (messageHandler_.isValid())
2691 _routes.addRoute(
id, messageHandler_, requestedAcks,
2692 Message::AckType::None, commandType);
2698 syncAckProcessing(timeout_, message_, 0,
false);
2707 case Message::Command::GroupBegin:
2708 case Message::Command::GroupEnd:
2709 case Message::Command::OOF:
2710 case Message::Command::Ack:
2711 case Message::Command::Unknown:
2713 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2719 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2721 Lock<Mutex> l(_lock);
2722 _disconnectHandler = disconnectHandler;
2725 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2727 switch (command_[0])
2729 #if 0 // Not currently implemented to avoid an extra branch in delivery 2731 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2734 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2738 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2740 #if 0 // Not currently implemented to avoid an extra branch in delivery 2742 if (command_[6] ==
'b')
2744 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2746 else if (command_[6] ==
'e')
2748 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2752 std::ostringstream os;
2753 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2754 throw CommandException(os.str());
2758 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2762 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2766 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2770 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2773 std::ostringstream os;
2774 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2775 throw CommandException(os.str());
2780 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2784 #if 0 // Not currently implemented to avoid an extra branch in delivery 2785 case Message::Command::Publish:
2786 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2788 case Message::Command::SOW:
2789 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2792 case Message::Command::Heartbeat:
2793 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2795 #if 0 // Not currently implemented to avoid an extra branch in delivery 2796 case Message::Command::GroupBegin:
2797 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2799 case Message::Command::GroupEnd:
2800 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2802 case Message::Command::OOF:
2803 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2806 case Message::Command::Ack:
2807 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2811 unsigned command = command_;
2818 AMPS_snprintf(errBuf,
sizeof(errBuf),
2819 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2820 CommandConstants<0>::Lengths[bits],
2821 CommandConstants<0>::Values[bits]);
2822 throw CommandException(errBuf);
2827 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2829 _globalCommandTypeHandlers[handlerType_] = handler_;
2834 Lock<Mutex> l(_lock);
2835 _failedWriteHandler.reset(handler_);
2838 void setPublishStore(
const Store& publishStore_)
2840 Lock<Mutex> l(_lock);
2843 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2845 _publishStore = publishStore_;
2850 Lock<Mutex> l(_lock);
2853 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2855 _bookmarkStore = bookmarkStore_;
2860 Lock<Mutex> l(_lock);
2861 _subscriptionManager.reset(subscriptionManager_);
2869 DisconnectHandler getDisconnectHandler()
const 2871 return _disconnectHandler;
2876 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2884 Store getPublishStore()
const 2886 return _publishStore;
2891 return _bookmarkStore;
2894 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2898 Lock<Mutex> l(_lock);
2900 _publishMessage.assignData(data_, dataLen_);
2901 _send(_publishMessage);
2906 publishStoreMessage.reset();
2908 return _publish(topic_, topicLen_, data_, dataLen_);
2912 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2913 size_t dataLen_,
unsigned long expiration_)
2917 Lock<Mutex> l(_lock);
2919 _publishMessage.assignData(data_, dataLen_);
2920 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2921 size_t pos = convertToCharArray(exprBuf, expiration_);
2922 _publishMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2923 _send(_publishMessage);
2929 publishStoreMessage.reset();
2930 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2931 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2934 AMPS_NUMBER_BUFFER_LEN - exprPos);
2935 return _publish(topic_, topicLen_, data_, dataLen_);
2942 ClientImpl* _pClient;
2944 #if __cplusplus >= 201100L || _MSC_VER >= 1900 2945 std::atomic<bool> _acked;
2946 std::atomic<bool> _disconnected;
2948 volatile bool _acked;
2949 volatile bool _disconnected;
2952 FlushAckHandler(ClientImpl* pClient_)
2953 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2955 pClient_->addConnectionStateListener(
this);
2959 _pClient->removeConnectionStateListener(
this);
2960 _pClient->removeMessageHandler(_cmdId);
2963 void setCommandId(
const Field& cmdId_)
2971 void connectionStateChanged(
State state_)
2973 if (state_ <= Shutdown)
2975 _disconnected =
true;
2984 return _acked || _disconnected;
2988 void publishFlush(
long timeout_,
unsigned ackType_)
2990 static const char* processed =
"processed";
2991 static const size_t processedLen = strlen(processed);
2992 static const char* persisted =
"persisted";
2993 static const size_t persistedLen = strlen(persisted);
2994 static const char* flush =
"flush";
2995 static const size_t flushLen = strlen(flush);
2996 static VersionInfo minPersisted(
"5.3.3.0");
2997 static VersionInfo minFlush(
"4");
2998 if (ackType_ != Message::AckType::Processed
2999 && ackType_ != Message::AckType::Persisted)
3001 throw CommandException(
"Flush can only be used with processed or persisted acks.");
3003 FlushAckHandler flushHandler(
this);
3004 if (_serverVersion >= minFlush)
3006 Lock<Mutex> l(_lock);
3009 throw DisconnectedException(
"Not connected trying to flush");
3014 if (_serverVersion < minPersisted
3015 || ackType_ == Message::AckType::Processed)
3025 std::bind(&FlushAckHandler::invoke,
3026 std::ref(flushHandler),
3027 std::placeholders::_1),
3029 NoDelay noDelay(_client);
3030 if (_send(_message) == -1)
3032 throw DisconnectedException(
"Disconnected trying to flush");
3039 _publishStore.
flush(timeout_);
3041 catch (
const AMPSException& ex)
3043 AMPS_UNHANDLED_EXCEPTION(ex);
3047 else if (_serverVersion < minFlush)
3051 AMPS_USLEEP(timeout_ * 1000);
3055 AMPS_USLEEP(1000 * 1000);
3061 Timer timer((
double)timeout_);
3063 while (!timer.check() && !flushHandler.done())
3066 amps_invoke_waiting_function();
3071 while (!flushHandler.done())
3074 amps_invoke_waiting_function();
3078 if (!flushHandler.done())
3080 throw TimedOutException(
"Timed out waiting for flush");
3083 if (!flushHandler.acked() && !_publishStore.
isValid())
3085 throw DisconnectedException(
"Disconnected waiting for flush");
3089 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3090 const char* data_,
size_t dataLength_)
3094 Lock<Mutex> l(_lock);
3096 _deltaMessage.assignData(data_, dataLength_);
3097 _send(_deltaMessage);
3102 publishStoreMessage.reset();
3103 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish);
3104 return _publish(topic_, topicLength_, data_, dataLength_);
3108 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3109 const char* data_,
size_t dataLength_,
3110 unsigned long expiration_)
3114 Lock<Mutex> l(_lock);
3116 _deltaMessage.assignData(data_, dataLength_);
3117 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3118 size_t pos = convertToCharArray(exprBuf, expiration_);
3119 _deltaMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3120 _send(_deltaMessage);
3126 publishStoreMessage.reset();
3127 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3128 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3129 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish)
3131 AMPS_NUMBER_BUFFER_LEN - exprPos);
3132 return _publish(topic_, topicLength_, data_, dataLength_);
3136 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3137 const char* data_,
size_t dataLength_)
3139 publishStoreMessage.
assignTopic(topic_, topicLength_)
3141 .assignData(data_, dataLength_);
3142 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3143 char buf[AMPS_NUMBER_BUFFER_LEN];
3144 size_t pos = convertToCharArray(buf, haSequenceNumber);
3145 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3147 Lock<Mutex> l(_lock);
3148 _send(publishStoreMessage, haSequenceNumber);
3150 return haSequenceNumber;
3153 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
3154 const char* options_ = NULL)
3156 Lock<Mutex> l(_lock);
3157 return _logon(timeout_, authenticator_, options_);
3160 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
3161 const char* options_ = NULL)
3168 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 3170 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3173 if (uri.user().size())
3177 if (uri.password().size())
3181 if (uri.protocol() ==
"amps" && uri.messageType().size())
3185 if (uri.isTrue(
"pretty"))
3191 if (!_logonCorrelationData.empty())
3202 AtomicFlagFlip pubFlip(&_logonInProgress);
3203 NoDelay noDelay(_client);
3207 AckResponse ack = syncAckProcessing(timeout_, _message);
3208 if (ack.status() ==
"retry")
3210 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
3211 _username = ack.username();
3216 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
3220 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3227 catch (
const AMPSException& ex)
3230 AMPS_UNHANDLED_EXCEPTION(ex);
3243 _publishStore.
replay(_replayer);
3244 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3246 catch (
const PublishStoreGapException& ex)
3249 AMPS_UNHANDLED_EXCEPTION(ex);
3252 catch (
const StoreException& ex)
3255 std::ostringstream os;
3256 os <<
"A local store exception occurred while logging on." 3258 throw ConnectionException(os.str());
3260 catch (
const AMPSException& ex)
3263 AMPS_UNHANDLED_EXCEPTION(ex);
3266 catch (
const std::exception& ex)
3269 AMPS_UNHANDLED_EXCEPTION(ex);
3279 return newCommandId;
3283 const std::string& topic_,
3285 const std::string& filter_,
3286 const std::string& bookmark_,
3287 const std::string& options_,
3288 const std::string& subId_,
3289 bool isHASubscribe_ =
true)
3291 isHASubscribe_ &= (bool)_subscriptionManager;
3292 Lock<Mutex> l(_lock);
3296 std::string subId(subId_);
3299 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3301 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3311 unsigned ackTypes = Message::AckType::Processed;
3313 if (!bookmark_.empty() && _bookmarkStore.isValid())
3315 ackTypes |= Message::AckType::Persisted;
3319 if (filter_.length())
3323 if (bookmark_.length())
3333 if (_bookmarkStore.isValid())
3338 _bookmarkStore.
log(_message);
3339 _bookmarkStore.
discard(_message);
3345 if (options_.length())
3354 Unlock<Mutex> u(_lock);
3355 _subscriptionManager->subscribe(messageHandler_, message,
3356 Message::AckType::None);
3357 if (_badTimeToHASubscribe)
3368 if (!options_.empty())
3374 syncAckProcessing(timeout_, message, isHASubscribe_);
3376 catch (
const DisconnectedException&)
3378 if (!isHASubscribe_)
3380 _routes.removeRoute(subIdField);
3385 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3389 catch (
const TimedOutException&)
3391 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3399 Unlock<Mutex> unlock(_lock);
3400 _subscriptionManager->unsubscribe(subIdField);
3402 _routes.removeRoute(subIdField);
3408 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3409 const std::string& topic_,
3411 const std::string& filter_,
3412 const std::string& bookmark_,
3413 const std::string& options_,
3414 const std::string& subId_ =
"",
3415 bool isHASubscribe_ =
true)
3417 isHASubscribe_ &= (bool)_subscriptionManager;
3418 Lock<Mutex> l(_lock);
3422 std::string subId(subId_);
3432 unsigned ackTypes = Message::AckType::Processed;
3434 if (!bookmark_.empty() && _bookmarkStore.isValid())
3436 ackTypes |= Message::AckType::Persisted;
3439 if (filter_.length())
3443 if (bookmark_.length())
3453 if (_bookmarkStore.isValid())
3458 _bookmarkStore.
log(_message);
3459 _bookmarkStore.
discard(_message);
3465 if (options_.length())
3473 Unlock<Mutex> u(_lock);
3474 _subscriptionManager->subscribe(messageHandler_, message,
3475 Message::AckType::None);
3476 if (_badTimeToHASubscribe)
3487 if (!options_.empty())
3493 syncAckProcessing(timeout_, message, isHASubscribe_);
3495 catch (
const DisconnectedException&)
3497 if (!isHASubscribe_)
3499 _routes.removeRoute(subIdField);
3503 catch (
const TimedOutException&)
3505 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3513 Unlock<Mutex> unlock(_lock);
3514 _subscriptionManager->unsubscribe(subIdField);
3516 _routes.removeRoute(subIdField);
3522 void unsubscribe(
const std::string&
id)
3524 Lock<Mutex> l(_lock);
3525 unsubscribeInternal(
id);
3528 void unsubscribe(
void)
3530 if (_subscriptionManager)
3532 _subscriptionManager->clear();
3535 _routes.unsubscribeAll();
3536 Lock<Mutex> l(_lock);
3541 _sendWithoutRetry(_message);
3543 deferredExecution(&s_noOpFn, NULL);
3547 const std::string& topic_,
3548 const std::string& filter_ =
"",
3549 const std::string& orderBy_ =
"",
3550 const std::string& bookmark_ =
"",
3551 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3552 int topN_ = AMPS_DEFAULT_TOP_N,
3553 const std::string& options_ =
"",
3554 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3556 Lock<Mutex> l(_lock);
3563 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3566 if (filter_.length())
3570 if (orderBy_.length())
3574 if (bookmark_.length())
3579 if (topN_ != AMPS_DEFAULT_TOP_N)
3583 if (options_.length())
3588 _routes.addRoute(_message.
getQueryID(), messageHandler_,
3593 syncAckProcessing(timeout_, _message);
3597 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3605 const std::string& topic_,
3607 const std::string& filter_ =
"",
3608 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3609 int topN_ = AMPS_DEFAULT_TOP_N)
3612 return sow(messageHandler_,
3623 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3624 const std::string& topic_,
3625 const std::string& filter_ =
"",
3626 const std::string& orderBy_ =
"",
3627 const std::string& bookmark_ =
"",
3628 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3629 int topN_ = AMPS_DEFAULT_TOP_N,
3630 const std::string& options_ =
"",
3631 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3632 bool isHASubscribe_ =
true)
3634 isHASubscribe_ &= (bool)_subscriptionManager;
3635 unsigned ackTypes = Message::AckType::Processed;
3636 Lock<Mutex> l(_lock);
3641 std::string subId = cid;
3643 if (filter_.length())
3647 if (orderBy_.length())
3651 if (bookmark_.length())
3655 if (_bookmarkStore.isValid())
3657 ackTypes |= Message::AckType::Persisted;
3665 _bookmarkStore.
log(_message);
3666 if (!BookmarkRange::isRange(bookmark))
3668 _bookmarkStore.
discard(_message);
3680 if (topN_ != AMPS_DEFAULT_TOP_N)
3684 if (options_.length())
3693 Unlock<Mutex> u(_lock);
3694 _subscriptionManager->subscribe(messageHandler_, message,
3695 Message::AckType::None);
3696 if (_badTimeToHASubscribe)
3701 _routes.addRoute(cid, messageHandler_,
3704 if (!options_.empty())
3710 syncAckProcessing(timeout_, message, isHASubscribe_);
3712 catch (
const DisconnectedException&)
3714 if (!isHASubscribe_)
3716 _routes.removeRoute(subId);
3720 catch (
const TimedOutException&)
3722 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3730 Unlock<Mutex> unlock(_lock);
3731 _subscriptionManager->unsubscribe(cid);
3733 _routes.removeRoute(subId);
3739 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3740 const std::string& topic_,
3742 const std::string& filter_ =
"",
3743 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3744 bool oofEnabled_ =
false,
3745 int topN_ = AMPS_DEFAULT_TOP_N,
3746 bool isHASubscribe_ =
true)
3749 return sowAndSubscribe(messageHandler_,
3756 (oofEnabled_ ?
"oof" :
""),
3761 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3762 const std::string& topic_,
3763 const std::string& filter_ =
"",
3764 const std::string& orderBy_ =
"",
3765 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3766 int topN_ = AMPS_DEFAULT_TOP_N,
3767 const std::string& options_ =
"",
3768 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3769 bool isHASubscribe_ =
true)
3771 isHASubscribe_ &= (bool)_subscriptionManager;
3772 Lock<Mutex> l(_lock);
3780 if (filter_.length())
3784 if (orderBy_.length())
3789 if (topN_ != AMPS_DEFAULT_TOP_N)
3793 if (options_.length())
3801 Unlock<Mutex> u(_lock);
3802 _subscriptionManager->subscribe(messageHandler_, message,
3803 Message::AckType::None);
3804 if (_badTimeToHASubscribe)
3809 _routes.addRoute(message.
getQueryID(), messageHandler_,
3810 Message::AckType::None, Message::AckType::Processed, message.
getCommandEnum());
3812 if (!options_.empty())
3818 syncAckProcessing(timeout_, message, isHASubscribe_);
3820 catch (
const DisconnectedException&)
3822 if (!isHASubscribe_)
3824 _routes.removeRoute(subId);
3828 catch (
const TimedOutException&)
3830 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3838 Unlock<Mutex> unlock(_lock);
3839 _subscriptionManager->unsubscribe(
Field(subId));
3841 _routes.removeRoute(subId);
3847 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3848 const std::string& topic_,
3850 const std::string& filter_ =
"",
3851 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3852 bool oofEnabled_ =
false,
3853 bool sendEmpties_ =
false,
3854 int topN_ = AMPS_DEFAULT_TOP_N,
3855 bool isHASubscribe_ =
true)
3863 if (sendEmpties_ ==
false)
3867 return sowAndDeltaSubscribe(messageHandler_,
3879 const std::string& topic_,
3880 const std::string& filter_,
3886 unsigned ackType = Message::AckType::Processed |
3887 Message::AckType::Stats |
3888 Message::AckType::Persisted;
3889 publishStoreMessage.reset();
3890 if (commandId_.
empty())
3901 .assignQueryID(commandId_.
data(), commandId_.
len())
3902 .setAckTypeEnum(ackType)
3904 .assignFilter(filter_.c_str(), filter_.length());
3905 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3906 char buf[AMPS_NUMBER_BUFFER_LEN];
3907 size_t pos = convertToCharArray(buf, haSequenceNumber);
3908 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3912 Lock<Mutex> l(_lock);
3913 _routes.addRoute(commandId_, messageHandler_,
3914 Message::AckType::Stats,
3915 Message::AckType::Processed | Message::AckType::Persisted,
3917 syncAckProcessing(timeout_, publishStoreMessage,
3920 catch (
const DisconnectedException&)
3927 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3931 return (std::string)commandId_;
3935 Lock<Mutex> l(_lock);
3937 if (commandId_.
empty())
3948 .assignQueryID(commandId_.
data(), commandId_.
len())
3949 .setAckTypeEnum(Message::AckType::Processed |
3950 Message::AckType::Stats)
3952 .assignFilter(filter_.c_str(), filter_.length());
3953 _routes.addRoute(commandId_, messageHandler_,
3954 Message::AckType::Stats,
3955 Message::AckType::Processed,
3959 syncAckProcessing(timeout_, _message);
3963 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3966 return (std::string)commandId_;
3970 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3971 const std::string& topic_,
3972 const std::string& data_,
3978 unsigned ackType = Message::AckType::Processed |
3979 Message::AckType::Stats |
3980 Message::AckType::Persisted;
3981 publishStoreMessage.reset();
3982 if (commandId_.
empty())
3993 .assignQueryID(commandId_.
data(), commandId_.
len())
3994 .setAckTypeEnum(ackType)
3996 .assignData(data_.c_str(), data_.length());
3997 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3998 char buf[AMPS_NUMBER_BUFFER_LEN];
3999 size_t pos = convertToCharArray(buf, haSequenceNumber);
4000 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4004 Lock<Mutex> l(_lock);
4005 _routes.addRoute(commandId_, messageHandler_,
4006 Message::AckType::Stats,
4007 Message::AckType::Processed | Message::AckType::Persisted,
4009 syncAckProcessing(timeout_, publishStoreMessage,
4012 catch (
const DisconnectedException&)
4019 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4023 return (std::string)commandId_;
4027 Lock<Mutex> l(_lock);
4029 if (commandId_.
empty())
4040 .assignQueryID(commandId_.
data(), commandId_.
len())
4041 .setAckTypeEnum(Message::AckType::Processed |
4042 Message::AckType::Stats)
4044 .assignData(data_.c_str(), data_.length());
4045 _routes.addRoute(commandId_, messageHandler_,
4046 Message::AckType::Stats,
4047 Message::AckType::Processed,
4051 syncAckProcessing(timeout_, _message);
4055 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4058 return (std::string)commandId_;
4062 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4063 const std::string& topic_,
4064 const std::string& keys_,
4070 unsigned ackType = Message::AckType::Processed |
4071 Message::AckType::Stats |
4072 Message::AckType::Persisted;
4073 publishStoreMessage.reset();
4074 if (commandId_.
empty())
4085 .assignQueryID(commandId_.
data(), commandId_.
len())
4086 .setAckTypeEnum(ackType)
4088 .assignSowKeys(keys_.c_str(), keys_.length());
4089 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
4090 char buf[AMPS_NUMBER_BUFFER_LEN];
4091 size_t pos = convertToCharArray(buf, haSequenceNumber);
4092 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4096 Lock<Mutex> l(_lock);
4097 _routes.addRoute(commandId_, messageHandler_,
4098 Message::AckType::Stats,
4099 Message::AckType::Processed | Message::AckType::Persisted,
4101 syncAckProcessing(timeout_, publishStoreMessage,
4104 catch (
const DisconnectedException&)
4111 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4115 return (std::string)commandId_;
4119 Lock<Mutex> l(_lock);
4121 if (commandId_.
empty())
4132 .assignQueryID(commandId_.
data(), commandId_.
len())
4133 .setAckTypeEnum(Message::AckType::Processed |
4134 Message::AckType::Stats)
4136 .assignSowKeys(keys_.c_str(), keys_.length());
4137 _routes.addRoute(commandId_, messageHandler_,
4138 Message::AckType::Stats,
4139 Message::AckType::Processed,
4143 syncAckProcessing(timeout_, _message);
4147 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4150 return (std::string)commandId_;
4154 void startTimer(
void)
4156 if (_serverVersion >=
"5.3.2.0")
4158 throw CommandException(
"The start_timer command is deprecated.");
4160 Lock<Mutex> l(_lock);
4169 if (_serverVersion >=
"5.3.2.0")
4171 throw CommandException(
"The stop_timer command is deprecated.");
4173 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4188 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4190 _pExceptionListener = pListener_;
4191 _exceptionListener = _pExceptionListener.get();
4196 _exceptionListener = &listener_;
4201 return *_exceptionListener;
4204 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4206 if (readTimeout_ < heartbeatInterval_)
4208 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4210 Lock<Mutex> l(_lock);
4211 if (_heartbeatInterval != heartbeatInterval_ ||
4212 _readTimeout != readTimeout_)
4214 _heartbeatInterval = heartbeatInterval_;
4215 _readTimeout = readTimeout_;
4220 void _sendHeartbeat(
void)
4222 if (_connected && _heartbeatInterval != 0)
4224 std::ostringstream options;
4225 options <<
"start," << _heartbeatInterval;
4228 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4229 _heartbeatTimer.start();
4232 _sendWithoutRetry(_beatMessage);
4233 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4235 catch (ConnectionException& ex_)
4239 AMPS_UNHANDLED_EXCEPTION(ex_);
4244 if (_readTimeout && _connected)
4249 AMPSException::throwFor(_client, result);
4251 if (!_queueAckTimeout)
4254 (
int)(_heartbeatInterval * 1000));
4257 AMPSException::throwFor(_client, result);
4265 Lock<Mutex> lock(_lock);
4266 _connectionStateListeners.insert(listener_);
4271 Lock<Mutex> lock(_lock);
4272 _connectionStateListeners.erase(listener_);
4275 void clearConnectionStateListeners()
4277 Lock<Mutex> lock(_lock);
4278 _connectionStateListeners.clear();
4283 unsigned systemAddedAcks_, Message::Command::Type commandType_)
4285 Message message = command_.getMessage();
4290 bool added = qid.
len() || subid.
len() || cid_.
len();
4291 bool cidIsQid = cid_ == qid;
4292 bool cidUnique = !cidIsQid && cid_.
len() > 0 && cid_ != subid;
4294 if (subid.
len() > 0)
4298 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4299 systemAddedAcks_, commandType_);
4301 && (commandType == Message::Command::Subscribe
4302 || commandType == Message::Command::DeltaSubscribe))
4309 if (qid.
len() > 0 && qid != subid
4310 && (commandType == Message::Command::SOW
4311 || commandType == Message::Command::SOWDelete
4312 || commandType == Message::Command::SOWAndSubscribe
4313 || commandType == Message::Command::SOWAndDeltaSubscribe))
4315 while (_routes.hasRoute(qid))
4324 if (addedCount == 0)
4326 _routes.addRoute(qid, handler_, requestedAcks_,
4327 systemAddedAcks_, commandType_);
4333 Unlock<Mutex> u(_lock);
4334 data = amps_invoke_copy_route_function(handler_.userData());
4338 _routes.addRoute(qid, handler_, requestedAcks_,
4339 systemAddedAcks_, commandType_);
4343 _routes.addRoute(qid,
4347 systemAddedAcks_, commandType_);
4352 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4354 while (_routes.hasRoute(cid_))
4358 if (addedCount == 0)
4360 _routes.addRoute(cid_, handler_, requestedAcks_,
4361 systemAddedAcks_, commandType_);
4367 Unlock<Mutex> u(_lock);
4368 data = amps_invoke_copy_route_function(handler_.userData());
4372 _routes.addRoute(cid_, handler_, requestedAcks_,
4373 systemAddedAcks_, commandType_);
4377 _routes.addRoute(cid_,
4381 systemAddedAcks_, commandType_);
4385 else if ((commandType == Message::Command::Publish ||
4386 commandType == Message::Command::DeltaPublish)
4387 && requestedAcks_ & ~
Message::AckType::Persisted)
4390 _routes.addRoute(cid_, handler_, requestedAcks_,
4391 systemAddedAcks_, commandType_);
4396 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4401 bool isHASubscribe_ =
true)
4403 isHASubscribe_ &= (bool)_subscriptionManager;
4404 Message& message = command_.getMessage();
4405 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4406 Message::AckType::Processed : Message::AckType::None;
4408 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
4410 if (commandType == Message::Command::SOWAndSubscribe
4411 || commandType == Message::Command::SOWAndDeltaSubscribe
4412 || commandType == Message::Command::StopTimer)
4414 systemAddedAcks |= Message::AckType::Completed;
4417 if (handler_.isValid() && cid.
empty())
4423 if (command_.isSubscribe())
4426 if (_bookmarkStore.isValid())
4428 systemAddedAcks |= Message::AckType::Persisted;
4436 _bookmarkStore.
log(message);
4437 if (!BookmarkRange::isRange(bookmark))
4439 _bookmarkStore.
discard(message);
4453 systemAddedAcks |= Message::AckType::Persisted;
4455 bool isSubscribe = command_.isSubscribe();
4456 if (handler_.isValid() && !isSubscribe)
4458 _registerHandler(command_, cid, handler_,
4459 requestedAcks, systemAddedAcks, commandType);
4461 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
4464 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4467 Unlock<Mutex> u(_lock);
4468 haSequenceNumber = _publishStore.
store(message);
4475 syncAckProcessing((
long)command_.getTimeout(), message,
4480 _send(message, haSequenceNumber);
4483 catch (
const DisconnectedException&)
4490 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4501 Unlock<Mutex> u(_lock);
4502 _subscriptionManager->subscribe(handler_,
4505 if (_badTimeToHASubscribe)
4508 return std::string(subId.
data(), subId.
len());
4511 if (handler_.isValid())
4513 _registerHandler(command_, cid, handler_,
4514 requestedAcks, systemAddedAcks, commandType);
4521 syncAckProcessing((
long)command_.getTimeout(), message,
4529 catch (
const DisconnectedException&)
4531 if (!isHASubscribe_)
4533 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4534 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4535 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4540 catch (
const TimedOutException&)
4542 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4543 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4544 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4553 Unlock<Mutex> unlock(_lock);
4554 _subscriptionManager->unsubscribe(subId);
4560 _routes.removeRoute(cid);
4561 _routes.removeRoute(subId);
4565 if (subId.
len() > 0)
4568 return std::string(subId.
data(), subId.
len());
4578 syncAckProcessing((
long)(command_.getTimeout()), message);
4585 catch (
const TimedOutException&)
4587 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4588 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4592 catch (
const DisconnectedException&)
4594 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4595 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4601 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4602 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4615 bool isHASubscribe_ =
true)
4617 Lock<Mutex> lock(_lock);
4618 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4622 void setAutoAck(
bool isAutoAckEnabled_)
4624 _isAutoAckEnabled = isAutoAckEnabled_;
4626 bool getAutoAck(
void)
const 4628 return _isAutoAckEnabled;
4630 void setAckBatchSize(
const unsigned batchSize_)
4632 _ackBatchSize = batchSize_;
4633 if (!_queueAckTimeout)
4635 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4639 unsigned getAckBatchSize(
void)
const 4641 return _ackBatchSize;
4643 int getAckTimeout(
void)
const 4645 return _queueAckTimeout;
4647 void setAckTimeout(
const int ackTimeout_)
4650 _queueAckTimeout = ackTimeout_;
4652 size_t _ack(QueueBookmarks& queueBookmarks_)
4654 if (queueBookmarks_._bookmarkCount)
4656 publishStoreMessage.reset();
4661 amps_uint64_t haSequenceNumber = 0;
4664 haSequenceNumber = _publishStore.
store(publishStoreMessage);
4667 queueBookmarks_._data.erase();
4668 queueBookmarks_._bookmarkCount = 0;
4670 _send(publishStoreMessage, haSequenceNumber);
4673 queueBookmarks_._data.erase();
4674 queueBookmarks_._bookmarkCount = 0;
4680 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4682 if (_isAutoAckEnabled)
4686 _ack(topic_, bookmark_, options_);
4688 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4690 if (bookmark_.
len() == 0)
4694 Lock<Mutex> lock(_lock);
4695 if (_ackBatchSize < 2 || options_ != NULL)
4697 publishStoreMessage.reset();
4705 amps_uint64_t haSequenceNumber = 0;
4708 haSequenceNumber = _publishStore.
store(publishStoreMessage);
4712 _send(publishStoreMessage, haSequenceNumber);
4716 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(), topic_.
len());
4717 TopicHashMap::iterator it = _topicHashMap.find(hash);
4718 if (it == _topicHashMap.end())
4721 #ifdef AMPS_USE_EMPLACE 4722 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4724 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4727 QueueBookmarks& queueBookmarks = it->second;
4728 if (queueBookmarks._data.length())
4730 queueBookmarks._data.append(
",");
4734 queueBookmarks._oldestTime = amps_now();
4736 queueBookmarks._data.append(bookmark_);
4737 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4739 _ack(queueBookmarks);
4742 void flushAcks(
void)
4744 size_t sendCount = 0;
4751 Lock<Mutex> lock(_lock);
4752 typedef TopicHashMap::iterator iterator;
4753 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4755 QueueBookmarks& queueBookmarks = it->second;
4756 sendCount += _ack(queueBookmarks);
4759 if (sendCount && _connected)
4761 publishFlush(0, Message::AckType::Processed);
4765 void checkQueueAcks(
void)
4767 if (!_topicHashMap.size())
4771 Lock<Mutex> lock(_lock);
4774 amps_uint64_t threshold = amps_now()
4775 - (amps_uint64_t)_queueAckTimeout;
4776 typedef TopicHashMap::iterator iterator;
4777 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4779 QueueBookmarks& queueBookmarks = it->second;
4780 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4782 _ack(queueBookmarks);
4786 catch (std::exception& ex)
4788 AMPS_UNHANDLED_EXCEPTION(ex);
4792 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4794 Lock<Mutex> lock(_deferredExecutionLock);
4795 #ifdef AMPS_USE_EMPLACE 4796 _deferredExecutionList.emplace_back(
4797 DeferredExecutionRequest(func_, userData_));
4799 _deferredExecutionList.push_back(
4800 DeferredExecutionRequest(func_, userData_));
4804 inline void processDeferredExecutions(
void)
4806 if (_deferredExecutionList.size())
4808 Lock<Mutex> lock(_deferredExecutionLock);
4809 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4810 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4811 for (; it != end; ++it)
4815 it->_func(it->_userData);
4823 _deferredExecutionList.clear();
4824 _routes.invalidateCache();
4825 _routeCache.invalidateCache();
4829 bool getRetryOnDisconnect(
void)
const 4831 return _isRetryOnDisconnect;
4834 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4836 _isRetryOnDisconnect = isRetryOnDisconnect_;
4839 void setDefaultMaxDepth(
unsigned maxDepth_)
4841 _defaultMaxDepth = maxDepth_;
4844 unsigned getDefaultMaxDepth(
void)
const 4846 return _defaultMaxDepth;
4938 RefHandle<MessageStreamImpl> _body;
4948 inline void advance(
void);
4955 : _pStream(pStream_)
4960 bool operator==(
const iterator& rhs)
const 4962 return _pStream == rhs._pStream;
4964 bool operator!=(
const iterator& rhs)
const 4966 return _pStream != rhs._pStream;
4968 void operator++(
void)
4984 return _body.isValid();
4991 if (!_body.isValid())
4993 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5025 unsigned getMaxDepth(
void)
const;
5028 unsigned getDepth(
void)
const;
5032 inline void setSOWOnly(
const std::string& commandId_,
5033 const std::string& queryId_ =
"");
5034 inline void setSubscription(
const std::string& subId_,
5035 const std::string& commandId_ =
"",
5036 const std::string& queryId_ =
"");
5037 inline void setStatsOnly(
const std::string& commandId_,
5038 const std::string& queryId_ =
"");
5039 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5045 friend class Client;
5071 BorrowRefHandle<ClientImpl> _body;
5073 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5074 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5075 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5086 : _body(new ClientImpl(clientName), true)
5089 Client(ClientImpl* existingClient)
5090 : _body(existingClient,
true)
5093 Client(ClientImpl* existingClient,
bool isRef)
5094 : _body(existingClient, isRef)
5097 Client(
const Client& rhs) : _body(rhs._body) {;}
5098 virtual ~Client(
void) {;}
5100 Client& operator=(
const Client& rhs)
5108 return _body.isValid();
5125 _body.get().setName(name);
5132 return _body.get().getName();
5140 return _body.get().getNameHash();
5148 return _body.get().getNameHashValue();
5159 _body.get().setLogonCorrelationData(logonCorrelationData_);
5166 return _body.get().getLogonCorrelationData();
5179 return _body.get().getServerVersion();
5190 return _body.get().getServerVersionInfo();
5204 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5219 return AMPS::convertVersionToNumber(data_, len_);
5226 return _body.get().getURI();
5250 _body.get().connect(uri);
5257 _body.get().disconnect();
5275 _body.get().send(message);
5288 unsigned requestedAcks_,
bool isSubscribe_)
5290 Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5291 _body.get().addMessageHandler(commandId_, messageHandler_,
5292 requestedAcks_, commandType);
5305 unsigned requestedAcks_, Message::Command::Type commandType_)
5307 _body.get().addMessageHandler(commandId_, messageHandler_,
5308 requestedAcks_, commandType_);
5316 return _body.get().removeMessageHandler(commandId_);
5344 return _body.get().send(messageHandler, message, timeout);
5358 _body.get().setDisconnectHandler(disconnectHandler);
5366 return _body.get().getDisconnectHandler();
5375 return _body.get().getConnectionInfo();
5388 _body.get().setBookmarkStore(bookmarkStore_);
5396 return _body.
get().getBookmarkStore();
5404 return _body.get().getSubscriptionManager();
5416 _body.get().setSubscriptionManager(subscriptionManager_);
5440 _body.get().setPublishStore(publishStore_);
5448 return _body.
get().getPublishStore();
5456 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5457 duplicateMessageHandler_);
5471 return _body.get().getDuplicateMessageHandler();
5485 _body.get().setFailedWriteHandler(handler_);
5493 return _body.get().getFailedWriteHandler();
5514 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5516 return _body.get().publish(topic_.c_str(), topic_.length(),
5517 data_.c_str(), data_.length());
5539 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5540 const char* data_,
size_t dataLength_)
5542 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5563 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5564 unsigned long expiration_)
5566 return _body.get().publish(topic_.c_str(), topic_.length(),
5567 data_.c_str(), data_.length(), expiration_);
5590 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5591 const char* data_,
size_t dataLength_,
5592 unsigned long expiration_)
5594 return _body.get().publish(topic_, topicLength_,
5595 data_, dataLength_, expiration_);
5636 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5638 _body.get().publishFlush(timeout_, ackType_);
5657 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5659 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5660 data_.c_str(), data_.length());
5681 const char* data_,
size_t dataLength_)
5683 return _body.get().deltaPublish(topic_, topicLength_,
5684 data_, dataLength_);
5703 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5704 unsigned long expiration_)
5706 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5707 data_.c_str(), data_.length(),
5730 const char* data_,
size_t dataLength_,
5731 unsigned long expiration_)
5733 return _body.get().deltaPublish(topic_, topicLength_,
5734 data_, dataLength_, expiration_);
5754 const char* options_ = NULL)
5756 return _body.get().logon(timeout_, authenticator_, options_);
5771 std::string
logon(
const char* options_,
int timeout_ = 0)
5790 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5816 const std::string& topic_,
5818 const std::string& filter_ =
"",
5819 const std::string& options_ =
"",
5820 const std::string& subId_ =
"")
5822 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5823 filter_,
"", options_, subId_);
5842 long timeout_ = 0,
const std::string& filter_ =
"",
5843 const std::string& options_ =
"",
5844 const std::string& subId_ =
"")
5847 if (_body.get().getDefaultMaxDepth())
5849 result.
maxDepth(_body.get().getDefaultMaxDepth());
5851 result.setSubscription(_body.get().subscribe(
5853 topic_, timeout_, filter_,
"",
5854 options_, subId_,
false));
5874 long timeout_ = 0,
const std::string& filter_ =
"",
5875 const std::string& options_ =
"",
5876 const std::string& subId_ =
"")
5879 if (_body.get().getDefaultMaxDepth())
5881 result.
maxDepth(_body.get().getDefaultMaxDepth());
5883 result.setSubscription(_body.get().subscribe(
5885 topic_, timeout_, filter_,
"",
5886 options_, subId_,
false));
5903 const std::string& topic_,
5905 const std::string& filter_ =
"",
5906 const std::string& options_ =
"",
5907 const std::string& subId_ =
"")
5909 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5910 filter_,
"", options_, subId_);
5921 long timeout_,
const std::string& filter_ =
"",
5922 const std::string& options_ =
"",
5923 const std::string& subId_ =
"")
5926 if (_body.get().getDefaultMaxDepth())
5928 result.
maxDepth(_body.get().getDefaultMaxDepth());
5930 result.setSubscription(_body.get().deltaSubscribe(
5932 topic_, timeout_, filter_,
"",
5933 options_, subId_,
false));
5939 long timeout_,
const std::string& filter_ =
"",
5940 const std::string& options_ =
"",
5941 const std::string& subId_ =
"")
5944 if (_body.get().getDefaultMaxDepth())
5946 result.
maxDepth(_body.get().getDefaultMaxDepth());
5948 result.setSubscription(_body.get().deltaSubscribe(
5950 topic_, timeout_, filter_,
"",
5951 options_, subId_,
false));
5981 const std::string& topic_,
5983 const std::string& bookmark_,
5984 const std::string& filter_ =
"",
5985 const std::string& options_ =
"",
5986 const std::string& subId_ =
"")
5988 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5989 filter_, bookmark_, options_, subId_);
6010 const std::string& bookmark_,
6011 const std::string& filter_ =
"",
6012 const std::string& options_ =
"",
6013 const std::string& subId_ =
"")
6016 if (_body.get().getDefaultMaxDepth())
6018 result.
maxDepth(_body.get().getDefaultMaxDepth());
6020 result.setSubscription(_body.get().subscribe(
6022 topic_, timeout_, filter_,
6023 bookmark_, options_,
6031 const std::string& bookmark_,
6032 const std::string& filter_ =
"",
6033 const std::string& options_ =
"",
6034 const std::string& subId_ =
"")
6037 if (_body.get().getDefaultMaxDepth())
6039 result.
maxDepth(_body.get().getDefaultMaxDepth());
6041 result.setSubscription(_body.get().subscribe(
6043 topic_, timeout_, filter_,
6044 bookmark_, options_,
6059 return _body.get().unsubscribe(commandId);
6071 return _body.get().unsubscribe();
6105 const std::string& topic_,
6106 const std::string& filter_ =
"",
6107 const std::string& orderBy_ =
"",
6108 const std::string& bookmark_ =
"",
6109 int batchSize_ = DEFAULT_BATCH_SIZE,
6110 int topN_ = DEFAULT_TOP_N,
6111 const std::string& options_ =
"",
6112 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6114 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6115 bookmark_, batchSize_, topN_, options_,
6143 const std::string& filter_ =
"",
6144 const std::string& orderBy_ =
"",
6145 const std::string& bookmark_ =
"",
6146 int batchSize_ = DEFAULT_BATCH_SIZE,
6147 int topN_ = DEFAULT_TOP_N,
6148 const std::string& options_ =
"",
6149 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6152 if (_body.get().getDefaultMaxDepth())
6154 result.
maxDepth(_body.get().getDefaultMaxDepth());
6156 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6157 topic_, filter_, orderBy_, bookmark_,
6158 batchSize_, topN_, options_, timeout_));
6164 const std::string& filter_ =
"",
6165 const std::string& orderBy_ =
"",
6166 const std::string& bookmark_ =
"",
6167 int batchSize_ = DEFAULT_BATCH_SIZE,
6168 int topN_ = DEFAULT_TOP_N,
6169 const std::string& options_ =
"",
6170 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6173 if (_body.get().getDefaultMaxDepth())
6175 result.
maxDepth(_body.get().getDefaultMaxDepth());
6177 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6178 topic_, filter_, orderBy_, bookmark_,
6179 batchSize_, topN_, options_, timeout_));
6205 const std::string& topic_,
6207 const std::string& filter_ =
"",
6208 int batchSize_ = DEFAULT_BATCH_SIZE,
6209 int topN_ = DEFAULT_TOP_N)
6211 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6237 const std::string& topic_,
6239 const std::string& filter_ =
"",
6240 int batchSize_ = DEFAULT_BATCH_SIZE,
6241 bool oofEnabled_ =
false,
6242 int topN_ = DEFAULT_TOP_N)
6244 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6245 filter_, batchSize_, oofEnabled_,
6270 const std::string& filter_ =
"",
6271 int batchSize_ = DEFAULT_BATCH_SIZE,
6272 bool oofEnabled_ =
false,
6273 int topN_ = DEFAULT_TOP_N)
6276 if (_body.get().getDefaultMaxDepth())
6278 result.
maxDepth(_body.get().getDefaultMaxDepth());
6280 result.setSubscription(_body.get().sowAndSubscribe(
6282 topic_, timeout_, filter_,
6283 batchSize_, oofEnabled_,
6308 const std::string& filter_ =
"",
6309 int batchSize_ = DEFAULT_BATCH_SIZE,
6310 bool oofEnabled_ =
false,
6311 int topN_ = DEFAULT_TOP_N)
6314 if (_body.get().getDefaultMaxDepth())
6316 result.
maxDepth(_body.get().getDefaultMaxDepth());
6318 result.setSubscription(_body.get().sowAndSubscribe(
6320 topic_, timeout_, filter_,
6321 batchSize_, oofEnabled_,
6355 const std::string& topic_,
6356 const std::string& filter_ =
"",
6357 const std::string& orderBy_ =
"",
6358 const std::string& bookmark_ =
"",
6359 int batchSize_ = DEFAULT_BATCH_SIZE,
6360 int topN_ = DEFAULT_TOP_N,
6361 const std::string& options_ =
"",
6362 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6364 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6365 orderBy_, bookmark_, batchSize_,
6366 topN_, options_, timeout_);
6394 const std::string& filter_ =
"",
6395 const std::string& orderBy_ =
"",
6396 const std::string& bookmark_ =
"",
6397 int batchSize_ = DEFAULT_BATCH_SIZE,
6398 int topN_ = DEFAULT_TOP_N,
6399 const std::string& options_ =
"",
6400 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6403 if (_body.get().getDefaultMaxDepth())
6405 result.
maxDepth(_body.get().getDefaultMaxDepth());
6407 result.setSubscription(_body.get().sowAndSubscribe(
6409 topic_, filter_, orderBy_,
6410 bookmark_, batchSize_, topN_,
6411 options_, timeout_,
false));
6417 const std::string& filter_ =
"",
6418 const std::string& orderBy_ =
"",
6419 const std::string& bookmark_ =
"",
6420 int batchSize_ = DEFAULT_BATCH_SIZE,
6421 int topN_ = DEFAULT_TOP_N,
6422 const std::string& options_ =
"",
6423 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6426 if (_body.get().getDefaultMaxDepth())
6428 result.
maxDepth(_body.get().getDefaultMaxDepth());
6430 result.setSubscription(_body.get().sowAndSubscribe(
6432 topic_, filter_, orderBy_,
6433 bookmark_, batchSize_, topN_,
6434 options_, timeout_,
false));
6463 const std::string& topic_,
6464 const std::string& filter_ =
"",
6465 const std::string& orderBy_ =
"",
6466 int batchSize_ = DEFAULT_BATCH_SIZE,
6467 int topN_ = DEFAULT_TOP_N,
6468 const std::string& options_ =
"",
6469 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6471 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6472 filter_, orderBy_, batchSize_,
6473 topN_, options_, timeout_);
6496 const std::string& filter_ =
"",
6497 const std::string& orderBy_ =
"",
6498 int batchSize_ = DEFAULT_BATCH_SIZE,
6499 int topN_ = DEFAULT_TOP_N,
6500 const std::string& options_ =
"",
6501 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6504 if (_body.get().getDefaultMaxDepth())
6506 result.
maxDepth(_body.get().getDefaultMaxDepth());
6508 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6510 topic_, filter_, orderBy_,
6511 batchSize_, topN_, options_,
6518 const std::string& filter_ =
"",
6519 const std::string& orderBy_ =
"",
6520 int batchSize_ = DEFAULT_BATCH_SIZE,
6521 int topN_ = DEFAULT_TOP_N,
6522 const std::string& options_ =
"",
6523 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6526 if (_body.get().getDefaultMaxDepth())
6528 result.
maxDepth(_body.get().getDefaultMaxDepth());
6530 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6532 topic_, filter_, orderBy_,
6533 batchSize_, topN_, options_,
6563 const std::string& topic_,
6565 const std::string& filter_ =
"",
6566 int batchSize_ = DEFAULT_BATCH_SIZE,
6567 bool oofEnabled_ =
false,
6568 bool sendEmpties_ =
false,
6569 int topN_ = DEFAULT_TOP_N)
6571 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6572 timeout_, filter_, batchSize_,
6573 oofEnabled_, sendEmpties_,
6600 const std::string& filter_ =
"",
6601 int batchSize_ = DEFAULT_BATCH_SIZE,
6602 bool oofEnabled_ =
false,
6603 bool sendEmpties_ =
false,
6604 int topN_ = DEFAULT_TOP_N)
6607 if (_body.get().getDefaultMaxDepth())
6609 result.
maxDepth(_body.get().getDefaultMaxDepth());
6611 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6613 topic_, timeout_, filter_,
6614 batchSize_, oofEnabled_,
6615 sendEmpties_, topN_,
false));
6641 const std::string& filter_ =
"",
6642 int batchSize_ = DEFAULT_BATCH_SIZE,
6643 bool oofEnabled_ =
false,
6644 bool sendEmpties_ =
false,
6645 int topN_ = DEFAULT_TOP_N)
6648 if (_body.get().getDefaultMaxDepth())
6650 result.
maxDepth(_body.get().getDefaultMaxDepth());
6652 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6654 topic_, timeout_, filter_,
6655 batchSize_, oofEnabled_,
6656 sendEmpties_, topN_,
false));
6679 const std::string& topic,
6680 const std::string& filter,
6683 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6711 stream.setStatsOnly(cid);
6712 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6713 return *(stream.
begin());
6715 catch (
const DisconnectedException&)
6717 removeMessageHandler(cid);
6728 _body.get().startTimer();
6739 return _body.get().stopTimer(messageHandler);
6764 const std::string& topic_,
6765 const std::string& keys_,
6768 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6800 stream.setStatsOnly(cid);
6801 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6802 return *(stream.
begin());
6804 catch (
const DisconnectedException&)
6806 removeMessageHandler(cid);
6826 const std::string& topic_,
const std::string& data_,
6829 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6856 stream.setStatsOnly(cid);
6857 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6858 return *(stream.
begin());
6860 catch (
const DisconnectedException&)
6862 removeMessageHandler(cid);
6872 return _body.get().getHandle();
6885 _body.get().setExceptionListener(pListener_);
6898 _body.get().setExceptionListener(listener_);
6905 return _body.get().getExceptionListener();
6931 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6955 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6961 setLastChanceMessageHandler(messageHandler);
6968 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6994 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7019 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7102 _body.get().addConnectionStateListener(listener);
7110 _body.get().removeConnectionStateListener(listener);
7117 _body.get().clearConnectionStateListeners();
7147 return _body.get().executeAsync(command_, handler_);
7185 if (command_.isSubscribe())
7187 Message& message = command_.getMessage();
7190 if (useExistingHandler)
7193 if (_body.get()._routes.getRoute(subId, existingHandler))
7196 _body.get().executeAsync(command_, existingHandler,
false);
7201 id = _body.get().executeAsync(command_, handler_,
false);
7203 catch (
const DisconnectedException&)
7205 removeMessageHandler(command_.getMessage().
getCommandId());
7206 if (command_.isSubscribe())
7210 if (command_.isSow())
7212 removeMessageHandler(command_.getMessage().
getQueryID());
7243 _body.get().ack(topic_, bookmark_, options_);
7265 void ack(
const std::string& topic_,
const std::string& bookmark_,
7266 const char* options_ = NULL)
7268 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7276 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7278 _body.get()._ack(topic_, bookmark_, options_);
7291 _body.get().flushAcks();
7300 return _body.get().getAutoAck();
7310 _body.get().setAutoAck(isAutoAckEnabled_);
7318 return _body.get().getAckBatchSize();
7328 _body.get().setAckBatchSize(ackBatchSize_);
7339 return _body.get().getAckTimeout();
7351 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7353 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7355 _body.get().setAckTimeout(ackTimeout_);
7369 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7378 return _body.get().getRetryOnDisconnect();
7387 _body.get().setDefaultMaxDepth(maxDepth_);
7396 return _body.get().getDefaultMaxDepth();
7408 return _body.get().setTransportFilterFunction(filter_, userData_);
7422 return _body.get().setThreadCreatedCallback(callback_, userData_);
7430 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7432 _body.get().deferredExecution(func_, userData_);
7442 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7448 unsigned deliveries = 0;
7460 const char* data = NULL;
7462 const char* status = NULL;
7463 size_t statusLen = 0;
7465 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7468 if (len == NotEntitled || len == Duplicate ||
7469 (statusLen == Failure && status[0] ==
'f'))
7471 if (_failedWriteHandler)
7473 if (_publishStore.isValid())
7475 amps_uint64_t sequence =
7477 FailedWriteStoreReplayer replayer(
this, data, len);
7478 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7479 replayer, sequence));
7485 AMPS_CALL_EXCEPTION_WRAPPER(
7486 _failedWriteHandler->failedWrite(emptyMessage,
7492 if (_publishStore.isValid())
7501 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7505 if (!deliveries && _bookmarkStore.isValid())
7512 const char* bookmarkData = NULL;
7513 size_t bookmarkLen = 0;
7519 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7522 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
7527 catch (std::exception& ex)
7529 AMPS_UNHANDLED_EXCEPTION(ex);
7535 ClientImpl::processedAck(
Message& message)
7537 unsigned deliveries = 0;
7539 const char* data = NULL;
7543 Lock<Mutex> l(_lock);
7546 Lock<Mutex> guard(_ackMapLock);
7547 AckMap::iterator i = _ackMap.find(std::string(data, len));
7548 if (i != _ackMap.end())
7558 ack.setStatus(data, len);
7560 ack.setReason(data, len);
7562 ack.setUsername(data, len);
7564 ack.setPassword(data, len);
7566 ack.setServerVersion(data, len);
7568 ack.setOptions(data, len);
7578 ClientImpl::checkAndSendHeartbeat(
bool force)
7580 if (force || _heartbeatTimer.check())
7582 _heartbeatTimer.start();
7585 sendWithoutRetry(_beatMessage);
7587 catch (
const AMPSException&)
7594 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 7596 ConnectionInfo info;
7597 std::ostringstream writer;
7599 info[
"client.uri"] = _lastUri;
7600 info[
"client.name"] = _name;
7601 info[
"client.username"] = _username;
7602 if (_publishStore.isValid())
7604 writer << _publishStore.unpersistedCount();
7605 info[
"publishStore.unpersistedCount"] = writer.str();
7614 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7616 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7617 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7618 ClientImpl* me = (ClientImpl*) userData_;
7619 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7620 if (!messageHandle_)
7622 if (me->_queueAckTimeout)
7624 me->checkQueueAcks();
7626 me->checkAndSendHeartbeat();
7630 me->_readMessage.replace(messageHandle_);
7631 Message& message = me->_readMessage;
7633 if (commandType & SOWMask)
7635 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7639 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7640 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7642 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7645 else if (commandType & PublishMask)
7647 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7648 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7649 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7650 GlobalCommandTypeHandlers::Publish :
7651 GlobalCommandTypeHandlers::OOF)].invoke(message));
7653 const char* subIds = NULL;
7654 size_t subIdsLen = 0;
7657 &subIds, &subIdsLen);
7658 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7659 for (
size_t i = 0; i < subIdCount; ++i)
7661 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7663 if (handler.isValid())
7666 AMPS_SubscriptionId,
7667 subIds + lookupResult.idOffset,
7668 lookupResult.idLength);
7671 bool isAutoAck = me->_isAutoAckEnabled;
7673 if (!isMessageQueue && !bookmark.
empty() &&
7674 me->_bookmarkStore.isValid())
7676 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7679 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7681 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7686 me->_bookmarkStore.log(me->_readMessage);
7687 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7688 handler.invoke(message));
7693 if (isMessageQueue && isAutoAck)
7697 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7698 if (!message.getIgnoreAutoAck())
7700 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7704 catch (std::exception& ex)
7706 if (!message.getIgnoreAutoAck())
7708 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7711 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7716 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7717 handler.invoke(message));
7723 me->lastChance(message);
7727 else if (commandType == Message::Command::Ack)
7729 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7730 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7732 unsigned deliveries = 0U;
7735 case Message::AckType::Persisted:
7736 deliveries += me->persistedAck(message);
7738 case Message::AckType::Processed:
7739 deliveries += me->processedAck(message);
7742 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7743 if (deliveries == 0)
7745 me->lastChance(message);
7748 else if (commandType == Message::Command::Heartbeat)
7750 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7751 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7752 if (me->_heartbeatTimer.getTimeout() != 0.0)
7754 me->checkAndSendHeartbeat(
true);
7758 me->lastChance(message);
7764 unsigned deliveries = 0U;
7767 while (me->_connected)
7771 deliveries = me->_routes.deliverData(message, message.
getCommandId());
7775 catch (MessageStreamFullException&)
7777 catch (MessageStreamFullException& ex_)
7782 me->checkAndSendHeartbeat(
false);
7785 catch (std::exception&)
7787 catch (std::exception& ex_)
7795 catch (std::exception& ex_)
7799 me->_exceptionListener->exceptionThrown(ex_);
7806 if (deliveries == 0)
7808 me->lastChance(message);
7811 me->checkAndSendHeartbeat();
7816 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7818 ClientImpl* me = (ClientImpl*) userData;
7821 me->clearAcks(failedConnectionVersion);
7825 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7827 ClientImpl* me = (ClientImpl*) userData;
7828 Lock<Mutex> l(me->_lock);
7829 Client wrapper(me,
false);
7832 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7836 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7839 me->_connected =
false;
7843 Unlock<Mutex> unlock(me->_lock);
7844 me->_disconnectHandler.invoke(wrapper);
7847 catch (
const std::exception& ex)
7849 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7851 me->_lock.signalAll();
7853 if (!me->_connected)
7855 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7856 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7862 if (me->_subscriptionManager)
7867 Unlock<Mutex> unlock(me->_lock);
7868 me->_subscriptionManager->resubscribe(wrapper);
7870 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7874 catch (
const AMPSException& subEx)
7876 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7878 catch (
const std::exception& subEx)
7880 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7903 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
7904 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7906 while (_pos != _len && _data[_pos] == _fieldSep)
7912 typedef void* difference_type;
7913 typedef std::forward_iterator_tag iterator_category;
7914 typedef std::pair<Message::Field, Message::Field> value_type;
7915 typedef value_type* pointer;
7916 typedef value_type& reference;
7917 bool operator==(
const iterator& rhs)
const 7919 return _pos == rhs._pos;
7921 bool operator!=(
const iterator& rhs)
const 7923 return _pos != rhs._pos;
7925 iterator& operator++()
7928 while (_pos != _len && _data[_pos] != _fieldSep)
7933 while (_pos != _len && _data[_pos] == _fieldSep)
7940 value_type operator*()
const 7943 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7944 for (; i < _len && _data[i] !=
'='; ++i)
7949 result.first.assign(_data + _pos, keyLength);
7951 if (i < _len && _data[i] ==
'=')
7955 for (; i < _len && _data[i] != _fieldSep; ++i)
7960 result.second.assign(_data + valueStart, valueLength);
7966 class reverse_iterator
7973 typedef std::pair<Message::Field, Message::Field> value_type;
7974 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7975 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7980 while (_pos >= _data && *_pos == _fieldSep)
7984 while (_pos > _data && *_pos != _fieldSep)
7991 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8001 bool operator==(
const reverse_iterator& rhs)
const 8003 return _pos == rhs._pos;
8005 bool operator!=(
const reverse_iterator& rhs)
const 8007 return _pos != rhs._pos;
8009 reverse_iterator& operator++()
8020 while (_pos >= _data && *_pos == _fieldSep)
8025 while (_pos > _data && *_pos != _fieldSep)
8029 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8040 value_type operator*()
const 8043 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8044 size_t i = (size_t)(_pos - _data);
8045 for (; i < _len && _data[i] !=
'='; ++i)
8049 result.first.assign(_pos, keyLength);
8050 if (i < _len && _data[i] ==
'=')
8054 for (; i < _len && _data[i] != _fieldSep; ++i)
8059 result.second.assign(_data + valueStart, valueLength);
8064 : _data(data.
data()), _len(data.
len()),
8065 _fieldSep(fieldSeparator)
8069 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8070 : _data(data), _len(len), _fieldSep(fieldSeparator)
8074 iterator begin()
const 8076 return iterator(_data, _len, 0, _fieldSep);
8078 iterator end()
const 8080 return iterator(_data, _len, _len, _fieldSep);
8084 reverse_iterator rbegin()
const 8086 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8089 reverse_iterator rend()
const 8091 return reverse_iterator(_data, _len, 0, _fieldSep);
8112 std::stringstream _data;
8129 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8131 _data << tag <<
'=';
8132 _data.write(value + offset, (std::streamsize)length);
8140 void append(
const T& tag,
const std::string& value)
8142 _data << tag <<
'=' << value << _fs;
8151 operator std::string()
const 8159 _data.str(std::string());
8196 typedef std::map<Message::Field, Message::Field>
map_type;
8207 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8216 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128 8220 std::deque<Message> _q;
8221 std::deque<Message> _cache;
8222 std::string _commandId;
8224 std::string _queryId;
8228 unsigned _requestedAcks;
8232 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8233 #if __cplusplus >= 201100L || _MSC_VER >= 1900 8234 std::atomic<State> _state;
8236 volatile State _state;
8238 typedef std::map<std::string, Message*> SOWKeyMap;
8239 SOWKeyMap _sowKeyMap;
8241 MessageStreamImpl(
const Client& client_)
8244 _maxDepth((
unsigned)~0),
8246 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8249 if (_client.isValid())
8255 MessageStreamImpl(ClientImpl* client_)
8258 _maxDepth((
unsigned)~0),
8262 if (_client.isValid())
8268 ~MessageStreamImpl()
8272 virtual void destroy()
8278 catch (std::exception& e)
8282 if (_client.isValid())
8289 if (_client.isValid())
8293 _client = Client((ClientImpl*)NULL);
8294 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8302 static void destroyer(
void* vpMessageStreamImpl_)
8304 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8307 void setSubscription(
const std::string& subId_,
8308 const std::string& commandId_ =
"",
8309 const std::string& queryId_ =
"")
8311 Lock<Mutex> lock(_lock);
8313 if (!commandId_.empty() && commandId_ != subId_)
8315 _commandId = commandId_;
8317 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8319 _queryId = queryId_;
8322 if (Disconnected == _state)
8326 assert(Unset == _state);
8330 void setSOWOnly(
const std::string& commandId_,
8331 const std::string& queryId_ =
"")
8333 Lock<Mutex> lock(_lock);
8334 _commandId = commandId_;
8335 if (!queryId_.empty() && queryId_ != commandId_)
8337 _queryId = queryId_;
8340 if (Disconnected == _state)
8344 assert(Unset == _state);
8348 void setStatsOnly(
const std::string& commandId_,
8349 const std::string& queryId_ =
"")
8351 Lock<Mutex> lock(_lock);
8352 _commandId = commandId_;
8353 if (!queryId_.empty() && queryId_ != commandId_)
8355 _queryId = queryId_;
8358 if (Disconnected == _state)
8362 assert(Unset == _state);
8364 _requestedAcks = Message::AckType::Stats;
8367 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8369 Lock<Mutex> lock(_lock);
8370 _commandId = commandId_;
8372 if (Disconnected == _state)
8376 assert(Unset == _state);
8378 _requestedAcks = acks_;
8383 Lock<Mutex> lock(_lock);
8384 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8386 _state = Disconnected;
8392 void timeout(
unsigned timeout_)
8394 _timeout = timeout_;
8398 if (_state == Subscribe)
8403 void maxDepth(
unsigned maxDepth_)
8407 _maxDepth = maxDepth_;
8411 _maxDepth = (unsigned)~0;
8414 unsigned getMaxDepth(
void)
const 8418 unsigned getDepth(
void)
const 8420 return (
unsigned)(_q.size());
8425 Lock<Mutex> lock(_lock);
8426 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
8430 if (_client.isValid())
8432 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8436 catch (AMPSException&)
8438 catch (AMPSException& e)
8441 current_.invalidate();
8442 _previousTopic.
clear();
8443 _previousBookmark.
clear();
8446 _previousTopic.
clear();
8447 _previousBookmark.
clear();
8450 long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8451 Timer timer((
double)_timeout);
8453 while (_q.empty() && _state & Running)
8456 _lock.wait(minWaitTime);
8458 Unlock<Mutex> unlck(_lock);
8459 amps_invoke_waiting_function();
8464 if (timer.checkAndGetRemaining(&minWaitTime))
8470 minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8473 if (current_.isValid() && _cache.size() < _cacheMax)
8476 _cache.push_back(current_);
8480 current_ = _q.front();
8481 if (_q.size() == _maxDepth)
8486 if (_state == Conflate)
8488 std::string sowKey = current_.
getSowKey();
8489 if (sowKey.length())
8491 _sowKeyMap.erase(sowKey);
8494 else if (_state == AcksOnly)
8498 if ((_state == AcksOnly && _requestedAcks == 0) ||
8499 (_state == SOWOnly && current_.
getCommand() ==
"group_end"))
8503 else if (current_.
getCommandEnum() == Message::Command::Publish &&
8513 if (_state == Disconnected)
8515 throw DisconnectedException(
"Connection closed.");
8517 current_.invalidate();
8518 if (_state == Closed)
8522 return _timeout != 0;
8526 if (_client.isValid())
8528 if (_state == SOWOnly || _state == Subscribe)
8530 if (!_commandId.empty())
8534 if (!_subId.empty())
8538 if (!_queryId.empty())
8545 if (!_commandId.empty())
8549 if (!_subId.empty())
8553 if (!_queryId.empty())
8559 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8564 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8566 Lock<Mutex> lock(this_->_lock);
8567 if (this_->_state != Conflate)
8569 AMPS_TESTING_SLOW_MESSAGE_STREAM
8570 if (this_->_q.size() >= this_->_maxDepth)
8575 this_->_lock.signalAll();
8576 throw MessageStreamFullException(
"Stream is currently full.");
8578 if (!this_->_cache.empty())
8580 this_->_cache.front().deepCopy(message_);
8581 this_->_q.push_back(this_->_cache.front());
8582 this_->_cache.pop_front();
8586 #ifdef AMPS_USE_EMPLACE 8587 this_->_q.emplace_back(message_.
deepCopy());
8589 this_->_q.push_back(message_.
deepCopy());
8593 this_->_client.isValid() && this_->_client.getAutoAck() &&
8597 message_.setIgnoreAutoAck();
8602 std::string sowKey = message_.
getSowKey();
8603 if (sowKey.length())
8605 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8606 if (it != this_->_sowKeyMap.end())
8608 it->second->deepCopy(message_);
8612 if (this_->_q.size() >= this_->_maxDepth)
8618 this_->_lock.signalAll();
8619 throw MessageStreamFullException(
"Stream is currently full.");
8621 if (!this_->_cache.empty())
8623 this_->_cache.front().deepCopy(message_);
8624 this_->_q.push_back(this_->_cache.front());
8625 this_->_cache.pop_front();
8629 #ifdef AMPS_USE_EMPLACE 8630 this_->_q.emplace_back(message_.
deepCopy());
8632 this_->_q.push_back(message_.
deepCopy());
8635 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8640 if (this_->_q.size() >= this_->_maxDepth)
8645 this_->_lock.signalAll();
8646 throw MessageStreamFullException(
"Stream is currently full.");
8648 if (!this_->_cache.empty())
8650 this_->_cache.front().deepCopy(message_);
8651 this_->_q.push_back(this_->_cache.front());
8652 this_->_cache.pop_front();
8656 #ifdef AMPS_USE_EMPLACE 8657 this_->_q.emplace_back(message_.
deepCopy());
8659 this_->_q.push_back(message_.
deepCopy());
8663 this_->_client.isValid() && this_->_client.getAutoAck() &&
8667 message_.setIgnoreAutoAck();
8671 this_->_lock.signalAll();
8674 inline MessageStream::MessageStream(
void)
8677 inline MessageStream::MessageStream(
const Client& client_)
8678 : _body(
new MessageStreamImpl(client_))
8681 inline void MessageStream::iterator::advance(
void)
8683 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8687 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8692 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8694 result._body = (MessageStreamImpl*)(handler_._userData);
8699 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8700 const std::string& queryId_)
8702 _body->setSOWOnly(commandId_, queryId_);
8704 inline void MessageStream::setSubscription(
const std::string& subId_,
8705 const std::string& commandId_,
8706 const std::string& queryId_)
8708 _body->setSubscription(subId_, commandId_, queryId_);
8710 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8711 const std::string& queryId_)
8713 _body->setStatsOnly(commandId_, queryId_);
8715 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8718 _body->setAcksOnly(commandId_, acks_);
8737 return _body->getMaxDepth();
8741 return _body->getDepth();
8744 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8746 return *(_pEmptyMessageStream.get());
8754 ClientImpl& body = _body.get();
8755 Message& message = command_.getMessage();
8759 if (useExistingHandler)
8765 if (body._routes.getRoute(subId, existingHandler))
8768 body.executeAsync(command_, existingHandler,
false);
8769 return MessageStream::fromExistingHandler(existingHandler);
8778 if ((command & Message::Command::NoDataCommands)
8779 && (ackTypes == Message::AckType::Persisted
8780 || ackTypes == Message::AckType::None))
8783 if (!body._pEmptyMessageStream)
8785 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8786 body._pEmptyMessageStream.get()->_body->close();
8788 return body.getEmptyMessageStream();
8791 if (body.getDefaultMaxDepth())
8793 stream.
maxDepth(body.getDefaultMaxDepth());
8796 std::string commandID = body.executeAsync(command_, handler,
false);
8797 if (command_.hasStatsAck())
8799 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8801 else if (command_.isSow())
8805 stream.setAcksOnly(commandID,
8810 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8813 else if (command_.isSubscribe())
8815 stream.setSubscription(commandID,
8822 if (command == Message::Command::Publish ||
8823 command == Message::Command::DeltaPublish ||
8824 command == Message::Command::SOWDelete)
8826 stream.setAcksOnly(commandID,
8827 ackTypes & (
unsigned)~Message::AckType::Persisted);
8831 stream.setAcksOnly(commandID, ackTypes);
8838 inline void Message::ack(
const char* options_)
const 8840 ClientImpl* pClient = _body.get().clientImpl();
8842 if (pClient && bookmark.
len() &&
8843 !pClient->getAutoAck())
8846 pClient->ack(getTopic(), bookmark, options_);
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:744
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:695
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5085
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6763
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6737
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:942
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5314
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8110
void startTimer()
Definition: ampsplusplus.hpp:6726
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6268
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1086
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8725
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5342
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:556
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:755
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7017
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:920
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7385
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5146
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5202
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6057
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:785
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1058
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:669
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7394
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7253
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6008
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5438
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1335
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5703
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5217
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:721
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5454
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7108
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7337
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5188
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6790
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:564
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1306
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8147
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7405
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5590
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4989
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:656
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7036
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:898
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7046
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5273
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5980
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1149
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7367
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8735
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5248
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5303
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1450
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6462
Success.
Definition: amps.h:209
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1032
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5680
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8192
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6104
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:199
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5286
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4982
amps_result
Return values from amps_xxx functions.
Definition: amps.h:204
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5491
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8749
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:630
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5446
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7419
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:947
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5177
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1494
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:825
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5752
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1239
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:688
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5138
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1094
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6416
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5068
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6992
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:714
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7027
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7100
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:579
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:847
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7115
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5539
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:662
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5402
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5815
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:571
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6903
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1049
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1490
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6825
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:793
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1260
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1044
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5920
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7056
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:588
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:806
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5771
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1026
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5157
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8120
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7265
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1039
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1080
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6306
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1180
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8129
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1320
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5394
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1277
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7326
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1370
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6701
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1478
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5386
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:701
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6896
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:596
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5414
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6959
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8140
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7316
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5657
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6393
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:878
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4944
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6639
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1269
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:833
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5636
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7145
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:621
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6598
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5790
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6069
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7308
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5938
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1228
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6562
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:862
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6883
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1352
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5469
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:734
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5364
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8184
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5373
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7241
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1428
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8196
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:682
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5356
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1212
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8203
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:203
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6495
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6953
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5873
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4936
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1344
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:799
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7078
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:727
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:812
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7089
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:656
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1298
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:675
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8730
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5255
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6142
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8739
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:233
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5164
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6929
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:853
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5224
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:884
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1248
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7067
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:708
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6870
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8157
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5483
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6517
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7349
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:638
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7298
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5514
Definition: ampsplusplus.hpp:102
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5123
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:995
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1290
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1400
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1156
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:778
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6163
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5563
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6966
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5729
The client and server are disconnected.
Definition: amps.h:237
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6846
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5902
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8720
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6204
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5130
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:468
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6354
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5841
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6029
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:608
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7179
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:766
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6678
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5000
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7376
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7289
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6236