25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 28 #include "amps/ampsver.h" 46 #include <sys/atomic.h> 48 #include "amps/BookmarkStore.hpp" 49 #include "amps/MessageRouter.hpp" 50 #include "amps/util.hpp" 51 #include "amps/ampscrc.hpp" 52 #if __cplusplus >= 201100L || _MSC_VER >= 1900 56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 57 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 91 #define AMPS_DEFAULT_TOP_N -1 92 #define AMPS_DEFAULT_BATCH_SIZE 10 93 #define AMPS_NUMBER_BUFFER_LEN 20 94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 105 typedef std::map<std::string, std::string> ConnectionInfo;
108 inline std::string asString(Type x_)
110 std::ostringstream os;
116 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
118 size_t pos = AMPS_NUMBER_BUFFER_LEN;
119 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
123 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
132 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
134 size_t pos = AMPS_NUMBER_BUFFER_LEN;
135 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
139 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
153 static const char* duplicate()
157 static const char* badFilter()
161 static const char* badRegexTopic()
163 return "bad regex topic";
165 static const char* subscriptionAlreadyExists()
167 return "subscription already exists";
169 static const char* nameInUse()
171 return "name in use";
173 static const char* authFailure()
175 return "auth failure";
177 static const char* notEntitled()
179 return "not entitled";
181 static const char* authDisabled()
183 return "authentication disabled";
185 static const char* subidInUse()
187 return "subid in use";
189 static const char* noTopic()
207 virtual void exceptionThrown(
const std::exception&)
const {;}
213 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 218 catch (std::exception& ex_)\ 222 _exceptionListener->exceptionThrown(ex_);\ 246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 249 while(me->_connected)\ 256 catch(MessageStreamFullException&)\ 260 me->checkAndSendHeartbeat(false);\ 262 catch (std::exception& ex_)\ 266 me->_exceptionListener->exceptionThrown(ex_);\ 277 catch (std::exception& ex_)\ 281 me->_exceptionListener->exceptionThrown(ex_);\ 305 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 306 while(me->_connected)\ 313 catch(MessageStreamFullException&)\ 317 me->checkAndSendHeartbeat(false);\ 319 catch (std::exception& ex_)\ 323 me->_exceptionListener->exceptionThrown(ex_);\ 334 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 337 while(me->_connected)\ 344 catch(MessageStreamFullException& ex_)\ 348 me->checkAndSendHeartbeat(false);\ 350 catch (std::exception& ex_)\ 354 me->_exceptionListener->exceptionThrown(ex_);\ 365 catch (std::exception& ex_)\ 369 me->_exceptionListener->exceptionThrown(ex_);\ 393 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 394 while(me->_connected)\ 401 catch(MessageStreamFullException& ex_)\ 405 me->checkAndSendHeartbeat(false);\ 407 catch (std::exception& ex_)\ 411 me->_exceptionListener->exceptionThrown(ex_);\ 423 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 426 _exceptionListener->exceptionThrown(ex);\ 433 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 436 me->_exceptionListener->exceptionThrown(ex);\ 477 static const unsigned Subscribe = 1;
478 static const unsigned SOW = 2;
479 static const unsigned NeedsSequenceNumber = 4;
480 static const unsigned ProcessedAck = 8;
481 static const unsigned StatsAck = 16;
482 void init(Message::Command::Type command_)
491 void init(
const std::string& command_)
500 void init(
const char* command_,
size_t commandLen_)
512 if (!(command & Message::Command::NoDataCommands))
515 if (command == Message::Command::Subscribe ||
516 command == Message::Command::SOWAndSubscribe ||
517 command == Message::Command::DeltaSubscribe ||
518 command == Message::Command::SOWAndDeltaSubscribe)
523 if (command == Message::Command::SOW
524 || command == Message::Command::SOWAndSubscribe
525 || command == Message::Command::SOWAndDeltaSubscribe)
530 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
532 if (command == Message::Command::SOW)
537 _flags |= ProcessedAck;
539 else if (command == Message::Command::SOWDelete)
542 _flags |= ProcessedAck;
543 _flags |= NeedsSequenceNumber;
545 else if (command == Message::Command::Publish
546 || command == Message::Command::DeltaPublish)
548 _flags |= NeedsSequenceNumber;
550 else if (command == Message::Command::StopTimer)
567 Command(
const char* command_,
size_t commandLen_)
569 init(command_, commandLen_);
593 init(command_, commandLen_);
687 _message.
setTopic(topic_, topicLen_);
817 std::ostringstream os;
822 amps_uint64_t getSequence()
const 838 _message.
setData(data_, dataLen_);
868 _batchSize = batchSize_;
890 if (ackType_ ==
"processed")
892 _flags |= ProcessedAck;
894 else if (ackType_ ==
"stats")
904 if (ackType_.find(
"processed") != std::string::npos)
906 _flags |= ProcessedAck;
910 _flags &= ~ProcessedAck;
912 if (ackType_.find(
"stats") != std::string::npos)
926 if (ackType_ & Message::AckType::Processed)
928 _flags |= ProcessedAck;
932 _flags &= ~ProcessedAck;
934 if (ackType_ & Message::AckType::Stats)
959 unsigned getTimeout(
void)
const 963 unsigned getBatchSize(
void)
const 967 bool isSubscribe(
void)
const 969 return _flags & Subscribe;
971 bool isSow(
void)
const 973 return (_flags & SOW) != 0;
975 bool hasProcessedAck(
void)
const 977 return (_flags & ProcessedAck) != 0;
979 bool hasStatsAck(
void)
const 981 return (_flags & StatsAck) != 0;
983 bool needsSequenceNumber(
void)
const 985 return (_flags & NeedsSequenceNumber) != 0;
991 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
1008 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
1016 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
1023 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
1035 std::string
authenticate(
const std::string& ,
const std::string& password_)
1042 std::string
retry(
const std::string& ,
const std::string& )
1044 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
1047 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
1068 virtual void execute(
Message& message_) = 0;
1083 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1098 : _resizeHandler(NULL)
1099 , _resizeHandlerData(NULL)
1100 , _errorOnPublishGap(errorOnPublishGap_)
1107 virtual amps_uint64_t store(
const Message& message_) = 0;
1115 virtual void discardUpTo(amps_uint64_t index_) = 0;
1130 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1136 virtual size_t unpersistedCount()
const = 0;
1148 virtual void flush(
long timeout_) = 0;
1154 return AMPS_UNSET_INDEX;
1161 return AMPS_UNSET_SEQUENCE;
1167 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
1172 virtual amps_uint64_t getLastPersisted() = 0;
1186 _resizeHandler = handler_;
1187 _resizeHandlerData = userData_;
1192 return _resizeHandler;
1195 bool callResizeHandler(
size_t newSize_);
1197 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
1199 _errorOnPublishGap = errorOnPublishGap_;
1202 inline virtual bool getErrorOnPublishGap()
const 1204 return _errorOnPublishGap;
1209 void* _resizeHandlerData;
1210 bool _errorOnPublishGap;
1217 RefHandle<StoreImpl> _body;
1221 Store(
const Store& rhs) : _body(rhs._body) {;}
1233 return _body.get().store(message_);
1244 _body.get().discardUpTo(index_);
1253 _body.get().replay(replayer_);
1265 return _body.get().replaySingle(replayer_, index_);
1274 return _body.get().unpersistedCount();
1282 return _body.isValid();
1295 return _body.get().flush(timeout_);
1303 return _body.get().getLowestUnpersisted();
1311 return _body.get().getLastPersisted();
1326 _body.get().setResizeHandler(handler_, userData_);
1331 return _body.get().getResizeHandler();
1340 _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1349 return _body.get().getErrorOnPublishGap();
1357 if (_body.isValid())
1359 return &_body.get();
1383 virtual void failedWrite(
const Message& message_,
1384 const char* reason_,
size_t reasonLength_) = 0;
1388 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1392 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1406 long* timeoutp = (
long*)data_;
1414 store_.
flush(*timeoutp);
1417 catch (
const TimedOutException&)
1419 catch (
const TimedOutException& e)
1446 unsigned requestedAckTypes_,
1447 const AMPSException& exception_) = 0;
1465 unsigned requestedAckTypes_) = 0;
1472 virtual void clear() = 0;
1476 virtual void resubscribe(Client& client_) = 0;
1483 _failedResubscribeHandler = handler_;
1486 std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1497 typedef enum { Disconnected = 0,
1501 PublishReplayed = 8,
1502 HeartbeatInitiated = 16,
1516 virtual void connectionStateChanged(
State newState_) = 0;
1521 class MessageStreamImpl;
1524 typedef void(*DeferredExecutionFunc)(
void*);
1526 class ClientImpl :
public RefBody
1532 AMPS_SOCKET _socket;
1538 socklen_t _valueLen;
1542 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(
sizeof(
int))
1544 _valuePtr = (
char*)&_noDelay;
1546 if (_socket != AMPS_INVALID_SOCKET)
1548 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1552 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1556 _socket = AMPS_INVALID_SOCKET;
1563 if (_socket != AMPS_INVALID_SOCKET)
1566 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1571 friend class Client;
1574 DisconnectHandler _disconnectHandler;
1575 enum GlobalCommandTypeHandlers :
size_t 1585 DuplicateMessage = 8,
1588 std::vector<MessageHandler> _globalCommandTypeHandlers;
1589 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1591 MessageRouter::RouteCache _routeCache;
1592 mutable Mutex _lock;
1593 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1594 amps_uint64_t _nameHashValue;
1596 Store _publishStore;
1597 bool _isRetryOnDisconnect;
1598 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1599 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1600 std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1602 volatile amps_uint64_t _lastSentHaSequenceNumber;
1604 AMPS_ATOMIC_TYPE_8 _logonInProgress;
1605 AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1606 VersionInfo _serverVersion;
1607 Timer _heartbeatTimer;
1608 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1611 int _queueAckTimeout;
1612 bool _isAutoAckEnabled;
1613 unsigned _ackBatchSize;
1614 unsigned _queuedAckCount;
1615 unsigned _defaultMaxDepth;
1616 struct QueueBookmarks
1618 QueueBookmarks(
const std::string& topic_)
1625 amps_uint64_t _oldestTime;
1626 unsigned _bookmarkCount;
1628 typedef amps_uint64_t topic_hash;
1629 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1630 TopicHashMap _topicHashMap;
1634 ClientImpl* _client;
1639 ClientStoreReplayer()
1640 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1643 ClientStoreReplayer(ClientImpl* client_)
1644 : _client(client_), _version(0), _res(
AMPS_E_OK)
1647 void setClient(ClientImpl* client_)
1652 void execute(
Message& message_)
1656 throw CommandException(
"Can't replay without a client.");
1660 if (index > _client->_lastSentHaSequenceNumber)
1662 _client->_lastSentHaSequenceNumber = index;
1670 (!_client->_logonInProgress ||
1674 message_.getMessage(),
1678 throw DisconnectedException(
"AMPS Server disconnected during replay");
1684 ClientStoreReplayer _replayer;
1688 ClientImpl* _parent;
1689 const char* _reason;
1690 size_t _reasonLength;
1691 size_t _replayCount;
1693 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1696 _reasonLength(reasonLength_),
1699 void execute(
Message& message_)
1701 if (_parent->_failedWriteHandler)
1704 _parent->_failedWriteHandler->failedWrite(message_,
1705 _reason, _reasonLength);
1708 size_t replayCount(
void)
const 1710 return _replayCount;
1714 struct AckResponseImpl :
public RefBody
1716 std::string username, password, reason, status, bookmark, options;
1717 amps_uint64_t sequenceNo;
1718 amps_uint64_t nameHashValue;
1719 VersionInfo serverVersion;
1720 #if __cplusplus >= 201100L || _MSC_VER >= 1900 1721 std::atomic<bool> responded;
1722 std::atomic<bool> abandoned;
1724 volatile bool responded;
1725 volatile bool abandoned;
1727 unsigned connectionVersion;
1730 username(), password(), reason(), status(), bookmark(), options(),
1731 sequenceNo((amps_uint64_t)0),
1735 connectionVersion(UINT_MAX)
1742 RefHandle<AckResponseImpl> _body;
1744 AckResponse() : _body(NULL) {;}
1745 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1746 static AckResponse create()
1749 r._body =
new AckResponseImpl();
1753 const std::string& username()
1755 return _body.get().username;
1757 void setUsername(
const char* data_,
size_t len_)
1761 _body.get().username.assign(data_, len_);
1765 _body.get().username.clear();
1768 const std::string& password()
1770 return _body.get().password;
1772 void setPassword(
const char* data_,
size_t len_)
1776 _body.get().password.assign(data_, len_);
1780 _body.get().password.clear();
1783 const std::string& reason()
1785 return _body.get().reason;
1787 void setReason(
const char* data_,
size_t len_)
1791 _body.get().reason.assign(data_, len_);
1795 _body.get().reason.clear();
1798 const std::string& status()
1800 return _body.get().status;
1802 void setStatus(
const char* data_,
size_t len_)
1806 _body.get().status.assign(data_, len_);
1810 _body.get().status.clear();
1813 const std::string& bookmark()
1815 return _body.get().bookmark;
1817 void setBookmark(
const Field& bookmark_)
1819 if (!bookmark_.
empty())
1821 _body.get().bookmark.assign(bookmark_.
data(), bookmark_.
len());
1822 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1823 _body.get().sequenceNo);
1827 _body.get().bookmark.clear();
1828 _body.get().sequenceNo = (amps_uint64_t)0;
1829 _body.get().nameHashValue = (amps_uint64_t)0;
1832 amps_uint64_t sequenceNo()
const 1834 return _body.get().sequenceNo;
1836 amps_uint64_t nameHashValue()
const 1838 return _body.get().nameHashValue;
1840 void setSequenceNo(
const char* data_,
size_t len_)
1842 amps_uint64_t result = (amps_uint64_t)0;
1845 for (
size_t i = 0; i < len_; ++i)
1847 result *= (amps_uint64_t)10;
1848 result += (amps_uint64_t)(data_[i] -
'0');
1851 _body.get().sequenceNo = result;
1853 VersionInfo serverVersion()
const 1855 return _body.get().serverVersion;
1857 void setServerVersion(
const char* data_,
size_t len_)
1861 _body.get().serverVersion.setVersion(std::string(data_, len_));
1866 return _body.get().responded;
1870 _body.get().responded =
true;
1874 return _body.get().abandoned;
1878 if (_body.isValid())
1880 _body.get().abandoned =
true;
1884 void setConnectionVersion(
unsigned connectionVersion)
1886 _body.get().connectionVersion = connectionVersion;
1889 unsigned getConnectionVersion()
1891 return _body.get().connectionVersion;
1893 void setOptions(
const char* data_,
size_t len_)
1897 _body.get().options.assign(data_, len_);
1901 _body.get().options.clear();
1905 const std::string& options()
1907 return _body.get().options;
1910 AckResponse& operator=(
const AckResponse& rhs)
1918 typedef std::map<std::string, AckResponse> AckMap;
1921 DefaultExceptionListener _defaultExceptionListener;
1924 struct DeferredExecutionRequest
1926 DeferredExecutionRequest(DeferredExecutionFunc func_,
1929 _userData(userData_)
1932 DeferredExecutionFunc _func;
1936 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1937 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1938 volatile bool _connected;
1939 std::string _username;
1940 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1941 ConnectionStateListeners _connectionStateListeners;
1942 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1943 Mutex _deferredExecutionLock;
1944 DeferredExecutionList _deferredExecutionList;
1945 unsigned _heartbeatInterval;
1946 unsigned _readTimeout;
1954 if (!_connected && newState_ > ConnectionStateListener::Connected)
1958 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1960 AMPS_CALL_EXCEPTION_WRAPPER(
1961 (*it)->connectionStateChanged(newState_));
1964 unsigned processedAck(
Message& message);
1965 unsigned persistedAck(
Message& meesage);
1966 void lastChance(
Message& message);
1967 void checkAndSendHeartbeat(
bool force =
false);
1968 virtual ConnectionInfo getConnectionInfo()
const;
1970 ClientImplMessageHandler(
amps_handle message,
void* userData);
1972 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1974 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1976 void unsubscribeInternal(
const std::string&
id)
1984 subId.assign(
id.data(),
id.length());
1985 _routes.removeRoute(subId);
1987 if (_subscriptionManager)
1990 Unlock<Mutex> unlock(_lock);
1991 _subscriptionManager->unsubscribe(subId);
1997 _sendWithoutRetry(_message);
1998 deferredExecution(&s_noOpFn, NULL);
2001 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
2002 bool isHASubscribe_)
2004 return syncAckProcessing(timeout_, message_,
2005 (amps_uint64_t)0, isHASubscribe_);
2008 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
2009 amps_uint64_t haSeq = (amps_uint64_t)0,
2010 bool isHASubscribe_ =
false)
2013 AckResponse ack = AckResponse::create();
2016 Lock<Mutex> guard(_ackMapLock);
2019 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
2020 if (ack.getConnectionVersion() == 0)
2023 throw DisconnectedException(
"Connection closed while waiting for response.");
2025 bool timedOut =
false;
2026 AMPS_START_TIMER(timeout_)
2027 while (!timedOut && !ack.responded() && !ack.abandoned())
2031 timedOut = !_lock.wait(timeout_);
2035 AMPS_RESET_TIMER(timedOut, timeout_);
2042 Unlock<Mutex> unlck(_lock);
2043 amps_invoke_waiting_function();
2046 if (ack.responded())
2048 if (ack.status() !=
"failure")
2052 amps_uint64_t ackSequence = ack.sequenceNo();
2053 if (_lastSentHaSequenceNumber < ackSequence)
2055 _lastSentHaSequenceNumber = ackSequence;
2068 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
2069 _nameHashValue = ack.nameHashValue();
2070 _serverVersion = ack.serverVersion();
2071 if (_bookmarkStore.isValid())
2078 const std::string& options = ack.options();
2079 size_t index = options.find_first_of(
"max_backlog=");
2080 if (index != std::string::npos)
2083 const char* c = options.c_str() + index + 12;
2084 while (*c && *c !=
',')
2086 data = (data * 10) + (
unsigned)(*c++ -48);
2088 if (_ackBatchSize > data)
2090 _ackBatchSize = data;
2096 const size_t NotEntitled = 12;
2097 std::string ackReason = ack.reason();
2098 if (ackReason.length() == 0)
2102 if (ackReason.length() == NotEntitled &&
2103 ackReason[0] ==
'n' &&
2108 message_.throwFor(_client, ackReason);
2112 if (!ack.abandoned())
2114 throw TimedOutException(
"timed out waiting for operation.");
2118 throw DisconnectedException(
"Connection closed while waiting for response.");
2132 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2133 _pEmptyMessageStream.reset(NULL);
2140 ClientImpl(
const std::string& clientName)
2141 : _client(NULL), _name(clientName)
2142 , _isRetryOnDisconnect(
true)
2143 , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2144 , _badTimeToHASubscribe(0), _serverVersion()
2145 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2146 , _isAutoAckEnabled(
false)
2148 , _queuedAckCount(0)
2149 , _defaultMaxDepth(0)
2151 , _heartbeatInterval(0)
2154 _replayer.setClient(
this);
2157 (amps_handler)ClientImpl::ClientImplMessageHandler,
2160 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2163 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2165 _exceptionListener = &_defaultExceptionListener;
2166 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2168 #ifdef AMPS_USE_EMPLACE 2176 virtual ~ClientImpl()
2181 const std::string& getName()
const 2186 const std::string& getNameHash()
const 2191 const amps_uint64_t getNameHashValue()
const 2193 return _nameHashValue;
2196 void setName(
const std::string& name)
2203 AMPSException::throwFor(_client, result);
2208 const std::string& getLogonCorrelationData()
const 2210 return _logonCorrelationData;
2213 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2215 _logonCorrelationData = logonCorrelationData_;
2218 size_t getServerVersion()
const 2220 return _serverVersion.getOldStyleVersion();
2223 VersionInfo getServerVersionInfo()
const 2225 return _serverVersion;
2228 const std::string& getURI()
const 2233 virtual void connect(
const std::string& uri)
2235 Lock<Mutex> l(_lock);
2239 virtual void _connect(
const std::string& uri)
2245 AMPSException::throwFor(_client, result);
2252 _readMessage.setClientImpl(
this);
2253 if (_queueAckTimeout)
2258 AMPSException::throwFor(_client, result);
2262 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2265 void setDisconnected()
2268 Lock<Mutex> l(_lock);
2271 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2274 _heartbeatTimer.setTimeout(0.0);
2277 clearAcks(UINT_MAX-1);
2283 virtual void disconnect()
2285 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2288 clearAcks(UINT_MAX);
2289 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2290 Lock<Mutex> l(_lock);
2291 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2294 void clearAcks(
unsigned failedVersion)
2297 Lock<Mutex> guard(_ackMapLock);
2300 std::vector<std::string> worklist;
2301 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2303 if (i->second.getConnectionVersion() <= failedVersion)
2305 i->second.setAbandoned();
2306 worklist.push_back(i->first);
2310 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2319 int send(
const Message& message)
2321 Lock<Mutex> l(_lock);
2322 return _send(message);
2325 void sendWithoutRetry(
const Message& message_)
2327 Lock<Mutex> l(_lock);
2330 if (_logonInProgress)
2332 throw DisconnectedException(
"The client has been disconnected.");
2334 _sendWithoutRetry(message_);
2337 void _sendWithoutRetry(
const Message& message_)
2342 AMPSException::throwFor(_client, result);
2346 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2347 bool isHASubscribe_ =
false)
2354 Message localMessage = message;
2355 unsigned version = 0;
2359 if (haSeq && _logonInProgress)
2363 if (!_isRetryOnDisconnect)
2367 if (!_lock.wait(1000))
2369 amps_invoke_waiting_function();
2374 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2375 (isHASubscribe_ && _badTimeToHASubscribe))
2377 return (
int)version;
2381 if (haSeq > _lastSentHaSequenceNumber)
2383 while (haSeq > _lastSentHaSequenceNumber + 1)
2389 _lastSentHaSequenceNumber + 1))
2395 version = _replayer._version;
2398 catch (
const DisconnectedException&)
2400 catch (
const DisconnectedException& e)
2403 result = _replayer._res;
2408 localMessage.getMessage(),
2410 ++_lastSentHaSequenceNumber;
2414 if (_logonInProgress && localMessage.
getCommand().
data()[0] !=
'l')
2416 while (_logonInProgress)
2418 if (!_lock.wait(1000))
2420 amps_invoke_waiting_function();
2425 localMessage.getMessage(),
2430 if (!isHASubscribe_ && !haSeq &&
2431 localMessage.getMessage() == message.getMessage())
2435 if (_isRetryOnDisconnect)
2437 Unlock<Mutex> u(_lock);
2442 if ((isHASubscribe_ || haSeq) &&
2445 return (
int)version;
2452 AMPSException::throwFor(_client, result);
2458 amps_invoke_waiting_function();
2464 AMPSException::throwFor(_client, result);
2466 return (
int)version;
2469 void addMessageHandler(
const Field& commandId_,
2471 unsigned requestedAcks_, Message::Command::Type commandType_)
2473 Lock<Mutex> lock(_lock);
2474 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2478 bool removeMessageHandler(
const Field& commandId_)
2480 Lock<Mutex> lock(_lock);
2481 return _routes.removeRoute(commandId_);
2489 bool isSubscribeOnly =
false;
2490 bool replace =
false;
2492 unsigned systemAddedAcks = Message::AckType::None;
2495 switch (commandType)
2497 case Message::Command::Subscribe:
2498 case Message::Command::DeltaSubscribe:
2499 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2500 isSubscribeOnly =
true;
2502 case Message::Command::SOWAndSubscribe:
2503 case Message::Command::SOWAndDeltaSubscribe:
2510 while (!replace &&
id != subId && _routes.hasRoute(
id))
2522 systemAddedAcks |= Message::AckType::Persisted;
2525 case Message::Command::SOW:
2532 while (!replace &&
id != subId && _routes.hasRoute(
id))
2543 if (!isSubscribeOnly)
2552 while (!replace && qid != subId && qid !=
id 2553 && _routes.hasRoute(qid))
2559 systemAddedAcks |= Message::AckType::Processed;
2562 int routesAdded = 0;
2563 Lock<Mutex> l(_lock);
2564 if (!subId.
empty() && messageHandler_.isValid())
2566 if (!_routes.hasRoute(subId))
2572 _routes.addRoute(subId, messageHandler_, requestedAcks,
2573 systemAddedAcks, commandType);
2575 if (!isSubscribeOnly && !qid.
empty()
2576 && messageHandler_.isValid() && qid != subId)
2578 if (routesAdded == 0)
2580 _routes.addRoute(qid, messageHandler_,
2581 requestedAcks, systemAddedAcks, commandType);
2587 Unlock<Mutex> u(_lock);
2588 data = amps_invoke_copy_route_function(
2589 messageHandler_.userData());
2593 _routes.addRoute(qid, messageHandler_, requestedAcks,
2594 systemAddedAcks, commandType);
2598 _routes.addRoute(qid,
2601 requestedAcks, systemAddedAcks, commandType);
2606 if (!
id.empty() && messageHandler_.isValid()
2607 && requestedAcks & ~
Message::AckType::Persisted
2608 &&
id != subId &&
id != qid)
2610 if (routesAdded == 0)
2612 _routes.addRoute(
id, messageHandler_, requestedAcks,
2613 systemAddedAcks, commandType);
2619 Unlock<Mutex> u(_lock);
2620 data = amps_invoke_copy_route_function(
2621 messageHandler_.userData());
2625 _routes.addRoute(
id, messageHandler_, requestedAcks,
2626 systemAddedAcks, commandType);
2630 _routes.addRoute(
id,
2634 systemAddedAcks, commandType);
2643 syncAckProcessing(timeout_, message_, 0,
false);
2650 _routes.removeRoute(
id);
2657 case Message::Command::Unsubscribe:
2658 case Message::Command::Heartbeat:
2659 case Message::Command::Logon:
2660 case Message::Command::StartTimer:
2661 case Message::Command::StopTimer:
2662 case Message::Command::SOWDelete:
2664 Lock<Mutex> l(_lock);
2673 if (messageHandler_.isValid())
2675 _routes.addRoute(
id, messageHandler_, requestedAcks,
2676 Message::AckType::None, commandType);
2682 case Message::Command::DeltaPublish:
2683 case Message::Command::Publish:
2686 Lock<Mutex> l(_lock);
2689 if (ackType != Message::AckType::None
2697 if (messageHandler_.isValid())
2699 _routes.addRoute(
id, messageHandler_, requestedAcks,
2700 Message::AckType::None, commandType);
2706 syncAckProcessing(timeout_, message_, 0,
false);
2715 case Message::Command::GroupBegin:
2716 case Message::Command::GroupEnd:
2717 case Message::Command::OOF:
2718 case Message::Command::Ack:
2719 case Message::Command::Unknown:
2721 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2727 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2729 Lock<Mutex> l(_lock);
2730 _disconnectHandler = disconnectHandler;
2733 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2735 switch (command_[0])
2737 #if 0 // Not currently implemented to avoid an extra branch in delivery 2739 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2742 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2746 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2748 #if 0 // Not currently implemented to avoid an extra branch in delivery 2750 if (command_[6] ==
'b')
2752 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2754 else if (command_[6] ==
'e')
2756 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2760 std::ostringstream os;
2761 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2762 throw CommandException(os.str());
2766 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2770 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2774 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2778 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2781 std::ostringstream os;
2782 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2783 throw CommandException(os.str());
2788 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2792 #if 0 // Not currently implemented to avoid an extra branch in delivery 2793 case Message::Command::Publish:
2794 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2796 case Message::Command::SOW:
2797 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2800 case Message::Command::Heartbeat:
2801 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2803 #if 0 // Not currently implemented to avoid an extra branch in delivery 2804 case Message::Command::GroupBegin:
2805 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2807 case Message::Command::GroupEnd:
2808 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2810 case Message::Command::OOF:
2811 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2814 case Message::Command::Ack:
2815 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2819 unsigned command = command_;
2826 AMPS_snprintf(errBuf,
sizeof(errBuf),
2827 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2828 CommandConstants<0>::Lengths[bits],
2829 CommandConstants<0>::Values[bits]);
2830 throw CommandException(errBuf);
2835 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2837 _globalCommandTypeHandlers[handlerType_] = handler_;
2842 Lock<Mutex> l(_lock);
2843 _failedWriteHandler.reset(handler_);
2846 void setPublishStore(
const Store& publishStore_)
2848 Lock<Mutex> l(_lock);
2851 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2853 _publishStore = publishStore_;
2858 Lock<Mutex> l(_lock);
2861 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2863 _bookmarkStore = bookmarkStore_;
2868 Lock<Mutex> l(_lock);
2869 _subscriptionManager.reset(subscriptionManager_);
2877 DisconnectHandler getDisconnectHandler()
const 2879 return _disconnectHandler;
2884 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2892 Store getPublishStore()
const 2894 return _publishStore;
2899 return _bookmarkStore;
2902 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2906 Lock<Mutex> l(_lock);
2908 _publishMessage.assignData(data_, dataLen_);
2909 _send(_publishMessage);
2914 publishStoreMessage.reset();
2916 return _publish(topic_, topicLen_, data_, dataLen_);
2920 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2921 size_t dataLen_,
unsigned long expiration_)
2925 Lock<Mutex> l(_lock);
2927 _publishMessage.assignData(data_, dataLen_);
2928 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2929 size_t pos = convertToCharArray(exprBuf, expiration_);
2930 _publishMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2931 _send(_publishMessage);
2937 publishStoreMessage.reset();
2938 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2939 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2942 AMPS_NUMBER_BUFFER_LEN - exprPos);
2943 return _publish(topic_, topicLen_, data_, dataLen_);
2950 ClientImpl* _pClient;
2952 #if __cplusplus >= 201100L || _MSC_VER >= 1900 2953 std::atomic<bool> _acked;
2954 std::atomic<bool> _disconnected;
2956 volatile bool _acked;
2957 volatile bool _disconnected;
2960 FlushAckHandler(ClientImpl* pClient_)
2961 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2963 pClient_->addConnectionStateListener(
this);
2967 _pClient->removeConnectionStateListener(
this);
2968 _pClient->removeMessageHandler(_cmdId);
2971 void setCommandId(
const Field& cmdId_)
2979 void connectionStateChanged(
State state_)
2981 if (state_ <= Shutdown)
2983 _disconnected =
true;
2992 return _acked || _disconnected;
2996 void publishFlush(
long timeout_,
unsigned ackType_)
2998 static const char* processed =
"processed";
2999 static const size_t processedLen = strlen(processed);
3000 static const char* persisted =
"persisted";
3001 static const size_t persistedLen = strlen(persisted);
3002 static const char* flush =
"flush";
3003 static const size_t flushLen = strlen(flush);
3004 static VersionInfo minPersisted(
"5.3.3.0");
3005 static VersionInfo minFlush(
"4");
3006 if (ackType_ != Message::AckType::Processed
3007 && ackType_ != Message::AckType::Persisted)
3009 throw CommandException(
"Flush can only be used with processed or persisted acks.");
3011 FlushAckHandler flushHandler(
this);
3012 if (_serverVersion >= minFlush)
3014 Lock<Mutex> l(_lock);
3017 throw DisconnectedException(
"Not connected trying to flush");
3022 if (_serverVersion < minPersisted
3023 || ackType_ == Message::AckType::Processed)
3033 std::bind(&FlushAckHandler::invoke,
3034 std::ref(flushHandler),
3035 std::placeholders::_1),
3037 NoDelay noDelay(_client);
3038 if (_send(_message) == -1)
3040 throw DisconnectedException(
"Disconnected trying to flush");
3047 _publishStore.
flush(timeout_);
3049 catch (
const AMPSException& ex)
3051 AMPS_UNHANDLED_EXCEPTION(ex);
3055 else if (_serverVersion < minFlush)
3059 AMPS_USLEEP(timeout_ * 1000);
3063 AMPS_USLEEP(1000 * 1000);
3069 Timer timer((
double)timeout_);
3071 while (!timer.check() && !flushHandler.done())
3074 amps_invoke_waiting_function();
3079 while (!flushHandler.done())
3082 amps_invoke_waiting_function();
3086 if (!flushHandler.done())
3088 throw TimedOutException(
"Timed out waiting for flush");
3091 if (!flushHandler.acked() && !_publishStore.
isValid())
3093 throw DisconnectedException(
"Disconnected waiting for flush");
3097 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3098 const char* data_,
size_t dataLength_)
3102 Lock<Mutex> l(_lock);
3104 _deltaMessage.assignData(data_, dataLength_);
3105 _send(_deltaMessage);
3110 publishStoreMessage.reset();
3111 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish);
3112 return _publish(topic_, topicLength_, data_, dataLength_);
3116 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
3117 const char* data_,
size_t dataLength_,
3118 unsigned long expiration_)
3122 Lock<Mutex> l(_lock);
3124 _deltaMessage.assignData(data_, dataLength_);
3125 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3126 size_t pos = convertToCharArray(exprBuf, expiration_);
3127 _deltaMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3128 _send(_deltaMessage);
3134 publishStoreMessage.reset();
3135 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3136 size_t exprPos = convertToCharArray(exprBuf, expiration_);
3137 publishStoreMessage.
setCommandEnum(Message::Command::DeltaPublish)
3139 AMPS_NUMBER_BUFFER_LEN - exprPos);
3140 return _publish(topic_, topicLength_, data_, dataLength_);
3144 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
3145 const char* data_,
size_t dataLength_)
3147 publishStoreMessage.
assignTopic(topic_, topicLength_)
3149 .assignData(data_, dataLength_);
3150 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3151 char buf[AMPS_NUMBER_BUFFER_LEN];
3152 size_t pos = convertToCharArray(buf, haSequenceNumber);
3153 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3155 Lock<Mutex> l(_lock);
3156 _send(publishStoreMessage, haSequenceNumber);
3158 return haSequenceNumber;
3161 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
3162 const char* options_ = NULL)
3164 Lock<Mutex> l(_lock);
3165 return _logon(timeout_, authenticator_, options_);
3168 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
3169 const char* options_ = NULL)
3176 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 3178 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3181 if (uri.user().size())
3185 if (uri.password().size())
3189 if (uri.protocol() ==
"amps" && uri.messageType().size())
3193 if (uri.isTrue(
"pretty"))
3199 if (!_logonCorrelationData.empty())
3210 AtomicFlagFlip pubFlip(&_logonInProgress);
3211 NoDelay noDelay(_client);
3215 AckResponse ack = syncAckProcessing(timeout_, _message);
3216 if (ack.status() ==
"retry")
3218 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
3219 _username = ack.username();
3224 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
3228 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3235 catch (
const AMPSException& ex)
3238 AMPS_UNHANDLED_EXCEPTION(ex);
3251 _publishStore.
replay(_replayer);
3252 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3254 catch (
const PublishStoreGapException& ex)
3257 AMPS_UNHANDLED_EXCEPTION(ex);
3260 catch (
const StoreException& ex)
3263 std::ostringstream os;
3264 os <<
"A local store exception occurred while logging on." 3266 throw ConnectionException(os.str());
3268 catch (
const AMPSException& ex)
3271 AMPS_UNHANDLED_EXCEPTION(ex);
3274 catch (
const std::exception& ex)
3277 AMPS_UNHANDLED_EXCEPTION(ex);
3287 return newCommandId;
3291 const std::string& topic_,
3293 const std::string& filter_,
3294 const std::string& bookmark_,
3295 const std::string& options_,
3296 const std::string& subId_,
3297 bool isHASubscribe_ =
true)
3299 isHASubscribe_ &= (bool)_subscriptionManager;
3300 Lock<Mutex> l(_lock);
3304 std::string subId(subId_);
3307 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3309 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3319 unsigned ackTypes = Message::AckType::Processed;
3321 if (!bookmark_.empty() && _bookmarkStore.isValid())
3323 ackTypes |= Message::AckType::Persisted;
3327 if (filter_.length())
3331 if (bookmark_.length())
3341 if (_bookmarkStore.isValid())
3346 _bookmarkStore.
log(_message);
3347 _bookmarkStore.
discard(_message);
3353 if (options_.length())
3362 Unlock<Mutex> u(_lock);
3363 _subscriptionManager->subscribe(messageHandler_, message,
3364 Message::AckType::None);
3365 if (_badTimeToHASubscribe)
3376 if (!options_.empty())
3382 syncAckProcessing(timeout_, message, isHASubscribe_);
3384 catch (
const DisconnectedException&)
3386 if (!isHASubscribe_)
3388 _routes.removeRoute(subIdField);
3393 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3397 catch (
const TimedOutException&)
3399 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3407 Unlock<Mutex> unlock(_lock);
3408 _subscriptionManager->unsubscribe(subIdField);
3410 _routes.removeRoute(subIdField);
3416 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3417 const std::string& topic_,
3419 const std::string& filter_,
3420 const std::string& bookmark_,
3421 const std::string& options_,
3422 const std::string& subId_ =
"",
3423 bool isHASubscribe_ =
true)
3425 isHASubscribe_ &= (bool)_subscriptionManager;
3426 Lock<Mutex> l(_lock);
3430 std::string subId(subId_);
3440 unsigned ackTypes = Message::AckType::Processed;
3442 if (!bookmark_.empty() && _bookmarkStore.isValid())
3444 ackTypes |= Message::AckType::Persisted;
3447 if (filter_.length())
3451 if (bookmark_.length())
3461 if (_bookmarkStore.isValid())
3466 _bookmarkStore.
log(_message);
3467 _bookmarkStore.
discard(_message);
3473 if (options_.length())
3481 Unlock<Mutex> u(_lock);
3482 _subscriptionManager->subscribe(messageHandler_, message,
3483 Message::AckType::None);
3484 if (_badTimeToHASubscribe)
3495 if (!options_.empty())
3501 syncAckProcessing(timeout_, message, isHASubscribe_);
3503 catch (
const DisconnectedException&)
3505 if (!isHASubscribe_)
3507 _routes.removeRoute(subIdField);
3511 catch (
const TimedOutException&)
3513 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3521 Unlock<Mutex> unlock(_lock);
3522 _subscriptionManager->unsubscribe(subIdField);
3524 _routes.removeRoute(subIdField);
3530 void unsubscribe(
const std::string&
id)
3532 Lock<Mutex> l(_lock);
3533 unsubscribeInternal(
id);
3536 void unsubscribe(
void)
3538 if (_subscriptionManager)
3540 _subscriptionManager->clear();
3543 _routes.unsubscribeAll();
3544 Lock<Mutex> l(_lock);
3549 _sendWithoutRetry(_message);
3551 deferredExecution(&s_noOpFn, NULL);
3555 const std::string& topic_,
3556 const std::string& filter_ =
"",
3557 const std::string& orderBy_ =
"",
3558 const std::string& bookmark_ =
"",
3559 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3560 int topN_ = AMPS_DEFAULT_TOP_N,
3561 const std::string& options_ =
"",
3562 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3564 Lock<Mutex> l(_lock);
3571 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3574 if (filter_.length())
3578 if (orderBy_.length())
3582 if (bookmark_.length())
3587 if (topN_ != AMPS_DEFAULT_TOP_N)
3591 if (options_.length())
3596 _routes.addRoute(_message.
getQueryID(), messageHandler_,
3601 syncAckProcessing(timeout_, _message);
3605 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3613 const std::string& topic_,
3615 const std::string& filter_ =
"",
3616 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3617 int topN_ = AMPS_DEFAULT_TOP_N)
3620 return sow(messageHandler_,
3631 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3632 const std::string& topic_,
3633 const std::string& filter_ =
"",
3634 const std::string& orderBy_ =
"",
3635 const std::string& bookmark_ =
"",
3636 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3637 int topN_ = AMPS_DEFAULT_TOP_N,
3638 const std::string& options_ =
"",
3639 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3640 bool isHASubscribe_ =
true)
3642 isHASubscribe_ &= (bool)_subscriptionManager;
3643 unsigned ackTypes = Message::AckType::Processed;
3644 Lock<Mutex> l(_lock);
3649 std::string subId = cid;
3651 if (filter_.length())
3655 if (orderBy_.length())
3659 if (bookmark_.length())
3663 if (_bookmarkStore.isValid())
3665 ackTypes |= Message::AckType::Persisted;
3673 _bookmarkStore.
log(_message);
3674 if (!BookmarkRange::isRange(bookmark))
3676 _bookmarkStore.
discard(_message);
3688 if (topN_ != AMPS_DEFAULT_TOP_N)
3692 if (options_.length())
3701 Unlock<Mutex> u(_lock);
3702 _subscriptionManager->subscribe(messageHandler_, message,
3703 Message::AckType::None);
3704 if (_badTimeToHASubscribe)
3709 _routes.addRoute(cid, messageHandler_,
3712 if (!options_.empty())
3718 syncAckProcessing(timeout_, message, isHASubscribe_);
3720 catch (
const DisconnectedException&)
3722 if (!isHASubscribe_)
3724 _routes.removeRoute(subId);
3728 catch (
const TimedOutException&)
3730 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3738 Unlock<Mutex> unlock(_lock);
3739 _subscriptionManager->unsubscribe(cid);
3741 _routes.removeRoute(subId);
3747 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3748 const std::string& topic_,
3750 const std::string& filter_ =
"",
3751 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3752 bool oofEnabled_ =
false,
3753 int topN_ = AMPS_DEFAULT_TOP_N,
3754 bool isHASubscribe_ =
true)
3757 return sowAndSubscribe(messageHandler_,
3764 (oofEnabled_ ?
"oof" :
""),
3769 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3770 const std::string& topic_,
3771 const std::string& filter_ =
"",
3772 const std::string& orderBy_ =
"",
3773 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3774 int topN_ = AMPS_DEFAULT_TOP_N,
3775 const std::string& options_ =
"",
3776 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3777 bool isHASubscribe_ =
true)
3779 isHASubscribe_ &= (bool)_subscriptionManager;
3780 Lock<Mutex> l(_lock);
3788 if (filter_.length())
3792 if (orderBy_.length())
3797 if (topN_ != AMPS_DEFAULT_TOP_N)
3801 if (options_.length())
3809 Unlock<Mutex> u(_lock);
3810 _subscriptionManager->subscribe(messageHandler_, message,
3811 Message::AckType::None);
3812 if (_badTimeToHASubscribe)
3817 _routes.addRoute(message.
getQueryID(), messageHandler_,
3818 Message::AckType::None, Message::AckType::Processed, message.
getCommandEnum());
3820 if (!options_.empty())
3826 syncAckProcessing(timeout_, message, isHASubscribe_);
3828 catch (
const DisconnectedException&)
3830 if (!isHASubscribe_)
3832 _routes.removeRoute(subId);
3836 catch (
const TimedOutException&)
3838 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3846 Unlock<Mutex> unlock(_lock);
3847 _subscriptionManager->unsubscribe(
Field(subId));
3849 _routes.removeRoute(subId);
3855 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3856 const std::string& topic_,
3858 const std::string& filter_ =
"",
3859 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3860 bool oofEnabled_ =
false,
3861 bool sendEmpties_ =
false,
3862 int topN_ = AMPS_DEFAULT_TOP_N,
3863 bool isHASubscribe_ =
true)
3871 if (sendEmpties_ ==
false)
3875 return sowAndDeltaSubscribe(messageHandler_,
3887 const std::string& topic_,
3888 const std::string& filter_,
3894 unsigned ackType = Message::AckType::Processed |
3895 Message::AckType::Stats |
3896 Message::AckType::Persisted;
3897 publishStoreMessage.reset();
3898 if (commandId_.
empty())
3909 .assignQueryID(commandId_.
data(), commandId_.
len())
3910 .setAckTypeEnum(ackType)
3912 .assignFilter(filter_.c_str(), filter_.length());
3913 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
3914 char buf[AMPS_NUMBER_BUFFER_LEN];
3915 size_t pos = convertToCharArray(buf, haSequenceNumber);
3916 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3920 Lock<Mutex> l(_lock);
3921 _routes.addRoute(commandId_, messageHandler_,
3922 Message::AckType::Stats,
3923 Message::AckType::Processed | Message::AckType::Persisted,
3925 syncAckProcessing(timeout_, publishStoreMessage,
3928 catch (
const DisconnectedException&)
3935 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3939 return (std::string)commandId_;
3943 Lock<Mutex> l(_lock);
3945 if (commandId_.
empty())
3956 .assignQueryID(commandId_.
data(), commandId_.
len())
3957 .setAckTypeEnum(Message::AckType::Processed |
3958 Message::AckType::Stats)
3960 .assignFilter(filter_.c_str(), filter_.length());
3961 _routes.addRoute(commandId_, messageHandler_,
3962 Message::AckType::Stats,
3963 Message::AckType::Processed,
3967 syncAckProcessing(timeout_, _message);
3971 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3974 return (std::string)commandId_;
3978 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3979 const std::string& topic_,
3980 const std::string& data_,
3986 unsigned ackType = Message::AckType::Processed |
3987 Message::AckType::Stats |
3988 Message::AckType::Persisted;
3989 publishStoreMessage.reset();
3990 if (commandId_.
empty())
4001 .assignQueryID(commandId_.
data(), commandId_.
len())
4002 .setAckTypeEnum(ackType)
4004 .assignData(data_.c_str(), data_.length());
4005 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
4006 char buf[AMPS_NUMBER_BUFFER_LEN];
4007 size_t pos = convertToCharArray(buf, haSequenceNumber);
4008 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4012 Lock<Mutex> l(_lock);
4013 _routes.addRoute(commandId_, messageHandler_,
4014 Message::AckType::Stats,
4015 Message::AckType::Processed | Message::AckType::Persisted,
4017 syncAckProcessing(timeout_, publishStoreMessage,
4020 catch (
const DisconnectedException&)
4027 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4031 return (std::string)commandId_;
4035 Lock<Mutex> l(_lock);
4037 if (commandId_.
empty())
4048 .assignQueryID(commandId_.
data(), commandId_.
len())
4049 .setAckTypeEnum(Message::AckType::Processed |
4050 Message::AckType::Stats)
4052 .assignData(data_.c_str(), data_.length());
4053 _routes.addRoute(commandId_, messageHandler_,
4054 Message::AckType::Stats,
4055 Message::AckType::Processed,
4059 syncAckProcessing(timeout_, _message);
4063 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4066 return (std::string)commandId_;
4070 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
4071 const std::string& topic_,
4072 const std::string& keys_,
4078 unsigned ackType = Message::AckType::Processed |
4079 Message::AckType::Stats |
4080 Message::AckType::Persisted;
4081 publishStoreMessage.reset();
4082 if (commandId_.
empty())
4093 .assignQueryID(commandId_.
data(), commandId_.
len())
4094 .setAckTypeEnum(ackType)
4096 .assignSowKeys(keys_.c_str(), keys_.length());
4097 amps_uint64_t haSequenceNumber = _publishStore.
store(publishStoreMessage);
4098 char buf[AMPS_NUMBER_BUFFER_LEN];
4099 size_t pos = convertToCharArray(buf, haSequenceNumber);
4100 publishStoreMessage.
assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4104 Lock<Mutex> l(_lock);
4105 _routes.addRoute(commandId_, messageHandler_,
4106 Message::AckType::Stats,
4107 Message::AckType::Processed | Message::AckType::Persisted,
4109 syncAckProcessing(timeout_, publishStoreMessage,
4112 catch (
const DisconnectedException&)
4119 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4123 return (std::string)commandId_;
4127 Lock<Mutex> l(_lock);
4129 if (commandId_.
empty())
4140 .assignQueryID(commandId_.
data(), commandId_.
len())
4141 .setAckTypeEnum(Message::AckType::Processed |
4142 Message::AckType::Stats)
4144 .assignSowKeys(keys_.c_str(), keys_.length());
4145 _routes.addRoute(commandId_, messageHandler_,
4146 Message::AckType::Stats,
4147 Message::AckType::Processed,
4151 syncAckProcessing(timeout_, _message);
4155 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4158 return (std::string)commandId_;
4162 void startTimer(
void)
4164 if (_serverVersion >=
"5.3.2.0")
4166 throw CommandException(
"The start_timer command is deprecated.");
4168 Lock<Mutex> l(_lock);
4177 if (_serverVersion >=
"5.3.2.0")
4179 throw CommandException(
"The stop_timer command is deprecated.");
4181 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
4196 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
4198 _pExceptionListener = pListener_;
4199 _exceptionListener = _pExceptionListener.get();
4204 _exceptionListener = &listener_;
4209 return *_exceptionListener;
4212 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
4214 if (readTimeout_ < heartbeatInterval_)
4216 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
4218 Lock<Mutex> l(_lock);
4219 if (_heartbeatInterval != heartbeatInterval_ ||
4220 _readTimeout != readTimeout_)
4222 _heartbeatInterval = heartbeatInterval_;
4223 _readTimeout = readTimeout_;
4228 void _sendHeartbeat(
void)
4230 if (_connected && _heartbeatInterval != 0)
4232 std::ostringstream options;
4233 options <<
"start," << _heartbeatInterval;
4236 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4237 _heartbeatTimer.start();
4240 _sendWithoutRetry(_beatMessage);
4241 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4243 catch (ConnectionException& ex_)
4247 AMPS_UNHANDLED_EXCEPTION(ex_);
4252 if (_readTimeout && _connected)
4257 AMPSException::throwFor(_client, result);
4259 if (!_queueAckTimeout)
4262 (
int)(_heartbeatInterval * 1000));
4265 AMPSException::throwFor(_client, result);
4273 Lock<Mutex> lock(_lock);
4274 _connectionStateListeners.insert(listener_);
4279 Lock<Mutex> lock(_lock);
4280 _connectionStateListeners.erase(listener_);
4283 void clearConnectionStateListeners()
4285 Lock<Mutex> lock(_lock);
4286 _connectionStateListeners.clear();
4291 unsigned systemAddedAcks_, Message::Command::Type commandType_)
4293 Message message = command_.getMessage();
4298 bool added = qid.
len() || subid.
len() || cid_.
len();
4299 bool cidIsQid = cid_ == qid;
4300 bool cidUnique = !cidIsQid && cid_.
len() > 0 && cid_ != subid;
4302 if (subid.
len() > 0)
4306 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4307 systemAddedAcks_, commandType_);
4309 && (commandType == Message::Command::Subscribe
4310 || commandType == Message::Command::DeltaSubscribe))
4317 if (qid.
len() > 0 && qid != subid
4318 && (commandType == Message::Command::SOW
4319 || commandType == Message::Command::SOWDelete
4320 || commandType == Message::Command::SOWAndSubscribe
4321 || commandType == Message::Command::SOWAndDeltaSubscribe))
4323 while (_routes.hasRoute(qid))
4332 if (addedCount == 0)
4334 _routes.addRoute(qid, handler_, requestedAcks_,
4335 systemAddedAcks_, commandType_);
4341 Unlock<Mutex> u(_lock);
4342 data = amps_invoke_copy_route_function(handler_.userData());
4346 _routes.addRoute(qid, handler_, requestedAcks_,
4347 systemAddedAcks_, commandType_);
4351 _routes.addRoute(qid,
4355 systemAddedAcks_, commandType_);
4360 if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4362 while (_routes.hasRoute(cid_))
4366 if (addedCount == 0)
4368 _routes.addRoute(cid_, handler_, requestedAcks_,
4369 systemAddedAcks_, commandType_);
4375 Unlock<Mutex> u(_lock);
4376 data = amps_invoke_copy_route_function(handler_.userData());
4380 _routes.addRoute(cid_, handler_, requestedAcks_,
4381 systemAddedAcks_, commandType_);
4385 _routes.addRoute(cid_,
4389 systemAddedAcks_, commandType_);
4393 else if ((commandType == Message::Command::Publish ||
4394 commandType == Message::Command::DeltaPublish)
4395 && requestedAcks_ & ~
Message::AckType::Persisted)
4398 _routes.addRoute(cid_, handler_, requestedAcks_,
4399 systemAddedAcks_, commandType_);
4404 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4409 bool isHASubscribe_ =
true)
4411 isHASubscribe_ &= (bool)_subscriptionManager;
4412 Message& message = command_.getMessage();
4413 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4414 Message::AckType::Processed : Message::AckType::None;
4416 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
4418 if (commandType == Message::Command::StopTimer)
4420 systemAddedAcks |= Message::AckType::Completed;
4423 if (handler_.isValid() && cid.
empty())
4429 if (command_.isSubscribe())
4432 if (_bookmarkStore.isValid())
4434 systemAddedAcks |= Message::AckType::Persisted;
4442 _bookmarkStore.
log(message);
4443 if (!BookmarkRange::isRange(bookmark))
4445 _bookmarkStore.
discard(message);
4459 systemAddedAcks |= Message::AckType::Persisted;
4461 bool isSubscribe = command_.isSubscribe();
4462 if (handler_.isValid() && !isSubscribe)
4464 _registerHandler(command_, cid, handler_,
4465 requestedAcks, systemAddedAcks, commandType);
4469 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
4470 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4473 Unlock<Mutex> u(_lock);
4474 haSequenceNumber = _publishStore.
store(message);
4481 syncAckProcessing((
long)command_.getTimeout(), message,
4486 _send(message, haSequenceNumber);
4489 catch (
const DisconnectedException&)
4495 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4506 Unlock<Mutex> u(_lock);
4507 _subscriptionManager->subscribe(handler_,
4510 if (_badTimeToHASubscribe)
4513 return std::string(subId.
data(), subId.
len());
4516 if (handler_.isValid())
4518 _registerHandler(command_, cid, handler_,
4519 requestedAcks, systemAddedAcks, commandType);
4524 syncAckProcessing((
long)command_.getTimeout(), message,
4527 catch (
const DisconnectedException&)
4529 if (!isHASubscribe_)
4531 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4532 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4533 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4538 catch (
const TimedOutException&)
4540 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4541 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4542 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4551 Unlock<Mutex> unlock(_lock);
4552 _subscriptionManager->unsubscribe(subId);
4558 _routes.removeRoute(cid);
4559 _routes.removeRoute(subId);
4563 if (subId.
len() > 0)
4566 return std::string(subId.
data(), subId.
len());
4572 bool useSyncSend = commandType & ~
Message::Command::NoDataCommands
4573 || (cid.
len() > 0 && command_.hasProcessedAck());
4579 syncAckProcessing((
long)(command_.getTimeout()), message);
4586 catch (
const TimedOutException&)
4588 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4589 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4593 catch (
const DisconnectedException&)
4595 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4596 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4602 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4603 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4616 bool isHASubscribe_ =
true)
4618 Lock<Mutex> lock(_lock);
4619 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4623 void setAutoAck(
bool isAutoAckEnabled_)
4625 _isAutoAckEnabled = isAutoAckEnabled_;
4627 bool getAutoAck(
void)
const 4629 return _isAutoAckEnabled;
4631 void setAckBatchSize(
const unsigned batchSize_)
4633 _ackBatchSize = batchSize_;
4634 if (!_queueAckTimeout)
4636 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4640 unsigned getAckBatchSize(
void)
const 4642 return _ackBatchSize;
4644 int getAckTimeout(
void)
const 4646 return _queueAckTimeout;
4648 void setAckTimeout(
const int ackTimeout_)
4651 _queueAckTimeout = ackTimeout_;
4653 size_t _ack(QueueBookmarks& queueBookmarks_)
4655 if (queueBookmarks_._bookmarkCount)
4657 publishStoreMessage.reset();
4662 amps_uint64_t haSequenceNumber = 0;
4665 haSequenceNumber = _publishStore.
store(publishStoreMessage);
4668 queueBookmarks_._data.erase();
4669 queueBookmarks_._bookmarkCount = 0;
4671 _send(publishStoreMessage, haSequenceNumber);
4674 queueBookmarks_._data.erase();
4675 queueBookmarks_._bookmarkCount = 0;
4681 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4683 if (_isAutoAckEnabled)
4687 _ack(topic_, bookmark_, options_);
4689 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4691 if (bookmark_.
len() == 0)
4695 Lock<Mutex> lock(_lock);
4696 if (_ackBatchSize < 2 || options_ != NULL)
4698 publishStoreMessage.reset();
4706 amps_uint64_t haSequenceNumber = 0;
4709 haSequenceNumber = _publishStore.
store(publishStoreMessage);
4713 _send(publishStoreMessage, haSequenceNumber);
4717 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(), topic_.
len());
4718 TopicHashMap::iterator it = _topicHashMap.find(hash);
4719 if (it == _topicHashMap.end())
4722 #ifdef AMPS_USE_EMPLACE 4723 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4725 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4728 QueueBookmarks& queueBookmarks = it->second;
4729 if (queueBookmarks._data.length())
4731 queueBookmarks._data.append(
",");
4735 queueBookmarks._oldestTime = amps_now();
4737 queueBookmarks._data.append(bookmark_);
4738 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4740 _ack(queueBookmarks);
4743 void flushAcks(
void)
4745 size_t sendCount = 0;
4752 Lock<Mutex> lock(_lock);
4753 typedef TopicHashMap::iterator iterator;
4754 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4756 QueueBookmarks& queueBookmarks = it->second;
4757 sendCount += _ack(queueBookmarks);
4760 if (sendCount && _connected)
4762 publishFlush(0, Message::AckType::Processed);
4766 void checkQueueAcks(
void)
4768 if (!_topicHashMap.size())
4772 Lock<Mutex> lock(_lock);
4775 amps_uint64_t threshold = amps_now()
4776 - (amps_uint64_t)_queueAckTimeout;
4777 typedef TopicHashMap::iterator iterator;
4778 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4780 QueueBookmarks& queueBookmarks = it->second;
4781 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4783 _ack(queueBookmarks);
4787 catch (std::exception& ex)
4789 AMPS_UNHANDLED_EXCEPTION(ex);
4793 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4795 Lock<Mutex> lock(_deferredExecutionLock);
4796 #ifdef AMPS_USE_EMPLACE 4797 _deferredExecutionList.emplace_back(
4798 DeferredExecutionRequest(func_, userData_));
4800 _deferredExecutionList.push_back(
4801 DeferredExecutionRequest(func_, userData_));
4805 inline void processDeferredExecutions(
void)
4807 if (_deferredExecutionList.size())
4809 Lock<Mutex> lock(_deferredExecutionLock);
4810 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4811 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4812 for (; it != end; ++it)
4816 it->_func(it->_userData);
4824 _deferredExecutionList.clear();
4825 _routes.invalidateCache();
4826 _routeCache.invalidateCache();
4830 bool getRetryOnDisconnect(
void)
const 4832 return _isRetryOnDisconnect;
4835 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4837 _isRetryOnDisconnect = isRetryOnDisconnect_;
4840 void setDefaultMaxDepth(
unsigned maxDepth_)
4842 _defaultMaxDepth = maxDepth_;
4845 unsigned getDefaultMaxDepth(
void)
const 4847 return _defaultMaxDepth;
4939 RefHandle<MessageStreamImpl> _body;
4949 inline void advance(
void);
4956 : _pStream(pStream_)
4961 bool operator==(
const iterator& rhs)
const 4963 return _pStream == rhs._pStream;
4965 bool operator!=(
const iterator& rhs)
const 4967 return _pStream != rhs._pStream;
4969 void operator++(
void)
4985 return _body.isValid();
4992 if (!_body.isValid())
4994 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
5026 unsigned getMaxDepth(
void)
const;
5029 unsigned getDepth(
void)
const;
5033 inline void setSOWOnly(
const std::string& commandId_,
5034 const std::string& queryId_ =
"");
5035 inline void setSubscription(
const std::string& subId_,
5036 const std::string& commandId_ =
"",
5037 const std::string& queryId_ =
"");
5038 inline void setStatsOnly(
const std::string& commandId_,
5039 const std::string& queryId_ =
"");
5040 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
5046 friend class Client;
5072 BorrowRefHandle<ClientImpl> _body;
5074 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5075 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5076 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5087 : _body(new ClientImpl(clientName), true)
5090 Client(ClientImpl* existingClient)
5091 : _body(existingClient,
true)
5094 Client(ClientImpl* existingClient,
bool isRef)
5095 : _body(existingClient, isRef)
5098 Client(
const Client& rhs) : _body(rhs._body) {;}
5099 virtual ~Client(
void) {;}
5101 Client& operator=(
const Client& rhs)
5109 return _body.isValid();
5126 _body.get().setName(name);
5133 return _body.get().getName();
5141 return _body.get().getNameHash();
5149 return _body.get().getNameHashValue();
5160 _body.get().setLogonCorrelationData(logonCorrelationData_);
5167 return _body.get().getLogonCorrelationData();
5180 return _body.get().getServerVersion();
5191 return _body.get().getServerVersionInfo();
5205 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5220 return AMPS::convertVersionToNumber(data_, len_);
5227 return _body.get().getURI();
5251 _body.get().connect(uri);
5258 _body.get().disconnect();
5276 _body.get().send(message);
5289 unsigned requestedAcks_,
bool isSubscribe_)
5291 Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5292 _body.get().addMessageHandler(commandId_, messageHandler_,
5293 requestedAcks_, commandType);
5306 unsigned requestedAcks_, Message::Command::Type commandType_)
5308 _body.get().addMessageHandler(commandId_, messageHandler_,
5309 requestedAcks_, commandType_);
5317 return _body.get().removeMessageHandler(commandId_);
5345 return _body.get().send(messageHandler, message, timeout);
5359 _body.get().setDisconnectHandler(disconnectHandler);
5367 return _body.get().getDisconnectHandler();
5376 return _body.get().getConnectionInfo();
5389 _body.get().setBookmarkStore(bookmarkStore_);
5397 return _body.
get().getBookmarkStore();
5405 return _body.get().getSubscriptionManager();
5417 _body.get().setSubscriptionManager(subscriptionManager_);
5441 _body.get().setPublishStore(publishStore_);
5449 return _body.
get().getPublishStore();
5457 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5458 duplicateMessageHandler_);
5472 return _body.get().getDuplicateMessageHandler();
5486 _body.get().setFailedWriteHandler(handler_);
5494 return _body.get().getFailedWriteHandler();
5515 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5517 return _body.get().publish(topic_.c_str(), topic_.length(),
5518 data_.c_str(), data_.length());
5540 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5541 const char* data_,
size_t dataLength_)
5543 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5564 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5565 unsigned long expiration_)
5567 return _body.get().publish(topic_.c_str(), topic_.length(),
5568 data_.c_str(), data_.length(), expiration_);
5591 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5592 const char* data_,
size_t dataLength_,
5593 unsigned long expiration_)
5595 return _body.get().publish(topic_, topicLength_,
5596 data_, dataLength_, expiration_);
5637 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5639 _body.get().publishFlush(timeout_, ackType_);
5658 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5660 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5661 data_.c_str(), data_.length());
5682 const char* data_,
size_t dataLength_)
5684 return _body.get().deltaPublish(topic_, topicLength_,
5685 data_, dataLength_);
5704 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5705 unsigned long expiration_)
5707 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5708 data_.c_str(), data_.length(),
5731 const char* data_,
size_t dataLength_,
5732 unsigned long expiration_)
5734 return _body.get().deltaPublish(topic_, topicLength_,
5735 data_, dataLength_, expiration_);
5755 const char* options_ = NULL)
5757 return _body.get().logon(timeout_, authenticator_, options_);
5772 std::string
logon(
const char* options_,
int timeout_ = 0)
5791 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5817 const std::string& topic_,
5819 const std::string& filter_ =
"",
5820 const std::string& options_ =
"",
5821 const std::string& subId_ =
"")
5823 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5824 filter_,
"", options_, subId_);
5843 long timeout_ = 0,
const std::string& filter_ =
"",
5844 const std::string& options_ =
"",
5845 const std::string& subId_ =
"")
5848 if (_body.get().getDefaultMaxDepth())
5850 result.
maxDepth(_body.get().getDefaultMaxDepth());
5852 result.setSubscription(_body.get().subscribe(
5854 topic_, timeout_, filter_,
"",
5855 options_, subId_,
false));
5875 long timeout_ = 0,
const std::string& filter_ =
"",
5876 const std::string& options_ =
"",
5877 const std::string& subId_ =
"")
5880 if (_body.get().getDefaultMaxDepth())
5882 result.
maxDepth(_body.get().getDefaultMaxDepth());
5884 result.setSubscription(_body.get().subscribe(
5886 topic_, timeout_, filter_,
"",
5887 options_, subId_,
false));
5904 const std::string& topic_,
5906 const std::string& filter_ =
"",
5907 const std::string& options_ =
"",
5908 const std::string& subId_ =
"")
5910 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5911 filter_,
"", options_, subId_);
5922 long timeout_,
const std::string& filter_ =
"",
5923 const std::string& options_ =
"",
5924 const std::string& subId_ =
"")
5927 if (_body.get().getDefaultMaxDepth())
5929 result.
maxDepth(_body.get().getDefaultMaxDepth());
5931 result.setSubscription(_body.get().deltaSubscribe(
5933 topic_, timeout_, filter_,
"",
5934 options_, subId_,
false));
5940 long timeout_,
const std::string& filter_ =
"",
5941 const std::string& options_ =
"",
5942 const std::string& subId_ =
"")
5945 if (_body.get().getDefaultMaxDepth())
5947 result.
maxDepth(_body.get().getDefaultMaxDepth());
5949 result.setSubscription(_body.get().deltaSubscribe(
5951 topic_, timeout_, filter_,
"",
5952 options_, subId_,
false));
5982 const std::string& topic_,
5984 const std::string& bookmark_,
5985 const std::string& filter_ =
"",
5986 const std::string& options_ =
"",
5987 const std::string& subId_ =
"")
5989 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5990 filter_, bookmark_, options_, subId_);
6011 const std::string& bookmark_,
6012 const std::string& filter_ =
"",
6013 const std::string& options_ =
"",
6014 const std::string& subId_ =
"")
6017 if (_body.get().getDefaultMaxDepth())
6019 result.
maxDepth(_body.get().getDefaultMaxDepth());
6021 result.setSubscription(_body.get().subscribe(
6023 topic_, timeout_, filter_,
6024 bookmark_, options_,
6032 const std::string& bookmark_,
6033 const std::string& filter_ =
"",
6034 const std::string& options_ =
"",
6035 const std::string& subId_ =
"")
6038 if (_body.get().getDefaultMaxDepth())
6040 result.
maxDepth(_body.get().getDefaultMaxDepth());
6042 result.setSubscription(_body.get().subscribe(
6044 topic_, timeout_, filter_,
6045 bookmark_, options_,
6060 return _body.get().unsubscribe(commandId);
6072 return _body.get().unsubscribe();
6106 const std::string& topic_,
6107 const std::string& filter_ =
"",
6108 const std::string& orderBy_ =
"",
6109 const std::string& bookmark_ =
"",
6110 int batchSize_ = DEFAULT_BATCH_SIZE,
6111 int topN_ = DEFAULT_TOP_N,
6112 const std::string& options_ =
"",
6113 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6115 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6116 bookmark_, batchSize_, topN_, options_,
6144 const std::string& filter_ =
"",
6145 const std::string& orderBy_ =
"",
6146 const std::string& bookmark_ =
"",
6147 int batchSize_ = DEFAULT_BATCH_SIZE,
6148 int topN_ = DEFAULT_TOP_N,
6149 const std::string& options_ =
"",
6150 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6153 if (_body.get().getDefaultMaxDepth())
6155 result.
maxDepth(_body.get().getDefaultMaxDepth());
6157 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6158 topic_, filter_, orderBy_, bookmark_,
6159 batchSize_, topN_, options_, timeout_));
6165 const std::string& filter_ =
"",
6166 const std::string& orderBy_ =
"",
6167 const std::string& bookmark_ =
"",
6168 int batchSize_ = DEFAULT_BATCH_SIZE,
6169 int topN_ = DEFAULT_TOP_N,
6170 const std::string& options_ =
"",
6171 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6174 if (_body.get().getDefaultMaxDepth())
6176 result.
maxDepth(_body.get().getDefaultMaxDepth());
6178 result.setSOWOnly(_body.get().sow(result.operator
MessageHandler(),
6179 topic_, filter_, orderBy_, bookmark_,
6180 batchSize_, topN_, options_, timeout_));
6206 const std::string& topic_,
6208 const std::string& filter_ =
"",
6209 int batchSize_ = DEFAULT_BATCH_SIZE,
6210 int topN_ = DEFAULT_TOP_N)
6212 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6238 const std::string& topic_,
6240 const std::string& filter_ =
"",
6241 int batchSize_ = DEFAULT_BATCH_SIZE,
6242 bool oofEnabled_ =
false,
6243 int topN_ = DEFAULT_TOP_N)
6245 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6246 filter_, batchSize_, oofEnabled_,
6271 const std::string& filter_ =
"",
6272 int batchSize_ = DEFAULT_BATCH_SIZE,
6273 bool oofEnabled_ =
false,
6274 int topN_ = DEFAULT_TOP_N)
6277 if (_body.get().getDefaultMaxDepth())
6279 result.
maxDepth(_body.get().getDefaultMaxDepth());
6281 result.setSubscription(_body.get().sowAndSubscribe(
6283 topic_, timeout_, filter_,
6284 batchSize_, oofEnabled_,
6309 const std::string& filter_ =
"",
6310 int batchSize_ = DEFAULT_BATCH_SIZE,
6311 bool oofEnabled_ =
false,
6312 int topN_ = DEFAULT_TOP_N)
6315 if (_body.get().getDefaultMaxDepth())
6317 result.
maxDepth(_body.get().getDefaultMaxDepth());
6319 result.setSubscription(_body.get().sowAndSubscribe(
6321 topic_, timeout_, filter_,
6322 batchSize_, oofEnabled_,
6356 const std::string& topic_,
6357 const std::string& filter_ =
"",
6358 const std::string& orderBy_ =
"",
6359 const std::string& bookmark_ =
"",
6360 int batchSize_ = DEFAULT_BATCH_SIZE,
6361 int topN_ = DEFAULT_TOP_N,
6362 const std::string& options_ =
"",
6363 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6365 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6366 orderBy_, bookmark_, batchSize_,
6367 topN_, options_, timeout_);
6395 const std::string& filter_ =
"",
6396 const std::string& orderBy_ =
"",
6397 const std::string& bookmark_ =
"",
6398 int batchSize_ = DEFAULT_BATCH_SIZE,
6399 int topN_ = DEFAULT_TOP_N,
6400 const std::string& options_ =
"",
6401 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6404 if (_body.get().getDefaultMaxDepth())
6406 result.
maxDepth(_body.get().getDefaultMaxDepth());
6408 result.setSubscription(_body.get().sowAndSubscribe(
6410 topic_, filter_, orderBy_,
6411 bookmark_, batchSize_, topN_,
6412 options_, timeout_,
false));
6418 const std::string& filter_ =
"",
6419 const std::string& orderBy_ =
"",
6420 const std::string& bookmark_ =
"",
6421 int batchSize_ = DEFAULT_BATCH_SIZE,
6422 int topN_ = DEFAULT_TOP_N,
6423 const std::string& options_ =
"",
6424 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6427 if (_body.get().getDefaultMaxDepth())
6429 result.
maxDepth(_body.get().getDefaultMaxDepth());
6431 result.setSubscription(_body.get().sowAndSubscribe(
6433 topic_, filter_, orderBy_,
6434 bookmark_, batchSize_, topN_,
6435 options_, timeout_,
false));
6464 const std::string& topic_,
6465 const std::string& filter_ =
"",
6466 const std::string& orderBy_ =
"",
6467 int batchSize_ = DEFAULT_BATCH_SIZE,
6468 int topN_ = DEFAULT_TOP_N,
6469 const std::string& options_ =
"",
6470 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6472 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6473 filter_, orderBy_, batchSize_,
6474 topN_, options_, timeout_);
6497 const std::string& filter_ =
"",
6498 const std::string& orderBy_ =
"",
6499 int batchSize_ = DEFAULT_BATCH_SIZE,
6500 int topN_ = DEFAULT_TOP_N,
6501 const std::string& options_ =
"",
6502 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6505 if (_body.get().getDefaultMaxDepth())
6507 result.
maxDepth(_body.get().getDefaultMaxDepth());
6509 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6511 topic_, filter_, orderBy_,
6512 batchSize_, topN_, options_,
6519 const std::string& filter_ =
"",
6520 const std::string& orderBy_ =
"",
6521 int batchSize_ = DEFAULT_BATCH_SIZE,
6522 int topN_ = DEFAULT_TOP_N,
6523 const std::string& options_ =
"",
6524 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6527 if (_body.get().getDefaultMaxDepth())
6529 result.
maxDepth(_body.get().getDefaultMaxDepth());
6531 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6533 topic_, filter_, orderBy_,
6534 batchSize_, topN_, options_,
6564 const std::string& topic_,
6566 const std::string& filter_ =
"",
6567 int batchSize_ = DEFAULT_BATCH_SIZE,
6568 bool oofEnabled_ =
false,
6569 bool sendEmpties_ =
false,
6570 int topN_ = DEFAULT_TOP_N)
6572 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6573 timeout_, filter_, batchSize_,
6574 oofEnabled_, sendEmpties_,
6601 const std::string& filter_ =
"",
6602 int batchSize_ = DEFAULT_BATCH_SIZE,
6603 bool oofEnabled_ =
false,
6604 bool sendEmpties_ =
false,
6605 int topN_ = DEFAULT_TOP_N)
6608 if (_body.get().getDefaultMaxDepth())
6610 result.
maxDepth(_body.get().getDefaultMaxDepth());
6612 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6614 topic_, timeout_, filter_,
6615 batchSize_, oofEnabled_,
6616 sendEmpties_, topN_,
false));
6642 const std::string& filter_ =
"",
6643 int batchSize_ = DEFAULT_BATCH_SIZE,
6644 bool oofEnabled_ =
false,
6645 bool sendEmpties_ =
false,
6646 int topN_ = DEFAULT_TOP_N)
6649 if (_body.get().getDefaultMaxDepth())
6651 result.
maxDepth(_body.get().getDefaultMaxDepth());
6653 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6655 topic_, timeout_, filter_,
6656 batchSize_, oofEnabled_,
6657 sendEmpties_, topN_,
false));
6680 const std::string& topic,
6681 const std::string& filter,
6684 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6712 stream.setStatsOnly(cid);
6713 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6714 return *(stream.
begin());
6716 catch (
const DisconnectedException&)
6718 removeMessageHandler(cid);
6729 _body.get().startTimer();
6740 return _body.get().stopTimer(messageHandler);
6765 const std::string& topic_,
6766 const std::string& keys_,
6769 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6801 stream.setStatsOnly(cid);
6802 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6803 return *(stream.
begin());
6805 catch (
const DisconnectedException&)
6807 removeMessageHandler(cid);
6827 const std::string& topic_,
const std::string& data_,
6830 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6857 stream.setStatsOnly(cid);
6858 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6859 return *(stream.
begin());
6861 catch (
const DisconnectedException&)
6863 removeMessageHandler(cid);
6873 return _body.get().getHandle();
6886 _body.get().setExceptionListener(pListener_);
6899 _body.get().setExceptionListener(listener_);
6906 return _body.get().getExceptionListener();
6932 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6956 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6962 setLastChanceMessageHandler(messageHandler);
6969 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6995 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7020 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7103 _body.get().addConnectionStateListener(listener);
7111 _body.get().removeConnectionStateListener(listener);
7118 _body.get().clearConnectionStateListeners();
7148 return _body.get().executeAsync(command_, handler_);
7186 if (command_.isSubscribe())
7188 Message& message = command_.getMessage();
7191 if (useExistingHandler)
7194 if (_body.get()._routes.getRoute(subId, existingHandler))
7197 _body.get().executeAsync(command_, existingHandler,
false);
7202 id = _body.get().executeAsync(command_, handler_,
false);
7204 catch (
const DisconnectedException&)
7206 removeMessageHandler(command_.getMessage().
getCommandId());
7207 if (command_.isSubscribe())
7211 if (command_.isSow())
7213 removeMessageHandler(command_.getMessage().
getQueryID());
7244 _body.get().ack(topic_, bookmark_, options_);
7266 void ack(
const std::string& topic_,
const std::string& bookmark_,
7267 const char* options_ = NULL)
7269 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
7277 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7279 _body.get()._ack(topic_, bookmark_, options_);
7292 _body.get().flushAcks();
7301 return _body.get().getAutoAck();
7311 _body.get().setAutoAck(isAutoAckEnabled_);
7319 return _body.get().getAckBatchSize();
7329 _body.get().setAckBatchSize(ackBatchSize_);
7340 return _body.get().getAckTimeout();
7352 if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7354 throw UsageException(
"Ack timeout must be > 0 when ack batch size > 1");
7356 _body.get().setAckTimeout(ackTimeout_);
7370 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7379 return _body.get().getRetryOnDisconnect();
7388 _body.get().setDefaultMaxDepth(maxDepth_);
7397 return _body.get().getDefaultMaxDepth();
7409 return _body.get().setTransportFilterFunction(filter_, userData_);
7423 return _body.get().setThreadCreatedCallback(callback_, userData_);
7431 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7433 _body.get().deferredExecution(func_, userData_);
7443 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7449 unsigned deliveries = 0;
7461 const char* data = NULL;
7463 const char* status = NULL;
7464 size_t statusLen = 0;
7466 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7469 if (len == NotEntitled || len == Duplicate ||
7470 (statusLen == Failure && status[0] ==
'f'))
7472 if (_failedWriteHandler)
7474 if (_publishStore.isValid())
7476 amps_uint64_t sequence =
7478 FailedWriteStoreReplayer replayer(
this, data, len);
7479 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7480 replayer, sequence));
7486 AMPS_CALL_EXCEPTION_WRAPPER(
7487 _failedWriteHandler->failedWrite(emptyMessage,
7493 if (_publishStore.isValid())
7502 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7506 if (!deliveries && _bookmarkStore.isValid())
7513 const char* bookmarkData = NULL;
7514 size_t bookmarkLen = 0;
7520 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7523 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
7528 catch (std::exception& ex)
7530 AMPS_UNHANDLED_EXCEPTION(ex);
7536 ClientImpl::processedAck(
Message& message)
7538 unsigned deliveries = 0;
7540 const char* data = NULL;
7544 Lock<Mutex> l(_lock);
7547 Lock<Mutex> guard(_ackMapLock);
7548 AckMap::iterator i = _ackMap.find(std::string(data, len));
7549 if (i != _ackMap.end())
7559 ack.setStatus(data, len);
7561 ack.setReason(data, len);
7563 ack.setUsername(data, len);
7565 ack.setPassword(data, len);
7567 ack.setServerVersion(data, len);
7569 ack.setOptions(data, len);
7579 ClientImpl::checkAndSendHeartbeat(
bool force)
7581 if (force || _heartbeatTimer.check())
7583 _heartbeatTimer.start();
7586 sendWithoutRetry(_beatMessage);
7588 catch (
const AMPSException&)
7595 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 7597 ConnectionInfo info;
7598 std::ostringstream writer;
7600 info[
"client.uri"] = _lastUri;
7601 info[
"client.name"] = _name;
7602 info[
"client.username"] = _username;
7603 if (_publishStore.isValid())
7605 writer << _publishStore.unpersistedCount();
7606 info[
"publishStore.unpersistedCount"] = writer.str();
7615 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7617 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7618 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7619 ClientImpl* me = (ClientImpl*) userData_;
7620 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7621 if (!messageHandle_)
7623 if (me->_queueAckTimeout)
7625 me->checkQueueAcks();
7627 me->checkAndSendHeartbeat();
7631 me->_readMessage.replace(messageHandle_);
7632 Message& message = me->_readMessage;
7634 if (commandType & SOWMask)
7636 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7640 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7641 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7643 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7646 else if (commandType & PublishMask)
7648 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7649 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7650 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7651 GlobalCommandTypeHandlers::Publish :
7652 GlobalCommandTypeHandlers::OOF)].invoke(message));
7654 const char* subIds = NULL;
7655 size_t subIdsLen = 0;
7658 &subIds, &subIdsLen);
7659 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7660 for (
size_t i = 0; i < subIdCount; ++i)
7662 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7664 if (handler.isValid())
7667 AMPS_SubscriptionId,
7668 subIds + lookupResult.idOffset,
7669 lookupResult.idLength);
7672 bool isAutoAck = me->_isAutoAckEnabled;
7674 if (!isMessageQueue && !bookmark.
empty() &&
7675 me->_bookmarkStore.isValid())
7677 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7680 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7682 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7687 me->_bookmarkStore.log(me->_readMessage);
7688 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7689 handler.invoke(message));
7694 if (isMessageQueue && isAutoAck)
7698 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7699 if (!message.getIgnoreAutoAck())
7701 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7705 catch (std::exception& ex)
7707 if (!message.getIgnoreAutoAck())
7709 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7712 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7717 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7718 handler.invoke(message));
7724 me->lastChance(message);
7728 else if (commandType == Message::Command::Ack)
7730 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7731 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7733 unsigned deliveries = 0U;
7736 case Message::AckType::Persisted:
7737 deliveries += me->persistedAck(message);
7739 case Message::AckType::Processed:
7740 deliveries += me->processedAck(message);
7743 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7744 if (deliveries == 0)
7746 me->lastChance(message);
7749 else if (commandType == Message::Command::Heartbeat)
7751 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7752 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7753 if (me->_heartbeatTimer.getTimeout() != 0.0)
7755 me->checkAndSendHeartbeat(
true);
7759 me->lastChance(message);
7765 unsigned deliveries = 0U;
7768 while (me->_connected)
7772 deliveries = me->_routes.deliverData(message, message.
getCommandId());
7776 catch (MessageStreamFullException&)
7778 catch (MessageStreamFullException& ex_)
7783 me->checkAndSendHeartbeat(
false);
7786 catch (std::exception&)
7788 catch (std::exception& ex_)
7796 catch (std::exception& ex_)
7800 me->_exceptionListener->exceptionThrown(ex_);
7807 if (deliveries == 0)
7809 me->lastChance(message);
7812 me->checkAndSendHeartbeat();
7817 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7819 ClientImpl* me = (ClientImpl*) userData;
7822 me->clearAcks(failedConnectionVersion);
7826 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7828 ClientImpl* me = (ClientImpl*) userData;
7829 Lock<Mutex> l(me->_lock);
7830 Client wrapper(me,
false);
7833 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7837 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7838 bool retryInProgress =
false;
7841 me->_connected =
false;
7842 me->_lock.signalAll();
7845 Unlock<Mutex> unlock(me->_lock);
7846 me->_disconnectHandler.invoke(wrapper);
7849 catch (
const RetryOperationException&)
7851 catch (
const RetryOperationException& ex)
7854 retryInProgress =
true;
7856 catch (
const std::exception& ex)
7858 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7860 me->_lock.signalAll();
7862 if (!me->_connected)
7864 if (retryInProgress)
7866 AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException(
"Reconnect in progress."));
7870 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7871 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7878 if (me->_subscriptionManager)
7883 Unlock<Mutex> unlock(me->_lock);
7884 me->_subscriptionManager->resubscribe(wrapper);
7886 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7890 catch (
const AMPSException& subEx)
7892 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7894 catch (
const std::exception& subEx)
7896 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7919 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
7920 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7922 while (_pos != _len && _data[_pos] == _fieldSep)
7928 typedef void* difference_type;
7929 typedef std::forward_iterator_tag iterator_category;
7930 typedef std::pair<Message::Field, Message::Field> value_type;
7931 typedef value_type* pointer;
7932 typedef value_type& reference;
7933 bool operator==(
const iterator& rhs)
const 7935 return _pos == rhs._pos;
7937 bool operator!=(
const iterator& rhs)
const 7939 return _pos != rhs._pos;
7941 iterator& operator++()
7944 while (_pos != _len && _data[_pos] != _fieldSep)
7949 while (_pos != _len && _data[_pos] == _fieldSep)
7956 value_type operator*()
const 7959 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7960 for (; i < _len && _data[i] !=
'='; ++i)
7965 result.first.assign(_data + _pos, keyLength);
7967 if (i < _len && _data[i] ==
'=')
7971 for (; i < _len && _data[i] != _fieldSep; ++i)
7976 result.second.assign(_data + valueStart, valueLength);
7982 class reverse_iterator
7989 typedef std::pair<Message::Field, Message::Field> value_type;
7990 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7991 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7996 while (_pos >= _data && *_pos == _fieldSep)
8000 while (_pos > _data && *_pos != _fieldSep)
8007 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8017 bool operator==(
const reverse_iterator& rhs)
const 8019 return _pos == rhs._pos;
8021 bool operator!=(
const reverse_iterator& rhs)
const 8023 return _pos != rhs._pos;
8025 reverse_iterator& operator++()
8036 while (_pos >= _data && *_pos == _fieldSep)
8041 while (_pos > _data && *_pos != _fieldSep)
8045 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8056 value_type operator*()
const 8059 size_t keyLength = 0, valueStart = 0, valueLength = 0;
8060 size_t i = (size_t)(_pos - _data);
8061 for (; i < _len && _data[i] !=
'='; ++i)
8065 result.first.assign(_pos, keyLength);
8066 if (i < _len && _data[i] ==
'=')
8070 for (; i < _len && _data[i] != _fieldSep; ++i)
8075 result.second.assign(_data + valueStart, valueLength);
8080 : _data(data.
data()), _len(data.
len()),
8081 _fieldSep(fieldSeparator)
8085 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
8086 : _data(data), _len(len), _fieldSep(fieldSeparator)
8090 iterator begin()
const 8092 return iterator(_data, _len, 0, _fieldSep);
8094 iterator end()
const 8096 return iterator(_data, _len, _len, _fieldSep);
8100 reverse_iterator rbegin()
const 8102 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8105 reverse_iterator rend()
const 8107 return reverse_iterator(_data, _len, 0, _fieldSep);
8128 std::stringstream _data;
8145 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
8147 _data << tag <<
'=';
8148 _data.write(value + offset, (std::streamsize)length);
8156 void append(
const T& tag,
const std::string& value)
8158 _data << tag <<
'=' << value << _fs;
8167 operator std::string()
const 8175 _data.str(std::string());
8212 typedef std::map<Message::Field, Message::Field>
map_type;
8223 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8232 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128 8236 std::deque<Message> _q;
8237 std::deque<Message> _cache;
8238 std::string _commandId;
8240 std::string _queryId;
8244 unsigned _requestedAcks;
8248 typedef enum :
unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8249 #if __cplusplus >= 201100L || _MSC_VER >= 1900 8250 std::atomic<State> _state;
8252 volatile State _state;
8254 typedef std::map<std::string, Message*> SOWKeyMap;
8255 SOWKeyMap _sowKeyMap;
8257 MessageStreamImpl(
const Client& client_)
8260 _maxDepth((
unsigned)~0),
8262 _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8265 if (_client.isValid())
8271 MessageStreamImpl(ClientImpl* client_)
8274 _maxDepth((
unsigned)~0),
8278 if (_client.isValid())
8284 ~MessageStreamImpl()
8288 virtual void destroy()
8294 catch (std::exception& e)
8298 if (_client.isValid())
8305 if (_client.isValid())
8309 _client = Client((ClientImpl*)NULL);
8310 c.deferredExecution(MessageStreamImpl::destroyer,
this);
8318 static void destroyer(
void* vpMessageStreamImpl_)
8320 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8323 void setSubscription(
const std::string& subId_,
8324 const std::string& commandId_ =
"",
8325 const std::string& queryId_ =
"")
8327 Lock<Mutex> lock(_lock);
8329 if (!commandId_.empty() && commandId_ != subId_)
8331 _commandId = commandId_;
8333 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8335 _queryId = queryId_;
8338 if (Disconnected == _state)
8342 assert(Unset == _state);
8346 void setSOWOnly(
const std::string& commandId_,
8347 const std::string& queryId_ =
"")
8349 Lock<Mutex> lock(_lock);
8350 _commandId = commandId_;
8351 if (!queryId_.empty() && queryId_ != commandId_)
8353 _queryId = queryId_;
8356 if (Disconnected == _state)
8360 assert(Unset == _state);
8364 void setStatsOnly(
const std::string& commandId_,
8365 const std::string& queryId_ =
"")
8367 Lock<Mutex> lock(_lock);
8368 _commandId = commandId_;
8369 if (!queryId_.empty() && queryId_ != commandId_)
8371 _queryId = queryId_;
8374 if (Disconnected == _state)
8378 assert(Unset == _state);
8380 _requestedAcks = Message::AckType::Stats;
8383 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8385 Lock<Mutex> lock(_lock);
8386 _commandId = commandId_;
8388 if (Disconnected == _state)
8392 assert(Unset == _state);
8394 _requestedAcks = acks_;
8399 Lock<Mutex> lock(_lock);
8400 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8402 _state = Disconnected;
8405 else if (state_ == AMPS::ConnectionStateListener::Connected
8406 && _commandId.empty()
8408 && _queryId.empty())
8416 void timeout(
unsigned timeout_)
8418 _timeout = timeout_;
8422 if (_state == Subscribe)
8427 void maxDepth(
unsigned maxDepth_)
8431 _maxDepth = maxDepth_;
8435 _maxDepth = (unsigned)~0;
8438 unsigned getMaxDepth(
void)
const 8442 unsigned getDepth(
void)
const 8444 return (
unsigned)(_q.size());
8449 Lock<Mutex> lock(_lock);
8450 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
8454 if (_client.isValid())
8456 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8460 catch (AMPSException&)
8462 catch (AMPSException& e)
8465 current_.invalidate();
8466 _previousTopic.
clear();
8467 _previousBookmark.
clear();
8470 _previousTopic.
clear();
8471 _previousBookmark.
clear();
8474 long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8475 Timer timer((
double)_timeout);
8477 while (_q.empty() && _state & Running)
8480 _lock.wait(minWaitTime);
8482 Unlock<Mutex> unlck(_lock);
8483 amps_invoke_waiting_function();
8488 if (timer.checkAndGetRemaining(&minWaitTime))
8494 minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8497 if (current_.isValid() && _cache.size() < _cacheMax)
8500 _cache.push_back(current_);
8504 current_ = _q.front();
8505 if (_q.size() == _maxDepth)
8510 if (_state == Conflate)
8512 std::string sowKey = current_.
getSowKey();
8513 if (sowKey.length())
8515 _sowKeyMap.erase(sowKey);
8518 else if (_state == AcksOnly)
8522 if ((_state == AcksOnly && _requestedAcks == 0) ||
8523 (_state == SOWOnly && current_.
getCommand() ==
"group_end"))
8527 else if (current_.isValid()
8538 if (_state == Disconnected)
8540 throw DisconnectedException(
"Connection closed.");
8542 current_.invalidate();
8543 if (_state == Closed)
8547 return _timeout != 0;
8551 if (_client.isValid())
8553 if (_state == SOWOnly || _state == Subscribe)
8555 if (!_commandId.empty())
8559 if (!_subId.empty())
8563 if (!_queryId.empty())
8570 if (!_commandId.empty())
8574 if (!_subId.empty())
8578 if (!_queryId.empty())
8584 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8589 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8591 Lock<Mutex> lock(this_->_lock);
8592 if (this_->_state != Conflate)
8594 AMPS_TESTING_SLOW_MESSAGE_STREAM
8595 if (this_->_q.size() >= this_->_maxDepth)
8600 this_->_lock.signalAll();
8601 throw MessageStreamFullException(
"Stream is currently full.");
8603 if (!this_->_cache.empty())
8605 this_->_cache.front().deepCopy(message_);
8606 this_->_q.push_back(this_->_cache.front());
8607 this_->_cache.pop_front();
8611 #ifdef AMPS_USE_EMPLACE 8612 this_->_q.emplace_back(message_.
deepCopy());
8614 this_->_q.push_back(message_.
deepCopy());
8618 this_->_client.isValid() && this_->_client.getAutoAck() &&
8622 message_.setIgnoreAutoAck();
8627 std::string sowKey = message_.
getSowKey();
8628 if (sowKey.length())
8630 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8631 if (it != this_->_sowKeyMap.end())
8633 it->second->deepCopy(message_);
8637 if (this_->_q.size() >= this_->_maxDepth)
8643 this_->_lock.signalAll();
8644 throw MessageStreamFullException(
"Stream is currently full.");
8646 if (!this_->_cache.empty())
8648 this_->_cache.front().deepCopy(message_);
8649 this_->_q.push_back(this_->_cache.front());
8650 this_->_cache.pop_front();
8654 #ifdef AMPS_USE_EMPLACE 8655 this_->_q.emplace_back(message_.
deepCopy());
8657 this_->_q.push_back(message_.
deepCopy());
8660 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8665 if (this_->_q.size() >= this_->_maxDepth)
8670 this_->_lock.signalAll();
8671 throw MessageStreamFullException(
"Stream is currently full.");
8673 if (!this_->_cache.empty())
8675 this_->_cache.front().deepCopy(message_);
8676 this_->_q.push_back(this_->_cache.front());
8677 this_->_cache.pop_front();
8681 #ifdef AMPS_USE_EMPLACE 8682 this_->_q.emplace_back(message_.
deepCopy());
8684 this_->_q.push_back(message_.
deepCopy());
8688 this_->_client.isValid() && this_->_client.getAutoAck() &&
8692 message_.setIgnoreAutoAck();
8696 this_->_lock.signalAll();
8699 inline MessageStream::MessageStream(
void)
8702 inline MessageStream::MessageStream(
const Client& client_)
8703 : _body(
new MessageStreamImpl(client_))
8706 inline void MessageStream::iterator::advance(
void)
8708 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8712 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8717 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8719 result._body = (MessageStreamImpl*)(handler_._userData);
8724 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8725 const std::string& queryId_)
8727 _body->setSOWOnly(commandId_, queryId_);
8729 inline void MessageStream::setSubscription(
const std::string& subId_,
8730 const std::string& commandId_,
8731 const std::string& queryId_)
8733 _body->setSubscription(subId_, commandId_, queryId_);
8735 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8736 const std::string& queryId_)
8738 _body->setStatsOnly(commandId_, queryId_);
8740 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8743 _body->setAcksOnly(commandId_, acks_);
8762 return _body->getMaxDepth();
8766 return _body->getDepth();
8769 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8771 return *(_pEmptyMessageStream.get());
8779 ClientImpl& body = _body.get();
8780 Message& message = command_.getMessage();
8784 if (useExistingHandler)
8790 if (body._routes.getRoute(subId, existingHandler))
8793 body.executeAsync(command_, existingHandler,
false);
8794 return MessageStream::fromExistingHandler(existingHandler);
8803 if ((command & Message::Command::NoDataCommands)
8804 && (ackTypes == Message::AckType::Persisted
8805 || ackTypes == Message::AckType::None))
8808 if (!body._pEmptyMessageStream)
8810 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8811 body._pEmptyMessageStream.get()->_body->close();
8813 return body.getEmptyMessageStream();
8816 if (body.getDefaultMaxDepth())
8818 stream.
maxDepth(body.getDefaultMaxDepth());
8821 std::string commandID = body.executeAsync(command_, handler,
false);
8822 if (command_.hasStatsAck())
8824 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8826 else if (command_.isSow())
8830 stream.setAcksOnly(commandID,
8835 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8838 else if (command_.isSubscribe())
8840 stream.setSubscription(commandID,
8847 if (command == Message::Command::Publish ||
8848 command == Message::Command::DeltaPublish ||
8849 command == Message::Command::SOWDelete)
8851 stream.setAcksOnly(commandID,
8852 ackTypes & (
unsigned)~Message::AckType::Persisted);
8856 stream.setAcksOnly(commandID, ackTypes);
8863 inline void Message::ack(
const char* options_)
const 8865 ClientImpl* pClient = _body.get().clientImpl();
8867 if (pClient && bookmark.
len() &&
8868 !pClient->getAutoAck())
8871 pClient->ack(getTopic(), bookmark, options_);
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:747
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:698
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5086
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6764
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6738
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:945
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5315
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8126
void startTimer()
Definition: ampsplusplus.hpp:6727
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6269
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1089
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8750
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5343
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:559
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:758
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7018
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:923
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7386
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5147
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5203
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6058
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:788
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1061
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:672
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7395
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7254
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6009
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5439
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1338
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5704
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5218
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:724
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5455
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7109
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7338
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5189
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6791
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:567
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1309
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8163
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7406
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5591
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4990
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:659
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7037
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:901
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7047
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5274
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5981
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1152
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7368
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8760
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5249
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5304
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1453
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6463
Success.
Definition: amps.h:221
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1035
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5681
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8208
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6105
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5287
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4983
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5492
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8774
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:642
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5447
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7420
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:950
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5178
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1497
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:828
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5753
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1242
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:691
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5139
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1097
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6417
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5069
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6993
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:717
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7028
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:7101
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:582
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:850
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7116
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5540
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:665
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5403
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5816
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:574
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6904
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1052
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1493
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6826
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:796
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1263
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1047
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5921
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7057
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:591
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:809
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5772
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1029
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5158
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8136
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7266
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1042
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1083
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6307
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1183
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8145
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1323
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5395
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1280
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7327
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1373
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6702
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1481
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5387
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:704
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6897
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:599
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5415
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6960
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8156
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7317
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5658
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6394
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:881
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4945
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6640
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1272
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:836
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5637
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7146
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:624
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6599
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5791
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6070
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7309
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5939
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1231
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6563
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:865
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6884
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1355
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5470
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:737
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5365
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8200
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5374
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7242
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1431
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8212
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:685
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5357
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1215
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8219
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:203
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6496
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6954
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5874
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4937
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1347
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:802
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7079
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:730
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:815
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7090
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:668
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1301
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:678
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8755
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5256
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6143
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8764
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5165
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6930
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:856
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5225
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:887
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1251
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7068
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:711
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6871
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8173
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5484
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6518
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7350
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:641
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7299
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5515
Definition: ampsplusplus.hpp:102
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5124
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:998
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1293
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1403
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1159
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:781
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6164
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5564
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6967
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5730
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6847
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5903
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8745
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6205
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5131
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:471
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6355
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5842
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6030
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:611
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7180
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:769
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6679
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5001
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7377
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7290
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6237