AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.1
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43  #include <inttypes.h>
44 #endif
45 #if defined(sun)
46  #include <sys/atomic.h>
47 #endif
48 #include "amps/BookmarkStore.hpp"
49 #include "amps/MessageRouter.hpp"
50 #include "amps/util.hpp"
51 #include "amps/ampscrc.hpp"
52 #if __cplusplus >= 201100L || _MSC_VER >= 1900
53 #include <atomic>
54 #endif
55 
56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
57  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
58 #endif
59 
64 
65 
72 
83 
84 // For StoreBuffer implementations
85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
91 #define AMPS_DEFAULT_TOP_N -1
92 #define AMPS_DEFAULT_BATCH_SIZE 10
93 #define AMPS_NUMBER_BUFFER_LEN 20
94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
95 
96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
97  #define AMPS_X64 1
98 #endif
99 
100 #ifdef _WIN32
101  static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
102 #else
103  static __thread AMPS::Message* publishStoreMessage = 0;
104 #endif
105 
106 namespace AMPS
107 {
108 
109  typedef std::map<std::string, std::string> ConnectionInfo;
110 
111  class PerThreadMessageTracker
112  {
113  std::vector<AMPS::Message*> _messages;
114  public:
115  PerThreadMessageTracker() {}
116  ~PerThreadMessageTracker()
117  {
118  for (size_t i = 0; i < _messages.size(); ++i)
119  {
120  delete _messages[i];
121  }
122  }
123  void addMessage(AMPS::Message* message)
124  {
125  _messages.push_back(message);
126  }
127  static void addMessageToCleanupList(AMPS::Message* message)
128  {
129  static AMPS::Mutex _lock;
130  AMPS::Lock<Mutex> l(_lock);
131  _addMessageToCleanupList(message);
132  }
133  static void _addMessageToCleanupList(AMPS::Message* message)
134  {
135  static PerThreadMessageTracker tracker;
136  tracker.addMessage(message);
137  }
138  };
139 
140  template<class Type>
141  inline std::string asString(Type x_)
142  {
143  std::ostringstream os;
144  os << x_;
145  return os.str();
146  }
147 
148  inline
149  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
150  {
151  size_t pos = AMPS_NUMBER_BUFFER_LEN;
152  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
153  {
154  if (seqNo_ > 0)
155  {
156  buf_[--pos] = (char)(seqNo_ % 10 + '0');
157  seqNo_ /= 10;
158  }
159  }
160  return pos;
161  }
162 
163 #ifdef _WIN32
164  inline
165  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
166  {
167  size_t pos = AMPS_NUMBER_BUFFER_LEN;
168  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
169  {
170  if (seqNo_ > 0)
171  {
172  buf_[--pos] = (char)(seqNo_ % 10 + '0');
173  seqNo_ /= 10;
174  }
175  }
176  return pos;
177  }
178 #endif
179 
183  class Reason
184  {
185  public:
186  static const char* duplicate()
187  {
188  return "duplicate";
189  }
190  static const char* badFilter()
191  {
192  return "bad filter";
193  }
194  static const char* badRegexTopic()
195  {
196  return "bad regex topic";
197  }
198  static const char* subscriptionAlreadyExists()
199  {
200  return "subscription already exists";
201  }
202  static const char* nameInUse()
203  {
204  return "name in use";
205  }
206  static const char* authFailure()
207  {
208  return "auth failure";
209  }
210  static const char* notEntitled()
211  {
212  return "not entitled";
213  }
214  static const char* authDisabled()
215  {
216  return "authentication disabled";
217  }
218  static const char* subidInUse()
219  {
220  return "subid in use";
221  }
222  static const char* noTopic()
223  {
224  return "no topic";
225  }
226  };
227 
237  {
238  public:
239  virtual ~ExceptionListener() {;}
240  virtual void exceptionThrown(const std::exception&) const {;}
241  };
242 
244 
245 
246 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
247  try\
248  {\
249  x;\
250  }\
251  catch (std::exception& ex_)\
252  {\
253  try\
254  {\
255  _exceptionListener->exceptionThrown(ex_);\
256  }\
257  catch(...)\
258  {\
259  ;\
260  }\
261  }
262  /*
263  * Note : we don't attempt to trap non std::exception exceptions
264  * here because doing so interferes with pthread_exit on some OSes.
265  catch (...)\
266  {\
267  try\
268  {\
269  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
270  "An unhandled exception of unknown type was thrown by "\
271  "the registered handler.", AMPS_E_USAGE));\
272  }\
273  catch(...)\
274  {\
275  ;\
276  }\
277  }
278  */
279 #ifdef _WIN32
280 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
281  try\
282  {\
283  while(me->_connected)\
284  {\
285  try\
286  {\
287  x;\
288  break;\
289  }\
290  catch(MessageStreamFullException&)\
291  {\
292  me->checkAndSendHeartbeat(false);\
293  }\
294  }\
295  }\
296  catch (std::exception& ex_)\
297  {\
298  try\
299  {\
300  me->_exceptionListener->exceptionThrown(ex_);\
301  }\
302  catch(...)\
303  {\
304  ;\
305  }\
306  }
307  /*
308  * Note : we don't attempt to trap non std::exception exceptions
309  * here because doing so interferes with pthread_exit on some OSes.
310  catch (...)\
311  {\
312  try\
313  {\
314  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
315  "An unhandled exception of unknown type was thrown by "\
316  "the registered handler.", AMPS_E_USAGE));\
317  }\
318  catch(...)\
319  {\
320  ;\
321  }\
322  }*/
323 
324 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
325  while(me->_connected)\
326  {\
327  try\
328  {\
329  x;\
330  break;\
331  }\
332  catch(MessageStreamFullException&)\
333  {\
334  me->checkAndSendHeartbeat(false);\
335  }\
336  }
337 #else
338 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
339  try\
340  {\
341  while(me->_connected)\
342  {\
343  try\
344  {\
345  x;\
346  break;\
347  }\
348  catch(MessageStreamFullException& ex_)\
349  {\
350  me->checkAndSendHeartbeat(false);\
351  }\
352  }\
353  }\
354  catch (std::exception& ex_)\
355  {\
356  try\
357  {\
358  me->_exceptionListener->exceptionThrown(ex_);\
359  }\
360  catch(...)\
361  {\
362  ;\
363  }\
364  }
365  /*
366  * Note : we don't attempt to trap non std::exception exceptions
367  * here because doing so interferes with pthread_exit on some OSes.
368  catch (...)\
369  {\
370  try\
371  {\
372  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
373  "An unhandled exception of unknown type was thrown by "\
374  "the registered handler.", AMPS_E_USAGE));\
375  }\
376  catch(...)\
377  {\
378  ;\
379  }\
380  }*/
381 
382 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
383  while(me->_connected)\
384  {\
385  try\
386  {\
387  x;\
388  break;\
389  }\
390  catch(MessageStreamFullException& ex_)\
391  {\
392  me->checkAndSendHeartbeat(false);\
393  }\
394  }
395 #endif
396 
397 #define AMPS_UNHANDLED_EXCEPTION(ex) \
398  try\
399  {\
400  _exceptionListener->exceptionThrown(ex);\
401  }\
402  catch(...)\
403  {;}
404 
405 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
406  try\
407  {\
408  me->_exceptionListener->exceptionThrown(ex);\
409  }\
410  catch(...)\
411  {;}
412 
413 
414  class Client;
415 
440 
441  class Command
442  {
443  Message _message;
444  unsigned _timeout;
445  unsigned _batchSize;
446  unsigned _flags;
447  static const unsigned Subscribe = 1;
448  static const unsigned SOW = 2;
449  static const unsigned NeedsSequenceNumber = 4;
450  static const unsigned ProcessedAck = 8;
451  static const unsigned StatsAck = 16;
452  void init(Message::Command::Type command_)
453  {
454  _timeout = 0;
455  _batchSize = 0;
456  _flags = 0;
457  _message.reset();
458  _message.setCommandEnum(command_);
459  _setIds();
460  }
461  void init(const std::string& command_)
462  {
463  _timeout = 0;
464  _batchSize = 0;
465  _flags = 0;
466  _message.reset();
467  _message.setCommand(command_);
468  _setIds();
469  }
470  void init(const char* command_, size_t commandLen_)
471  {
472  _timeout = 0;
473  _batchSize = 0;
474  _flags = 0;
475  _message.reset();
476  _message.setCommand(command_, commandLen_);
477  _setIds();
478  }
479  void _setIds(void)
480  {
481  Message::Command::Type command = _message.getCommandEnum();
482  if (!(command & Message::Command::NoDataCommands))
483  {
484  _message.newCommandId();
485  if (command == Message::Command::Subscribe ||
486  command == Message::Command::SOWAndSubscribe ||
487  command == Message::Command::DeltaSubscribe ||
488  command == Message::Command::SOWAndDeltaSubscribe)
489  {
490  _message.setSubscriptionId(_message.getCommandId());
491  _flags |= Subscribe;
492  }
493  if (command == Message::Command::SOW
494  || command == Message::Command::SOWAndSubscribe
495  || command == Message::Command::SOWAndDeltaSubscribe)
496  {
497  _message.setQueryID(_message.getCommandId());
498  if (_batchSize == 0)
499  {
500  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
501  }
502  if (command == Message::Command::SOW)
503  {
504  _flags |= SOW;
505  }
506  }
507  _flags |= ProcessedAck;
508  }
509  else if (command == Message::Command::SOWDelete)
510  {
511  _message.newCommandId();
512  _flags |= ProcessedAck;
513  _flags |= NeedsSequenceNumber;
514  }
515  else if (command == Message::Command::Publish
516  || command == Message::Command::DeltaPublish)
517  {
518  _flags |= NeedsSequenceNumber;
519  }
520  else if (command == Message::Command::StopTimer)
521  {
522  _message.newCommandId();
523  }
524  }
525  public:
529  Command(const std::string& command_)
530  {
531  init(command_);
532  }
537  Command(const char* command_, size_t commandLen_)
538  {
539  init(command_, commandLen_);
540  }
544  Command(Message::Command::Type command_)
545  {
546  init(command_);
547  }
548 
552  Command& reset(const std::string& command_)
553  {
554  init(command_);
555  return *this;
556  }
561  Command& reset(const char* command_, size_t commandLen_)
562  {
563  init(command_, commandLen_);
564  return *this;
565  }
569  Command& reset(Message::Command::Type command_)
570  {
571  init(command_);
572  return *this;
573  }
581  Command& setSowKey(const std::string& sowKey_)
582  {
583  _message.setSowKey(sowKey_);
584  return *this;
585  }
594  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
595  {
596  _message.setSowKey(sowKey_, sowKeyLen_);
597  return *this;
598  }
611  Command& setSowKeys(const std::string& sowKeys_)
612  {
613  _message.setSowKeys(sowKeys_);
614  return *this;
615  }
629  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
630  {
631  _message.setSowKeys(sowKeys_, sowKeysLen_);
632  return *this;
633  }
635  Command& setCommandId(const std::string& cmdId_)
636  {
637  _message.setCommandId(cmdId_);
638  return *this;
639  }
642  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
643  {
644  _message.setCommandId(cmdId_, cmdIdLen_);
645  return *this;
646  }
648  Command& setTopic(const std::string& topic_)
649  {
650  _message.setTopic(topic_);
651  return *this;
652  }
655  Command& setTopic(const char* topic_, size_t topicLen_)
656  {
657  _message.setTopic(topic_, topicLen_);
658  return *this;
659  }
661  Command& setFilter(const std::string& filter_)
662  {
663  _message.setFilter(filter_);
664  return *this;
665  }
668  Command& setFilter(const char* filter_, size_t filterLen_)
669  {
670  _message.setFilter(filter_, filterLen_);
671  return *this;
672  }
674  Command& setOrderBy(const std::string& orderBy_)
675  {
676  _message.setOrderBy(orderBy_);
677  return *this;
678  }
681  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
682  {
683  _message.setOrderBy(orderBy_, orderByLen_);
684  return *this;
685  }
687  Command& setSubId(const std::string& subId_)
688  {
689  _message.setSubscriptionId(subId_);
690  return *this;
691  }
694  Command& setSubId(const char* subId_, size_t subIdLen_)
695  {
696  _message.setSubscriptionId(subId_, subIdLen_);
697  return *this;
698  }
700  Command& setQueryId(const std::string& queryId_)
701  {
702  _message.setQueryId(queryId_);
703  return *this;
704  }
707  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
708  {
709  _message.setQueryId(queryId_, queryIdLen_);
710  return *this;
711  }
717  Command& setBookmark(const std::string& bookmark_)
718  {
719  _message.setBookmark(bookmark_);
720  return *this;
721  }
728  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
729  {
730  _message.setBookmark(bookmark_, bookmarkLen_);
731  return *this;
732  }
739  Command& setCorrelationId(const std::string& correlationId_)
740  {
741  _message.setCorrelationId(correlationId_);
742  return *this;
743  }
751  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
752  {
753  _message.setCorrelationId(correlationId_, correlationIdLen_);
754  return *this;
755  }
758  Command& setOptions(const std::string& options_)
759  {
760  _message.setOptions(options_);
761  return *this;
762  }
766  Command& setOptions(const char* options_, size_t optionsLen_)
767  {
768  _message.setOptions(options_, optionsLen_);
769  return *this;
770  }
772  Command& setSequence(const std::string& seq_)
773  {
774  _message.setSequence(seq_);
775  return *this;
776  }
779  Command& setSequence(const char* seq_, size_t seqLen_)
780  {
781  _message.setSequence(seq_, seqLen_);
782  return *this;
783  }
785  Command& setSequence(const amps_uint64_t seq_)
786  {
787  std::ostringstream os;
788  os << seq_;
789  _message.setSequence(os.str());
790  return *this;
791  }
792  amps_uint64_t getSequence() const
793  {
794  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
795  }
798  Command& setData(const std::string& data_)
799  {
800  _message.setData(data_);
801  return *this;
802  }
806  Command& setData(const char* data_, size_t dataLen_)
807  {
808  _message.setData(data_, dataLen_);
809  return *this;
810  }
820  Command& setTimeout(unsigned timeout_)
821  {
822  _timeout = timeout_;
823  return *this;
824  }
826  Command& setTopN(unsigned topN_)
827  {
828  _message.setTopNRecordsReturned(topN_);
829  return *this;
830  }
835  Command& setBatchSize(unsigned batchSize_)
836  {
837  _message.setBatchSize(batchSize_);
838  _batchSize = batchSize_;
839  return *this;
840  }
851  Command& setExpiration(unsigned expiration_)
852  {
853  _message.setExpiration(expiration_);
854  return *this;
855  }
857  Command& addAckType(const std::string& ackType_)
858  {
859  _message.setAckType(_message.getAckType() + "," + ackType_);
860  if (ackType_ == "processed")
861  {
862  _flags |= ProcessedAck;
863  }
864  else if (ackType_ == "stats")
865  {
866  _flags |= StatsAck;
867  }
868  return *this;
869  }
871  Command& setAckType(const std::string& ackType_)
872  {
873  _message.setAckType(ackType_);
874  if (ackType_.find("processed") != std::string::npos)
875  {
876  _flags |= ProcessedAck;
877  }
878  else
879  {
880  _flags &= ~ProcessedAck;
881  }
882  if (ackType_.find("stats") != std::string::npos)
883  {
884  _flags |= StatsAck;
885  }
886  else
887  {
888  _flags &= ~StatsAck;
889  }
890  return *this;
891  }
893  Command& setAckType(unsigned ackType_)
894  {
895  _message.setAckTypeEnum(ackType_);
896  if (ackType_ & Message::AckType::Processed)
897  {
898  _flags |= ProcessedAck;
899  }
900  else
901  {
902  _flags &= ~ProcessedAck;
903  }
904  if (ackType_ & Message::AckType::Stats)
905  {
906  _flags |= StatsAck;
907  }
908  else
909  {
910  _flags &= ~StatsAck;
911  }
912  return *this;
913  }
915  std::string getAckType() const
916  {
917  return (std::string)(_message.getAckType());
918  }
920  unsigned getAckTypeEnum() const
921  {
922  return _message.getAckTypeEnum();
923  }
924 
925  Message& getMessage(void)
926  {
927  return _message;
928  }
929  unsigned getTimeout(void) const
930  {
931  return _timeout;
932  }
933  unsigned getBatchSize(void) const
934  {
935  return _batchSize;
936  }
937  bool isSubscribe(void) const
938  {
939  return _flags & Subscribe;
940  }
941  bool isSow(void) const
942  {
943  return (_flags & SOW) != 0;
944  }
945  bool hasProcessedAck(void) const
946  {
947  return (_flags & ProcessedAck) != 0;
948  }
949  bool hasStatsAck(void) const
950  {
951  return (_flags & StatsAck) != 0;
952  }
953  bool needsSequenceNumber(void) const
954  {
955  return (_flags & NeedsSequenceNumber) != 0;
956  }
957  };
958 
961  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
962 
963  class Message;
965 
969  {
970  public:
971  virtual ~Authenticator() {;}
972 
978  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
986  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
993  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
994  };
995 
1000  {
1001  public:
1002  virtual ~DefaultAuthenticator() {;}
1005  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1006  {
1007  return password_;
1008  }
1009 
1012  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1013  {
1014  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1015  }
1016 
1017  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1018 
1023  {
1024  static DefaultAuthenticator d;
1025  return d;
1026  }
1027  };
1028 
1032  {
1033  public:
1034 
1038  virtual void execute(Message& message_) = 0;
1039 
1040  virtual ~StoreReplayer() {;}
1041  };
1042 
1043  class Store;
1044 
1053  typedef bool (*PublishStoreResizeHandler)(Store store_,
1054  size_t size_,
1055  void* userData_);
1056 
1059  class StoreImpl : public RefBody
1060  {
1061  public:
1067  StoreImpl(bool errorOnPublishGap_ = false)
1068  : _resizeHandler(NULL)
1069  , _resizeHandlerData(NULL)
1070  , _errorOnPublishGap(errorOnPublishGap_)
1071  {;}
1072 
1077  virtual amps_uint64_t store(const Message& message_) = 0;
1078 
1085  virtual void discardUpTo(amps_uint64_t index_) = 0;
1086 
1091  virtual void replay(StoreReplayer& replayer_) = 0;
1092 
1100  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1101 
1106  virtual size_t unpersistedCount() const = 0;
1107 
1108  virtual ~StoreImpl() {;}
1109 
1118  virtual void flush(long timeout_) = 0;
1119 
1122  static inline size_t getUnsetPosition()
1123  {
1124  return AMPS_UNSET_INDEX;
1125  }
1126 
1129  static inline amps_uint64_t getUnsetSequence()
1130  {
1131  return AMPS_UNSET_SEQUENCE;
1132  }
1133 
1137  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1138 
1142  virtual amps_uint64_t getLastPersisted() = 0;
1143 
1153  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1154  void* userData_)
1155  {
1156  _resizeHandler = handler_;
1157  _resizeHandlerData = userData_;
1158  }
1159 
1160  inline virtual PublishStoreResizeHandler getResizeHandler() const
1161  {
1162  return _resizeHandler;
1163  }
1164 
1165  bool callResizeHandler(size_t newSize_);
1166 
1167  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1168  {
1169  _errorOnPublishGap = errorOnPublishGap_;
1170  }
1171 
1172  inline virtual bool getErrorOnPublishGap() const
1173  {
1174  return _errorOnPublishGap;
1175  }
1176 
1177  private:
1178  PublishStoreResizeHandler _resizeHandler;
1179  void* _resizeHandlerData;
1180  bool _errorOnPublishGap;
1181  };
1182 
1185  class Store
1186  {
1187  RefHandle<StoreImpl> _body;
1188  public:
1189  Store() {;}
1190  Store(StoreImpl* body_) : _body(body_) {;}
1191  Store(const Store& rhs) : _body(rhs._body) {;}
1192  Store& operator=(const Store& rhs)
1193  {
1194  _body = rhs._body;
1195  return *this;
1196  }
1197 
1201  amps_uint64_t store(const Message& message_)
1202  {
1203  return _body.get().store(message_);
1204  }
1205 
1212  void discardUpTo(amps_uint64_t index_)
1213  {
1214  _body.get().discardUpTo(index_);
1215  }
1216 
1221  void replay(StoreReplayer& replayer_)
1222  {
1223  _body.get().replay(replayer_);
1224  }
1225 
1233  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1234  {
1235  return _body.get().replaySingle(replayer_, index_);
1236  }
1237 
1242  size_t unpersistedCount() const
1243  {
1244  return _body.get().unpersistedCount();
1245  }
1246 
1250  bool isValid() const
1251  {
1252  return _body.isValid();
1253  }
1254 
1263  void flush(long timeout_ = 0)
1264  {
1265  return _body.get().flush(timeout_);
1266  }
1267 
1271  amps_uint64_t getLowestUnpersisted()
1272  {
1273  return _body.get().getLowestUnpersisted();
1274  }
1275 
1279  amps_uint64_t getLastPersisted()
1280  {
1281  return _body.get().getLastPersisted();
1282  }
1283 
1293  void setResizeHandler(PublishStoreResizeHandler handler_,
1294  void* userData_)
1295  {
1296  _body.get().setResizeHandler(handler_, userData_);
1297  }
1298 
1299  PublishStoreResizeHandler getResizeHandler() const
1300  {
1301  return _body.get().getResizeHandler();
1302  }
1303 
1308  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1309  {
1310  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1311  }
1312 
1317  inline bool getErrorOnPublishGap() const
1318  {
1319  return _body.get().getErrorOnPublishGap();
1320  }
1321 
1325  StoreImpl* get()
1326  {
1327  if (_body.isValid())
1328  {
1329  return &_body.get();
1330  }
1331  else
1332  {
1333  return NULL;
1334  }
1335  }
1336 
1337  };
1338 
1344  {
1345  public:
1346  virtual ~FailedWriteHandler() {;}
1353  virtual void failedWrite(const Message& message_,
1354  const char* reason_, size_t reasonLength_) = 0;
1355  };
1356 
1357 
1358  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1359  {
1360  if (_resizeHandler)
1361  {
1362  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1363  }
1364  return true;
1365  }
1366 
1373  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1374  void* data_)
1375  {
1376  long* timeoutp = (long*)data_;
1377  size_t count = store_.unpersistedCount();
1378  if (count == 0)
1379  {
1380  return false;
1381  }
1382  try
1383  {
1384  store_.flush(*timeoutp);
1385  }
1386 #ifdef _WIN32
1387  catch (const TimedOutException&)
1388 #else
1389  catch (const TimedOutException& e)
1390 #endif
1391  {
1392  return true;
1393  }
1394  return (count == store_.unpersistedCount());
1395  }
1396 
1402 {
1403 public:
1415  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1416  unsigned requestedAckTypes_,
1417  const AMPSException& exception_) = 0;
1418 };
1419 
1424  {
1425  public:
1426  virtual ~SubscriptionManager() {;}
1434  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1435  unsigned requestedAckTypes_) = 0;
1439  virtual void unsubscribe(const Message::Field& subId_) = 0;
1442  virtual void clear() = 0;
1446  virtual void resubscribe(Client& client_) = 0;
1451  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1452  {
1453  _failedResubscribeHandler = handler_;
1454  }
1455  protected:
1456  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1457  };
1458 
1462 
1464  {
1465  public:
1467  typedef enum { Disconnected = 0,
1468  Shutdown = 1,
1469  Connected = 2,
1470  LoggedOn = 4,
1471  PublishReplayed = 8,
1472  HeartbeatInitiated = 16,
1473  Resubscribed = 32,
1474  UNKNOWN = 16384
1475  } State;
1476 
1486  virtual void connectionStateChanged(State newState_) = 0;
1487  virtual ~ConnectionStateListener() {;};
1488  };
1489 
1490 
1491  class MessageStreamImpl;
1492  class MessageStream;
1493 
1494  typedef void(*DeferredExecutionFunc)(void*);
1495 
1496  class ClientImpl : public RefBody // -V553
1497  {
1498  // Class to wrap turning of Nagle for things like flush and logon
1499  class NoDelay
1500  {
1501  private:
1502  AMPS_SOCKET _socket;
1503  int _noDelay;
1504  char* _valuePtr;
1505 #ifdef _WIN32
1506  int _valueLen;
1507 #else
1508  socklen_t _valueLen;
1509 #endif
1510  public:
1511  NoDelay(amps_handle client_)
1512  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1513  {
1514  _valuePtr = (char*)&_noDelay;
1515  _socket = amps_client_get_socket(client_);
1516  if (_socket != AMPS_INVALID_SOCKET)
1517  {
1518  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1519  if (!_noDelay)
1520  {
1521  _noDelay = 1;
1522  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1523  }
1524  else
1525  {
1526  _socket = AMPS_INVALID_SOCKET;
1527  }
1528  }
1529  }
1530 
1531  ~NoDelay()
1532  {
1533  if (_socket != AMPS_INVALID_SOCKET)
1534  {
1535  _noDelay = 0;
1536  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1537  }
1538  }
1539  };
1540 
1541  friend class Client;
1542  protected:
1543  amps_handle _client;
1544  DisconnectHandler _disconnectHandler;
1545  enum GlobalCommandTypeHandlers : size_t
1546  {
1547  Publish = 0,
1548  SOW = 1,
1549  GroupBegin = 2,
1550  GroupEnd = 3,
1551  Heartbeat = 4,
1552  OOF = 5,
1553  Ack = 6,
1554  LastChance = 7,
1555  DuplicateMessage = 8,
1556  COUNT = 9
1557  };
1558  std::vector<MessageHandler> _globalCommandTypeHandlers;
1559  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1560  MessageRouter _routes;
1561  MessageRouter::RouteCache _routeCache;
1562  mutable Mutex _lock;
1563  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1564  amps_uint64_t _nameHashValue;
1565  BookmarkStore _bookmarkStore;
1566  Store _publishStore;
1567  bool _isRetryOnDisconnect;
1568  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1569 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1570  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1571 #else
1572  volatile amps_uint64_t _lastSentHaSequenceNumber;
1573 #endif
1574  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1575  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1576  VersionInfo _serverVersion;
1577  Timer _heartbeatTimer;
1578  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1579 
1580  // queue data
1581  int _queueAckTimeout;
1582  bool _isAutoAckEnabled;
1583  unsigned _ackBatchSize;
1584  unsigned _queuedAckCount;
1585  unsigned _defaultMaxDepth;
1586  struct QueueBookmarks
1587  {
1588  QueueBookmarks(const std::string& topic_)
1589  : _topic(topic_)
1590  , _oldestTime(0)
1591  , _bookmarkCount(0)
1592  {;}
1593  std::string _topic;
1594  std::string _data;
1595  amps_uint64_t _oldestTime;
1596  unsigned _bookmarkCount;
1597  };
1598  typedef amps_uint64_t topic_hash;
1599  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1600  TopicHashMap _topicHashMap;
1601 
1602  class ClientStoreReplayer : public StoreReplayer
1603  {
1604  ClientImpl* _client;
1605  public:
1606  unsigned _version;
1607  amps_result _res;
1608 
1609  ClientStoreReplayer()
1610  : _client(NULL), _version(0), _res(AMPS_E_OK)
1611  {}
1612 
1613  ClientStoreReplayer(ClientImpl* client_)
1614  : _client(client_), _version(0), _res(AMPS_E_OK)
1615  {}
1616 
1617  void setClient(ClientImpl* client_)
1618  {
1619  _client = client_;
1620  }
1621 
1622  void execute(Message& message_)
1623  {
1624  if (!_client)
1625  {
1626  throw CommandException("Can't replay without a client.");
1627  }
1628  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1629  AMPS_Sequence);
1630  if (index > _client->_lastSentHaSequenceNumber)
1631  {
1632  _client->_lastSentHaSequenceNumber = index;
1633  }
1634 
1635  _res = AMPS_E_OK;
1636  // Don't replay a queue cancel message after a reconnect.
1637  // Currently, the only messages that will have anything in options
1638  // are cancel messages.
1639  if (!message_.getCommand().empty() &&
1640  (!_client->_logonInProgress ||
1641  message_.getOptions().len() < 6))
1642  {
1643  _res = amps_client_send_with_version(_client->_client,
1644  message_.getMessage(),
1645  &_version);
1646  if (_res != AMPS_E_OK)
1647  {
1648  throw DisconnectedException("AMPS Server disconnected during replay");
1649  }
1650  }
1651  }
1652 
1653  };
1654  ClientStoreReplayer _replayer;
1655 
1656  class FailedWriteStoreReplayer : public StoreReplayer
1657  {
1658  ClientImpl* _parent;
1659  const char* _reason;
1660  size_t _reasonLength;
1661  size_t _replayCount;
1662  public:
1663  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1664  : _parent(parent),
1665  _reason(reason_),
1666  _reasonLength(reasonLength_),
1667  _replayCount(0)
1668  {;}
1669  void execute(Message& message_)
1670  {
1671  if (_parent->_failedWriteHandler)
1672  {
1673  ++_replayCount;
1674  _parent->_failedWriteHandler->failedWrite(message_,
1675  _reason, _reasonLength);
1676  }
1677  }
1678  size_t replayCount(void) const
1679  {
1680  return _replayCount;
1681  }
1682  };
1683 
1684  struct AckResponseImpl : public RefBody
1685  {
1686  std::string username, password, reason, status, bookmark, options;
1687  amps_uint64_t sequenceNo;
1688  amps_uint64_t nameHashValue;
1689  VersionInfo serverVersion;
1690 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1691  std::atomic<bool> responded;
1692  std::atomic<bool> abandoned;
1693 #else
1694  volatile bool responded;
1695  volatile bool abandoned;
1696 #endif
1697  unsigned connectionVersion;
1698  AckResponseImpl() :
1699  RefBody(),
1700  sequenceNo((amps_uint64_t)0),
1701  serverVersion(),
1702  responded(false),
1703  abandoned(false),
1704  connectionVersion(0)
1705  {
1706  }
1707  };
1708 
1709  class AckResponse
1710  {
1711  RefHandle<AckResponseImpl> _body;
1712  public:
1713  AckResponse() : _body(NULL) {;}
1714  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1715  static AckResponse create()
1716  {
1717  AckResponse r;
1718  r._body = new AckResponseImpl();
1719  return r;
1720  }
1721 
1722  const std::string& username()
1723  {
1724  return _body.get().username;
1725  }
1726  void setUsername(const char* data_, size_t len_)
1727  {
1728  if (data_)
1729  {
1730  _body.get().username.assign(data_, len_);
1731  }
1732  else
1733  {
1734  _body.get().username.clear();
1735  }
1736  }
1737  const std::string& password()
1738  {
1739  return _body.get().password;
1740  }
1741  void setPassword(const char* data_, size_t len_)
1742  {
1743  if (data_)
1744  {
1745  _body.get().password.assign(data_, len_);
1746  }
1747  else
1748  {
1749  _body.get().password.clear();
1750  }
1751  }
1752  const std::string& reason()
1753  {
1754  return _body.get().reason;
1755  }
1756  void setReason(const char* data_, size_t len_)
1757  {
1758  if (data_)
1759  {
1760  _body.get().reason.assign(data_, len_);
1761  }
1762  else
1763  {
1764  _body.get().reason.clear();
1765  }
1766  }
1767  const std::string& status()
1768  {
1769  return _body.get().status;
1770  }
1771  void setStatus(const char* data_, size_t len_)
1772  {
1773  if (data_)
1774  {
1775  _body.get().status.assign(data_, len_);
1776  }
1777  else
1778  {
1779  _body.get().status.clear();
1780  }
1781  }
1782  const std::string& bookmark()
1783  {
1784  return _body.get().bookmark;
1785  }
1786  void setBookmark(const Field& bookmark_)
1787  {
1788  if (!bookmark_.empty())
1789  {
1790  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1791  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1792  _body.get().sequenceNo);
1793  }
1794  else
1795  {
1796  _body.get().bookmark.clear();
1797  _body.get().sequenceNo = (amps_uint64_t)0;
1798  _body.get().nameHashValue = (amps_uint64_t)0;
1799  }
1800  }
1801  amps_uint64_t sequenceNo() const
1802  {
1803  return _body.get().sequenceNo;
1804  }
1805  amps_uint64_t nameHashValue() const
1806  {
1807  return _body.get().nameHashValue;
1808  }
1809  void setSequenceNo(const char* data_, size_t len_)
1810  {
1811  amps_uint64_t result = (amps_uint64_t)0;
1812  if (data_)
1813  {
1814  for (size_t i = 0; i < len_; ++i)
1815  {
1816  result *= (amps_uint64_t)10;
1817  result += (amps_uint64_t)(data_[i] - '0');
1818  }
1819  }
1820  _body.get().sequenceNo = result;
1821  }
1822  VersionInfo serverVersion() const
1823  {
1824  return _body.get().serverVersion;
1825  }
1826  void setServerVersion(const char* data_, size_t len_)
1827  {
1828  if (data_)
1829  {
1830  _body.get().serverVersion.setVersion(std::string(data_, len_));
1831  }
1832  }
1833  bool responded()
1834  {
1835  return _body.get().responded;
1836  }
1837  void setResponded()
1838  {
1839  _body.get().responded = true;
1840  }
1841  bool abandoned()
1842  {
1843  return _body.get().abandoned;
1844  }
1845  void setAbandoned()
1846  {
1847  if (_body.isValid())
1848  {
1849  _body.get().abandoned = true;
1850  }
1851  }
1852 
1853  void setConnectionVersion(unsigned connectionVersion)
1854  {
1855  _body.get().connectionVersion = connectionVersion;
1856  }
1857 
1858  unsigned getConnectionVersion()
1859  {
1860  return _body.get().connectionVersion;
1861  }
1862  void setOptions(const char* data_, size_t len_)
1863  {
1864  if (data_)
1865  {
1866  _body.get().options.assign(data_, len_);
1867  }
1868  else
1869  {
1870  _body.get().options.clear();
1871  }
1872  }
1873 
1874  const std::string& options()
1875  {
1876  return _body.get().options;
1877  }
1878 
1879  AckResponse& operator=(const AckResponse& rhs)
1880  {
1881  _body = rhs._body;
1882  return *this;
1883  }
1884  };
1885 
1886 
1887  typedef std::map<std::string, AckResponse> AckMap;
1888  AckMap _ackMap;
1889  Mutex _ackMapLock;
1890  DefaultExceptionListener _defaultExceptionListener;
1891  protected:
1892 
1893  struct DeferredExecutionRequest
1894  {
1895  DeferredExecutionRequest(DeferredExecutionFunc func_,
1896  void* userData_)
1897  : _func(func_),
1898  _userData(userData_)
1899  {;}
1900 
1901  DeferredExecutionFunc _func;
1902  void* _userData;
1903  };
1904  const ExceptionListener* _exceptionListener;
1905  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1906  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1907  volatile bool _connected;
1908  std::string _username;
1909  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1910  ConnectionStateListeners _connectionStateListeners;
1911  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1912  Mutex _deferredExecutionLock;
1913  DeferredExecutionList _deferredExecutionList;
1914  unsigned _heartbeatInterval;
1915  unsigned _readTimeout;
1916 
1917  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1918  {
1919  // If we disconnected before we got to notification, don't notify.
1920  // This should only be able to happen for Resubscribed, since the lock
1921  // is released to let the subscription manager run resubscribe so a
1922  // disconnect could be called before the change is broadcast.
1923  if (!_connected && newState_ > ConnectionStateListener::Connected)
1924  {
1925  return;
1926  }
1927  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1928  {
1929  AMPS_CALL_EXCEPTION_WRAPPER(
1930  (*it)->connectionStateChanged(newState_));
1931  }
1932  }
1933  unsigned processedAck(Message& message);
1934  unsigned persistedAck(Message& meesage);
1935  void lastChance(Message& message);
1936  void checkAndSendHeartbeat(bool force = false);
1937  virtual ConnectionInfo getConnectionInfo() const;
1938  static amps_result
1939  ClientImplMessageHandler(amps_handle message, void* userData);
1940  static void
1941  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1942  static amps_result
1943  ClientImplDisconnectHandler(amps_handle client, void* userData);
1944 
1945  void unsubscribeInternal(const std::string& id)
1946  {
1947  if (id.empty())
1948  {
1949  return;
1950  }
1951  // remove the handler first to avoid any more message delivery
1952  Message::Field subId;
1953  subId.assign(id.data(), id.length());
1954  _routes.removeRoute(subId);
1955  // Lock is already acquired
1956  if (_subscriptionManager)
1957  {
1958  // Have to unlock before calling into sub manager to avoid deadlock
1959  Unlock<Mutex> unlock(_lock);
1960  _subscriptionManager->unsubscribe(subId);
1961  }
1962  _message.reset();
1963  _message.setCommandEnum(Message::Command::Unsubscribe);
1964  _message.newCommandId();
1965  _message.setSubscriptionId(id);
1966  _sendWithoutRetry(_message);
1967  deferredExecution(&amps_noOpFn, NULL);
1968  }
1969 
1970  AckResponse syncAckProcessing(long timeout_, Message& message_,
1971  bool isHASubscribe_)
1972  {
1973  return syncAckProcessing(timeout_, message_,
1974  (amps_uint64_t)0, isHASubscribe_);
1975  }
1976 
1977  AckResponse syncAckProcessing(long timeout_, Message& message_,
1978  amps_uint64_t haSeq = (amps_uint64_t)0,
1979  bool isHASubscribe_ = false)
1980  {
1981  // inv: we already have _lock locked up.
1982  AckResponse ack = AckResponse::create();
1983  if (1)
1984  {
1985  Lock<Mutex> guard(_ackMapLock);
1986  _ackMap[message_.getCommandId()] = ack;
1987  }
1988  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1989  if (ack.getConnectionVersion() == 0)
1990  {
1991  // Send failed
1992  throw DisconnectedException("Connection closed while waiting for response.");
1993  }
1994  bool timedOut = false;
1995  AMPS_START_TIMER(timeout_)
1996  while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1997  {
1998  if (timeout_)
1999  {
2000  timedOut = !_lock.wait(timeout_);
2001  // May have woken up early, check real time
2002  if (timedOut)
2003  {
2004  AMPS_RESET_TIMER(timedOut, timeout_);
2005  }
2006  }
2007  else
2008  {
2009  // Using a timeout version to ensure python can interrupt
2010  _lock.wait(1000);
2011  Unlock<Mutex> unlck(_lock);
2012  amps_invoke_waiting_function();
2013  }
2014  }
2015  if (ack.responded())
2016  {
2017  if (ack.status() != "failure")
2018  {
2019  if (message_.getCommand() == "logon")
2020  {
2021  amps_uint64_t ackSequence = ack.sequenceNo();
2022  if (_lastSentHaSequenceNumber < ackSequence)
2023  {
2024  _lastSentHaSequenceNumber = ackSequence;
2025  }
2026  if (_publishStore.isValid())
2027  {
2028  // If this throws, logon will fail and eitehr be
2029  // handled in HAClient/ServerChooser or by the caller
2030  // of logon.
2031  _publishStore.discardUpTo(ackSequence);
2032  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2033  {
2034  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2035  }
2036  }
2037  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2038  _nameHashValue = ack.nameHashValue();
2039  _serverVersion = ack.serverVersion();
2040  if (_bookmarkStore.isValid())
2041  {
2042  _bookmarkStore.setServerVersion(_serverVersion);
2043  }
2044  }
2045  if (_ackBatchSize)
2046  {
2047  const std::string& options = ack.options();
2048  size_t index = options.find_first_of("max_backlog=");
2049  if (index != std::string::npos)
2050  {
2051  unsigned data = 0;
2052  const char* c = options.c_str() + index + 12;
2053  while (*c && *c != ',')
2054  {
2055  data = (data * 10) + (unsigned)(*c++ -48);
2056  }
2057  if (_ackBatchSize > data)
2058  {
2059  _ackBatchSize = data;
2060  }
2061  }
2062  }
2063  return ack;
2064  }
2065  const size_t NotEntitled = 12;
2066  std::string ackReason = ack.reason();
2067  if (ackReason.length() == 0)
2068  {
2069  return ack; // none
2070  }
2071  if (ackReason.length() == NotEntitled &&
2072  ackReason[0] == 'n' &&
2073  message_.getUserId().len() == 0)
2074  {
2075  message_.assignUserId(_username);
2076  }
2077  message_.throwFor(_client, ackReason);
2078  }
2079  else // !ack.responded()
2080  {
2081  if (!ack.abandoned())
2082  {
2083  throw TimedOutException("timed out waiting for operation.");
2084  }
2085  else
2086  {
2087  throw DisconnectedException("Connection closed while waiting for response.");
2088  }
2089  }
2090  return ack;
2091  }
2092 
2093  void _cleanup(void)
2094  {
2095  if (!_client)
2096  {
2097  return;
2098  }
2099  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2100  amps_client_set_disconnect_handler(_client, NULL, 0L);
2101  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2102  _pEmptyMessageStream.reset(NULL);
2103  amps_client_destroy(_client);
2104  _client = NULL;
2105  }
2106 
2107  public:
2108 
2109  ClientImpl(const std::string& clientName)
2110  : _client(NULL), _name(clientName)
2111  , _isRetryOnDisconnect(true)
2112  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2113  , _badTimeToHASubscribe(0), _serverVersion()
2114  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2115  , _isAutoAckEnabled(false)
2116  , _ackBatchSize(0)
2117  , _queuedAckCount(0)
2118  , _defaultMaxDepth(0)
2119  , _connected(false)
2120  , _heartbeatInterval(0)
2121  , _readTimeout(0)
2122  {
2123  _replayer.setClient(this);
2124  _client = amps_client_create(clientName.c_str());
2126  (amps_handler)ClientImpl::ClientImplMessageHandler,
2127  this);
2129  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2130  this);
2132  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2133  this);
2134  _exceptionListener = &_defaultExceptionListener;
2135  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2136  {
2137 #ifdef AMPS_USE_EMPLACE
2138  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2139 #else
2140  _globalCommandTypeHandlers.push_back(MessageHandler());
2141 #endif
2142  }
2143  }
2144 
2145  virtual ~ClientImpl()
2146  {
2147  _cleanup();
2148  }
2149 
2150  const std::string& getName() const
2151  {
2152  return _name;
2153  }
2154 
2155  const std::string& getNameHash() const
2156  {
2157  return _nameHash;
2158  }
2159 
2160  const amps_uint64_t getNameHashValue() const
2161  {
2162  return _nameHashValue;
2163  }
2164 
2165  void setName(const std::string& name)
2166  {
2167  // This operation will fail if the client's
2168  // name is already set.
2169  amps_result result = amps_client_set_name(_client, name.c_str());
2170  if (result != AMPS_E_OK)
2171  {
2172  AMPSException::throwFor(_client, result);
2173  }
2174  _name = name;
2175  }
2176 
2177  const std::string& getLogonCorrelationData() const
2178  {
2179  return _logonCorrelationData;
2180  }
2181 
2182  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2183  {
2184  _logonCorrelationData = logonCorrelationData_;
2185  }
2186 
2187  size_t getServerVersion() const
2188  {
2189  return _serverVersion.getOldStyleVersion();
2190  }
2191 
2192  VersionInfo getServerVersionInfo() const
2193  {
2194  return _serverVersion;
2195  }
2196 
2197  const std::string& getURI() const
2198  {
2199  return _lastUri;
2200  }
2201 
2202  virtual void connect(const std::string& uri)
2203  {
2204  Lock<Mutex> l(_lock);
2205  _connect(uri);
2206  }
2207 
2208  virtual void _connect(const std::string& uri)
2209  {
2210  _lastUri = uri;
2211  amps_result result = amps_client_connect(_client, uri.c_str());
2212  if (result != AMPS_E_OK)
2213  {
2214  AMPSException::throwFor(_client, result);
2215  }
2216  _message.reset();
2217  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2218  _publishMessage.setCommandEnum(Message::Command::Publish);
2219  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2220  _beatMessage.setOptions("beat");
2221  _readMessage.setClientImpl(this);
2222  if (_queueAckTimeout)
2223  {
2224  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2225  if (result != AMPS_E_OK)
2226  {
2227  AMPSException::throwFor(_client, result);
2228  }
2229  }
2230  _connected = true;
2231  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2232  }
2233 
2234  void setDisconnected()
2235  {
2236  {
2237  Lock<Mutex> l(_lock);
2238  if (_connected)
2239  {
2240  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2241  }
2242  _connected = false;
2243  _heartbeatTimer.setTimeout(0.0);
2244  }
2245  clearAcks(INT_MAX);
2246  amps_client_disconnect(_client);
2247  _routes.clear();
2248  }
2249 
2250  virtual void disconnect()
2251  {
2252  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2253  setDisconnected();
2254  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2255  Lock<Mutex> l(_lock);
2256  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2257  }
2258 
2259  void clearAcks(unsigned failedVersion)
2260  {
2261  // Have to lock to prevent race conditions
2262  Lock<Mutex> guard(_ackMapLock);
2263  {
2264  // Go ahead and signal any waiters if they are around...
2265  std::vector<std::string> worklist;
2266  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2267  {
2268  if (i->second.getConnectionVersion() <= failedVersion)
2269  {
2270  i->second.setAbandoned();
2271  worklist.push_back(i->first);
2272  }
2273  }
2274 
2275  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2276  {
2277  _ackMap.erase(*j);
2278  }
2279  }
2280 
2281  _lock.signalAll();
2282  }
2283 
2284  int send(const Message& message)
2285  {
2286  Lock<Mutex> l(_lock);
2287  return _send(message);
2288  }
2289 
2290  void sendWithoutRetry(const Message& message_)
2291  {
2292  Lock<Mutex> l(_lock);
2293  // If we got here while logon was in progress, then we tried to send
2294  // while we were disconnected so throw DisconnectedException
2295  if (_logonInProgress)
2296  {
2297  throw DisconnectedException("The client has been disconnected.");
2298  }
2299  _sendWithoutRetry(message_);
2300  }
2301 
2302  void _sendWithoutRetry(const Message& message_)
2303  {
2304  amps_result result = amps_client_send(_client, message_.getMessage());
2305  if (result != AMPS_E_OK)
2306  {
2307  AMPSException::throwFor(_client, result);
2308  }
2309  }
2310 
2311  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2312  bool isHASubscribe_ = false)
2313  {
2314  // Lock is already acquired
2315  amps_result result = AMPS_E_RETRY;
2316 
2317  // Create a local reference to this message, as we'll need to hold on
2318  // to a reference to it in case reconnect occurs.
2319  Message localMessage = message;
2320  unsigned version = 0;
2321 
2322  while (result == AMPS_E_RETRY)
2323  {
2324  if (haSeq && _logonInProgress)
2325  {
2326  // If retrySend is disabled, do not wait for the reconnect
2327  // to finish, just throw.
2328  if (!_isRetryOnDisconnect)
2329  {
2330  AMPSException::throwFor(_client, AMPS_E_RETRY);
2331  }
2332  if (!_lock.wait(1000))
2333  {
2334  amps_invoke_waiting_function();
2335  }
2336  }
2337  else
2338  {
2339  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2340  (isHASubscribe_ && _badTimeToHASubscribe))
2341  {
2342  return (int)version;
2343  }
2344  // It's possible to get here out of order, but this way we'll
2345  // always send in order.
2346  if (haSeq > _lastSentHaSequenceNumber)
2347  {
2348  while (haSeq > _lastSentHaSequenceNumber + 1)
2349  {
2350  try
2351  {
2352  // Replayer updates _lastSentHaSsequenceNumber
2353  if (!_publishStore.replaySingle(_replayer,
2354  _lastSentHaSequenceNumber + 1))
2355  {
2356  //++_lastSentHaSequenceNumber;
2357  continue;
2358  }
2359  result = AMPS_E_OK;
2360  version = _replayer._version;
2361  }
2362 #ifdef _WIN32
2363  catch (const DisconnectedException&)
2364 #else
2365  catch (const DisconnectedException& e)
2366 #endif
2367  {
2368  result = _replayer._res;
2369  break;
2370  }
2371  }
2372  result = amps_client_send_with_version(_client,
2373  localMessage.getMessage(),
2374  &version);
2375  ++_lastSentHaSequenceNumber;
2376  }
2377  else
2378  {
2379  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2380  {
2381  while (_logonInProgress)
2382  {
2383  if (!_lock.wait(1000))
2384  {
2385  amps_invoke_waiting_function();
2386  }
2387  }
2388  }
2389  result = amps_client_send_with_version(_client,
2390  localMessage.getMessage(),
2391  &version);
2392  }
2393  if (result != AMPS_E_OK)
2394  {
2395  if (!isHASubscribe_ && !haSeq &&
2396  localMessage.getMessage() == message.getMessage())
2397  {
2398  localMessage = message.deepCopy();
2399  }
2400  if (_isRetryOnDisconnect)
2401  {
2402  Unlock<Mutex> u(_lock);
2403  result = amps_client_attempt_reconnect(_client, version);
2404  // If this is an HA publish or subscribe command, it was
2405  // stored first and will have already been replayed by the
2406  // store or sub manager after reconnect, so just return.
2407  if ((isHASubscribe_ || haSeq) &&
2408  result == AMPS_E_RETRY)
2409  {
2410  return (int)version;
2411  }
2412  }
2413  else
2414  {
2415  // retrySend is disabled so throw the error
2416  // from the send as an exception, do not retry.
2417  AMPSException::throwFor(_client, result);
2418  }
2419  }
2420  }
2421  if (result == AMPS_E_RETRY)
2422  {
2423  amps_invoke_waiting_function();
2424  }
2425  }
2426 
2427  if (result != AMPS_E_OK)
2428  {
2429  AMPSException::throwFor(_client, result);
2430  }
2431  return (int)version;
2432  }
2433 
2434  void addMessageHandler(const Field& commandId_,
2435  const AMPS::MessageHandler& messageHandler_,
2436  unsigned requestedAcks_, bool isSubscribe_)
2437  {
2438  Lock<Mutex> lock(_lock);
2439  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2440  0, isSubscribe_);
2441  }
2442 
2443  bool removeMessageHandler(const Field& commandId_)
2444  {
2445  Lock<Mutex> lock(_lock);
2446  return _routes.removeRoute(commandId_);
2447  }
2448 
2449  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2450  {
2451  Field id = message_.getCommandId();
2452  Field subId = message_.getSubscriptionId();
2453  Field qid = message_.getQueryId();
2454  bool isSubscribe = false;
2455  bool isSubscribeOnly = false;
2456  bool replace = false;
2457  unsigned requestedAcks = message_.getAckTypeEnum();
2458  unsigned systemAddedAcks = Message::AckType::None;
2459 
2460  switch (message_.getCommandEnum())
2461  {
2462  case Message::Command::Subscribe:
2463  case Message::Command::DeltaSubscribe:
2464  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2465  isSubscribeOnly = true;
2466  // fall through
2467  case Message::Command::SOWAndSubscribe:
2468  case Message::Command::SOWAndDeltaSubscribe:
2469  if (id.empty())
2470  {
2471  id = message_.newCommandId().getCommandId();
2472  }
2473  else
2474  {
2475  while (!replace && id != subId && _routes.hasRoute(id))
2476  {
2477  id = message_.newCommandId().getCommandId();
2478  }
2479  }
2480  if (subId.empty())
2481  {
2482  message_.setSubscriptionId(id);
2483  subId = id;
2484  }
2485  isSubscribe = true;
2486  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2487  {
2488  systemAddedAcks |= Message::AckType::Persisted;
2489  }
2490  // fall through
2491  case Message::Command::SOW:
2492  if (id.empty())
2493  {
2494  id = message_.newCommandId().getCommandId();
2495  }
2496  else
2497  {
2498  while (!replace && id != subId && _routes.hasRoute(id))
2499  {
2500  message_.newCommandId();
2501  if (qid == id)
2502  {
2503  qid = message_.getCommandId();
2504  message_.setQueryId(qid);
2505  }
2506  id = message_.getCommandId();
2507  }
2508  }
2509  if (!isSubscribeOnly)
2510  {
2511  if (qid.empty())
2512  {
2513  message_.setQueryID(id);
2514  qid = id;
2515  }
2516  else
2517  {
2518  while (!replace && qid != subId && qid != id
2519  && _routes.hasRoute(qid))
2520  {
2521  qid = message_.newQueryId().getQueryId();
2522  }
2523  }
2524  }
2525  systemAddedAcks |= Message::AckType::Processed;
2526  // for SOW only, we get a completed ack so we know when to remove the handler.
2527  if (!isSubscribeOnly)
2528  {
2529  systemAddedAcks |= Message::AckType::Completed;
2530  }
2531  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2532  {
2533  int routesAdded = 0;
2534  Lock<Mutex> l(_lock);
2535  if (!subId.empty() && messageHandler_.isValid())
2536  {
2537  if (!_routes.hasRoute(subId))
2538  {
2539  ++routesAdded;
2540  }
2541  // This can replace a non-subscribe with a matching id
2542  // with a subscription but not another subscription.
2543  _routes.addRoute(subId, messageHandler_, requestedAcks,
2544  systemAddedAcks, isSubscribe);
2545  }
2546  if (!isSubscribeOnly && !qid.empty()
2547  && messageHandler_.isValid() && qid != subId)
2548  {
2549  if (routesAdded == 0)
2550  {
2551  _routes.addRoute(qid, messageHandler_,
2552  requestedAcks, systemAddedAcks, false);
2553  }
2554  else
2555  {
2556  void* data = NULL;
2557  {
2558  Unlock<Mutex> u(_lock);
2559  data = amps_invoke_copy_route_function(
2560  messageHandler_.userData());
2561  }
2562  if (!data)
2563  {
2564  _routes.addRoute(qid, messageHandler_, requestedAcks,
2565  systemAddedAcks, false);
2566  }
2567  else
2568  {
2569  _routes.addRoute(qid,
2570  MessageHandler(messageHandler_.function(),
2571  data),
2572  requestedAcks, systemAddedAcks, false);
2573  }
2574  }
2575  ++routesAdded;
2576  }
2577  if (!id.empty() && messageHandler_.isValid()
2578  && requestedAcks & ~Message::AckType::Persisted
2579  && id != subId && id != qid)
2580  {
2581  if (routesAdded == 0)
2582  {
2583  _routes.addRoute(id, messageHandler_, requestedAcks,
2584  systemAddedAcks, false);
2585  }
2586  else
2587  {
2588  void* data = NULL;
2589  {
2590  Unlock<Mutex> u(_lock);
2591  data = amps_invoke_copy_route_function(
2592  messageHandler_.userData());
2593  }
2594  if (!data)
2595  {
2596  _routes.addRoute(id, messageHandler_, requestedAcks,
2597  systemAddedAcks, false);
2598  }
2599  else
2600  {
2601  _routes.addRoute(id,
2602  MessageHandler(messageHandler_.function(),
2603  data),
2604  requestedAcks,
2605  systemAddedAcks, false);
2606  }
2607  }
2608  ++routesAdded;
2609  }
2610  try
2611  {
2612  // We aren't adding to subscription manager, so this isn't
2613  // an HA subscribe.
2614  syncAckProcessing(timeout_, message_, 0, false);
2615  message_.setAckTypeEnum(requestedAcks);
2616  }
2617  catch (...)
2618  {
2619  _routes.removeRoute(message_.getQueryID());
2620  _routes.removeRoute(message_.getSubscriptionId());
2621  _routes.removeRoute(id);
2622  message_.setAckTypeEnum(requestedAcks);
2623  throw;
2624  }
2625  }
2626  break;
2627  // These are valid commands that are used as-is
2628  case Message::Command::Unsubscribe:
2629  case Message::Command::Heartbeat:
2630  case Message::Command::Logon:
2631  case Message::Command::StartTimer:
2632  case Message::Command::StopTimer:
2633  case Message::Command::SOWDelete:
2634  {
2635  Lock<Mutex> l(_lock);
2636  // if an ack is requested, it'll need a command ID.
2637  if (message_.getAckTypeEnum() != Message::AckType::None)
2638  {
2639  if (id.empty())
2640  {
2641  message_.newCommandId();
2642  id = message_.getCommandId();
2643  }
2644  if (messageHandler_.isValid())
2645  {
2646  _routes.addRoute(id, messageHandler_, requestedAcks,
2647  Message::AckType::None, false);
2648  }
2649  }
2650  _send(message_);
2651  }
2652  break;
2653  case Message::Command::DeltaPublish:
2654  case Message::Command::Publish:
2655  {
2656  bool useSync = message_.getFilter().len() > 0;
2657  Lock<Mutex> l(_lock);
2658  // if an ack is requested, it'll need a command ID.
2659  unsigned ackType = message_.getAckTypeEnum();
2660  if (ackType != Message::AckType::None
2661  || useSync)
2662  {
2663  if (id.empty())
2664  {
2665  message_.newCommandId();
2666  id = message_.getCommandId();
2667  }
2668  if (messageHandler_.isValid())
2669  {
2670  _routes.addRoute(id, messageHandler_, requestedAcks,
2671  Message::AckType::None, false);
2672  }
2673  }
2674  if (useSync)
2675  {
2676  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2677  syncAckProcessing(timeout_, message_, 0, false);
2678  }
2679  else
2680  {
2681  _send(message_);
2682  }
2683  }
2684  break;
2685  // These are things that shouldn't be sent (not meaningful)
2686  case Message::Command::GroupBegin:
2687  case Message::Command::GroupEnd:
2688  case Message::Command::OOF:
2689  case Message::Command::Ack:
2690  case Message::Command::Unknown:
2691  default:
2692  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2693  }
2694  message_.setAckTypeEnum(requestedAcks);
2695  return id;
2696  }
2697 
2698  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2699  {
2700  Lock<Mutex> l(_lock);
2701  _disconnectHandler = disconnectHandler;
2702  }
2703 
2704  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2705  {
2706  switch (command_[0])
2707  {
2708 #if 0 // Not currently implemented to avoid an extra branch in delivery
2709  case 'p':
2710  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2711  break;
2712  case 's':
2713  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2714  break;
2715 #endif
2716  case 'h':
2717  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2718  break;
2719 #if 0 // Not currently implemented to avoid an extra branch in delivery
2720  case 'g':
2721  if (command_[6] == 'b')
2722  {
2723  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2724  }
2725  else if (command_[6] == 'e')
2726  {
2727  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2728  }
2729  else
2730  {
2731  std::ostringstream os;
2732  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2733  throw CommandException(os.str());
2734  }
2735  break;
2736  case 'o':
2737  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2738  break;
2739 #endif
2740  case 'a':
2741  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2742  break;
2743  case 'l':
2744  case 'L':
2745  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2746  break;
2747  case 'd':
2748  case 'D':
2749  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2750  break;
2751  default:
2752  std::ostringstream os;
2753  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2754  throw CommandException(os.str());
2755  break;
2756  }
2757  }
2758 
2759  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2760  {
2761  switch (command_)
2762  {
2763 #if 0 // Not currently implemented to avoid an extra branch in delivery
2764  case Message::Command::Publish:
2765  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2766  break;
2767  case Message::Command::SOW:
2768  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2769  break;
2770 #endif
2771  case Message::Command::Heartbeat:
2772  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2773  break;
2774 #if 0 // Not currently implemented to avoid an extra branch in delivery
2775  case Message::Command::GroupBegin:
2776  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2777  break;
2778  case Message::Command::GroupEnd:
2779  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2780  break;
2781  case Message::Command::OOF:
2782  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2783  break;
2784 #endif
2785  case Message::Command::Ack:
2786  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2787  break;
2788  default:
2789  unsigned bits = 0;
2790  unsigned command = command_;
2791  while (command > 0)
2792  {
2793  ++bits;
2794  command >>= 1;
2795  }
2796  char errBuf[128];
2797  AMPS_snprintf(errBuf, sizeof(errBuf),
2798  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2799  CommandConstants<0>::Lengths[bits],
2800  CommandConstants<0>::Values[bits]);
2801  throw CommandException(errBuf);
2802  break;
2803  }
2804  }
2805 
2806  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2807  {
2808  _globalCommandTypeHandlers[handlerType_] = handler_;
2809  }
2810 
2811  void setFailedWriteHandler(FailedWriteHandler* handler_)
2812  {
2813  Lock<Mutex> l(_lock);
2814  _failedWriteHandler.reset(handler_);
2815  }
2816 
2817  void setPublishStore(const Store& publishStore_)
2818  {
2819  Lock<Mutex> l(_lock);
2820  if (_connected)
2821  {
2822  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2823  }
2824  _publishStore = publishStore_;
2825  }
2826 
2827  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2828  {
2829  Lock<Mutex> l(_lock);
2830  if (_connected)
2831  {
2832  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2833  }
2834  _bookmarkStore = bookmarkStore_;
2835  }
2836 
2837  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2838  {
2839  Lock<Mutex> l(_lock);
2840  _subscriptionManager.reset(subscriptionManager_);
2841  }
2842 
2843  SubscriptionManager* getSubscriptionManager() const
2844  {
2845  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2846  }
2847 
2848  DisconnectHandler getDisconnectHandler() const
2849  {
2850  return _disconnectHandler;
2851  }
2852 
2853  MessageHandler getDuplicateMessageHandler() const
2854  {
2855  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2856  }
2857 
2858  FailedWriteHandler* getFailedWriteHandler() const
2859  {
2860  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2861  }
2862 
2863  Store getPublishStore() const
2864  {
2865  return _publishStore;
2866  }
2867 
2868  BookmarkStore getBookmarkStore() const
2869  {
2870  return _bookmarkStore;
2871  }
2872 
2873  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2874  {
2875  if (!_publishStore.isValid())
2876  {
2877  Lock<Mutex> l(_lock);
2878  _publishMessage.assignTopic(topic_, topicLen_);
2879  _publishMessage.assignData(data_, dataLen_);
2880  _send(_publishMessage);
2881  return 0;
2882  }
2883  else
2884  {
2885  if (!publishStoreMessage)
2886  {
2887  publishStoreMessage = new Message();
2888  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2889  }
2890  publishStoreMessage->reset();
2891  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2892  return _publish(topic_, topicLen_, data_, dataLen_);
2893  }
2894  }
2895 
2896  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2897  size_t dataLen_, unsigned long expiration_)
2898  {
2899  if (!_publishStore.isValid())
2900  {
2901  Lock<Mutex> l(_lock);
2902  _publishMessage.assignTopic(topic_, topicLen_);
2903  _publishMessage.assignData(data_, dataLen_);
2904  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2905  size_t pos = convertToCharArray(exprBuf, expiration_);
2906  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2907  _send(_publishMessage);
2908  _publishMessage.assignExpiration(NULL, 0);
2909  return 0;
2910  }
2911  else
2912  {
2913  if (!publishStoreMessage)
2914  {
2915  publishStoreMessage = new Message();
2916  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2917  }
2918  publishStoreMessage->reset();
2919  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2920  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2921  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2922  .assignExpiration(exprBuf + exprPos,
2923  AMPS_NUMBER_BUFFER_LEN - exprPos);
2924  return _publish(topic_, topicLen_, data_, dataLen_);
2925  }
2926  }
2927 
2928  class FlushAckHandler : ConnectionStateListener
2929  {
2930  private:
2931  ClientImpl* _pClient;
2932  Field _cmdId;
2933 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2934  std::atomic<bool> _acked;
2935  std::atomic<bool> _disconnected;
2936 #else
2937  volatile bool _acked;
2938  volatile bool _disconnected;
2939 #endif
2940  public:
2941  FlushAckHandler(ClientImpl* pClient_)
2942  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2943  {
2944  pClient_->addConnectionStateListener(this);
2945  }
2946  ~FlushAckHandler()
2947  {
2948  _pClient->removeConnectionStateListener(this);
2949  _pClient->removeMessageHandler(_cmdId);
2950  _cmdId.clear();
2951  }
2952  void setCommandId(const Field& cmdId_)
2953  {
2954  _cmdId.deepCopy(cmdId_);
2955  }
2956  void invoke(const Message&)
2957  {
2958  _acked = true;
2959  }
2960  void connectionStateChanged(State state_)
2961  {
2962  if (state_ <= Shutdown)
2963  {
2964  _disconnected = true;
2965  }
2966  }
2967  bool acked()
2968  {
2969  return _acked;
2970  }
2971  bool done()
2972  {
2973  return _acked || _disconnected;
2974  }
2975  };
2976 
2977  void publishFlush(long timeout_, unsigned ackType_)
2978  {
2979  static const char* processed = "processed";
2980  static const size_t processedLen = strlen(processed);
2981  static const char* persisted = "persisted";
2982  static const size_t persistedLen = strlen(persisted);
2983  static const char* flush = "flush";
2984  static const size_t flushLen = strlen(flush);
2985  static VersionInfo minPersisted("5.3.3.0");
2986  static VersionInfo minFlush("4");
2987  if (ackType_ != Message::AckType::Processed
2988  && ackType_ != Message::AckType::Persisted)
2989  {
2990  throw CommandException("Flush can only be used with processed or persisted acks.");
2991  }
2992  FlushAckHandler flushHandler(this);
2993  if (_serverVersion >= minFlush)
2994  {
2995  Lock<Mutex> l(_lock);
2996  if (!_connected)
2997  {
2998  throw DisconnectedException("Not connected trying to flush");
2999  }
3000  _message.reset();
3001  _message.newCommandId();
3002  _message.assignCommand(flush, flushLen);
3003  if (_serverVersion < minPersisted
3004  || ackType_ == Message::AckType::Processed)
3005  {
3006  _message.assignAckType(processed, processedLen);
3007  }
3008  else
3009  {
3010  _message.assignAckType(persisted, persistedLen);
3011  }
3012  flushHandler.setCommandId(_message.getCommandId());
3013  addMessageHandler(_message.getCommandId(),
3014  std::bind(&FlushAckHandler::invoke,
3015  std::ref(flushHandler),
3016  std::placeholders::_1),
3017  ackType_, false);
3018  NoDelay noDelay(_client);
3019  if (_send(_message) == -1)
3020  {
3021  throw DisconnectedException("Disconnected trying to flush");
3022  }
3023  }
3024  if (_publishStore.isValid())
3025  {
3026  try
3027  {
3028  _publishStore.flush(timeout_);
3029  }
3030  catch (const AMPSException& ex)
3031  {
3032  AMPS_UNHANDLED_EXCEPTION(ex);
3033  throw;
3034  }
3035  }
3036  else if (_serverVersion < minFlush)
3037  {
3038  if (timeout_ > 0)
3039  {
3040  AMPS_USLEEP(timeout_ * 1000);
3041  }
3042  else
3043  {
3044  AMPS_USLEEP(1000 * 1000);
3045  }
3046  return;
3047  }
3048  if (timeout_)
3049  {
3050  Timer timer((double)timeout_);
3051  timer.start();
3052  while (!timer.check() && !flushHandler.done())
3053  {
3054  AMPS_USLEEP(10000);
3055  amps_invoke_waiting_function();
3056  }
3057  }
3058  else
3059  {
3060  while (!flushHandler.done())
3061  {
3062  AMPS_USLEEP(10000);
3063  amps_invoke_waiting_function();
3064  }
3065  }
3066  // No response or disconnect in timeout interval
3067  if (!flushHandler.done())
3068  {
3069  throw TimedOutException("Timed out waiting for flush");
3070  }
3071  // We got disconnected and there is no publish store
3072  if (!flushHandler.acked() && !_publishStore.isValid())
3073  {
3074  throw DisconnectedException("Disconnected waiting for flush");
3075  }
3076  }
3077 
3078  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3079  const char* data_, size_t dataLength_)
3080  {
3081  if (!_publishStore.isValid())
3082  {
3083  Lock<Mutex> l(_lock);
3084  _deltaMessage.assignTopic(topic_, topicLength_);
3085  _deltaMessage.assignData(data_, dataLength_);
3086  _send(_deltaMessage);
3087  return 0;
3088  }
3089  else
3090  {
3091  if (!publishStoreMessage)
3092  {
3093  publishStoreMessage = new Message();
3094  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3095  }
3096  publishStoreMessage->reset();
3097  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
3098  return _publish(topic_, topicLength_, data_, dataLength_);
3099  }
3100  }
3101 
3102  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3103  const char* data_, size_t dataLength_,
3104  unsigned long expiration_)
3105  {
3106  if (!_publishStore.isValid())
3107  {
3108  Lock<Mutex> l(_lock);
3109  _deltaMessage.assignTopic(topic_, topicLength_);
3110  _deltaMessage.assignData(data_, dataLength_);
3111  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3112  size_t pos = convertToCharArray(exprBuf, expiration_);
3113  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3114  _send(_deltaMessage);
3115  _deltaMessage.assignExpiration(NULL, 0);
3116  return 0;
3117  }
3118  else
3119  {
3120  if (!publishStoreMessage)
3121  {
3122  publishStoreMessage = new Message();
3123  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3124  }
3125  publishStoreMessage->reset();
3126  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3127  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3128  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
3129  .assignExpiration(exprBuf + exprPos,
3130  AMPS_NUMBER_BUFFER_LEN - exprPos);
3131  return _publish(topic_, topicLength_, data_, dataLength_);
3132  }
3133  }
3134 
3135  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3136  const char* data_, size_t dataLength_)
3137  {
3138  publishStoreMessage->assignTopic(topic_, topicLength_)
3139  .setAckTypeEnum(Message::AckType::Persisted)
3140  .assignData(data_, dataLength_);
3141  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3142  char buf[AMPS_NUMBER_BUFFER_LEN];
3143  size_t pos = convertToCharArray(buf, haSequenceNumber);
3144  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3145  {
3146  Lock<Mutex> l(_lock);
3147  _send(*publishStoreMessage, haSequenceNumber);
3148  }
3149  return haSequenceNumber;
3150  }
3151 
3152  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3153  const char* options_ = NULL)
3154  {
3155  Lock<Mutex> l(_lock);
3156  return _logon(timeout_, authenticator_, options_);
3157  }
3158 
3159  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3160  const char* options_ = NULL)
3161  {
3162  _message.reset();
3163  _message.newCommandId();
3164  std::string newCommandId = _message.getCommandId();
3165  _message.setCommandEnum(Message::Command::Logon);
3166  _message.setClientName(_name);
3167 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3168  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3169  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3170 #endif
3171  URI uri(_lastUri);
3172  if (uri.user().size())
3173  {
3174  _message.setUserId(uri.user());
3175  }
3176  if (uri.password().size())
3177  {
3178  _message.setPassword(uri.password());
3179  }
3180  if (uri.protocol() == "amps" && uri.messageType().size())
3181  {
3182  _message.setMessageType(uri.messageType());
3183  }
3184  if (uri.isTrue("pretty"))
3185  {
3186  _message.setOptions("pretty");
3187  }
3188 
3189  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3190  if (!_logonCorrelationData.empty())
3191  {
3192  _message.assignCorrelationId(_logonCorrelationData);
3193  }
3194  if (options_)
3195  {
3196  _message.setOptions(options_);
3197  }
3198  _username = _message.getUserId();
3199  try
3200  {
3201  AtomicFlagFlip pubFlip(&_logonInProgress);
3202  NoDelay noDelay(_client);
3203  while (true)
3204  {
3205  _message.setAckTypeEnum(Message::AckType::Processed);
3206  AckResponse ack = syncAckProcessing(timeout_, _message);
3207  if (ack.status() == "retry")
3208  {
3209  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3210  _username = ack.username();
3211  _message.setUserId(_username);
3212  }
3213  else
3214  {
3215  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3216  break;
3217  }
3218  }
3219  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3220 
3221  // Now re-send the heartbeat command if configured
3222  _sendHeartbeat();
3223  // Signal any threads waiting for _logonInProgress
3224  _lock.signalAll();
3225  }
3226  catch (const AMPSException& ex)
3227  {
3228  _lock.signalAll();
3229  AMPS_UNHANDLED_EXCEPTION(ex);
3230  throw;
3231  }
3232  catch (...)
3233  {
3234  _lock.signalAll();
3235  throw;
3236  }
3237 
3238  if (_publishStore.isValid())
3239  {
3240  try
3241  {
3242  _publishStore.replay(_replayer);
3243  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3244  }
3245  catch (const PublishStoreGapException& ex)
3246  {
3247  _lock.signalAll();
3248  AMPS_UNHANDLED_EXCEPTION(ex);
3249  throw;
3250  }
3251  catch (const StoreException& ex)
3252  {
3253  _lock.signalAll();
3254  std::ostringstream os;
3255  os << "A local store exception occurred while logging on."
3256  << ex.toString();
3257  throw ConnectionException(os.str());
3258  }
3259  catch (const AMPSException& ex)
3260  {
3261  _lock.signalAll();
3262  AMPS_UNHANDLED_EXCEPTION(ex);
3263  throw;
3264  }
3265  catch (const std::exception& ex)
3266  {
3267  _lock.signalAll();
3268  AMPS_UNHANDLED_EXCEPTION(ex);
3269  throw;
3270  }
3271  catch (...)
3272  {
3273  _lock.signalAll();
3274  throw;
3275  }
3276  }
3277  _lock.signalAll();
3278  return newCommandId;
3279  }
3280 
3281  std::string subscribe(const MessageHandler& messageHandler_,
3282  const std::string& topic_,
3283  long timeout_,
3284  const std::string& filter_,
3285  const std::string& bookmark_,
3286  const std::string& options_,
3287  const std::string& subId_,
3288  bool isHASubscribe_ = true)
3289  {
3290  isHASubscribe_ &= (bool)_subscriptionManager;
3291  Lock<Mutex> l(_lock);
3292  _message.reset();
3293  _message.setCommandEnum(Message::Command::Subscribe);
3294  _message.newCommandId();
3295  std::string subId(subId_);
3296  if (subId.empty())
3297  {
3298  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3299  {
3300  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3301  }
3302 
3303  subId = _message.getCommandId();
3304  }
3305  _message.setSubscriptionId(subId);
3306  // we need to deep copy this before sending the message; while we are
3307  // waiting for a response, the fields in _message may get blown away for
3308  // other operations.
3309  AMPS::Message::Field subIdField(subId);
3310  unsigned ackTypes = Message::AckType::Processed;
3311 
3312  if (!bookmark_.empty() && _bookmarkStore.isValid())
3313  {
3314  ackTypes |= Message::AckType::Persisted;
3315  }
3316  _message.setTopic(topic_);
3317 
3318  if (filter_.length())
3319  {
3320  _message.setFilter(filter_);
3321  }
3322  if (bookmark_.length())
3323  {
3324  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3325  {
3326  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3327  _message.setBookmark(mostRecent);
3328  }
3329  else
3330  {
3331  _message.setBookmark(bookmark_);
3332  if (_bookmarkStore.isValid())
3333  {
3334  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3335  bookmark_ != AMPS_BOOKMARK_EPOCH)
3336  {
3337  _bookmarkStore.log(_message);
3338  _bookmarkStore.discard(_message);
3339  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3340  }
3341  }
3342  }
3343  }
3344  if (options_.length())
3345  {
3346  _message.setOptions(options_);
3347  }
3348 
3349  Message message = _message;
3350  if (isHASubscribe_)
3351  {
3352  message = _message.deepCopy();
3353  Unlock<Mutex> u(_lock);
3354  _subscriptionManager->subscribe(messageHandler_, message,
3355  Message::AckType::None);
3356  if (_badTimeToHASubscribe)
3357  {
3358  return subId;
3359  }
3360  }
3361  if (!_routes.hasRoute(_message.getSubscriptionId()))
3362  {
3363  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3364  Message::AckType::None, ackTypes, true);
3365  }
3366  message.setAckTypeEnum(ackTypes);
3367  if (!options_.empty())
3368  {
3369  message.setOptions(options_);
3370  }
3371  try
3372  {
3373  syncAckProcessing(timeout_, message, isHASubscribe_);
3374  }
3375  catch (const DisconnectedException&)
3376  {
3377  if (!isHASubscribe_)
3378  {
3379  _routes.removeRoute(subIdField);
3380  throw;
3381  }
3382  else
3383  {
3384  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3385  throw;
3386  }
3387  }
3388  catch (const TimedOutException&)
3389  {
3390  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3391  throw;
3392  }
3393  catch (...)
3394  {
3395  if (isHASubscribe_)
3396  {
3397  // Have to unlock before calling into sub manager to avoid deadlock
3398  Unlock<Mutex> unlock(_lock);
3399  _subscriptionManager->unsubscribe(subIdField);
3400  }
3401  _routes.removeRoute(subIdField);
3402  throw;
3403  }
3404 
3405  return subId;
3406  }
3407  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3408  const std::string& topic_,
3409  long timeout_,
3410  const std::string& filter_,
3411  const std::string& bookmark_,
3412  const std::string& options_,
3413  const std::string& subId_ = "",
3414  bool isHASubscribe_ = true)
3415  {
3416  isHASubscribe_ &= (bool)_subscriptionManager;
3417  Lock<Mutex> l(_lock);
3418  _message.reset();
3419  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3420  _message.newCommandId();
3421  std::string subId(subId_);
3422  if (subId.empty())
3423  {
3424  subId = _message.getCommandId();
3425  }
3426  _message.setSubscriptionId(subId);
3427  // we need to deep copy this before sending the message; while we are
3428  // waiting for a response, the fields in _message may get blown away for
3429  // other operations.
3430  AMPS::Message::Field subIdField(subId);
3431  unsigned ackTypes = Message::AckType::Processed;
3432 
3433  if (!bookmark_.empty() && _bookmarkStore.isValid())
3434  {
3435  ackTypes |= Message::AckType::Persisted;
3436  }
3437  _message.setTopic(topic_);
3438  if (filter_.length())
3439  {
3440  _message.setFilter(filter_);
3441  }
3442  if (bookmark_.length())
3443  {
3444  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3445  {
3446  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3447  _message.setBookmark(mostRecent);
3448  }
3449  else
3450  {
3451  _message.setBookmark(bookmark_);
3452  if (_bookmarkStore.isValid())
3453  {
3454  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3455  bookmark_ != AMPS_BOOKMARK_EPOCH)
3456  {
3457  _bookmarkStore.log(_message);
3458  _bookmarkStore.discard(_message);
3459  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3460  }
3461  }
3462  }
3463  }
3464  if (options_.length())
3465  {
3466  _message.setOptions(options_);
3467  }
3468  Message message = _message;
3469  if (isHASubscribe_)
3470  {
3471  message = _message.deepCopy();
3472  Unlock<Mutex> u(_lock);
3473  _subscriptionManager->subscribe(messageHandler_, message,
3474  Message::AckType::None);
3475  if (_badTimeToHASubscribe)
3476  {
3477  return subId;
3478  }
3479  }
3480  if (!_routes.hasRoute(_message.getSubscriptionId()))
3481  {
3482  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3483  Message::AckType::None, ackTypes, true);
3484  }
3485  message.setAckTypeEnum(ackTypes);
3486  if (!options_.empty())
3487  {
3488  message.setOptions(options_);
3489  }
3490  try
3491  {
3492  syncAckProcessing(timeout_, message, isHASubscribe_);
3493  }
3494  catch (const DisconnectedException&)
3495  {
3496  if (!isHASubscribe_)
3497  {
3498  _routes.removeRoute(subIdField);
3499  throw;
3500  }
3501  }
3502  catch (const TimedOutException&)
3503  {
3504  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3505  throw;
3506  }
3507  catch (...)
3508  {
3509  if (isHASubscribe_)
3510  {
3511  // Have to unlock before calling into sub manager to avoid deadlock
3512  Unlock<Mutex> unlock(_lock);
3513  _subscriptionManager->unsubscribe(subIdField);
3514  }
3515  _routes.removeRoute(subIdField);
3516  throw;
3517  }
3518  return subId;
3519  }
3520 
3521  void unsubscribe(const std::string& id)
3522  {
3523  Lock<Mutex> l(_lock);
3524  unsubscribeInternal(id);
3525  }
3526 
3527  void unsubscribe(void)
3528  {
3529  if (_subscriptionManager)
3530  {
3531  _subscriptionManager->clear();
3532  }
3533  {
3534  _routes.unsubscribeAll();
3535  Lock<Mutex> l(_lock);
3536  _message.reset();
3537  _message.setCommandEnum(Message::Command::Unsubscribe);
3538  _message.newCommandId();
3539  _message.setSubscriptionId("all");
3540  _sendWithoutRetry(_message);
3541  }
3542  deferredExecution(&amps_noOpFn, NULL);
3543  }
3544 
3545  std::string sow(const MessageHandler& messageHandler_,
3546  const std::string& topic_,
3547  const std::string& filter_ = "",
3548  const std::string& orderBy_ = "",
3549  const std::string& bookmark_ = "",
3550  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3551  int topN_ = AMPS_DEFAULT_TOP_N,
3552  const std::string& options_ = "",
3553  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3554  {
3555  Lock<Mutex> l(_lock);
3556  _message.reset();
3557  _message.setCommandEnum(Message::Command::SOW);
3558  _message.newCommandId();
3559  // need to keep our own copy of the command ID.
3560  std::string commandId = _message.getCommandId();
3561  _message.setQueryID(_message.getCommandId());
3562  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3563  _message.setAckTypeEnum(ackTypes);
3564  _message.setTopic(topic_);
3565  if (filter_.length())
3566  {
3567  _message.setFilter(filter_);
3568  }
3569  if (orderBy_.length())
3570  {
3571  _message.setOrderBy(orderBy_);
3572  }
3573  if (bookmark_.length())
3574  {
3575  _message.setBookmark(bookmark_);
3576  }
3577  _message.setBatchSize(AMPS::asString(batchSize_));
3578  if (topN_ != AMPS_DEFAULT_TOP_N)
3579  {
3580  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3581  }
3582  if (options_.length())
3583  {
3584  _message.setOptions(options_);
3585  }
3586 
3587  _routes.addRoute(_message.getQueryID(), messageHandler_,
3588  Message::AckType::None, ackTypes, false);
3589 
3590  try
3591  {
3592  syncAckProcessing(timeout_, _message);
3593  }
3594  catch (...)
3595  {
3596  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3597  throw;
3598  }
3599 
3600  return commandId;
3601  }
3602 
3603  std::string sow(const MessageHandler& messageHandler_,
3604  const std::string& topic_,
3605  long timeout_,
3606  const std::string& filter_ = "",
3607  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3608  int topN_ = AMPS_DEFAULT_TOP_N)
3609  {
3610  std::string notSet;
3611  return sow(messageHandler_,
3612  topic_,
3613  filter_,
3614  notSet, // orderBy
3615  notSet, // bookmark
3616  batchSize_,
3617  topN_,
3618  notSet,
3619  timeout_);
3620  }
3621 
3622  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3623  const std::string& topic_,
3624  const std::string& filter_ = "",
3625  const std::string& orderBy_ = "",
3626  const std::string& bookmark_ = "",
3627  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3628  int topN_ = AMPS_DEFAULT_TOP_N,
3629  const std::string& options_ = "",
3630  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3631  bool isHASubscribe_ = true)
3632  {
3633  isHASubscribe_ &= (bool)_subscriptionManager;
3634  unsigned ackTypes = Message::AckType::Processed;
3635  Lock<Mutex> l(_lock);
3636  _message.reset();
3637  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3638  _message.newCommandId();
3639  Field cid = _message.getCommandId();
3640  std::string subId = cid;
3641  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3642  if (filter_.length())
3643  {
3644  _message.setFilter(filter_);
3645  }
3646  if (orderBy_.length())
3647  {
3648  _message.setOrderBy(orderBy_);
3649  }
3650  if (bookmark_.length())
3651  {
3652  _message.setBookmark(bookmark_);
3653  Message::Field bookmark = _message.getBookmark();
3654  if (_bookmarkStore.isValid())
3655  {
3656  ackTypes |= Message::AckType::Persisted;
3657  if (bookmark == AMPS_BOOKMARK_RECENT)
3658  {
3659  _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3660  }
3661  else if (bookmark != AMPS_BOOKMARK_NOW &&
3662  bookmark != AMPS_BOOKMARK_EPOCH)
3663  {
3664  _bookmarkStore.log(_message);
3665  if (!BookmarkRange::isRange(bookmark))
3666  {
3667  _bookmarkStore.discard(_message);
3668  _bookmarkStore.persisted(_message.getSubscriptionId(),
3669  bookmark);
3670  }
3671  }
3672  }
3673  else if (bookmark == AMPS_BOOKMARK_RECENT)
3674  {
3675  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3676  }
3677  }
3678  _message.setBatchSize(AMPS::asString(batchSize_));
3679  if (topN_ != AMPS_DEFAULT_TOP_N)
3680  {
3681  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3682  }
3683  if (options_.length())
3684  {
3685  _message.setOptions(options_);
3686  }
3687 
3688  Message message = _message;
3689  if (isHASubscribe_)
3690  {
3691  message = _message.deepCopy();
3692  Unlock<Mutex> u(_lock);
3693  _subscriptionManager->subscribe(messageHandler_, message,
3694  Message::AckType::None);
3695  if (_badTimeToHASubscribe)
3696  {
3697  return subId;
3698  }
3699  }
3700  _routes.addRoute(cid, messageHandler_,
3701  Message::AckType::None, ackTypes, true);
3702  message.setAckTypeEnum(ackTypes);
3703  if (!options_.empty())
3704  {
3705  message.setOptions(options_);
3706  }
3707  try
3708  {
3709  syncAckProcessing(timeout_, message, isHASubscribe_);
3710  }
3711  catch (const DisconnectedException&)
3712  {
3713  if (!isHASubscribe_)
3714  {
3715  _routes.removeRoute(subId);
3716  throw;
3717  }
3718  }
3719  catch (const TimedOutException&)
3720  {
3721  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3722  throw;
3723  }
3724  catch (...)
3725  {
3726  if (isHASubscribe_)
3727  {
3728  // Have to unlock before calling into sub manager to avoid deadlock
3729  Unlock<Mutex> unlock(_lock);
3730  _subscriptionManager->unsubscribe(cid);
3731  }
3732  _routes.removeRoute(subId);
3733  throw;
3734  }
3735  return subId;
3736  }
3737 
3738  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3739  const std::string& topic_,
3740  long timeout_,
3741  const std::string& filter_ = "",
3742  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3743  bool oofEnabled_ = false,
3744  int topN_ = AMPS_DEFAULT_TOP_N,
3745  bool isHASubscribe_ = true)
3746  {
3747  std::string notSet;
3748  return sowAndSubscribe(messageHandler_,
3749  topic_,
3750  filter_,
3751  notSet, // orderBy
3752  notSet, // bookmark
3753  batchSize_,
3754  topN_,
3755  (oofEnabled_ ? "oof" : ""),
3756  timeout_,
3757  isHASubscribe_);
3758  }
3759 
3760  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3761  const std::string& topic_,
3762  const std::string& filter_ = "",
3763  const std::string& orderBy_ = "",
3764  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3765  int topN_ = AMPS_DEFAULT_TOP_N,
3766  const std::string& options_ = "",
3767  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3768  bool isHASubscribe_ = true)
3769  {
3770  isHASubscribe_ &= (bool)_subscriptionManager;
3771  Lock<Mutex> l(_lock);
3772  _message.reset();
3773  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3774  _message.newCommandId();
3775  _message.setQueryID(_message.getCommandId());
3776  _message.setSubscriptionId(_message.getCommandId());
3777  std::string subId = _message.getSubscriptionId();
3778  _message.setTopic(topic_);
3779  if (filter_.length())
3780  {
3781  _message.setFilter(filter_);
3782  }
3783  if (orderBy_.length())
3784  {
3785  _message.setOrderBy(orderBy_);
3786  }
3787  _message.setBatchSize(AMPS::asString(batchSize_));
3788  if (topN_ != AMPS_DEFAULT_TOP_N)
3789  {
3790  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3791  }
3792  if (options_.length())
3793  {
3794  _message.setOptions(options_);
3795  }
3796  Message message = _message;
3797  if (isHASubscribe_)
3798  {
3799  message = _message.deepCopy();
3800  Unlock<Mutex> u(_lock);
3801  _subscriptionManager->subscribe(messageHandler_, message,
3802  Message::AckType::None);
3803  if (_badTimeToHASubscribe)
3804  {
3805  return subId;
3806  }
3807  }
3808  _routes.addRoute(message.getQueryID(), messageHandler_,
3809  Message::AckType::None, Message::AckType::Processed, true);
3810  message.setAckTypeEnum(Message::AckType::Processed);
3811  if (!options_.empty())
3812  {
3813  message.setOptions(options_);
3814  }
3815  try
3816  {
3817  syncAckProcessing(timeout_, message, isHASubscribe_);
3818  }
3819  catch (const DisconnectedException&)
3820  {
3821  if (!isHASubscribe_)
3822  {
3823  _routes.removeRoute(subId);
3824  throw;
3825  }
3826  }
3827  catch (const TimedOutException&)
3828  {
3829  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3830  throw;
3831  }
3832  catch (...)
3833  {
3834  if (isHASubscribe_)
3835  {
3836  // Have to unlock before calling into sub manager to avoid deadlock
3837  Unlock<Mutex> unlock(_lock);
3838  _subscriptionManager->unsubscribe(Field(subId));
3839  }
3840  _routes.removeRoute(subId);
3841  throw;
3842  }
3843  return subId;
3844  }
3845 
3846  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3847  const std::string& topic_,
3848  long timeout_,
3849  const std::string& filter_ = "",
3850  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3851  bool oofEnabled_ = false,
3852  bool sendEmpties_ = false,
3853  int topN_ = AMPS_DEFAULT_TOP_N,
3854  bool isHASubscribe_ = true)
3855  {
3856  std::string notSet;
3857  Message::Options options;
3858  if (oofEnabled_)
3859  {
3860  options.setOOF();
3861  }
3862  if (sendEmpties_ == false)
3863  {
3864  options.setNoEmpties();
3865  }
3866  return sowAndDeltaSubscribe(messageHandler_,
3867  topic_,
3868  filter_,
3869  notSet, // orderBy
3870  batchSize_,
3871  topN_,
3872  options,
3873  timeout_,
3874  isHASubscribe_);
3875  }
3876 
3877  std::string sowDelete(const MessageHandler& messageHandler_,
3878  const std::string& topic_,
3879  const std::string& filter_,
3880  long timeout_,
3881  Message::Field commandId_ = Message::Field())
3882  {
3883  if (_publishStore.isValid())
3884  {
3885  unsigned ackType = Message::AckType::Processed |
3886  Message::AckType::Stats |
3887  Message::AckType::Persisted;
3888  if (!publishStoreMessage)
3889  {
3890  publishStoreMessage = new Message();
3891  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3892  }
3893  publishStoreMessage->reset();
3894  if (commandId_.empty())
3895  {
3896  publishStoreMessage->newCommandId();
3897  commandId_ = publishStoreMessage->getCommandId();
3898  }
3899  else
3900  {
3901  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3902  }
3903  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3904  .assignSubscriptionId(commandId_.data(), commandId_.len())
3905  .assignQueryID(commandId_.data(), commandId_.len())
3906  .setAckTypeEnum(ackType)
3907  .assignTopic(topic_.c_str(), topic_.length())
3908  .assignFilter(filter_.c_str(), filter_.length());
3909  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3910  char buf[AMPS_NUMBER_BUFFER_LEN];
3911  size_t pos = convertToCharArray(buf, haSequenceNumber);
3912  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3913  {
3914  try
3915  {
3916  Lock<Mutex> l(_lock);
3917  _routes.addRoute(commandId_, messageHandler_,
3918  Message::AckType::Stats,
3919  Message::AckType::Processed | Message::AckType::Persisted,
3920  false);
3921  syncAckProcessing(timeout_, *publishStoreMessage,
3922  haSequenceNumber);
3923  }
3924  catch (const DisconnectedException&)
3925  {
3926  // -V565
3927  // Pass - it will get replayed upon reconnect
3928  }
3929  catch (...)
3930  {
3931  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3932  throw;
3933  }
3934  }
3935  return (std::string)commandId_;
3936  }
3937  else
3938  {
3939  Lock<Mutex> l(_lock);
3940  _message.reset();
3941  if (commandId_.empty())
3942  {
3943  _message.newCommandId();
3944  commandId_ = _message.getCommandId();
3945  }
3946  else
3947  {
3948  _message.setCommandId(commandId_.data(), commandId_.len());
3949  }
3950  _message.setCommandEnum(Message::Command::SOWDelete)
3951  .assignSubscriptionId(commandId_.data(), commandId_.len())
3952  .assignQueryID(commandId_.data(), commandId_.len())
3953  .setAckTypeEnum(Message::AckType::Processed |
3954  Message::AckType::Stats)
3955  .assignTopic(topic_.c_str(), topic_.length())
3956  .assignFilter(filter_.c_str(), filter_.length());
3957  _routes.addRoute(commandId_, messageHandler_,
3958  Message::AckType::Stats,
3959  Message::AckType::Processed,
3960  false);
3961  try
3962  {
3963  syncAckProcessing(timeout_, _message);
3964  }
3965  catch (...)
3966  {
3967  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3968  throw;
3969  }
3970  return (std::string)commandId_;
3971  }
3972  }
3973 
3974  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3975  const std::string& topic_,
3976  const std::string& data_,
3977  long timeout_,
3978  Message::Field commandId_ = Message::Field())
3979  {
3980  if (_publishStore.isValid())
3981  {
3982  unsigned ackType = Message::AckType::Processed |
3983  Message::AckType::Stats |
3984  Message::AckType::Persisted;
3985  if (!publishStoreMessage)
3986  {
3987  publishStoreMessage = new Message();
3988  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3989  }
3990  publishStoreMessage->reset();
3991  if (commandId_.empty())
3992  {
3993  publishStoreMessage->newCommandId();
3994  commandId_ = publishStoreMessage->getCommandId();
3995  }
3996  else
3997  {
3998  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3999  }
4000  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4001  .assignSubscriptionId(commandId_.data(), commandId_.len())
4002  .assignQueryID(commandId_.data(), commandId_.len())
4003  .setAckTypeEnum(ackType)
4004  .assignTopic(topic_.c_str(), topic_.length())
4005  .assignData(data_.c_str(), data_.length());
4006  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
4007  char buf[AMPS_NUMBER_BUFFER_LEN];
4008  size_t pos = convertToCharArray(buf, haSequenceNumber);
4009  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4010  {
4011  try
4012  {
4013  Lock<Mutex> l(_lock);
4014  _routes.addRoute(commandId_, messageHandler_,
4015  Message::AckType::Stats,
4016  Message::AckType::Processed | Message::AckType::Persisted,
4017  false);
4018  syncAckProcessing(timeout_, *publishStoreMessage,
4019  haSequenceNumber);
4020  }
4021  catch (const DisconnectedException&)
4022  {
4023  // -V565
4024  // Pass - it will get replayed upon reconnect
4025  }
4026  catch (...)
4027  {
4028  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4029  throw;
4030  }
4031  }
4032  return (std::string)commandId_;
4033  }
4034  else
4035  {
4036  Lock<Mutex> l(_lock);
4037  _message.reset();
4038  if (commandId_.empty())
4039  {
4040  _message.newCommandId();
4041  commandId_ = _message.getCommandId();
4042  }
4043  else
4044  {
4045  _message.setCommandId(commandId_.data(), commandId_.len());
4046  }
4047  _message.setCommandEnum(Message::Command::SOWDelete)
4048  .assignSubscriptionId(commandId_.data(), commandId_.len())
4049  .assignQueryID(commandId_.data(), commandId_.len())
4050  .setAckTypeEnum(Message::AckType::Processed |
4051  Message::AckType::Stats)
4052  .assignTopic(topic_.c_str(), topic_.length())
4053  .assignData(data_.c_str(), data_.length());
4054  _routes.addRoute(commandId_, messageHandler_,
4055  Message::AckType::Stats,
4056  Message::AckType::Processed,
4057  false);
4058  try
4059  {
4060  syncAckProcessing(timeout_, _message);
4061  }
4062  catch (...)
4063  {
4064  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4065  throw;
4066  }
4067  return (std::string)commandId_;
4068  }
4069  }
4070 
4071  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4072  const std::string& topic_,
4073  const std::string& keys_,
4074  long timeout_,
4075  Message::Field commandId_ = Message::Field())
4076  {
4077  if (_publishStore.isValid())
4078  {
4079  unsigned ackType = Message::AckType::Processed |
4080  Message::AckType::Stats |
4081  Message::AckType::Persisted;
4082  if (!publishStoreMessage)
4083  {
4084  publishStoreMessage = new Message();
4085  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4086  }
4087  publishStoreMessage->reset();
4088  if (commandId_.empty())
4089  {
4090  publishStoreMessage->newCommandId();
4091  commandId_ = publishStoreMessage->getCommandId();
4092  }
4093  else
4094  {
4095  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
4096  }
4097  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4098  .assignSubscriptionId(commandId_.data(), commandId_.len())
4099  .assignQueryID(commandId_.data(), commandId_.len())
4100  .setAckTypeEnum(ackType)
4101  .assignTopic(topic_.c_str(), topic_.length())
4102  .assignSowKeys(keys_.c_str(), keys_.length());
4103  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
4104  char buf[AMPS_NUMBER_BUFFER_LEN];
4105  size_t pos = convertToCharArray(buf, haSequenceNumber);
4106  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4107  {
4108  try
4109  {
4110  Lock<Mutex> l(_lock);
4111  _routes.addRoute(commandId_, messageHandler_,
4112  Message::AckType::Stats,
4113  Message::AckType::Processed | Message::AckType::Persisted,
4114  false);
4115  syncAckProcessing(timeout_, *publishStoreMessage,
4116  haSequenceNumber);
4117  }
4118  catch (const DisconnectedException&)
4119  {
4120  // -V565
4121  // Pass - it will get replayed upon reconnect
4122  }
4123  catch (...)
4124  {
4125  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4126  throw;
4127  }
4128  }
4129  return (std::string)commandId_;
4130  }
4131  else
4132  {
4133  Lock<Mutex> l(_lock);
4134  _message.reset();
4135  if (commandId_.empty())
4136  {
4137  _message.newCommandId();
4138  commandId_ = _message.getCommandId();
4139  }
4140  else
4141  {
4142  _message.setCommandId(commandId_.data(), commandId_.len());
4143  }
4144  _message.setCommandEnum(Message::Command::SOWDelete)
4145  .assignSubscriptionId(commandId_.data(), commandId_.len())
4146  .assignQueryID(commandId_.data(), commandId_.len())
4147  .setAckTypeEnum(Message::AckType::Processed |
4148  Message::AckType::Stats)
4149  .assignTopic(topic_.c_str(), topic_.length())
4150  .assignSowKeys(keys_.c_str(), keys_.length());
4151  _routes.addRoute(commandId_, messageHandler_,
4152  Message::AckType::Stats,
4153  Message::AckType::Processed,
4154  false);
4155  try
4156  {
4157  syncAckProcessing(timeout_, _message);
4158  }
4159  catch (...)
4160  {
4161  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4162  throw;
4163  }
4164  return (std::string)commandId_;
4165  }
4166  }
4167 
4168  void startTimer(void)
4169  {
4170  if (_serverVersion >= "5.3.2.0")
4171  {
4172  throw CommandException("The start_timer command is deprecated.");
4173  }
4174  Lock<Mutex> l(_lock);
4175  _message.reset();
4176  _message.setCommandEnum(Message::Command::StartTimer);
4177 
4178  _send(_message);
4179  }
4180 
4181  std::string stopTimer(MessageHandler messageHandler_)
4182  {
4183  if (_serverVersion >= "5.3.2.0")
4184  {
4185  throw CommandException("The stop_timer command is deprecated.");
4186  }
4187  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4188  }
4189 
4190  amps_handle getHandle(void)
4191  {
4192  return _client;
4193  }
4194 
4202  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4203  {
4204  _pExceptionListener = pListener_;
4205  _exceptionListener = _pExceptionListener.get();
4206  }
4207 
4208  void setExceptionListener(const ExceptionListener& listener_)
4209  {
4210  _exceptionListener = &listener_;
4211  }
4212 
4213  const ExceptionListener& getExceptionListener(void) const
4214  {
4215  return *_exceptionListener;
4216  }
4217 
4218  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4219  {
4220  if (readTimeout_ < heartbeatInterval_)
4221  {
4222  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4223  }
4224  Lock<Mutex> l(_lock);
4225  if (_heartbeatInterval != heartbeatInterval_ ||
4226  _readTimeout != readTimeout_)
4227  {
4228  _heartbeatInterval = heartbeatInterval_;
4229  _readTimeout = readTimeout_;
4230  _sendHeartbeat();
4231  }
4232  }
4233 
4234  void _sendHeartbeat(void)
4235  {
4236  if (_connected && _heartbeatInterval != 0)
4237  {
4238  std::ostringstream options;
4239  options << "start," << _heartbeatInterval;
4240  _beatMessage.setOptions(options.str());
4241 
4242  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4243  _heartbeatTimer.start();
4244  try
4245  {
4246  _sendWithoutRetry(_beatMessage);
4247  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4248  }
4249  catch (ConnectionException& ex_)
4250  {
4251  // If we are disconnected when we attempt to send, that's OK;
4252  // we'll send this message after we re-connect (if we do).
4253  AMPS_UNHANDLED_EXCEPTION(ex_);
4254  }
4255  _beatMessage.setOptions("beat");
4256  }
4257  amps_result result = AMPS_E_OK;
4258  if (_readTimeout && _connected)
4259  {
4260  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4261  }
4262  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4263  {
4264  AMPSException::throwFor(_client, result);
4265  }
4266  }
4267 
4268  void addConnectionStateListener(ConnectionStateListener* listener_)
4269  {
4270  Lock<Mutex> lock(_lock);
4271  _connectionStateListeners.insert(listener_);
4272  }
4273 
4274  void removeConnectionStateListener(ConnectionStateListener* listener_)
4275  {
4276  Lock<Mutex> lock(_lock);
4277  _connectionStateListeners.erase(listener_);
4278  }
4279 
4280  void clearConnectionStateListeners()
4281  {
4282  Lock<Mutex> lock(_lock);
4283  _connectionStateListeners.clear();
4284  }
4285 
4286  void _registerHandler(Command& command_, Message::Field& cid_,
4287  MessageHandler& handler_, unsigned requestedAcks_,
4288  unsigned systemAddedAcks_, bool isSubscribe_)
4289  {
4290  Message message = command_.getMessage();
4291  Message::Command::Type commandType = message.getCommandEnum();
4292  Message::Field subid = message.getSubscriptionId();
4293  Message::Field qid = message.getQueryID();
4294  // If we have an id, we're good, even if it's an existing route
4295  bool added = qid.len() || subid.len() || cid_.len();
4296  bool cidIsQid = cid_ == qid;
4297  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4298  int addedCount = 0;
4299  if (subid.len() > 0)
4300  {
4301  // This can replace a non-subscribe with a matching id
4302  // with a subscription but not another subscription.
4303  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4304  systemAddedAcks_, isSubscribe_);
4305  if (!cidUnique
4306  && (commandType == Message::Command::Subscribe
4307  || commandType == Message::Command::DeltaSubscribe))
4308  {
4309  // We don't need to do anything else
4310  cid_ = subid;
4311  return;
4312  }
4313  }
4314  if (qid.len() > 0 && qid != subid
4315  && (commandType == Message::Command::SOW
4316  || commandType == Message::Command::SOWDelete
4317  || commandType == Message::Command::SOWAndSubscribe
4318  || commandType == Message::Command::SOWAndDeltaSubscribe))
4319  {
4320  while (_routes.hasRoute(qid))
4321  {
4322  message.newQueryId();
4323  if (cidIsQid)
4324  {
4325  cid_ = message.getQueryId();
4326  }
4327  qid = message.getQueryId();
4328  }
4329  if (addedCount == 0)
4330  {
4331  _routes.addRoute(qid, handler_, requestedAcks_,
4332  systemAddedAcks_, isSubscribe_);
4333  }
4334  else
4335  {
4336  void* data = NULL;
4337  {
4338  Unlock<Mutex> u(_lock);
4339  data = amps_invoke_copy_route_function(handler_.userData());
4340  }
4341  if (!data)
4342  {
4343  _routes.addRoute(qid, handler_, requestedAcks_,
4344  systemAddedAcks_, false);
4345  }
4346  else
4347  {
4348  _routes.addRoute(qid,
4349  MessageHandler(handler_.function(),
4350  data),
4351  requestedAcks_,
4352  systemAddedAcks_, false);
4353  }
4354  }
4355  ++addedCount;
4356  }
4357  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4358  {
4359  while (_routes.hasRoute(cid_))
4360  {
4361  cid_ = message.newCommandId().getCommandId();
4362  }
4363  if (addedCount == 0)
4364  {
4365  _routes.addRoute(cid_, handler_, requestedAcks_,
4366  systemAddedAcks_, false);
4367  }
4368  else
4369  {
4370  void* data = NULL;
4371  {
4372  Unlock<Mutex> u(_lock);
4373  data = amps_invoke_copy_route_function(handler_.userData());
4374  }
4375  if (!data)
4376  {
4377  _routes.addRoute(cid_, handler_, requestedAcks_,
4378  systemAddedAcks_, false);
4379  }
4380  else
4381  {
4382  _routes.addRoute(cid_,
4383  MessageHandler(handler_.function(),
4384  data),
4385  requestedAcks_,
4386  systemAddedAcks_, false);
4387  }
4388  }
4389  }
4390  else if ((commandType == Message::Command::Publish ||
4391  commandType == Message::Command::DeltaPublish)
4392  && requestedAcks_ & ~Message::AckType::Persisted)
4393  {
4394  cid_ = command_.getMessage().newCommandId().getCommandId();
4395  _routes.addRoute(cid_, handler_, requestedAcks_,
4396  systemAddedAcks_, false);
4397  added = true;
4398  }
4399  if (!added)
4400  {
4401  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4402  }
4403  }
4404 
4405  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4406  bool isHASubscribe_ = true)
4407  {
4408  isHASubscribe_ &= (bool)_subscriptionManager;
4409  Message& message = command_.getMessage();
4410  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4411  Message::AckType::Processed : Message::AckType::None;
4412  unsigned requestedAcks = message.getAckTypeEnum();
4413  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4414  Message::Command::Type commandType = message.getCommandEnum();
4415  if (commandType == Message::Command::SOW
4416  || commandType == Message::Command::SOWAndSubscribe
4417  || commandType == Message::Command::SOWAndDeltaSubscribe
4418  || commandType == Message::Command::StopTimer)
4419  {
4420  systemAddedAcks |= Message::AckType::Completed;
4421  }
4422  Message::Field cid = message.getCommandId();
4423  if (handler_.isValid() && cid.empty())
4424  {
4425  cid = message.newCommandId().getCommandId();
4426  }
4427  if (message.getBookmark().len() > 0)
4428  {
4429  if (command_.isSubscribe())
4430  {
4431  Message::Field bookmark = message.getBookmark();
4432  if (_bookmarkStore.isValid())
4433  {
4434  systemAddedAcks |= Message::AckType::Persisted;
4435  if (bookmark == AMPS_BOOKMARK_RECENT)
4436  {
4437  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4438  }
4439  else if (bookmark != AMPS_BOOKMARK_NOW &&
4440  bookmark != AMPS_BOOKMARK_EPOCH)
4441  {
4442  _bookmarkStore.log(message);
4443  if (!BookmarkRange::isRange(bookmark))
4444  {
4445  _bookmarkStore.discard(message);
4446  _bookmarkStore.persisted(message.getSubscriptionId(),
4447  bookmark);
4448  }
4449  }
4450  }
4451  else if (bookmark == AMPS_BOOKMARK_RECENT)
4452  {
4454  }
4455  }
4456  }
4457  if (isPublishStore)
4458  {
4459  systemAddedAcks |= Message::AckType::Persisted;
4460  }
4461  bool isSubscribe = command_.isSubscribe();
4462  if (handler_.isValid() && !isSubscribe)
4463  {
4464  _registerHandler(command_, cid, handler_,
4465  requestedAcks, systemAddedAcks, isSubscribe);
4466  }
4467  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4468  if (isPublishStore)
4469  {
4470  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4471  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4472  {
4473  Unlock<Mutex> u(_lock);
4474  haSequenceNumber = _publishStore.store(message);
4475  }
4476  message.setSequence(haSequenceNumber);
4477  try
4478  {
4479  if (useSyncSend)
4480  {
4481  syncAckProcessing((long)command_.getTimeout(), message,
4482  haSequenceNumber);
4483  }
4484  else
4485  {
4486  _send(message, haSequenceNumber);
4487  }
4488  }
4489  catch (const DisconnectedException&)
4490  {
4491  // -V565
4492  // Pass - message will get replayed when reconnected
4493  }
4494  catch (...)
4495  {
4496  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4497  throw;
4498  }
4499  }
4500  else
4501  {
4502  if (isSubscribe)
4503  {
4504  const Message::Field& subId = message.getSubscriptionId();
4505  if (isHASubscribe_)
4506  {
4507  Unlock<Mutex> u(_lock);
4508  _subscriptionManager->subscribe(handler_,
4509  message.deepCopy(),
4510  requestedAcks);
4511  if (_badTimeToHASubscribe)
4512  {
4513  message.setAckTypeEnum(requestedAcks);
4514  return std::string(subId.data(), subId.len());
4515  }
4516  }
4517  if (handler_.isValid())
4518  {
4519  _registerHandler(command_, cid, handler_,
4520  requestedAcks, systemAddedAcks, isSubscribe);
4521  }
4522  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4523  try
4524  {
4525  if (useSyncSend)
4526  {
4527  syncAckProcessing((long)command_.getTimeout(), message,
4528  isHASubscribe_);
4529  }
4530  else
4531  {
4532  _send(message);
4533  }
4534  }
4535  catch (const DisconnectedException&)
4536  {
4537  if (!isHASubscribe_)
4538  {
4539  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4540  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4541  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4542  message.setAckTypeEnum(requestedAcks);
4543  throw;
4544  }
4545  }
4546  catch (const TimedOutException&)
4547  {
4548  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4549  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4550  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4551  throw;
4552  }
4553  catch (...)
4554  {
4555  if (isHASubscribe_)
4556  {
4557  // Have to unlock before calling into sub manager to avoid deadlock
4558  Unlock<Mutex> unlock(_lock);
4559  _subscriptionManager->unsubscribe(subId);
4560  }
4561  if (message.getQueryID().len() > 0)
4562  {
4563  _routes.removeRoute(message.getQueryID());
4564  }
4565  _routes.removeRoute(cid);
4566  _routes.removeRoute(subId);
4567  throw;
4568  }
4569  if (subId.len() > 0)
4570  {
4571  message.setAckTypeEnum(requestedAcks);
4572  return std::string(subId.data(), subId.len());
4573  }
4574  }
4575  else
4576  {
4577  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4578  try
4579  {
4580  if (useSyncSend)
4581  {
4582  syncAckProcessing((long)(command_.getTimeout()), message);
4583  }
4584  else
4585  {
4586  _send(message);
4587  }
4588  }
4589  catch (const DisconnectedException&)
4590  {
4591  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4592  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4593  message.setAckTypeEnum(requestedAcks);
4594  throw;
4595  }
4596  catch (...)
4597  {
4598  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4599  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4600  message.setAckTypeEnum(requestedAcks);
4601  throw;
4602  }
4603  }
4604  }
4605  message.setAckTypeEnum(requestedAcks);
4606  return cid;
4607  }
4608 
4609  MessageStream getEmptyMessageStream(void);
4610 
4611  std::string executeAsync(Command& command_, MessageHandler& handler_,
4612  bool isHASubscribe_ = true)
4613  {
4614  Lock<Mutex> lock(_lock);
4615  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4616  }
4617 
4618  // Queue Methods //
4619  void setAutoAck(bool isAutoAckEnabled_)
4620  {
4621  _isAutoAckEnabled = isAutoAckEnabled_;
4622  }
4623  bool getAutoAck(void) const
4624  {
4625  return _isAutoAckEnabled;
4626  }
4627  void setAckBatchSize(const unsigned batchSize_)
4628  {
4629  _ackBatchSize = batchSize_;
4630  if (!_queueAckTimeout)
4631  {
4632  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4633  amps_client_set_idle_time(_client, _queueAckTimeout);
4634  }
4635  }
4636  unsigned getAckBatchSize(void) const
4637  {
4638  return _ackBatchSize;
4639  }
4640  int getAckTimeout(void) const
4641  {
4642  return _queueAckTimeout;
4643  }
4644  void setAckTimeout(const int ackTimeout_)
4645  {
4646  amps_client_set_idle_time(_client, ackTimeout_);
4647  _queueAckTimeout = ackTimeout_;
4648  }
4649  size_t _ack(QueueBookmarks& queueBookmarks_)
4650  {
4651  if (queueBookmarks_._bookmarkCount)
4652  {
4653  if (!publishStoreMessage)
4654  {
4655  publishStoreMessage = new Message();
4656  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4657  }
4658  publishStoreMessage->reset();
4659  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4660  .setTopic(queueBookmarks_._topic)
4661  .setBookmark(queueBookmarks_._data)
4662  .setCommandId("AMPS-queue-ack");
4663  amps_uint64_t haSequenceNumber = 0;
4664  if (_publishStore.isValid())
4665  {
4666  haSequenceNumber = _publishStore.store(*publishStoreMessage);
4667  publishStoreMessage->setAckType("persisted")
4668  .setSequence(haSequenceNumber);
4669  queueBookmarks_._data.erase();
4670  queueBookmarks_._bookmarkCount = 0;
4671  }
4672  _send(*publishStoreMessage, haSequenceNumber);
4673  if (!_publishStore.isValid())
4674  {
4675  queueBookmarks_._data.erase();
4676  queueBookmarks_._bookmarkCount = 0;
4677  }
4678  return 1;
4679  }
4680  return 0;
4681  }
4682  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4683  {
4684  if (_isAutoAckEnabled)
4685  {
4686  return;
4687  }
4688  _ack(topic_, bookmark_, options_);
4689  }
4690  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4691  {
4692  if (bookmark_.len() == 0)
4693  {
4694  return;
4695  }
4696  Lock<Mutex> lock(_lock);
4697  if (_ackBatchSize < 2 || options_ != NULL)
4698  {
4699  if (!publishStoreMessage)
4700  {
4701  publishStoreMessage = new Message();
4702  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4703  }
4704  publishStoreMessage->reset();
4705  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4706  .setCommandId("AMPS-queue-ack")
4707  .setTopic(topic_).setBookmark(bookmark_);
4708  if (options_)
4709  {
4710  publishStoreMessage->setOptions(options_);
4711  }
4712  amps_uint64_t haSequenceNumber = 0;
4713  if (_publishStore.isValid())
4714  {
4715  haSequenceNumber = _publishStore.store(*publishStoreMessage);
4716  publishStoreMessage->setAckType("persisted")
4717  .setSequence(haSequenceNumber);
4718  }
4719  _send(*publishStoreMessage, haSequenceNumber);
4720  return;
4721  }
4722  // have we acked anything for this hash
4723  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4724  TopicHashMap::iterator it = _topicHashMap.find(hash);
4725  if (it == _topicHashMap.end())
4726  {
4727  // add a new one to the map
4728 #ifdef AMPS_USE_EMPLACE
4729  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4730 #else
4731  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4732 #endif
4733  }
4734  QueueBookmarks& queueBookmarks = it->second;
4735  if (queueBookmarks._data.length())
4736  {
4737  queueBookmarks._data.append(",");
4738  }
4739  else
4740  {
4741  queueBookmarks._oldestTime = amps_now();
4742  }
4743  queueBookmarks._data.append(bookmark_);
4744  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4745  {
4746  _ack(queueBookmarks);
4747  }
4748  }
4749  void flushAcks(void)
4750  {
4751  size_t sendCount = 0;
4752  if (!_connected)
4753  {
4754  return;
4755  }
4756  else
4757  {
4758  Lock<Mutex> lock(_lock);
4759  typedef TopicHashMap::iterator iterator;
4760  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4761  {
4762  QueueBookmarks& queueBookmarks = it->second;
4763  sendCount += _ack(queueBookmarks);
4764  }
4765  }
4766  if (sendCount && _connected)
4767  {
4768  publishFlush(0, Message::AckType::Processed);
4769  }
4770  }
4771  // called when there's idle time, to see if we need to flush out any "acks"
4772  void checkQueueAcks(void)
4773  {
4774  if (!_topicHashMap.size())
4775  {
4776  return;
4777  }
4778  Lock<Mutex> lock(_lock);
4779  try
4780  {
4781  amps_uint64_t threshold = amps_now()
4782  - (amps_uint64_t)_queueAckTimeout;
4783  typedef TopicHashMap::iterator iterator;
4784  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4785  {
4786  QueueBookmarks& queueBookmarks = it->second;
4787  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4788  {
4789  _ack(queueBookmarks);
4790  }
4791  }
4792  }
4793  catch (std::exception& ex)
4794  {
4795  AMPS_UNHANDLED_EXCEPTION(ex);
4796  }
4797  }
4798 
4799  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4800  {
4801  Lock<Mutex> lock(_deferredExecutionLock);
4802 #ifdef AMPS_USE_EMPLACE
4803  _deferredExecutionList.emplace_back(
4804  DeferredExecutionRequest(func_, userData_));
4805 #else
4806  _deferredExecutionList.push_back(
4807  DeferredExecutionRequest(func_, userData_));
4808 #endif
4809  }
4810 
4811  inline void processDeferredExecutions(void)
4812  {
4813  if (_deferredExecutionList.size())
4814  {
4815  Lock<Mutex> lock(_deferredExecutionLock);
4816  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4817  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4818  for (; it != end; ++it)
4819  {
4820  try
4821  {
4822  it->_func(it->_userData);
4823  }
4824  catch (...)
4825  {
4826  // -V565
4827  // Intentionally ignore errors
4828  }
4829  }
4830  _deferredExecutionList.clear();
4831  _routes.invalidateCache();
4832  _routeCache.invalidateCache();
4833  }
4834  }
4835 
4836  bool getRetryOnDisconnect(void) const
4837  {
4838  return _isRetryOnDisconnect;
4839  }
4840 
4841  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4842  {
4843  _isRetryOnDisconnect = isRetryOnDisconnect_;
4844  }
4845 
4846  void setDefaultMaxDepth(unsigned maxDepth_)
4847  {
4848  _defaultMaxDepth = maxDepth_;
4849  }
4850 
4851  unsigned getDefaultMaxDepth(void) const
4852  {
4853  return _defaultMaxDepth;
4854  }
4855 
4856  void setTransportFilterFunction(amps_transport_filter_function filter_,
4857  void* userData_)
4858  {
4859  amps_client_set_transport_filter_function(_client, filter_, userData_);
4860  }
4861 
4862  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4863  void* userData_)
4864  {
4865  amps_client_set_thread_created_callback(_client, callback_, userData_);
4866  }
4867  }; // class ClientImpl
4942 
4944  {
4945  RefHandle<MessageStreamImpl> _body;
4946  public:
4951  class iterator
4952  {
4953  MessageStream* _pStream;
4954  Message _current;
4955  inline void advance(void);
4956 
4957  public:
4958  iterator() // end
4959  : _pStream(NULL)
4960  {;}
4961  iterator(MessageStream* pStream_)
4962  : _pStream(pStream_)
4963  {
4964  advance();
4965  }
4966 
4967  bool operator==(const iterator& rhs) const
4968  {
4969  return _pStream == rhs._pStream;
4970  }
4971  bool operator!=(const iterator& rhs) const
4972  {
4973  return _pStream != rhs._pStream;
4974  }
4975  void operator++(void)
4976  {
4977  advance();
4978  }
4979  Message operator*(void)
4980  {
4981  return _current;
4982  }
4983  Message* operator->(void)
4984  {
4985  return &_current;
4986  }
4987  };
4989  bool isValid() const
4990  {
4991  return _body.isValid();
4992  }
4993 
4997  {
4998  if (!_body.isValid())
4999  {
5000  throw UsageException("This MessageStream is not valid and cannot be iterated.");
5001  }
5002  return iterator(this);
5003  }
5006  // For non-SOW queries, the end is never reached.
5008  {
5009  return iterator();
5010  }
5011  inline MessageStream(void);
5012 
5018  MessageStream timeout(unsigned timeout_);
5019 
5023  MessageStream conflate(void);
5029  MessageStream maxDepth(unsigned maxDepth_);
5032  unsigned getMaxDepth(void) const;
5035  unsigned getDepth(void) const;
5036 
5037  private:
5038  inline MessageStream(const Client& client_);
5039  inline void setSOWOnly(const std::string& commandId_,
5040  const std::string& queryId_ = "");
5041  inline void setSubscription(const std::string& subId_,
5042  const std::string& commandId_ = "",
5043  const std::string& queryId_ = "");
5044  inline void setStatsOnly(const std::string& commandId_,
5045  const std::string& queryId_ = "");
5046  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5047 
5048  inline operator MessageHandler(void);
5049 
5050  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5051 
5052  friend class Client;
5053 
5054  };
5055 
5075  class Client // -V553
5076  {
5077  protected:
5078  BorrowRefHandle<ClientImpl> _body;
5079  public:
5080  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5081  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5082  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5083 
5092  Client(const std::string& clientName = "")
5093  : _body(new ClientImpl(clientName), true)
5094  {;}
5095 
5096  Client(ClientImpl* existingClient)
5097  : _body(existingClient, true)
5098  {;}
5099 
5100  Client(ClientImpl* existingClient, bool isRef)
5101  : _body(existingClient, isRef)
5102  {;}
5103 
5104  Client(const Client& rhs) : _body(rhs._body) {;}
5105  virtual ~Client(void) {;}
5106 
5107  Client& operator=(const Client& rhs)
5108  {
5109  _body = rhs._body;
5110  return *this;
5111  }
5112 
5113  bool isValid()
5114  {
5115  return _body.isValid();
5116  }
5117 
5130  void setName(const std::string& name)
5131  {
5132  _body.get().setName(name);
5133  }
5134 
5137  const std::string& getName() const
5138  {
5139  return _body.get().getName();
5140  }
5141 
5145  const std::string& getNameHash() const
5146  {
5147  return _body.get().getNameHash();
5148  }
5149 
5153  const amps_uint64_t getNameHashValue() const
5154  {
5155  return _body.get().getNameHashValue();
5156  }
5157 
5164  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5165  {
5166  _body.get().setLogonCorrelationData(logonCorrelationData_);
5167  }
5168 
5171  const std::string& getLogonCorrelationData() const
5172  {
5173  return _body.get().getLogonCorrelationData();
5174  }
5175 
5184  size_t getServerVersion() const
5185  {
5186  return _body.get().getServerVersion();
5187  }
5188 
5195  VersionInfo getServerVersionInfo() const
5196  {
5197  return _body.get().getServerVersionInfo();
5198  }
5199 
5209  static size_t convertVersionToNumber(const std::string& version_)
5210  {
5211  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5212  }
5213 
5224  static size_t convertVersionToNumber(const char* data_, size_t len_)
5225  {
5226  return AMPS::convertVersionToNumber(data_, len_);
5227  }
5228 
5231  const std::string& getURI() const
5232  {
5233  return _body.get().getURI();
5234  }
5235 
5242 
5244 
5255  void connect(const std::string& uri)
5256  {
5257  _body.get().connect(uri);
5258  }
5259 
5262  void disconnect()
5263  {
5264  _body.get().disconnect();
5265  }
5266 
5280  void send(const Message& message)
5281  {
5282  _body.get().send(message);
5283  }
5284 
5293  void addMessageHandler(const Field& commandId_,
5294  const AMPS::MessageHandler& messageHandler_,
5295  unsigned requestedAcks_, bool isSubscribe_)
5296  {
5297  _body.get().addMessageHandler(commandId_, messageHandler_,
5298  requestedAcks_, isSubscribe_);
5299  }
5300 
5304  bool removeMessageHandler(const Field& commandId_)
5305  {
5306  return _body.get().removeMessageHandler(commandId_);
5307  }
5308 
5332  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5333  {
5334  return _body.get().send(messageHandler, message, timeout);
5335  }
5336 
5346  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5347  {
5348  _body.get().setDisconnectHandler(disconnectHandler);
5349  }
5350 
5354  DisconnectHandler getDisconnectHandler(void) const
5355  {
5356  return _body.get().getDisconnectHandler();
5357  }
5358 
5363  virtual ConnectionInfo getConnectionInfo() const
5364  {
5365  return _body.get().getConnectionInfo();
5366  }
5367 
5376  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5377  {
5378  _body.get().setBookmarkStore(bookmarkStore_);
5379  }
5380 
5385  {
5386  return _body.get().getBookmarkStore();
5387  }
5388 
5393  {
5394  return _body.get().getSubscriptionManager();
5395  }
5396 
5404  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5405  {
5406  _body.get().setSubscriptionManager(subscriptionManager_);
5407  }
5408 
5428  void setPublishStore(const Store& publishStore_)
5429  {
5430  _body.get().setPublishStore(publishStore_);
5431  }
5432 
5437  {
5438  return _body.get().getPublishStore();
5439  }
5440 
5444  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5445  {
5446  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5447  duplicateMessageHandler_);
5448  }
5449 
5460  {
5461  return _body.get().getDuplicateMessageHandler();
5462  }
5463 
5474  {
5475  _body.get().setFailedWriteHandler(handler_);
5476  }
5477 
5482  {
5483  return _body.get().getFailedWriteHandler();
5484  }
5485 
5486 
5504  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5505  {
5506  return _body.get().publish(topic_.c_str(), topic_.length(),
5507  data_.c_str(), data_.length());
5508  }
5509 
5529  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5530  const char* data_, size_t dataLength_)
5531  {
5532  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5533  }
5534 
5553  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5554  unsigned long expiration_)
5555  {
5556  return _body.get().publish(topic_.c_str(), topic_.length(),
5557  data_.c_str(), data_.length(), expiration_);
5558  }
5559 
5580  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5581  const char* data_, size_t dataLength_,
5582  unsigned long expiration_)
5583  {
5584  return _body.get().publish(topic_, topicLength_,
5585  data_, dataLength_, expiration_);
5586  }
5587 
5626  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5627  {
5628  _body.get().publishFlush(timeout_, ackType_);
5629  }
5630 
5631 
5647  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5648  {
5649  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5650  data_.c_str(), data_.length());
5651  }
5652 
5670  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5671  const char* data_, size_t dataLength_)
5672  {
5673  return _body.get().deltaPublish(topic_, topicLength_,
5674  data_, dataLength_);
5675  }
5676 
5693  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5694  unsigned long expiration_)
5695  {
5696  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5697  data_.c_str(), data_.length(),
5698  expiration_);
5699  }
5700 
5719  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5720  const char* data_, size_t dataLength_,
5721  unsigned long expiration_)
5722  {
5723  return _body.get().deltaPublish(topic_, topicLength_,
5724  data_, dataLength_, expiration_);
5725  }
5726 
5742  std::string logon(int timeout_ = 0,
5743  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5744  const char* options_ = NULL)
5745  {
5746  return _body.get().logon(timeout_, authenticator_, options_);
5747  }
5761  std::string logon(const char* options_, int timeout_ = 0)
5762  {
5763  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5764  options_);
5765  }
5766 
5780  std::string logon(const std::string& options_, int timeout_ = 0)
5781  {
5782  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5783  options_.c_str());
5784  }
5785 
5805  std::string subscribe(const MessageHandler& messageHandler_,
5806  const std::string& topic_,
5807  long timeout_ = 0,
5808  const std::string& filter_ = "",
5809  const std::string& options_ = "",
5810  const std::string& subId_ = "")
5811  {
5812  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5813  filter_, "", options_, subId_);
5814  }
5815 
5831  MessageStream subscribe(const std::string& topic_,
5832  long timeout_ = 0, const std::string& filter_ = "",
5833  const std::string& options_ = "",
5834  const std::string& subId_ = "")
5835  {
5836  MessageStream result(*this);
5837  if (_body.get().getDefaultMaxDepth())
5838  {
5839  result.maxDepth(_body.get().getDefaultMaxDepth());
5840  }
5841  result.setSubscription(_body.get().subscribe(
5842  result.operator MessageHandler(),
5843  topic_, timeout_, filter_, "",
5844  options_, subId_, false));
5845  return result;
5846  }
5847 
5863  MessageStream subscribe(const char* topic_,
5864  long timeout_ = 0, const std::string& filter_ = "",
5865  const std::string& options_ = "",
5866  const std::string& subId_ = "")
5867  {
5868  MessageStream result(*this);
5869  if (_body.get().getDefaultMaxDepth())
5870  {
5871  result.maxDepth(_body.get().getDefaultMaxDepth());
5872  }
5873  result.setSubscription(_body.get().subscribe(
5874  result.operator MessageHandler(),
5875  topic_, timeout_, filter_, "",
5876  options_, subId_, false));
5877  return result;
5878  }
5879 
5892  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5893  const std::string& topic_,
5894  long timeout_,
5895  const std::string& filter_ = "",
5896  const std::string& options_ = "",
5897  const std::string& subId_ = "")
5898  {
5899  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5900  filter_, "", options_, subId_);
5901  }
5910  MessageStream deltaSubscribe(const std::string& topic_,
5911  long timeout_, const std::string& filter_ = "",
5912  const std::string& options_ = "",
5913  const std::string& subId_ = "")
5914  {
5915  MessageStream result(*this);
5916  if (_body.get().getDefaultMaxDepth())
5917  {
5918  result.maxDepth(_body.get().getDefaultMaxDepth());
5919  }
5920  result.setSubscription(_body.get().deltaSubscribe(
5921  result.operator MessageHandler(),
5922  topic_, timeout_, filter_, "",
5923  options_, subId_, false));
5924  return result;
5925  }
5926 
5928  MessageStream deltaSubscribe(const char* topic_,
5929  long timeout_, const std::string& filter_ = "",
5930  const std::string& options_ = "",
5931  const std::string& subId_ = "")
5932  {
5933  MessageStream result(*this);
5934  if (_body.get().getDefaultMaxDepth())
5935  {
5936  result.maxDepth(_body.get().getDefaultMaxDepth());
5937  }
5938  result.setSubscription(_body.get().deltaSubscribe(
5939  result.operator MessageHandler(),
5940  topic_, timeout_, filter_, "",
5941  options_, subId_, false));
5942  return result;
5943  }
5944 
5970  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5971  const std::string& topic_,
5972  long timeout_,
5973  const std::string& bookmark_,
5974  const std::string& filter_ = "",
5975  const std::string& options_ = "",
5976  const std::string& subId_ = "")
5977  {
5978  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5979  filter_, bookmark_, options_, subId_);
5980  }
5998  MessageStream bookmarkSubscribe(const std::string& topic_,
5999  long timeout_,
6000  const std::string& bookmark_,
6001  const std::string& filter_ = "",
6002  const std::string& options_ = "",
6003  const std::string& subId_ = "")
6004  {
6005  MessageStream result(*this);
6006  if (_body.get().getDefaultMaxDepth())
6007  {
6008  result.maxDepth(_body.get().getDefaultMaxDepth());
6009  }
6010  result.setSubscription(_body.get().subscribe(
6011  result.operator MessageHandler(),
6012  topic_, timeout_, filter_,
6013  bookmark_, options_,
6014  subId_, false));
6015  return result;
6016  }
6017 
6019  MessageStream bookmarkSubscribe(const char* topic_,
6020  long timeout_,
6021  const std::string& bookmark_,
6022  const std::string& filter_ = "",
6023  const std::string& options_ = "",
6024  const std::string& subId_ = "")
6025  {
6026  MessageStream result(*this);
6027  if (_body.get().getDefaultMaxDepth())
6028  {
6029  result.maxDepth(_body.get().getDefaultMaxDepth());
6030  }
6031  result.setSubscription(_body.get().subscribe(
6032  result.operator MessageHandler(),
6033  topic_, timeout_, filter_,
6034  bookmark_, options_,
6035  subId_, false));
6036  return result;
6037  }
6038 
6047  void unsubscribe(const std::string& commandId)
6048  {
6049  return _body.get().unsubscribe(commandId);
6050  }
6051 
6060  {
6061  return _body.get().unsubscribe();
6062  }
6063 
6064 
6094  std::string sow(const MessageHandler& messageHandler_,
6095  const std::string& topic_,
6096  const std::string& filter_ = "",
6097  const std::string& orderBy_ = "",
6098  const std::string& bookmark_ = "",
6099  int batchSize_ = DEFAULT_BATCH_SIZE,
6100  int topN_ = DEFAULT_TOP_N,
6101  const std::string& options_ = "",
6102  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6103  {
6104  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6105  bookmark_, batchSize_, topN_, options_,
6106  timeout_);
6107  }
6132  MessageStream sow(const std::string& topic_,
6133  const std::string& filter_ = "",
6134  const std::string& orderBy_ = "",
6135  const std::string& bookmark_ = "",
6136  int batchSize_ = DEFAULT_BATCH_SIZE,
6137  int topN_ = DEFAULT_TOP_N,
6138  const std::string& options_ = "",
6139  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6140  {
6141  MessageStream result(*this);
6142  if (_body.get().getDefaultMaxDepth())
6143  {
6144  result.maxDepth(_body.get().getDefaultMaxDepth());
6145  }
6146  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6147  topic_, filter_, orderBy_, bookmark_,
6148  batchSize_, topN_, options_, timeout_));
6149  return result;
6150  }
6151 
6153  MessageStream sow(const char* topic_,
6154  const std::string& filter_ = "",
6155  const std::string& orderBy_ = "",
6156  const std::string& bookmark_ = "",
6157  int batchSize_ = DEFAULT_BATCH_SIZE,
6158  int topN_ = DEFAULT_TOP_N,
6159  const std::string& options_ = "",
6160  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6161  {
6162  MessageStream result(*this);
6163  if (_body.get().getDefaultMaxDepth())
6164  {
6165  result.maxDepth(_body.get().getDefaultMaxDepth());
6166  }
6167  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6168  topic_, filter_, orderBy_, bookmark_,
6169  batchSize_, topN_, options_, timeout_));
6170  return result;
6171  }
6194  std::string sow(const MessageHandler& messageHandler_,
6195  const std::string& topic_,
6196  long timeout_,
6197  const std::string& filter_ = "",
6198  int batchSize_ = DEFAULT_BATCH_SIZE,
6199  int topN_ = DEFAULT_TOP_N)
6200  {
6201  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6202  batchSize_, topN_);
6203  }
6226  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6227  const std::string& topic_,
6228  long timeout_,
6229  const std::string& filter_ = "",
6230  int batchSize_ = DEFAULT_BATCH_SIZE,
6231  bool oofEnabled_ = false,
6232  int topN_ = DEFAULT_TOP_N)
6233  {
6234  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6235  filter_, batchSize_, oofEnabled_,
6236  topN_);
6237  }
6238 
6258  MessageStream sowAndSubscribe(const std::string& topic_,
6259  long timeout_,
6260  const std::string& filter_ = "",
6261  int batchSize_ = DEFAULT_BATCH_SIZE,
6262  bool oofEnabled_ = false,
6263  int topN_ = DEFAULT_TOP_N)
6264  {
6265  MessageStream result(*this);
6266  if (_body.get().getDefaultMaxDepth())
6267  {
6268  result.maxDepth(_body.get().getDefaultMaxDepth());
6269  }
6270  result.setSubscription(_body.get().sowAndSubscribe(
6271  result.operator MessageHandler(),
6272  topic_, timeout_, filter_,
6273  batchSize_, oofEnabled_,
6274  topN_, false));
6275  return result;
6276  }
6296  MessageStream sowAndSubscribe(const char* topic_,
6297  long timeout_,
6298  const std::string& filter_ = "",
6299  int batchSize_ = DEFAULT_BATCH_SIZE,
6300  bool oofEnabled_ = false,
6301  int topN_ = DEFAULT_TOP_N)
6302  {
6303  MessageStream result(*this);
6304  if (_body.get().getDefaultMaxDepth())
6305  {
6306  result.maxDepth(_body.get().getDefaultMaxDepth());
6307  }
6308  result.setSubscription(_body.get().sowAndSubscribe(
6309  result.operator MessageHandler(),
6310  topic_, timeout_, filter_,
6311  batchSize_, oofEnabled_,
6312  topN_, false));
6313  return result;
6314  }
6315 
6316 
6344  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6345  const std::string& topic_,
6346  const std::string& filter_ = "",
6347  const std::string& orderBy_ = "",
6348  const std::string& bookmark_ = "",
6349  int batchSize_ = DEFAULT_BATCH_SIZE,
6350  int topN_ = DEFAULT_TOP_N,
6351  const std::string& options_ = "",
6352  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6353  {
6354  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6355  orderBy_, bookmark_, batchSize_,
6356  topN_, options_, timeout_);
6357  }
6358 
6383  MessageStream sowAndSubscribe(const std::string& topic_,
6384  const std::string& filter_ = "",
6385  const std::string& orderBy_ = "",
6386  const std::string& bookmark_ = "",
6387  int batchSize_ = DEFAULT_BATCH_SIZE,
6388  int topN_ = DEFAULT_TOP_N,
6389  const std::string& options_ = "",
6390  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6391  {
6392  MessageStream result(*this);
6393  if (_body.get().getDefaultMaxDepth())
6394  {
6395  result.maxDepth(_body.get().getDefaultMaxDepth());
6396  }
6397  result.setSubscription(_body.get().sowAndSubscribe(
6398  result.operator MessageHandler(),
6399  topic_, filter_, orderBy_,
6400  bookmark_, batchSize_, topN_,
6401  options_, timeout_, false));
6402  return result;
6403  }
6404 
6406  MessageStream sowAndSubscribe(const char* topic_,
6407  const std::string& filter_ = "",
6408  const std::string& orderBy_ = "",
6409  const std::string& bookmark_ = "",
6410  int batchSize_ = DEFAULT_BATCH_SIZE,
6411  int topN_ = DEFAULT_TOP_N,
6412  const std::string& options_ = "",
6413  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6414  {
6415  MessageStream result(*this);
6416  if (_body.get().getDefaultMaxDepth())
6417  {
6418  result.maxDepth(_body.get().getDefaultMaxDepth());
6419  }
6420  result.setSubscription(_body.get().sowAndSubscribe(
6421  result.operator MessageHandler(),
6422  topic_, filter_, orderBy_,
6423  bookmark_, batchSize_, topN_,
6424  options_, timeout_, false));
6425  return result;
6426  }
6427 
6452  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6453  const std::string& topic_,
6454  const std::string& filter_ = "",
6455  const std::string& orderBy_ = "",
6456  int batchSize_ = DEFAULT_BATCH_SIZE,
6457  int topN_ = DEFAULT_TOP_N,
6458  const std::string& options_ = "",
6459  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6460  {
6461  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6462  filter_, orderBy_, batchSize_,
6463  topN_, options_, timeout_);
6464  }
6485  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6486  const std::string& filter_ = "",
6487  const std::string& orderBy_ = "",
6488  int batchSize_ = DEFAULT_BATCH_SIZE,
6489  int topN_ = DEFAULT_TOP_N,
6490  const std::string& options_ = "",
6491  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6492  {
6493  MessageStream result(*this);
6494  if (_body.get().getDefaultMaxDepth())
6495  {
6496  result.maxDepth(_body.get().getDefaultMaxDepth());
6497  }
6498  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6499  result.operator MessageHandler(),
6500  topic_, filter_, orderBy_,
6501  batchSize_, topN_, options_,
6502  timeout_, false));
6503  return result;
6504  }
6505 
6508  const std::string& filter_ = "",
6509  const std::string& orderBy_ = "",
6510  int batchSize_ = DEFAULT_BATCH_SIZE,
6511  int topN_ = DEFAULT_TOP_N,
6512  const std::string& options_ = "",
6513  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6514  {
6515  MessageStream result(*this);
6516  if (_body.get().getDefaultMaxDepth())
6517  {
6518  result.maxDepth(_body.get().getDefaultMaxDepth());
6519  }
6520  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6521  result.operator MessageHandler(),
6522  topic_, filter_, orderBy_,
6523  batchSize_, topN_, options_,
6524  timeout_, false));
6525  return result;
6526  }
6527 
6552  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6553  const std::string& topic_,
6554  long timeout_,
6555  const std::string& filter_ = "",
6556  int batchSize_ = DEFAULT_BATCH_SIZE,
6557  bool oofEnabled_ = false,
6558  bool sendEmpties_ = false,
6559  int topN_ = DEFAULT_TOP_N)
6560  {
6561  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6562  timeout_, filter_, batchSize_,
6563  oofEnabled_, sendEmpties_,
6564  topN_);
6565  }
6566 
6588  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6589  long timeout_,
6590  const std::string& filter_ = "",
6591  int batchSize_ = DEFAULT_BATCH_SIZE,
6592  bool oofEnabled_ = false,
6593  bool sendEmpties_ = false,
6594  int topN_ = DEFAULT_TOP_N)
6595  {
6596  MessageStream result(*this);
6597  if (_body.get().getDefaultMaxDepth())
6598  {
6599  result.maxDepth(_body.get().getDefaultMaxDepth());
6600  }
6601  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6602  result.operator MessageHandler(),
6603  topic_, timeout_, filter_,
6604  batchSize_, oofEnabled_,
6605  sendEmpties_, topN_, false));
6606  return result;
6607  }
6630  long timeout_,
6631  const std::string& filter_ = "",
6632  int batchSize_ = DEFAULT_BATCH_SIZE,
6633  bool oofEnabled_ = false,
6634  bool sendEmpties_ = false,
6635  int topN_ = DEFAULT_TOP_N)
6636  {
6637  MessageStream result(*this);
6638  if (_body.get().getDefaultMaxDepth())
6639  {
6640  result.maxDepth(_body.get().getDefaultMaxDepth());
6641  }
6642  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6643  result.operator MessageHandler(),
6644  topic_, timeout_, filter_,
6645  batchSize_, oofEnabled_,
6646  sendEmpties_, topN_, false));
6647  return result;
6648  }
6668  std::string sowDelete(const MessageHandler& messageHandler,
6669  const std::string& topic,
6670  const std::string& filter,
6671  long timeout)
6672  {
6673  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6674  }
6691  Message sowDelete(const std::string& topic, const std::string& filter,
6692  long timeout = 0)
6693  {
6694  MessageStream stream(*this);
6695  char buf[Message::IdentifierLength + 1];
6696  buf[Message::IdentifierLength] = 0;
6697  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6698  Field cid(buf);
6699  try
6700  {
6701  stream.setStatsOnly(cid);
6702  _body.get().sowDelete(stream.operator MessageHandler(), topic, filter, timeout, cid);
6703  return *(stream.begin());
6704  }
6705  catch (const DisconnectedException&)
6706  {
6707  removeMessageHandler(cid);
6708  throw;
6709  }
6710  }
6711 
6716  void startTimer()
6717  {
6718  _body.get().startTimer();
6719  }
6720 
6727  std::string stopTimer(const MessageHandler& messageHandler)
6728  {
6729  return _body.get().stopTimer(messageHandler);
6730  }
6731 
6753  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6754  const std::string& topic_,
6755  const std::string& keys_,
6756  long timeout_ = 0)
6757  {
6758  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6759  }
6780  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6781  long timeout_ = 0)
6782  {
6783  MessageStream stream(*this);
6784  char buf[Message::IdentifierLength + 1];
6785  buf[Message::IdentifierLength] = 0;
6786  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6787  Field cid(buf);
6788  try
6789  {
6790  stream.setStatsOnly(cid);
6791  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6792  return *(stream.begin());
6793  }
6794  catch (const DisconnectedException&)
6795  {
6796  removeMessageHandler(cid);
6797  throw;
6798  }
6799  }
6800 
6815  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6816  const std::string& topic_, const std::string& data_,
6817  long timeout_ = 0)
6818  {
6819  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6820  }
6821 
6836  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
6837  long timeout_ = 0)
6838  {
6839  MessageStream stream(*this);
6840  char buf[Message::IdentifierLength + 1];
6841  buf[Message::IdentifierLength] = 0;
6842  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6843  Field cid(buf);
6844  try
6845  {
6846  stream.setStatsOnly(cid);
6847  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
6848  return *(stream.begin());
6849  }
6850  catch (const DisconnectedException&)
6851  {
6852  removeMessageHandler(cid);
6853  throw;
6854  }
6855  }
6856 
6861  {
6862  return _body.get().getHandle();
6863  }
6864 
6873  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6874  {
6875  _body.get().setExceptionListener(pListener_);
6876  }
6877 
6887  {
6888  _body.get().setExceptionListener(listener_);
6889  }
6890 
6894  {
6895  return _body.get().getExceptionListener();
6896  }
6897 
6905  // type of message) from the server for the specified interval (plus a grace period),
6919  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6920  {
6921  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6922  }
6923 
6931  // type of message) from the server for the specified interval (plus a grace period),
6943  void setHeartbeat(unsigned heartbeatTime_)
6944  {
6945  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6946  }
6947 
6950  {
6951  setLastChanceMessageHandler(messageHandler);
6952  }
6953 
6957  {
6958  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6959  messageHandler);
6960  }
6961 
6982  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6983  {
6984  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6985  }
6986 
7007  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7008  {
7009  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7010  }
7011 
7017  static const char* BOOKMARK_NOW()
7018  {
7019  return AMPS_BOOKMARK_NOW;
7020  }
7026  static const char* NOW()
7027  {
7028  return AMPS_BOOKMARK_NOW;
7029  }
7030 
7036  static const char* BOOKMARK_EPOCH()
7037  {
7038  return AMPS_BOOKMARK_EPOCH;
7039  }
7040 
7046  static const char* EPOCH()
7047  {
7048  return AMPS_BOOKMARK_EPOCH;
7049  }
7050 
7057  static const char* BOOKMARK_MOST_RECENT()
7058  {
7059  return AMPS_BOOKMARK_RECENT;
7060  }
7061 
7068  static const char* MOST_RECENT()
7069  {
7070  return AMPS_BOOKMARK_RECENT;
7071  }
7072 
7079  static const char* BOOKMARK_RECENT()
7080  {
7081  return AMPS_BOOKMARK_RECENT;
7082  }
7083 
7084 
7091  {
7092  _body.get().addConnectionStateListener(listener);
7093  }
7094 
7099  {
7100  _body.get().removeConnectionStateListener(listener);
7101  }
7102 
7106  {
7107  _body.get().clearConnectionStateListeners();
7108  }
7109 
7135  std::string executeAsync(Command& command_, MessageHandler handler_)
7136  {
7137  return _body.get().executeAsync(command_, handler_);
7138  }
7139 
7169  std::string executeAsyncNoResubscribe(Command& command_,
7170  MessageHandler handler_)
7171  {
7172  std::string id;
7173  try
7174  {
7175  if (command_.isSubscribe())
7176  {
7177  Message& message = command_.getMessage();
7178  Field subId = message.getSubscriptionId();
7179  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7180  if (useExistingHandler)
7181  {
7182  MessageHandler existingHandler;
7183  if (_body.get()._routes.getRoute(subId, existingHandler))
7184  {
7185  // we found an existing handler.
7186  _body.get().executeAsync(command_, existingHandler, false);
7187  return id; // empty string indicates existing
7188  }
7189  }
7190  }
7191  id = _body.get().executeAsync(command_, handler_, false);
7192  }
7193  catch (const DisconnectedException&)
7194  {
7195  removeMessageHandler(command_.getMessage().getCommandId());
7196  if (command_.isSubscribe())
7197  {
7198  removeMessageHandler(command_.getMessage().getSubscriptionId());
7199  }
7200  if (command_.isSow())
7201  {
7202  removeMessageHandler(command_.getMessage().getQueryID());
7203  }
7204  throw;
7205  }
7206  return id;
7207  }
7208 
7221  MessageStream execute(Command& command_);
7222 
7231  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7232  {
7233  _body.get().ack(topic_, bookmark_, options_);
7234  }
7235 
7243  void ack(Message& message_, const char* options_ = NULL)
7244  {
7245  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7246  }
7255  void ack(const std::string& topic_, const std::string& bookmark_,
7256  const char* options_ = NULL)
7257  {
7258  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7259  }
7260 
7266  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7267  {
7268  _body.get()._ack(topic_, bookmark_, options_);
7269  }
7279  void flushAcks(void)
7280  {
7281  _body.get().flushAcks();
7282  }
7283 
7288  bool getAutoAck(void) const
7289  {
7290  return _body.get().getAutoAck();
7291  }
7298  void setAutoAck(bool isAutoAckEnabled_)
7299  {
7300  _body.get().setAutoAck(isAutoAckEnabled_);
7301  }
7306  unsigned getAckBatchSize(void) const
7307  {
7308  return _body.get().getAckBatchSize();
7309  }
7316  void setAckBatchSize(const unsigned ackBatchSize_)
7317  {
7318  _body.get().setAckBatchSize(ackBatchSize_);
7319  }
7320 
7327  int getAckTimeout(void) const
7328  {
7329  return _body.get().getAckTimeout();
7330  }
7339  void setAckTimeout(const int ackTimeout_)
7340  {
7341  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7342  {
7343  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7344  }
7345  _body.get().setAckTimeout(ackTimeout_);
7346  }
7347 
7348 
7357  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7358  {
7359  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7360  }
7361 
7366  bool getRetryOnDisconnect(void) const
7367  {
7368  return _body.get().getRetryOnDisconnect();
7369  }
7370 
7375  void setDefaultMaxDepth(unsigned maxDepth_)
7376  {
7377  _body.get().setDefaultMaxDepth(maxDepth_);
7378  }
7379 
7384  unsigned getDefaultMaxDepth(void) const
7385  {
7386  return _body.get().getDefaultMaxDepth();
7387  }
7388 
7396  void* userData_)
7397  {
7398  return _body.get().setTransportFilterFunction(filter_, userData_);
7399  }
7400 
7410  void* userData_)
7411  {
7412  return _body.get().setThreadCreatedCallback(callback_, userData_);
7413  }
7414 
7420  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7421  {
7422  _body.get().deferredExecution(func_, userData_);
7423  }
7427  };
7428 
7429  inline void
7430  ClientImpl::lastChance(AMPS::Message& message)
7431  {
7432  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7433  }
7434 
7435  inline unsigned
7436  ClientImpl::persistedAck(AMPS::Message& message)
7437  {
7438  unsigned deliveries = 0;
7439  try
7440  {
7441  /*
7442  * Best Practice: If you don't care about the dupe acks that
7443  * occur during failover or rapid disconnect/reconnect, then just
7444  * ignore them. We could discard each duplicate from the
7445  * persisted store, but the storage costs of doing 1 record
7446  * discards is heavy. In most scenarios we'll just quickly blow
7447  * through the duplicates and get back to processing the
7448  * non-dupes.
7449  */
7450  const char* data = NULL;
7451  size_t len = 0;
7452  const char* status = NULL;
7453  size_t statusLen = 0;
7454  amps_handle messageHandle = message.getMessage();
7455  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7456  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7457  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7458  if (len == NotEntitled || len == Duplicate ||
7459  (statusLen == Failure && status[0] == 'f'))
7460  {
7461  if (_failedWriteHandler)
7462  {
7463  if (_publishStore.isValid())
7464  {
7465  amps_uint64_t sequence =
7466  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7467  FailedWriteStoreReplayer replayer(this, data, len);
7468  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7469  replayer, sequence));
7470  }
7471  else // Call the handler with what little we have
7472  {
7473  static Message emptyMessage;
7474  emptyMessage.setSequence(message.getSequence());
7475  AMPS_CALL_EXCEPTION_WRAPPER(
7476  _failedWriteHandler->failedWrite(emptyMessage,
7477  data, len));
7478  }
7479  ++deliveries;
7480  }
7481  }
7482  if (_publishStore.isValid())
7483  {
7484  // Ack for publisher will have sequence while
7485  // ack for bookmark subscribe won't
7486  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7487  AMPS_Sequence);
7488  if (seq > 0)
7489  {
7490  ++deliveries;
7491  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7492  }
7493  }
7494 
7495  if (!deliveries && _bookmarkStore.isValid())
7496  {
7497  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7498  &data, &len);
7499  if (len > 0)
7500  {
7501  Message::Field subId(data, len);
7502  const char* bookmarkData = NULL;
7503  size_t bookmarkLen = 0;
7504  amps_message_get_field_value(messageHandle,
7505  AMPS_Bookmark,
7506  &bookmarkData,
7507  &bookmarkLen);
7508  // Everything is there and not unsubscribed AC-912
7509  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7510  {
7511  ++deliveries;
7512  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7513  }
7514  }
7515  }
7516  }
7517  catch (std::exception& ex)
7518  {
7519  AMPS_UNHANDLED_EXCEPTION(ex);
7520  }
7521  return deliveries;
7522  }
7523 
7524  inline unsigned
7525  ClientImpl::processedAck(Message& message)
7526  {
7527  unsigned deliveries = 0;
7528  AckResponse ack;
7529  const char* data = NULL;
7530  size_t len = 0;
7531  amps_handle messageHandle = message.getMessage();
7532  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7533  Lock<Mutex> l(_lock);
7534  if (data && len)
7535  {
7536  Lock<Mutex> guard(_ackMapLock);
7537  AckMap::iterator i = _ackMap.find(std::string(data, len));
7538  if (i != _ackMap.end())
7539  {
7540  ++deliveries;
7541  ack = i->second;
7542  _ackMap.erase(i);
7543  }
7544  }
7545  if (deliveries)
7546  {
7547  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7548  ack.setStatus(data, len);
7549  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7550  ack.setReason(data, len);
7551  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7552  ack.setUsername(data, len);
7553  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7554  ack.setPassword(data, len);
7555  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7556  ack.setServerVersion(data, len);
7557  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7558  ack.setOptions(data, len);
7559  // This sets bookmark, nameHashValue, and sequenceNo
7560  ack.setBookmark(message.getBookmark());
7561  ack.setResponded();
7562  _lock.signalAll();
7563  }
7564  return deliveries;
7565  }
7566 
7567  inline void
7568  ClientImpl::checkAndSendHeartbeat(bool force)
7569  {
7570  if (force || _heartbeatTimer.check())
7571  {
7572  _heartbeatTimer.start();
7573  try
7574  {
7575  sendWithoutRetry(_beatMessage);
7576  }
7577  catch (const AMPSException&)
7578  {
7579  ;
7580  }
7581  }
7582  }
7583 
7584  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7585  {
7586  ConnectionInfo info;
7587  std::ostringstream writer;
7588 
7589  info["client.uri"] = _lastUri;
7590  info["client.name"] = _name;
7591  info["client.username"] = _username;
7592  if (_publishStore.isValid())
7593  {
7594  writer << _publishStore.unpersistedCount();
7595  info["publishStore.unpersistedCount"] = writer.str();
7596  writer.clear();
7597  writer.str("");
7598  }
7599 
7600  return info;
7601  }
7602 
7603  inline amps_result
7604  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7605  {
7606  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7607  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7608  ClientImpl* me = (ClientImpl*) userData_;
7609  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7610  if (!messageHandle_)
7611  {
7612  if (me->_queueAckTimeout)
7613  {
7614  me->checkQueueAcks();
7615  }
7616  return AMPS_E_OK;
7617  }
7618 
7619  me->_readMessage.replace(messageHandle_);
7620  Message& message = me->_readMessage;
7621  Message::Command::Type commandType = message.getCommandEnum();
7622  if (commandType & SOWMask)
7623  {
7624 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7625  // A small cheat here to get the right handler, using knowledge of the
7626  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7627  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7628  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7629  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7630 #endif
7631  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7632  message.getQueryID()));
7633  }
7634  else if (commandType & PublishMask)
7635  {
7636 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7637  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7638  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7639  GlobalCommandTypeHandlers::Publish :
7640  GlobalCommandTypeHandlers::OOF)].invoke(message));
7641 #endif
7642  const char* subIds = NULL;
7643  size_t subIdsLen = 0;
7644  // Publish command, send to subscriptions
7645  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7646  &subIds, &subIdsLen);
7647  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7648  for (size_t i = 0; i < subIdCount; ++i)
7649  {
7650  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7651  MessageHandler& handler = lookupResult.handler;
7652  if (handler.isValid())
7653  {
7654  amps_message_set_field_value(messageHandle_,
7655  AMPS_SubscriptionId,
7656  subIds + lookupResult.idOffset,
7657  lookupResult.idLength);
7658  Message::Field bookmark = message.getBookmark();
7659  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7660  bool isAutoAck = me->_isAutoAckEnabled;
7661 
7662  if (!isMessageQueue && !bookmark.empty() &&
7663  me->_bookmarkStore.isValid())
7664  {
7665  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7666  {
7667  //Call duplicate message handler in handlers map
7668  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7669  {
7670  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7671  }
7672  }
7673  else
7674  {
7675  me->_bookmarkStore.log(me->_readMessage);
7676  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7677  handler.invoke(message));
7678  }
7679  }
7680  else
7681  {
7682  if (isMessageQueue && isAutoAck)
7683  {
7684  try
7685  {
7686  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7687  if (!message.getIgnoreAutoAck())
7688  {
7689  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7690  me->_ack(message.getTopic(), message.getBookmark()));
7691  }
7692  }
7693  catch (std::exception& ex)
7694  {
7695  if (!message.getIgnoreAutoAck())
7696  {
7697  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7698  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7699  }
7700  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7701  }
7702  }
7703  else
7704  {
7705  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7706  handler.invoke(message));
7707  }
7708  }
7709  }
7710  else
7711  {
7712  me->lastChance(message);
7713  }
7714  } // for (subidsEnd)
7715  }
7716  else if (commandType == Message::Command::Ack)
7717  {
7718  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7719  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7720  unsigned ackType = message.getAckTypeEnum();
7721  unsigned deliveries = 0U;
7722  switch (ackType)
7723  {
7724  case Message::AckType::Persisted:
7725  deliveries += me->persistedAck(message);
7726  break;
7727  case Message::AckType::Processed: // processed
7728  deliveries += me->processedAck(message);
7729  break;
7730  }
7731  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7732  if (deliveries == 0)
7733  {
7734  me->lastChance(message);
7735  }
7736  }
7737  else if (commandType == Message::Command::Heartbeat)
7738  {
7739  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7740  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7741  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7742  {
7743  me->checkAndSendHeartbeat(true);
7744  }
7745  else
7746  {
7747  me->lastChance(message);
7748  }
7749  return AMPS_E_OK;
7750  }
7751  else if (!message.getCommandId().empty())
7752  {
7753  unsigned deliveries = 0U;
7754  try
7755  {
7756  while (me->_connected) // Keep sending heartbeats when stream is full
7757  {
7758  try
7759  {
7760  deliveries = me->_routes.deliverData(message, message.getCommandId());
7761  break;
7762  }
7763 #ifdef _WIN32
7764  catch (MessageStreamFullException&)
7765 #else
7766  catch (MessageStreamFullException& ex_)
7767 #endif
7768  {
7769  me->checkAndSendHeartbeat(false);
7770  }
7771  }
7772  }
7773  catch (std::exception& ex_)
7774  {
7775  try
7776  {
7777  me->_exceptionListener->exceptionThrown(ex_);
7778  }
7779  catch (...)
7780  {
7781  ;
7782  }
7783  }
7784  if (deliveries == 0)
7785  {
7786  me->lastChance(message);
7787  }
7788  }
7789  me->checkAndSendHeartbeat();
7790  return AMPS_E_OK;
7791  }
7792 
7793  inline void
7794  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7795  {
7796  ClientImpl* me = (ClientImpl*) userData;
7797  //Client wrapper(me);
7798  // Go ahead and signal any waiters if they are around...
7799  me->clearAcks(failedConnectionVersion);
7800  }
7801 
7802  inline amps_result
7803  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7804  {
7805  ClientImpl* me = (ClientImpl*) userData;
7806  Lock<Mutex> l(me->_lock);
7807  Client wrapper(me, false);
7808  if (me->_connected)
7809  {
7810  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7811  }
7812  while (true)
7813  {
7814  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7815  try
7816  {
7817  me->_connected = false;
7818  {
7819  // Have to release the lock here or receive thread can't
7820  // invoke the message handler.
7821  Unlock<Mutex> unlock(me->_lock);
7822  me->_disconnectHandler.invoke(wrapper);
7823  }
7824  }
7825  catch (const std::exception& ex)
7826  {
7827  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7828  }
7829  me->_lock.signalAll();
7830 
7831  if (!me->_connected)
7832  {
7833  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7834  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
7835  return AMPS_E_DISCONNECTED;
7836  }
7837  try
7838  {
7839  // Resubscribe
7840  if (me->_subscriptionManager)
7841  {
7842  {
7843  // Have to release the lock here or receive thread can't
7844  // invoke the message handler.
7845  Unlock<Mutex> unlock(me->_lock);
7846  me->_subscriptionManager->resubscribe(wrapper);
7847  }
7848  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7849  }
7850  return AMPS_E_OK;
7851  }
7852  catch (const AMPSException& subEx)
7853  {
7854  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7855  }
7856  catch (const std::exception& subEx)
7857  {
7858  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7859  return AMPS_E_RETRY;
7860  }
7861  catch (...)
7862  {
7863  return AMPS_E_RETRY;
7864  }
7865  }
7866  return AMPS_E_RETRY;
7867  }
7868 
7869  class FIX
7870  {
7871  const char* _data;
7872  size_t _len;
7873  char _fieldSep;
7874  public:
7875  class iterator
7876  {
7877  const char* _data;
7878  size_t _len;
7879  size_t _pos;
7880  char _fieldSep;
7881  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
7882  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7883  {
7884  while (_pos != _len && _data[_pos] == _fieldSep)
7885  {
7886  ++_pos;
7887  }
7888  }
7889  public:
7890  typedef void* difference_type;
7891  typedef std::forward_iterator_tag iterator_category;
7892  typedef std::pair<Message::Field, Message::Field> value_type;
7893  typedef value_type* pointer;
7894  typedef value_type& reference;
7895  bool operator==(const iterator& rhs) const
7896  {
7897  return _pos == rhs._pos;
7898  }
7899  bool operator!=(const iterator& rhs) const
7900  {
7901  return _pos != rhs._pos;
7902  }
7903  iterator& operator++()
7904  {
7905  // Skip through the data
7906  while (_pos != _len && _data[_pos] != _fieldSep)
7907  {
7908  ++_pos;
7909  }
7910  // Skip through any field separators
7911  while (_pos != _len && _data[_pos] == _fieldSep)
7912  {
7913  ++_pos;
7914  }
7915  return *this;
7916  }
7917 
7918  value_type operator*() const
7919  {
7920  value_type result;
7921  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7922  for (; i < _len && _data[i] != '='; ++i)
7923  {
7924  ++keyLength;
7925  }
7926 
7927  result.first.assign(_data + _pos, keyLength);
7928 
7929  if (i < _len && _data[i] == '=')
7930  {
7931  ++i;
7932  valueStart = i;
7933  for (; i < _len && _data[i] != _fieldSep; ++i)
7934  {
7935  valueLength++;
7936  }
7937  }
7938  result.second.assign(_data + valueStart, valueLength);
7939  return result;
7940  }
7941 
7942  friend class FIX;
7943  };
7944  class reverse_iterator
7945  {
7946  const char* _data;
7947  size_t _len;
7948  const char* _pos;
7949  char _fieldSep;
7950  public:
7951  typedef std::pair<Message::Field, Message::Field> value_type;
7952  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7953  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7954  {
7955  if (_pos)
7956  {
7957  // skip past meaningless trailing fieldseps
7958  while (_pos >= _data && *_pos == _fieldSep)
7959  {
7960  --_pos;
7961  }
7962  while (_pos > _data && *_pos != _fieldSep)
7963  {
7964  --_pos;
7965  }
7966  // if we stopped before the 0th character, it's because
7967  // it's a field sep. advance one to point to the first character
7968  // of a key.
7969  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7970  {
7971  ++_pos;
7972  }
7973  if (_pos < _data)
7974  {
7975  _pos = 0;
7976  }
7977  }
7978  }
7979  bool operator==(const reverse_iterator& rhs) const
7980  {
7981  return _pos == rhs._pos;
7982  }
7983  bool operator!=(const reverse_iterator& rhs) const
7984  {
7985  return _pos != rhs._pos;
7986  }
7987  reverse_iterator& operator++()
7988  {
7989  if (_pos == _data)
7990  {
7991  _pos = 0;
7992  }
7993  else
7994  {
7995  // back up 1 to a field separator
7996  --_pos;
7997  // keep backing up through field separators
7998  while (_pos >= _data && *_pos == _fieldSep)
7999  {
8000  --_pos;
8001  }
8002  // now back up to the beginning of this field
8003  while (_pos > _data && *_pos != _fieldSep)
8004  {
8005  --_pos;
8006  }
8007  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8008  {
8009  ++_pos;
8010  }
8011  if (_pos < _data)
8012  {
8013  _pos = 0;
8014  }
8015  }
8016  return *this;
8017  }
8018  value_type operator*() const
8019  {
8020  value_type result;
8021  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8022  size_t i = (size_t)(_pos - _data);
8023  for (; i < _len && _data[i] != '='; ++i)
8024  {
8025  ++keyLength;
8026  }
8027  result.first.assign(_pos, keyLength);
8028  if (i < _len && _data[i] == '=')
8029  {
8030  ++i;
8031  valueStart = i;
8032  for (; i < _len && _data[i] != _fieldSep; ++i)
8033  {
8034  valueLength++;
8035  }
8036  }
8037  result.second.assign(_data + valueStart, valueLength);
8038  return result;
8039  }
8040  };
8041  FIX(const Message::Field& data, char fieldSeparator = 1)
8042  : _data(data.data()), _len(data.len()),
8043  _fieldSep(fieldSeparator)
8044  {
8045  }
8046 
8047  FIX(const char* data, size_t len, char fieldSeparator = 1)
8048  : _data(data), _len(len), _fieldSep(fieldSeparator)
8049  {
8050  }
8051 
8052  iterator begin() const
8053  {
8054  return iterator(_data, _len, 0, _fieldSep);
8055  }
8056  iterator end() const
8057  {
8058  return iterator(_data, _len, _len, _fieldSep);
8059  }
8060 
8061 
8062  reverse_iterator rbegin() const
8063  {
8064  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8065  }
8066 
8067  reverse_iterator rend() const
8068  {
8069  return reverse_iterator(_data, _len, 0, _fieldSep);
8070  }
8071  };
8072 
8073 
8086 
8087  template <class T>
8089  {
8090  std::stringstream _data;
8091  char _fs;
8092  public:
8098  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8099 
8107  void append(const T& tag, const char* value, size_t offset, size_t length)
8108  {
8109  _data << tag << '=';
8110  _data.write(value + offset, (std::streamsize)length);
8111  _data << _fs;
8112  }
8118  void append(const T& tag, const std::string& value)
8119  {
8120  _data << tag << '=' << value << _fs;
8121  }
8122 
8125  std::string getString() const
8126  {
8127  return _data.str();
8128  }
8129  operator std::string() const
8130  {
8131  return _data.str();
8132  }
8133 
8135  void reset()
8136  {
8137  _data.str(std::string());
8138  }
8139  };
8140 
8144 
8146 
8150 
8152 
8153 
8161 
8163  {
8164  char _fs;
8165  public:
8170  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8171 
8174  typedef std::map<Message::Field, Message::Field> map_type;
8175 
8181  map_type toMap(const Message::Field& data)
8182  {
8183  FIX fix(data, _fs);
8184  map_type retval;
8185  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8186  {
8187  retval.insert(*a);
8188  }
8189 
8190  return retval;
8191  }
8192  };
8193 
8194 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8195  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8196  {
8197  Mutex _lock;
8198  std::deque<Message> _q;
8199  std::deque<Message> _cache;
8200  std::string _commandId;
8201  std::string _subId;
8202  std::string _queryId;
8203  Client _client;
8204  unsigned _timeout;
8205  unsigned _maxDepth;
8206  unsigned _requestedAcks;
8207  size_t _cacheMax;
8208  Message::Field _previousTopic;
8209  Message::Field _previousBookmark;
8210  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8211 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8212  std::atomic<State> _state;
8213 #else
8214  volatile State _state;
8215 #endif
8216  typedef std::map<std::string, Message*> SOWKeyMap;
8217  SOWKeyMap _sowKeyMap;
8218  public:
8219  MessageStreamImpl(const Client& client_)
8220  : _client(client_),
8221  _timeout(0),
8222  _maxDepth((unsigned)~0),
8223  _requestedAcks(0),
8224  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8225  _state(Unset)
8226  {
8227  if (_client.isValid())
8228  {
8229  _client.addConnectionStateListener(this);
8230  }
8231  }
8232 
8233  MessageStreamImpl(ClientImpl* client_)
8234  : _client(client_),
8235  _timeout(0),
8236  _maxDepth((unsigned)~0),
8237  _requestedAcks(0),
8238  _state(Unset)
8239  {
8240  if (_client.isValid())
8241  {
8242  _client.addConnectionStateListener(this);
8243  }
8244  }
8245 
8246  ~MessageStreamImpl()
8247  {
8248  }
8249 
8250  virtual void destroy()
8251  {
8252  try
8253  {
8254  close();
8255  }
8256  catch (std::exception& e)
8257  {
8258  try
8259  {
8260  if (_client.isValid())
8261  {
8262  _client.getExceptionListener().exceptionThrown(e);
8263  }
8264  }
8265  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8266  }
8267  if (_client.isValid())
8268  {
8269  _client.removeConnectionStateListener(this);
8270  Client c = _client;
8271  _client = Client((ClientImpl*)NULL);
8272  c.deferredExecution(MessageStreamImpl::destroyer, this);
8273  }
8274  else
8275  {
8276  delete this;
8277  }
8278  }
8279 
8280  static void destroyer(void* vpMessageStreamImpl_)
8281  {
8282  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8283  }
8284 
8285  void setSubscription(const std::string& subId_,
8286  const std::string& commandId_ = "",
8287  const std::string& queryId_ = "")
8288  {
8289  Lock<Mutex> lock(_lock);
8290  _subId = subId_;
8291  if (!commandId_.empty() && commandId_ != subId_)
8292  {
8293  _commandId = commandId_;
8294  }
8295  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8296  {
8297  _queryId = queryId_;
8298  }
8299  // It's possible to disconnect between creation/registration and here.
8300  if (Disconnected == _state)
8301  {
8302  return;
8303  }
8304  assert(Unset == _state);
8305  _state = Subscribe;
8306  }
8307 
8308  void setSOWOnly(const std::string& commandId_,
8309  const std::string& queryId_ = "")
8310  {
8311  Lock<Mutex> lock(_lock);
8312  _commandId = commandId_;
8313  if (!queryId_.empty() && queryId_ != commandId_)
8314  {
8315  _queryId = queryId_;
8316  }
8317  // It's possible to disconnect between creation/registration and here.
8318  if (Disconnected == _state)
8319  {
8320  return;
8321  }
8322  assert(Unset == _state);
8323  _state = SOWOnly;
8324  }
8325 
8326  void setStatsOnly(const std::string& commandId_,
8327  const std::string& queryId_ = "")
8328  {
8329  Lock<Mutex> lock(_lock);
8330  _commandId = commandId_;
8331  if (!queryId_.empty() && queryId_ != commandId_)
8332  {
8333  _queryId = queryId_;
8334  }
8335  // It's possible to disconnect between creation/registration and here.
8336  if (Disconnected == _state)
8337  {
8338  return;
8339  }
8340  assert(Unset == _state);
8341  _state = AcksOnly;
8342  _requestedAcks = Message::AckType::Stats;
8343  }
8344 
8345  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8346  {
8347  Lock<Mutex> lock(_lock);
8348  _commandId = commandId_;
8349  // It's possible to disconnect between creation/registration and here.
8350  if (Disconnected == _state)
8351  {
8352  return;
8353  }
8354  assert(Unset == _state);
8355  _state = AcksOnly;
8356  _requestedAcks = acks_;
8357  }
8358 
8359  void connectionStateChanged(ConnectionStateListener::State state_)
8360  {
8361  Lock<Mutex> lock(_lock);
8362  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8363  {
8364  _state = Disconnected;
8365  close();
8366  }
8367  _lock.signalAll();
8368  }
8369 
8370  void timeout(unsigned timeout_)
8371  {
8372  _timeout = timeout_;
8373  }
8374  void conflate(void)
8375  {
8376  if (_state == Subscribe)
8377  {
8378  _state = Conflate;
8379  }
8380  }
8381  void maxDepth(unsigned maxDepth_)
8382  {
8383  if (maxDepth_)
8384  {
8385  _maxDepth = maxDepth_;
8386  }
8387  else
8388  {
8389  _maxDepth = (unsigned)~0;
8390  }
8391  }
8392  unsigned getMaxDepth(void) const
8393  {
8394  return _maxDepth;
8395  }
8396  unsigned getDepth(void) const
8397  {
8398  return (unsigned)(_q.size());
8399  }
8400 
8401  bool next(Message& current_)
8402  {
8403  Lock<Mutex> lock(_lock);
8404  if (!_previousTopic.empty() && !_previousBookmark.empty())
8405  {
8406  try
8407  {
8408  if (_client.isValid())
8409  {
8410  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8411  }
8412  }
8413 #ifdef _WIN32
8414  catch (AMPSException&)
8415 #else
8416  catch (AMPSException& e)
8417 #endif
8418  {
8419  current_.invalidate();
8420  _previousTopic.clear();
8421  _previousBookmark.clear();
8422  return false;
8423  }
8424  _previousTopic.clear();
8425  _previousBookmark.clear();
8426  }
8427  double minWaitTime = (double)((_timeout && _timeout > 1000)
8428  ? _timeout : 1000);
8429  Timer timer(minWaitTime);
8430  timer.start();
8431  while (_q.empty() && _state & Running)
8432  {
8433  // Using timeout so python can interrupt
8434  _lock.wait((long)minWaitTime);
8435  {
8436  Unlock<Mutex> unlck(_lock);
8437  amps_invoke_waiting_function();
8438  }
8439  if (_timeout)
8440  {
8441  // In case we woke up early, see how much longer to wait
8442  if (timer.checkAndGetRemaining(&minWaitTime))
8443  {
8444  break;
8445  }
8446  }
8447  }
8448  if (current_.isValid() && _cache.size() < _cacheMax)
8449  {
8450  current_.reset();
8451  _cache.push_back(current_);
8452  }
8453  if (!_q.empty())
8454  {
8455  current_ = _q.front();
8456  if (_q.size() == _maxDepth)
8457  {
8458  _lock.signalAll();
8459  }
8460  _q.pop_front();
8461  if (_state == Conflate)
8462  {
8463  std::string sowKey = current_.getSowKey();
8464  if (sowKey.length())
8465  {
8466  _sowKeyMap.erase(sowKey);
8467  }
8468  }
8469  else if (_state == AcksOnly)
8470  {
8471  _requestedAcks &= ~(current_.getAckTypeEnum());
8472  }
8473  if ((_state == AcksOnly && _requestedAcks == 0) ||
8474  (_state == SOWOnly && current_.getCommand() == "group_end"))
8475  {
8476  _state = Closed;
8477  }
8478  else if (current_.getCommandEnum() == Message::Command::Publish &&
8479  _client.isValid() && _client.getAutoAck() &&
8480  !current_.getLeasePeriod().empty() &&
8481  !current_.getBookmark().empty())
8482  {
8483  _previousTopic = current_.getTopic().deepCopy();
8484  _previousBookmark = current_.getBookmark().deepCopy();
8485  }
8486  return true;
8487  }
8488  if (_state == Disconnected)
8489  {
8490  throw DisconnectedException("Connection closed.");
8491  }
8492  current_.invalidate();
8493  if (_state == Closed)
8494  {
8495  return false;
8496  }
8497  return _timeout != 0;
8498  }
8499  void close(void)
8500  {
8501  if (_client.isValid())
8502  {
8503  if (_state == SOWOnly || _state == Subscribe) //not delete
8504  {
8505  if (!_commandId.empty())
8506  {
8507  _client.unsubscribe(_commandId);
8508  }
8509  if (!_subId.empty())
8510  {
8511  _client.unsubscribe(_subId);
8512  }
8513  if (!_queryId.empty())
8514  {
8515  _client.unsubscribe(_queryId);
8516  }
8517  }
8518  else
8519  {
8520  if (!_commandId.empty())
8521  {
8522  _client.removeMessageHandler(_commandId);
8523  }
8524  if (!_subId.empty())
8525  {
8526  _client.removeMessageHandler(_subId);
8527  }
8528  if (!_queryId.empty())
8529  {
8530  _client.removeMessageHandler(_queryId);
8531  }
8532  }
8533  }
8534  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8535  {
8536  _state = Closed;
8537  }
8538  }
8539  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8540  {
8541  Lock<Mutex> lock(this_->_lock);
8542  if (this_->_state != Conflate)
8543  {
8544  AMPS_TESTING_SLOW_MESSAGE_STREAM
8545  if (this_->_q.size() >= this_->_maxDepth)
8546  {
8547  // We throw here so that heartbeats can be sent. The exception
8548  // will be handled internally only, and the same Message will
8549  // come back to try again. Make sure to signal.
8550  this_->_lock.signalAll();
8551  throw MessageStreamFullException("Stream is currently full.");
8552  }
8553  if (!this_->_cache.empty())
8554  {
8555  this_->_cache.front().deepCopy(message_);
8556  this_->_q.push_back(this_->_cache.front());
8557  this_->_cache.pop_front();
8558  }
8559  else
8560  {
8561 #ifdef AMPS_USE_EMPLACE
8562  this_->_q.emplace_back(message_.deepCopy());
8563 #else
8564  this_->_q.push_back(message_.deepCopy());
8565 #endif
8566  }
8567  if (message_.getCommandEnum() == Message::Command::Publish &&
8568  this_->_client.isValid() && this_->_client.getAutoAck() &&
8569  !message_.getLeasePeriod().empty() &&
8570  !message_.getBookmark().empty())
8571  {
8572  message_.setIgnoreAutoAck();
8573  }
8574  }
8575  else
8576  {
8577  std::string sowKey = message_.getSowKey();
8578  if (sowKey.length())
8579  {
8580  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8581  if (it != this_->_sowKeyMap.end())
8582  {
8583  it->second->deepCopy(message_);
8584  }
8585  else
8586  {
8587  if (this_->_q.size() >= this_->_maxDepth)
8588  {
8589  // We throw here so that heartbeats can be sent. The
8590  // exception will be handled internally only, and the
8591  // same Message will come back to try again. Make sure
8592  // to signal.
8593  this_->_lock.signalAll();
8594  throw MessageStreamFullException("Stream is currently full.");
8595  }
8596  if (!this_->_cache.empty())
8597  {
8598  this_->_cache.front().deepCopy(message_);
8599  this_->_q.push_back(this_->_cache.front());
8600  this_->_cache.pop_front();
8601  }
8602  else
8603  {
8604 #ifdef AMPS_USE_EMPLACE
8605  this_->_q.emplace_back(message_.deepCopy());
8606 #else
8607  this_->_q.push_back(message_.deepCopy());
8608 #endif
8609  }
8610  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8611  }
8612  }
8613  else
8614  {
8615  if (this_->_q.size() >= this_->_maxDepth)
8616  {
8617  // We throw here so that heartbeats can be sent. The exception
8618  // will be handled internally only, and the same Message will
8619  // come back to try again. Make sure to signal.
8620  this_->_lock.signalAll();
8621  throw MessageStreamFullException("Stream is currently full.");
8622  }
8623  if (!this_->_cache.empty())
8624  {
8625  this_->_cache.front().deepCopy(message_);
8626  this_->_q.push_back(this_->_cache.front());
8627  this_->_cache.pop_front();
8628  }
8629  else
8630  {
8631 #ifdef AMPS_USE_EMPLACE
8632  this_->_q.emplace_back(message_.deepCopy());
8633 #else
8634  this_->_q.push_back(message_.deepCopy());
8635 #endif
8636  }
8637  if (message_.getCommandEnum() == Message::Command::Publish &&
8638  this_->_client.isValid() && this_->_client.getAutoAck() &&
8639  !message_.getLeasePeriod().empty() &&
8640  !message_.getBookmark().empty())
8641  {
8642  message_.setIgnoreAutoAck();
8643  }
8644  }
8645  }
8646  this_->_lock.signalAll();
8647  }
8648  };
8649  inline MessageStream::MessageStream(void)
8650  {
8651  }
8652  inline MessageStream::MessageStream(const Client& client_)
8653  : _body(new MessageStreamImpl(client_))
8654  {
8655  }
8656  inline void MessageStream::iterator::advance(void)
8657  {
8658  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8659  }
8660  inline MessageStream::operator MessageHandler(void)
8661  {
8662  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8663  }
8664  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8665  {
8666  MessageStream result;
8667  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8668  {
8669  result._body = (MessageStreamImpl*)(handler_._userData);
8670  }
8671  return result;
8672  }
8673 
8674  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8675  const std::string& queryId_)
8676  {
8677  _body->setSOWOnly(commandId_, queryId_);
8678  }
8679  inline void MessageStream::setSubscription(const std::string& subId_,
8680  const std::string& commandId_,
8681  const std::string& queryId_)
8682  {
8683  _body->setSubscription(subId_, commandId_, queryId_);
8684  }
8685  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8686  const std::string& queryId_)
8687  {
8688  _body->setStatsOnly(commandId_, queryId_);
8689  }
8690  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8691  unsigned acks_)
8692  {
8693  _body->setAcksOnly(commandId_, acks_);
8694  }
8695  inline MessageStream MessageStream::timeout(unsigned timeout_)
8696  {
8697  _body->timeout(timeout_);
8698  return *this;
8699  }
8701  {
8702  _body->conflate();
8703  return *this;
8704  }
8705  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8706  {
8707  _body->maxDepth(maxDepth_);
8708  return *this;
8709  }
8710  inline unsigned MessageStream::getMaxDepth(void) const
8711  {
8712  return _body->getMaxDepth();
8713  }
8714  inline unsigned MessageStream::getDepth(void) const
8715  {
8716  return _body->getDepth();
8717  }
8718 
8719  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8720  {
8721  return *(_pEmptyMessageStream.get());
8722  }
8723 
8725  {
8726  // If the command is sow and has a sub_id, OR
8727  // if the command has a replace option, return the existing
8728  // messagestream, don't create a new one.
8729  ClientImpl& body = _body.get();
8730  Message& message = command_.getMessage();
8731  Field subId = message.getSubscriptionId();
8732  unsigned ackTypes = message.getAckTypeEnum();
8733  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8734  if (useExistingHandler)
8735  {
8736  // Try to find the existing message handler.
8737  if (!subId.empty())
8738  {
8739  MessageHandler existingHandler;
8740  if (body._routes.getRoute(subId, existingHandler))
8741  {
8742  // we found an existing handler. It might not be a message stream, but that's okay.
8743  body.executeAsync(command_, existingHandler, false);
8744  return MessageStream::fromExistingHandler(existingHandler);
8745  }
8746  }
8747  // fall through; we'll a new handler altogether.
8748  }
8749  // Make sure something will be returned to the stream or use the empty one
8750  // Check that: it's a command that doesn't normally return data, and there
8751  // are no acks requested for the cmd id
8752  Message::Command::Type command = message.getCommandEnum();
8753  if ((command & Message::Command::NoDataCommands)
8754  && (ackTypes == Message::AckType::Persisted
8755  || ackTypes == Message::AckType::None))
8756  {
8757  executeAsync(command_, MessageHandler());
8758  if (!body._pEmptyMessageStream)
8759  {
8760  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
8761  body._pEmptyMessageStream.get()->_body->close();
8762  }
8763  return body.getEmptyMessageStream();
8764  }
8765  MessageStream stream(*this);
8766  if (body.getDefaultMaxDepth())
8767  {
8768  stream.maxDepth(body.getDefaultMaxDepth());
8769  }
8770  MessageHandler handler = stream.operator MessageHandler();
8771  std::string commandID = body.executeAsync(command_, handler, false);
8772  if (command_.hasStatsAck())
8773  {
8774  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
8775  }
8776  else if (command_.isSow())
8777  {
8778  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
8779  }
8780  else if (command_.isSubscribe())
8781  {
8782  stream.setSubscription(commandID,
8783  command_.getMessage().getCommandId(),
8784  command_.getMessage().getQueryId());
8785  }
8786  else
8787  {
8788  // Persisted acks for writes don't come back with command id
8789  if (command == Message::Command::Publish ||
8790  command == Message::Command::DeltaPublish ||
8791  command == Message::Command::SOWDelete)
8792  {
8793  stream.setAcksOnly(commandID,
8794  ackTypes & (unsigned)~Message::AckType::Persisted);
8795  }
8796  else
8797  {
8798  stream.setAcksOnly(commandID, ackTypes);
8799  }
8800  }
8801  return stream;
8802  }
8803 
8804 // This is here because it uses api from Client.
8805  inline void Message::ack(const char* options_) const
8806  {
8807  ClientImpl* pClient = _body.get().clientImpl();
8808  Message::Field bookmark = getBookmark();
8809  if (pClient && bookmark.len() &&
8810  !pClient->getAutoAck())
8811  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
8812  {
8813  pClient->ack(getTopic(), bookmark, options_);
8814  }
8815  }
8816 }// end namespace AMPS
8817 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:717
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:668
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:183
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5092
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6753
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6727
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:915
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5304
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8088
void startTimer()
Definition: ampsplusplus.hpp:6716
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6258
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1059
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8700
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5332
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:529
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:728
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7007
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:893
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7375
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5153
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5209
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6047
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:758
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1031
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:642
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7384
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7243
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5998
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5428
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1308
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5693
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5224
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:694
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5444
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7098
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7327
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5195
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6780
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:537
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1279
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8125
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7395
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5580
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4996
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:629
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7026
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:871
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7036
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5280
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5970
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1122
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7357
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8710
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5255
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1423
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6452
Success.
Definition: amps.h:205
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1005
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5670
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8170
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6094
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:195
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5293
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4989
amps_result
Return values from amps_xxx functions.
Definition: amps.h:200
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5481
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8724
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:626
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5436
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7409
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:920
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5184
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1467
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:798
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5742
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1212
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:661
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5145
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1067
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6406
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5075
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6982
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:687
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7017
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7090
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:552
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:820
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7105
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5529
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:635
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5392
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5805
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:544
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6893
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1022
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1463
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6815
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:766
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1233
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1017
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5910
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7046
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:561
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:779
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5761
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:999
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5164
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8098
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7255
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1012
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1053
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6296
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1153
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8107
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1293
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5384
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1250
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7316
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1343
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6691
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1451
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5376
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:674
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6886
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:569
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5404
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6949
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8118
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7306
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5647
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6383
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:851
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4951
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6629
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1242
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:806
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5626
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7135
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:594
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6588
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5780
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6059
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7298
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5928
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1201
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6552
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:835
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6873
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1325
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5459
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:707
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5354
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8162
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5363
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7231
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1401
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8174
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:655
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5346
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1185
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8181
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:236
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6485
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6943
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5863
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4943
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1317
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:772
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7068
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:700
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:785
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7079
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:652
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1271
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:648
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8705
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5262
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6132
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8714
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:229
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5171
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6919
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:826
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5231
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:857
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1221
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7057
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:681
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6860
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8135
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5473
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6507
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7339
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:611
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7288
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5504
Definition: ampsplusplus.hpp:106
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5130
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:968
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1263
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1373
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1129
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:751
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6153
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5553
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6956
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5719
The client and server are disconnected.
Definition: amps.h:233
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6836
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5892
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8695
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6194
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5137
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:441
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6344
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5831
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6019
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:581
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7169
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:739
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6668
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5007
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7366
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7279
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6226