AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.3
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43  #include <inttypes.h>
44 #endif
45 #if defined(sun)
46  #include <sys/atomic.h>
47 #endif
48 #include "amps/BookmarkStore.hpp"
49 #include "amps/MessageRouter.hpp"
50 #include "amps/util.hpp"
51 #include "amps/ampscrc.hpp"
52 #if __cplusplus >= 201100L || _MSC_VER >= 1900
53  #include <atomic>
54 #endif
55 
56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
57  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
58 #endif
59 
64 
65 
72 
83 
84 // For StoreBuffer implementations
85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
91 #define AMPS_DEFAULT_TOP_N -1
92 #define AMPS_DEFAULT_BATCH_SIZE 10
93 #define AMPS_NUMBER_BUFFER_LEN 20
94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
95 
96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
97  #define AMPS_X64 1
98 #endif
99 
100 static thread_local AMPS::Message publishStoreMessage;
101 
102 namespace AMPS
103 {
104 
105  typedef std::map<std::string, std::string> ConnectionInfo;
106 
107  template<class Type>
108  inline std::string asString(Type x_)
109  {
110  std::ostringstream os;
111  os << x_;
112  return os.str();
113  }
114 
115  inline
116  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
117  {
118  size_t pos = AMPS_NUMBER_BUFFER_LEN;
119  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
120  {
121  if (seqNo_ > 0)
122  {
123  buf_[--pos] = (char)(seqNo_ % 10 + '0');
124  seqNo_ /= 10;
125  }
126  }
127  return pos;
128  }
129 
130 #ifdef _WIN32
131  inline
132  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
133  {
134  size_t pos = AMPS_NUMBER_BUFFER_LEN;
135  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
136  {
137  if (seqNo_ > 0)
138  {
139  buf_[--pos] = (char)(seqNo_ % 10 + '0');
140  seqNo_ /= 10;
141  }
142  }
143  return pos;
144  }
145 #endif
146 
150  class Reason
151  {
152  public:
153  static const char* duplicate()
154  {
155  return "duplicate";
156  }
157  static const char* badFilter()
158  {
159  return "bad filter";
160  }
161  static const char* badRegexTopic()
162  {
163  return "bad regex topic";
164  }
165  static const char* subscriptionAlreadyExists()
166  {
167  return "subscription already exists";
168  }
169  static const char* nameInUse()
170  {
171  return "name in use";
172  }
173  static const char* authFailure()
174  {
175  return "auth failure";
176  }
177  static const char* notEntitled()
178  {
179  return "not entitled";
180  }
181  static const char* authDisabled()
182  {
183  return "authentication disabled";
184  }
185  static const char* subidInUse()
186  {
187  return "subid in use";
188  }
189  static const char* noTopic()
190  {
191  return "no topic";
192  }
193  };
194 
204  {
205  public:
206  virtual ~ExceptionListener() {;}
207  virtual void exceptionThrown(const std::exception&) const {;}
208  };
209 
211 
212 
213 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
214  try\
215  {\
216  x;\
217  }\
218  catch (std::exception& ex_)\
219  {\
220  try\
221  {\
222  _exceptionListener->exceptionThrown(ex_);\
223  }\
224  catch(...)\
225  {\
226  ;\
227  }\
228  }
229  /*
230  * Note : we don't attempt to trap non std::exception exceptions
231  * here because doing so interferes with pthread_exit on some OSes.
232  catch (...)\
233  {\
234  try\
235  {\
236  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
237  "An unhandled exception of unknown type was thrown by "\
238  "the registered handler.", AMPS_E_USAGE));\
239  }\
240  catch(...)\
241  {\
242  ;\
243  }\
244  }
245  */
246 #ifdef _WIN32
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
248  try\
249  {\
250  while(me->_connected)\
251  {\
252  try\
253  {\
254  x;\
255  break;\
256  }\
257  catch(MessageStreamFullException&)\
258  {\
259  try\
260  {\
261  me->checkAndSendHeartbeat(false);\
262  }\
263  catch (std::exception& ex_)\
264  {\
265  try\
266  {\
267  me->_exceptionListener->exceptionThrown(ex_);\
268  }\
269  catch(...)\
270  {\
271  ;\
272  }\
273  break;\
274  }\
275  }\
276  }\
277  }\
278  catch (std::exception& ex_)\
279  {\
280  try\
281  {\
282  me->_exceptionListener->exceptionThrown(ex_);\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }
289  /*
290  * Note : we don't attempt to trap non std::exception exceptions
291  * here because doing so interferes with pthread_exit on some OSes.
292  catch (...)\
293  {\
294  try\
295  {\
296  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
297  "An unhandled exception of unknown type was thrown by "\
298  "the registered handler.", AMPS_E_USAGE));\
299  }\
300  catch(...)\
301  {\
302  ;\
303  }\
304  }*/
305 
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException&)\
315  {\
316  try\
317  {\
318  me->checkAndSendHeartbeat(false);\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  break;\
331  }\
332  }\
333  }
334 #else
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
336  try\
337  {\
338  while(me->_connected)\
339  {\
340  try\
341  {\
342  x;\
343  break;\
344  }\
345  catch(MessageStreamFullException& ex_)\
346  {\
347  try\
348  {\
349  me->checkAndSendHeartbeat(false);\
350  }\
351  catch (std::exception& ex_)\
352  {\
353  try\
354  {\
355  me->_exceptionListener->exceptionThrown(ex_);\
356  }\
357  catch(...)\
358  {\
359  ;\
360  }\
361  break;\
362  }\
363  }\
364  }\
365  }\
366  catch (std::exception& ex_)\
367  {\
368  try\
369  {\
370  me->_exceptionListener->exceptionThrown(ex_);\
371  }\
372  catch(...)\
373  {\
374  ;\
375  }\
376  }
377  /*
378  * Note : we don't attempt to trap non std::exception exceptions
379  * here because doing so interferes with pthread_exit on some OSes.
380  catch (...)\
381  {\
382  try\
383  {\
384  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
385  "An unhandled exception of unknown type was thrown by "\
386  "the registered handler.", AMPS_E_USAGE));\
387  }\
388  catch(...)\
389  {\
390  ;\
391  }\
392  }*/
393 
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395  while(me->_connected)\
396  {\
397  try\
398  {\
399  x;\
400  break;\
401  }\
402  catch(MessageStreamFullException& ex_)\
403  {\
404  try\
405  {\
406  me->checkAndSendHeartbeat(false);\
407  }\
408  catch (std::exception& ex_)\
409  {\
410  try\
411  {\
412  me->_exceptionListener->exceptionThrown(ex_);\
413  }\
414  catch(...)\
415  {\
416  ;\
417  }\
418  break;\
419  }\
420  }\
421  }
422 #endif
423 
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
425  try\
426  {\
427  _exceptionListener->exceptionThrown(ex);\
428  }\
429  catch(...)\
430  {;}
431 
432 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
433  try\
434  {\
435  me->_exceptionListener->exceptionThrown(ex);\
436  }\
437  catch(...)\
438  {;}
439 
440 
441  class Client;
442 
467 
468  class Command
469  {
470  Message _message;
471  unsigned _timeout;
472  unsigned _batchSize;
473  unsigned _flags;
474  static const unsigned Subscribe = 1;
475  static const unsigned SOW = 2;
476  static const unsigned NeedsSequenceNumber = 4;
477  static const unsigned ProcessedAck = 8;
478  static const unsigned StatsAck = 16;
479  void init(Message::Command::Type command_)
480  {
481  _timeout = 0;
482  _batchSize = 0;
483  _flags = 0;
484  _message.reset();
485  _message.setCommandEnum(command_);
486  _setIds();
487  }
488  void init(const std::string& command_)
489  {
490  _timeout = 0;
491  _batchSize = 0;
492  _flags = 0;
493  _message.reset();
494  _message.setCommand(command_);
495  _setIds();
496  }
497  void init(const char* command_, size_t commandLen_)
498  {
499  _timeout = 0;
500  _batchSize = 0;
501  _flags = 0;
502  _message.reset();
503  _message.setCommand(command_, commandLen_);
504  _setIds();
505  }
506  void _setIds(void)
507  {
508  Message::Command::Type command = _message.getCommandEnum();
509  if (!(command & Message::Command::NoDataCommands))
510  {
511  _message.newCommandId();
512  if (command == Message::Command::Subscribe ||
513  command == Message::Command::SOWAndSubscribe ||
514  command == Message::Command::DeltaSubscribe ||
515  command == Message::Command::SOWAndDeltaSubscribe)
516  {
517  _message.setSubscriptionId(_message.getCommandId());
518  _flags |= Subscribe;
519  }
520  if (command == Message::Command::SOW
521  || command == Message::Command::SOWAndSubscribe
522  || command == Message::Command::SOWAndDeltaSubscribe)
523  {
524  _message.setQueryID(_message.getCommandId());
525  if (_batchSize == 0)
526  {
527  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
528  }
529  if (command == Message::Command::SOW)
530  {
531  _flags |= SOW;
532  }
533  }
534  _flags |= ProcessedAck;
535  }
536  else if (command == Message::Command::SOWDelete)
537  {
538  _message.newCommandId();
539  _flags |= ProcessedAck;
540  _flags |= NeedsSequenceNumber;
541  }
542  else if (command == Message::Command::Publish
543  || command == Message::Command::DeltaPublish)
544  {
545  _flags |= NeedsSequenceNumber;
546  }
547  else if (command == Message::Command::StopTimer)
548  {
549  _message.newCommandId();
550  }
551  }
552  public:
556  Command(const std::string& command_)
557  {
558  init(command_);
559  }
564  Command(const char* command_, size_t commandLen_)
565  {
566  init(command_, commandLen_);
567  }
571  Command(Message::Command::Type command_)
572  {
573  init(command_);
574  }
575 
579  Command& reset(const std::string& command_)
580  {
581  init(command_);
582  return *this;
583  }
588  Command& reset(const char* command_, size_t commandLen_)
589  {
590  init(command_, commandLen_);
591  return *this;
592  }
596  Command& reset(Message::Command::Type command_)
597  {
598  init(command_);
599  return *this;
600  }
608  Command& setSowKey(const std::string& sowKey_)
609  {
610  _message.setSowKey(sowKey_);
611  return *this;
612  }
621  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
622  {
623  _message.setSowKey(sowKey_, sowKeyLen_);
624  return *this;
625  }
638  Command& setSowKeys(const std::string& sowKeys_)
639  {
640  _message.setSowKeys(sowKeys_);
641  return *this;
642  }
656  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
657  {
658  _message.setSowKeys(sowKeys_, sowKeysLen_);
659  return *this;
660  }
662  Command& setCommandId(const std::string& cmdId_)
663  {
664  _message.setCommandId(cmdId_);
665  return *this;
666  }
669  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
670  {
671  _message.setCommandId(cmdId_, cmdIdLen_);
672  return *this;
673  }
675  Command& setTopic(const std::string& topic_)
676  {
677  _message.setTopic(topic_);
678  return *this;
679  }
682  Command& setTopic(const char* topic_, size_t topicLen_)
683  {
684  _message.setTopic(topic_, topicLen_);
685  return *this;
686  }
688  Command& setFilter(const std::string& filter_)
689  {
690  _message.setFilter(filter_);
691  return *this;
692  }
695  Command& setFilter(const char* filter_, size_t filterLen_)
696  {
697  _message.setFilter(filter_, filterLen_);
698  return *this;
699  }
701  Command& setOrderBy(const std::string& orderBy_)
702  {
703  _message.setOrderBy(orderBy_);
704  return *this;
705  }
708  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
709  {
710  _message.setOrderBy(orderBy_, orderByLen_);
711  return *this;
712  }
714  Command& setSubId(const std::string& subId_)
715  {
716  _message.setSubscriptionId(subId_);
717  return *this;
718  }
721  Command& setSubId(const char* subId_, size_t subIdLen_)
722  {
723  _message.setSubscriptionId(subId_, subIdLen_);
724  return *this;
725  }
727  Command& setQueryId(const std::string& queryId_)
728  {
729  _message.setQueryId(queryId_);
730  return *this;
731  }
734  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
735  {
736  _message.setQueryId(queryId_, queryIdLen_);
737  return *this;
738  }
744  Command& setBookmark(const std::string& bookmark_)
745  {
746  _message.setBookmark(bookmark_);
747  return *this;
748  }
755  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
756  {
757  _message.setBookmark(bookmark_, bookmarkLen_);
758  return *this;
759  }
766  Command& setCorrelationId(const std::string& correlationId_)
767  {
768  _message.setCorrelationId(correlationId_);
769  return *this;
770  }
778  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
779  {
780  _message.setCorrelationId(correlationId_, correlationIdLen_);
781  return *this;
782  }
785  Command& setOptions(const std::string& options_)
786  {
787  _message.setOptions(options_);
788  return *this;
789  }
793  Command& setOptions(const char* options_, size_t optionsLen_)
794  {
795  _message.setOptions(options_, optionsLen_);
796  return *this;
797  }
799  Command& setSequence(const std::string& seq_)
800  {
801  _message.setSequence(seq_);
802  return *this;
803  }
806  Command& setSequence(const char* seq_, size_t seqLen_)
807  {
808  _message.setSequence(seq_, seqLen_);
809  return *this;
810  }
812  Command& setSequence(const amps_uint64_t seq_)
813  {
814  std::ostringstream os;
815  os << seq_;
816  _message.setSequence(os.str());
817  return *this;
818  }
819  amps_uint64_t getSequence() const
820  {
821  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
822  }
825  Command& setData(const std::string& data_)
826  {
827  _message.setData(data_);
828  return *this;
829  }
833  Command& setData(const char* data_, size_t dataLen_)
834  {
835  _message.setData(data_, dataLen_);
836  return *this;
837  }
847  Command& setTimeout(unsigned timeout_)
848  {
849  _timeout = timeout_;
850  return *this;
851  }
853  Command& setTopN(unsigned topN_)
854  {
855  _message.setTopNRecordsReturned(topN_);
856  return *this;
857  }
862  Command& setBatchSize(unsigned batchSize_)
863  {
864  _message.setBatchSize(batchSize_);
865  _batchSize = batchSize_;
866  return *this;
867  }
878  Command& setExpiration(unsigned expiration_)
879  {
880  _message.setExpiration(expiration_);
881  return *this;
882  }
884  Command& addAckType(const std::string& ackType_)
885  {
886  _message.setAckType(_message.getAckType() + "," + ackType_);
887  if (ackType_ == "processed")
888  {
889  _flags |= ProcessedAck;
890  }
891  else if (ackType_ == "stats")
892  {
893  _flags |= StatsAck;
894  }
895  return *this;
896  }
898  Command& setAckType(const std::string& ackType_)
899  {
900  _message.setAckType(ackType_);
901  if (ackType_.find("processed") != std::string::npos)
902  {
903  _flags |= ProcessedAck;
904  }
905  else
906  {
907  _flags &= ~ProcessedAck;
908  }
909  if (ackType_.find("stats") != std::string::npos)
910  {
911  _flags |= StatsAck;
912  }
913  else
914  {
915  _flags &= ~StatsAck;
916  }
917  return *this;
918  }
920  Command& setAckType(unsigned ackType_)
921  {
922  _message.setAckTypeEnum(ackType_);
923  if (ackType_ & Message::AckType::Processed)
924  {
925  _flags |= ProcessedAck;
926  }
927  else
928  {
929  _flags &= ~ProcessedAck;
930  }
931  if (ackType_ & Message::AckType::Stats)
932  {
933  _flags |= StatsAck;
934  }
935  else
936  {
937  _flags &= ~StatsAck;
938  }
939  return *this;
940  }
942  std::string getAckType() const
943  {
944  return (std::string)(_message.getAckType());
945  }
947  unsigned getAckTypeEnum() const
948  {
949  return _message.getAckTypeEnum();
950  }
951 
952  Message& getMessage(void)
953  {
954  return _message;
955  }
956  unsigned getTimeout(void) const
957  {
958  return _timeout;
959  }
960  unsigned getBatchSize(void) const
961  {
962  return _batchSize;
963  }
964  bool isSubscribe(void) const
965  {
966  return _flags & Subscribe;
967  }
968  bool isSow(void) const
969  {
970  return (_flags & SOW) != 0;
971  }
972  bool hasProcessedAck(void) const
973  {
974  return (_flags & ProcessedAck) != 0;
975  }
976  bool hasStatsAck(void) const
977  {
978  return (_flags & StatsAck) != 0;
979  }
980  bool needsSequenceNumber(void) const
981  {
982  return (_flags & NeedsSequenceNumber) != 0;
983  }
984  };
985 
988  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
989 
990  class Message;
992 
996  {
997  public:
998  virtual ~Authenticator() {;}
999 
1005  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
1013  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
1020  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
1021  };
1022 
1027  {
1028  public:
1029  virtual ~DefaultAuthenticator() {;}
1032  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1033  {
1034  return password_;
1035  }
1036 
1039  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1040  {
1041  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1042  }
1043 
1044  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1045 
1050  {
1051  static DefaultAuthenticator d;
1052  return d;
1053  }
1054  };
1055 
1059  {
1060  public:
1061 
1065  virtual void execute(Message& message_) = 0;
1066 
1067  virtual ~StoreReplayer() {;}
1068  };
1069 
1070  class Store;
1071 
1080  typedef bool (*PublishStoreResizeHandler)(Store store_,
1081  size_t size_,
1082  void* userData_);
1083 
1086  class StoreImpl : public RefBody
1087  {
1088  public:
1094  StoreImpl(bool errorOnPublishGap_ = false)
1095  : _resizeHandler(NULL)
1096  , _resizeHandlerData(NULL)
1097  , _errorOnPublishGap(errorOnPublishGap_)
1098  {;}
1099 
1104  virtual amps_uint64_t store(const Message& message_) = 0;
1105 
1112  virtual void discardUpTo(amps_uint64_t index_) = 0;
1113 
1118  virtual void replay(StoreReplayer& replayer_) = 0;
1119 
1127  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1128 
1133  virtual size_t unpersistedCount() const = 0;
1134 
1135  virtual ~StoreImpl() {;}
1136 
1145  virtual void flush(long timeout_) = 0;
1146 
1149  static inline size_t getUnsetPosition()
1150  {
1151  return AMPS_UNSET_INDEX;
1152  }
1153 
1156  static inline amps_uint64_t getUnsetSequence()
1157  {
1158  return AMPS_UNSET_SEQUENCE;
1159  }
1160 
1164  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1165 
1169  virtual amps_uint64_t getLastPersisted() = 0;
1170 
1180  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1181  void* userData_)
1182  {
1183  _resizeHandler = handler_;
1184  _resizeHandlerData = userData_;
1185  }
1186 
1187  inline virtual PublishStoreResizeHandler getResizeHandler() const
1188  {
1189  return _resizeHandler;
1190  }
1191 
1192  bool callResizeHandler(size_t newSize_);
1193 
1194  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1195  {
1196  _errorOnPublishGap = errorOnPublishGap_;
1197  }
1198 
1199  inline virtual bool getErrorOnPublishGap() const
1200  {
1201  return _errorOnPublishGap;
1202  }
1203 
1204  private:
1205  PublishStoreResizeHandler _resizeHandler;
1206  void* _resizeHandlerData;
1207  bool _errorOnPublishGap;
1208  };
1209 
1212  class Store
1213  {
1214  RefHandle<StoreImpl> _body;
1215  public:
1216  Store() {;}
1217  Store(StoreImpl* body_) : _body(body_) {;}
1218  Store(const Store& rhs) : _body(rhs._body) {;}
1219  Store& operator=(const Store& rhs)
1220  {
1221  _body = rhs._body;
1222  return *this;
1223  }
1224 
1228  amps_uint64_t store(const Message& message_)
1229  {
1230  return _body.get().store(message_);
1231  }
1232 
1239  void discardUpTo(amps_uint64_t index_)
1240  {
1241  _body.get().discardUpTo(index_);
1242  }
1243 
1248  void replay(StoreReplayer& replayer_)
1249  {
1250  _body.get().replay(replayer_);
1251  }
1252 
1260  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1261  {
1262  return _body.get().replaySingle(replayer_, index_);
1263  }
1264 
1269  size_t unpersistedCount() const
1270  {
1271  return _body.get().unpersistedCount();
1272  }
1273 
1277  bool isValid() const
1278  {
1279  return _body.isValid();
1280  }
1281 
1290  void flush(long timeout_ = 0)
1291  {
1292  return _body.get().flush(timeout_);
1293  }
1294 
1298  amps_uint64_t getLowestUnpersisted()
1299  {
1300  return _body.get().getLowestUnpersisted();
1301  }
1302 
1306  amps_uint64_t getLastPersisted()
1307  {
1308  return _body.get().getLastPersisted();
1309  }
1310 
1320  void setResizeHandler(PublishStoreResizeHandler handler_,
1321  void* userData_)
1322  {
1323  _body.get().setResizeHandler(handler_, userData_);
1324  }
1325 
1326  PublishStoreResizeHandler getResizeHandler() const
1327  {
1328  return _body.get().getResizeHandler();
1329  }
1330 
1335  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1336  {
1337  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1338  }
1339 
1344  inline bool getErrorOnPublishGap() const
1345  {
1346  return _body.get().getErrorOnPublishGap();
1347  }
1348 
1352  StoreImpl* get()
1353  {
1354  if (_body.isValid())
1355  {
1356  return &_body.get();
1357  }
1358  else
1359  {
1360  return NULL;
1361  }
1362  }
1363 
1364  };
1365 
1371  {
1372  public:
1373  virtual ~FailedWriteHandler() {;}
1380  virtual void failedWrite(const Message& message_,
1381  const char* reason_, size_t reasonLength_) = 0;
1382  };
1383 
1384 
1385  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1386  {
1387  if (_resizeHandler)
1388  {
1389  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1390  }
1391  return true;
1392  }
1393 
1400  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1401  void* data_)
1402  {
1403  long* timeoutp = (long*)data_;
1404  size_t count = store_.unpersistedCount();
1405  if (count == 0)
1406  {
1407  return false;
1408  }
1409  try
1410  {
1411  store_.flush(*timeoutp);
1412  }
1413 #ifdef _WIN32
1414  catch (const TimedOutException&)
1415 #else
1416  catch (const TimedOutException& e)
1417 #endif
1418  {
1419  return true;
1420  }
1421  return (count == store_.unpersistedCount());
1422  }
1423 
1429  {
1430  public:
1442  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1443  unsigned requestedAckTypes_,
1444  const AMPSException& exception_) = 0;
1445  };
1446 
1451  {
1452  public:
1453  virtual ~SubscriptionManager() {;}
1461  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1462  unsigned requestedAckTypes_) = 0;
1466  virtual void unsubscribe(const Message::Field& subId_) = 0;
1469  virtual void clear() = 0;
1473  virtual void resubscribe(Client& client_) = 0;
1478  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1479  {
1480  _failedResubscribeHandler = handler_;
1481  }
1482  protected:
1483  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1484  };
1485 
1489 
1491  {
1492  public:
1494  typedef enum { Disconnected = 0,
1495  Shutdown = 1,
1496  Connected = 2,
1497  LoggedOn = 4,
1498  PublishReplayed = 8,
1499  HeartbeatInitiated = 16,
1500  Resubscribed = 32,
1501  UNKNOWN = 16384
1502  } State;
1503 
1513  virtual void connectionStateChanged(State newState_) = 0;
1514  virtual ~ConnectionStateListener() {;};
1515  };
1516 
1517 
1518  class MessageStreamImpl;
1519  class MessageStream;
1520 
1521  typedef void(*DeferredExecutionFunc)(void*);
1522 
1523  class ClientImpl : public RefBody // -V553
1524  {
1525  // Class to wrap turning of Nagle for things like flush and logon
1526  class NoDelay
1527  {
1528  private:
1529  AMPS_SOCKET _socket;
1530  int _noDelay;
1531  char* _valuePtr;
1532 #ifdef _WIN32
1533  int _valueLen;
1534 #else
1535  socklen_t _valueLen;
1536 #endif
1537  public:
1538  NoDelay(amps_handle client_)
1539  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1540  {
1541  _valuePtr = (char*)&_noDelay;
1542  _socket = amps_client_get_socket(client_);
1543  if (_socket != AMPS_INVALID_SOCKET)
1544  {
1545  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1546  if (!_noDelay)
1547  {
1548  _noDelay = 1;
1549  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1550  }
1551  else
1552  {
1553  _socket = AMPS_INVALID_SOCKET;
1554  }
1555  }
1556  }
1557 
1558  ~NoDelay()
1559  {
1560  if (_socket != AMPS_INVALID_SOCKET)
1561  {
1562  _noDelay = 0;
1563  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1564  }
1565  }
1566  };
1567 
1568  friend class Client;
1569  protected:
1570  amps_handle _client;
1571  DisconnectHandler _disconnectHandler;
1572  enum GlobalCommandTypeHandlers : size_t
1573  {
1574  Publish = 0,
1575  SOW = 1,
1576  GroupBegin = 2,
1577  GroupEnd = 3,
1578  Heartbeat = 4,
1579  OOF = 5,
1580  Ack = 6,
1581  LastChance = 7,
1582  DuplicateMessage = 8,
1583  COUNT = 9
1584  };
1585  std::vector<MessageHandler> _globalCommandTypeHandlers;
1586  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1587  MessageRouter _routes;
1588  MessageRouter::RouteCache _routeCache;
1589  mutable Mutex _lock;
1590  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1591  amps_uint64_t _nameHashValue;
1592  BookmarkStore _bookmarkStore;
1593  Store _publishStore;
1594  bool _isRetryOnDisconnect;
1595  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1596 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1597  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1598 #else
1599  volatile amps_uint64_t _lastSentHaSequenceNumber;
1600 #endif
1601  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1602  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1603  VersionInfo _serverVersion;
1604  Timer _heartbeatTimer;
1605  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1606 
1607  // queue data
1608  int _queueAckTimeout;
1609  bool _isAutoAckEnabled;
1610  unsigned _ackBatchSize;
1611  unsigned _queuedAckCount;
1612  unsigned _defaultMaxDepth;
1613  struct QueueBookmarks
1614  {
1615  QueueBookmarks(const std::string& topic_)
1616  : _topic(topic_)
1617  , _oldestTime(0)
1618  , _bookmarkCount(0)
1619  {;}
1620  std::string _topic;
1621  std::string _data;
1622  amps_uint64_t _oldestTime;
1623  unsigned _bookmarkCount;
1624  };
1625  typedef amps_uint64_t topic_hash;
1626  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1627  TopicHashMap _topicHashMap;
1628 
1629  class ClientStoreReplayer : public StoreReplayer
1630  {
1631  ClientImpl* _client;
1632  public:
1633  unsigned _version;
1634  amps_result _res;
1635 
1636  ClientStoreReplayer()
1637  : _client(NULL), _version(0), _res(AMPS_E_OK)
1638  {}
1639 
1640  ClientStoreReplayer(ClientImpl* client_)
1641  : _client(client_), _version(0), _res(AMPS_E_OK)
1642  {}
1643 
1644  void setClient(ClientImpl* client_)
1645  {
1646  _client = client_;
1647  }
1648 
1649  void execute(Message& message_)
1650  {
1651  if (!_client)
1652  {
1653  throw CommandException("Can't replay without a client.");
1654  }
1655  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1656  AMPS_Sequence);
1657  if (index > _client->_lastSentHaSequenceNumber)
1658  {
1659  _client->_lastSentHaSequenceNumber = index;
1660  }
1661 
1662  _res = AMPS_E_OK;
1663  // Don't replay a queue cancel message after a reconnect.
1664  // Currently, the only messages that will have anything in options
1665  // are cancel messages.
1666  if (!message_.getCommand().empty() &&
1667  (!_client->_logonInProgress ||
1668  message_.getOptions().len() < 6))
1669  {
1670  _res = amps_client_send_with_version(_client->_client,
1671  message_.getMessage(),
1672  &_version);
1673  if (_res != AMPS_E_OK)
1674  {
1675  throw DisconnectedException("AMPS Server disconnected during replay");
1676  }
1677  }
1678  }
1679 
1680  };
1681  ClientStoreReplayer _replayer;
1682 
1683  class FailedWriteStoreReplayer : public StoreReplayer
1684  {
1685  ClientImpl* _parent;
1686  const char* _reason;
1687  size_t _reasonLength;
1688  size_t _replayCount;
1689  public:
1690  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1691  : _parent(parent),
1692  _reason(reason_),
1693  _reasonLength(reasonLength_),
1694  _replayCount(0)
1695  {;}
1696  void execute(Message& message_)
1697  {
1698  if (_parent->_failedWriteHandler)
1699  {
1700  ++_replayCount;
1701  _parent->_failedWriteHandler->failedWrite(message_,
1702  _reason, _reasonLength);
1703  }
1704  }
1705  size_t replayCount(void) const
1706  {
1707  return _replayCount;
1708  }
1709  };
1710 
1711  struct AckResponseImpl : public RefBody
1712  {
1713  std::string username, password, reason, status, bookmark, options;
1714  amps_uint64_t sequenceNo;
1715  amps_uint64_t nameHashValue;
1716  VersionInfo serverVersion;
1717 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1718  std::atomic<bool> responded;
1719  std::atomic<bool> abandoned;
1720 #else
1721  volatile bool responded;
1722  volatile bool abandoned;
1723 #endif
1724  unsigned connectionVersion;
1725  AckResponseImpl() :
1726  RefBody(),
1727  username(), password(), reason(), status(), bookmark(), options(),
1728  sequenceNo((amps_uint64_t)0),
1729  serverVersion(),
1730  responded(false),
1731  abandoned(false),
1732  connectionVersion(0)
1733  {
1734  }
1735  };
1736 
1737  class AckResponse
1738  {
1739  RefHandle<AckResponseImpl> _body;
1740  public:
1741  AckResponse() : _body(NULL) {;}
1742  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1743  static AckResponse create()
1744  {
1745  AckResponse r;
1746  r._body = new AckResponseImpl();
1747  return r;
1748  }
1749 
1750  const std::string& username()
1751  {
1752  return _body.get().username;
1753  }
1754  void setUsername(const char* data_, size_t len_)
1755  {
1756  if (data_)
1757  {
1758  _body.get().username.assign(data_, len_);
1759  }
1760  else
1761  {
1762  _body.get().username.clear();
1763  }
1764  }
1765  const std::string& password()
1766  {
1767  return _body.get().password;
1768  }
1769  void setPassword(const char* data_, size_t len_)
1770  {
1771  if (data_)
1772  {
1773  _body.get().password.assign(data_, len_);
1774  }
1775  else
1776  {
1777  _body.get().password.clear();
1778  }
1779  }
1780  const std::string& reason()
1781  {
1782  return _body.get().reason;
1783  }
1784  void setReason(const char* data_, size_t len_)
1785  {
1786  if (data_)
1787  {
1788  _body.get().reason.assign(data_, len_);
1789  }
1790  else
1791  {
1792  _body.get().reason.clear();
1793  }
1794  }
1795  const std::string& status()
1796  {
1797  return _body.get().status;
1798  }
1799  void setStatus(const char* data_, size_t len_)
1800  {
1801  if (data_)
1802  {
1803  _body.get().status.assign(data_, len_);
1804  }
1805  else
1806  {
1807  _body.get().status.clear();
1808  }
1809  }
1810  const std::string& bookmark()
1811  {
1812  return _body.get().bookmark;
1813  }
1814  void setBookmark(const Field& bookmark_)
1815  {
1816  if (!bookmark_.empty())
1817  {
1818  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1819  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1820  _body.get().sequenceNo);
1821  }
1822  else
1823  {
1824  _body.get().bookmark.clear();
1825  _body.get().sequenceNo = (amps_uint64_t)0;
1826  _body.get().nameHashValue = (amps_uint64_t)0;
1827  }
1828  }
1829  amps_uint64_t sequenceNo() const
1830  {
1831  return _body.get().sequenceNo;
1832  }
1833  amps_uint64_t nameHashValue() const
1834  {
1835  return _body.get().nameHashValue;
1836  }
1837  void setSequenceNo(const char* data_, size_t len_)
1838  {
1839  amps_uint64_t result = (amps_uint64_t)0;
1840  if (data_)
1841  {
1842  for (size_t i = 0; i < len_; ++i)
1843  {
1844  result *= (amps_uint64_t)10;
1845  result += (amps_uint64_t)(data_[i] - '0');
1846  }
1847  }
1848  _body.get().sequenceNo = result;
1849  }
1850  VersionInfo serverVersion() const
1851  {
1852  return _body.get().serverVersion;
1853  }
1854  void setServerVersion(const char* data_, size_t len_)
1855  {
1856  if (data_)
1857  {
1858  _body.get().serverVersion.setVersion(std::string(data_, len_));
1859  }
1860  }
1861  bool responded()
1862  {
1863  return _body.get().responded;
1864  }
1865  void setResponded()
1866  {
1867  _body.get().responded = true;
1868  }
1869  bool abandoned()
1870  {
1871  return _body.get().abandoned;
1872  }
1873  void setAbandoned()
1874  {
1875  if (_body.isValid())
1876  {
1877  _body.get().abandoned = true;
1878  }
1879  }
1880 
1881  void setConnectionVersion(unsigned connectionVersion)
1882  {
1883  _body.get().connectionVersion = connectionVersion;
1884  }
1885 
1886  unsigned getConnectionVersion()
1887  {
1888  return _body.get().connectionVersion;
1889  }
1890  void setOptions(const char* data_, size_t len_)
1891  {
1892  if (data_)
1893  {
1894  _body.get().options.assign(data_, len_);
1895  }
1896  else
1897  {
1898  _body.get().options.clear();
1899  }
1900  }
1901 
1902  const std::string& options()
1903  {
1904  return _body.get().options;
1905  }
1906 
1907  AckResponse& operator=(const AckResponse& rhs)
1908  {
1909  _body = rhs._body;
1910  return *this;
1911  }
1912  };
1913 
1914 
1915  typedef std::map<std::string, AckResponse> AckMap;
1916  AckMap _ackMap;
1917  Mutex _ackMapLock;
1918  DefaultExceptionListener _defaultExceptionListener;
1919  protected:
1920 
1921  struct DeferredExecutionRequest
1922  {
1923  DeferredExecutionRequest(DeferredExecutionFunc func_,
1924  void* userData_)
1925  : _func(func_),
1926  _userData(userData_)
1927  {;}
1928 
1929  DeferredExecutionFunc _func;
1930  void* _userData;
1931  };
1932  const ExceptionListener* _exceptionListener;
1933  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1934  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1935  volatile bool _connected;
1936  std::string _username;
1937  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1938  ConnectionStateListeners _connectionStateListeners;
1939  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1940  Mutex _deferredExecutionLock;
1941  DeferredExecutionList _deferredExecutionList;
1942  unsigned _heartbeatInterval;
1943  unsigned _readTimeout;
1944 
1945  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1946  {
1947  // If we disconnected before we got to notification, don't notify.
1948  // This should only be able to happen for Resubscribed, since the lock
1949  // is released to let the subscription manager run resubscribe so a
1950  // disconnect could be called before the change is broadcast.
1951  if (!_connected && newState_ > ConnectionStateListener::Connected)
1952  {
1953  return;
1954  }
1955  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1956  {
1957  AMPS_CALL_EXCEPTION_WRAPPER(
1958  (*it)->connectionStateChanged(newState_));
1959  }
1960  }
1961  unsigned processedAck(Message& message);
1962  unsigned persistedAck(Message& meesage);
1963  void lastChance(Message& message);
1964  void checkAndSendHeartbeat(bool force = false);
1965  virtual ConnectionInfo getConnectionInfo() const;
1966  static amps_result
1967  ClientImplMessageHandler(amps_handle message, void* userData);
1968  static void
1969  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1970  static amps_result
1971  ClientImplDisconnectHandler(amps_handle client, void* userData);
1972 
1973  void unsubscribeInternal(const std::string& id)
1974  {
1975  if (id.empty())
1976  {
1977  return;
1978  }
1979  // remove the handler first to avoid any more message delivery
1980  Message::Field subId;
1981  subId.assign(id.data(), id.length());
1982  _routes.removeRoute(subId);
1983  // Lock is already acquired
1984  if (_subscriptionManager)
1985  {
1986  // Have to unlock before calling into sub manager to avoid deadlock
1987  Unlock<Mutex> unlock(_lock);
1988  _subscriptionManager->unsubscribe(subId);
1989  }
1990  _message.reset();
1991  _message.setCommandEnum(Message::Command::Unsubscribe);
1992  _message.newCommandId();
1993  _message.setSubscriptionId(id);
1994  _sendWithoutRetry(_message);
1995  deferredExecution(&amps_noOpFn, NULL);
1996  }
1997 
1998  AckResponse syncAckProcessing(long timeout_, Message& message_,
1999  bool isHASubscribe_)
2000  {
2001  return syncAckProcessing(timeout_, message_,
2002  (amps_uint64_t)0, isHASubscribe_);
2003  }
2004 
2005  AckResponse syncAckProcessing(long timeout_, Message& message_,
2006  amps_uint64_t haSeq = (amps_uint64_t)0,
2007  bool isHASubscribe_ = false)
2008  {
2009  // inv: we already have _lock locked up.
2010  AckResponse ack = AckResponse::create();
2011  if (1)
2012  {
2013  Lock<Mutex> guard(_ackMapLock);
2014  _ackMap[message_.getCommandId()] = ack;
2015  }
2016  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
2017  if (ack.getConnectionVersion() == 0)
2018  {
2019  // Send failed
2020  throw DisconnectedException("Connection closed while waiting for response.");
2021  }
2022  bool timedOut = false;
2023  AMPS_START_TIMER(timeout_)
2024  while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
2025  {
2026  if (timeout_)
2027  {
2028  timedOut = !_lock.wait(timeout_);
2029  // May have woken up early, check real time
2030  if (timedOut)
2031  {
2032  AMPS_RESET_TIMER(timedOut, timeout_);
2033  }
2034  }
2035  else
2036  {
2037  // Using a timeout version to ensure python can interrupt
2038  _lock.wait(1000);
2039  Unlock<Mutex> unlck(_lock);
2040  amps_invoke_waiting_function();
2041  }
2042  }
2043  if (ack.responded())
2044  {
2045  if (ack.status() != "failure")
2046  {
2047  if (message_.getCommand() == "logon")
2048  {
2049  amps_uint64_t ackSequence = ack.sequenceNo();
2050  if (_lastSentHaSequenceNumber < ackSequence)
2051  {
2052  _lastSentHaSequenceNumber = ackSequence;
2053  }
2054  if (_publishStore.isValid())
2055  {
2056  // If this throws, logon will fail and eitehr be
2057  // handled in HAClient/ServerChooser or by the caller
2058  // of logon.
2059  _publishStore.discardUpTo(ackSequence);
2060  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2061  {
2062  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2063  }
2064  }
2065  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2066  _nameHashValue = ack.nameHashValue();
2067  _serverVersion = ack.serverVersion();
2068  if (_bookmarkStore.isValid())
2069  {
2070  _bookmarkStore.setServerVersion(_serverVersion);
2071  }
2072  }
2073  if (_ackBatchSize)
2074  {
2075  const std::string& options = ack.options();
2076  size_t index = options.find_first_of("max_backlog=");
2077  if (index != std::string::npos)
2078  {
2079  unsigned data = 0;
2080  const char* c = options.c_str() + index + 12;
2081  while (*c && *c != ',')
2082  {
2083  data = (data * 10) + (unsigned)(*c++ -48);
2084  }
2085  if (_ackBatchSize > data)
2086  {
2087  _ackBatchSize = data;
2088  }
2089  }
2090  }
2091  return ack;
2092  }
2093  const size_t NotEntitled = 12;
2094  std::string ackReason = ack.reason();
2095  if (ackReason.length() == 0)
2096  {
2097  return ack; // none
2098  }
2099  if (ackReason.length() == NotEntitled &&
2100  ackReason[0] == 'n' &&
2101  message_.getUserId().len() == 0)
2102  {
2103  message_.assignUserId(_username);
2104  }
2105  message_.throwFor(_client, ackReason);
2106  }
2107  else // !ack.responded()
2108  {
2109  if (!ack.abandoned())
2110  {
2111  throw TimedOutException("timed out waiting for operation.");
2112  }
2113  else
2114  {
2115  throw DisconnectedException("Connection closed while waiting for response.");
2116  }
2117  }
2118  return ack;
2119  }
2120 
2121  void _cleanup(void)
2122  {
2123  if (!_client)
2124  {
2125  return;
2126  }
2127  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2128  amps_client_set_disconnect_handler(_client, NULL, 0L);
2129  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2130  _pEmptyMessageStream.reset(NULL);
2131  amps_client_destroy(_client);
2132  _client = NULL;
2133  }
2134 
2135  public:
2136 
2137  ClientImpl(const std::string& clientName)
2138  : _client(NULL), _name(clientName)
2139  , _isRetryOnDisconnect(true)
2140  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2141  , _badTimeToHASubscribe(0), _serverVersion()
2142  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2143  , _isAutoAckEnabled(false)
2144  , _ackBatchSize(0)
2145  , _queuedAckCount(0)
2146  , _defaultMaxDepth(0)
2147  , _connected(false)
2148  , _heartbeatInterval(0)
2149  , _readTimeout(0)
2150  {
2151  _replayer.setClient(this);
2152  _client = amps_client_create(clientName.c_str());
2154  (amps_handler)ClientImpl::ClientImplMessageHandler,
2155  this);
2157  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2158  this);
2160  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2161  this);
2162  _exceptionListener = &_defaultExceptionListener;
2163  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2164  {
2165 #ifdef AMPS_USE_EMPLACE
2166  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2167 #else
2168  _globalCommandTypeHandlers.push_back(MessageHandler());
2169 #endif
2170  }
2171  }
2172 
2173  virtual ~ClientImpl()
2174  {
2175  _cleanup();
2176  }
2177 
2178  const std::string& getName() const
2179  {
2180  return _name;
2181  }
2182 
2183  const std::string& getNameHash() const
2184  {
2185  return _nameHash;
2186  }
2187 
2188  const amps_uint64_t getNameHashValue() const
2189  {
2190  return _nameHashValue;
2191  }
2192 
2193  void setName(const std::string& name)
2194  {
2195  // This operation will fail if the client's
2196  // name is already set.
2197  amps_result result = amps_client_set_name(_client, name.c_str());
2198  if (result != AMPS_E_OK)
2199  {
2200  AMPSException::throwFor(_client, result);
2201  }
2202  _name = name;
2203  }
2204 
2205  const std::string& getLogonCorrelationData() const
2206  {
2207  return _logonCorrelationData;
2208  }
2209 
2210  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2211  {
2212  _logonCorrelationData = logonCorrelationData_;
2213  }
2214 
2215  size_t getServerVersion() const
2216  {
2217  return _serverVersion.getOldStyleVersion();
2218  }
2219 
2220  VersionInfo getServerVersionInfo() const
2221  {
2222  return _serverVersion;
2223  }
2224 
2225  const std::string& getURI() const
2226  {
2227  return _lastUri;
2228  }
2229 
2230  virtual void connect(const std::string& uri)
2231  {
2232  Lock<Mutex> l(_lock);
2233  _connect(uri);
2234  }
2235 
2236  virtual void _connect(const std::string& uri)
2237  {
2238  _lastUri = uri;
2239  amps_result result = amps_client_connect(_client, uri.c_str());
2240  if (result != AMPS_E_OK)
2241  {
2242  AMPSException::throwFor(_client, result);
2243  }
2244  _message.reset();
2245  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2246  _publishMessage.setCommandEnum(Message::Command::Publish);
2247  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2248  _beatMessage.setOptions("beat");
2249  _readMessage.setClientImpl(this);
2250  if (_queueAckTimeout)
2251  {
2252  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2253  if (result != AMPS_E_OK)
2254  {
2255  AMPSException::throwFor(_client, result);
2256  }
2257  }
2258  _connected = true;
2259  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2260  }
2261 
2262  void setDisconnected()
2263  {
2264  {
2265  Lock<Mutex> l(_lock);
2266  if (_connected)
2267  {
2268  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2269  }
2270  _connected = false;
2271  _heartbeatTimer.setTimeout(0.0);
2272  }
2273  clearAcks(INT_MAX);
2274  amps_client_disconnect(_client);
2275  _routes.clear();
2276  }
2277 
2278  virtual void disconnect()
2279  {
2280  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2281  setDisconnected();
2282  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2283  Lock<Mutex> l(_lock);
2284  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2285  }
2286 
2287  void clearAcks(unsigned failedVersion)
2288  {
2289  // Have to lock to prevent race conditions
2290  Lock<Mutex> guard(_ackMapLock);
2291  {
2292  // Go ahead and signal any waiters if they are around...
2293  std::vector<std::string> worklist;
2294  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2295  {
2296  if (i->second.getConnectionVersion() <= failedVersion)
2297  {
2298  i->second.setAbandoned();
2299  worklist.push_back(i->first);
2300  }
2301  }
2302 
2303  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2304  {
2305  _ackMap.erase(*j);
2306  }
2307  }
2308 
2309  _lock.signalAll();
2310  }
2311 
2312  int send(const Message& message)
2313  {
2314  Lock<Mutex> l(_lock);
2315  return _send(message);
2316  }
2317 
2318  void sendWithoutRetry(const Message& message_)
2319  {
2320  Lock<Mutex> l(_lock);
2321  // If we got here while logon was in progress, then we tried to send
2322  // while we were disconnected so throw DisconnectedException
2323  if (_logonInProgress)
2324  {
2325  throw DisconnectedException("The client has been disconnected.");
2326  }
2327  _sendWithoutRetry(message_);
2328  }
2329 
2330  void _sendWithoutRetry(const Message& message_)
2331  {
2332  amps_result result = amps_client_send(_client, message_.getMessage());
2333  if (result != AMPS_E_OK)
2334  {
2335  AMPSException::throwFor(_client, result);
2336  }
2337  }
2338 
2339  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2340  bool isHASubscribe_ = false)
2341  {
2342  // Lock is already acquired
2343  amps_result result = AMPS_E_RETRY;
2344 
2345  // Create a local reference to this message, as we'll need to hold on
2346  // to a reference to it in case reconnect occurs.
2347  Message localMessage = message;
2348  unsigned version = 0;
2349 
2350  while (result == AMPS_E_RETRY)
2351  {
2352  if (haSeq && _logonInProgress)
2353  {
2354  // If retrySend is disabled, do not wait for the reconnect
2355  // to finish, just throw.
2356  if (!_isRetryOnDisconnect)
2357  {
2358  AMPSException::throwFor(_client, AMPS_E_RETRY);
2359  }
2360  if (!_lock.wait(1000))
2361  {
2362  amps_invoke_waiting_function();
2363  }
2364  }
2365  else
2366  {
2367  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2368  (isHASubscribe_ && _badTimeToHASubscribe))
2369  {
2370  return (int)version;
2371  }
2372  // It's possible to get here out of order, but this way we'll
2373  // always send in order.
2374  if (haSeq > _lastSentHaSequenceNumber)
2375  {
2376  while (haSeq > _lastSentHaSequenceNumber + 1)
2377  {
2378  try
2379  {
2380  // Replayer updates _lastSentHaSsequenceNumber
2381  if (!_publishStore.replaySingle(_replayer,
2382  _lastSentHaSequenceNumber + 1))
2383  {
2384  //++_lastSentHaSequenceNumber;
2385  continue;
2386  }
2387  result = AMPS_E_OK;
2388  version = _replayer._version;
2389  }
2390 #ifdef _WIN32
2391  catch (const DisconnectedException&)
2392 #else
2393  catch (const DisconnectedException& e)
2394 #endif
2395  {
2396  result = _replayer._res;
2397  break;
2398  }
2399  }
2400  result = amps_client_send_with_version(_client,
2401  localMessage.getMessage(),
2402  &version);
2403  ++_lastSentHaSequenceNumber;
2404  }
2405  else
2406  {
2407  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2408  {
2409  while (_logonInProgress)
2410  {
2411  if (!_lock.wait(1000))
2412  {
2413  amps_invoke_waiting_function();
2414  }
2415  }
2416  }
2417  result = amps_client_send_with_version(_client,
2418  localMessage.getMessage(),
2419  &version);
2420  }
2421  if (result != AMPS_E_OK)
2422  {
2423  if (!isHASubscribe_ && !haSeq &&
2424  localMessage.getMessage() == message.getMessage())
2425  {
2426  localMessage = message.deepCopy();
2427  }
2428  if (_isRetryOnDisconnect)
2429  {
2430  Unlock<Mutex> u(_lock);
2431  result = amps_client_attempt_reconnect(_client, version);
2432  // If this is an HA publish or subscribe command, it was
2433  // stored first and will have already been replayed by the
2434  // store or sub manager after reconnect, so just return.
2435  if ((isHASubscribe_ || haSeq) &&
2436  result == AMPS_E_RETRY)
2437  {
2438  return (int)version;
2439  }
2440  }
2441  else
2442  {
2443  // retrySend is disabled so throw the error
2444  // from the send as an exception, do not retry.
2445  AMPSException::throwFor(_client, result);
2446  }
2447  }
2448  }
2449  if (result == AMPS_E_RETRY)
2450  {
2451  amps_invoke_waiting_function();
2452  }
2453  }
2454 
2455  if (result != AMPS_E_OK)
2456  {
2457  AMPSException::throwFor(_client, result);
2458  }
2459  return (int)version;
2460  }
2461 
2462  void addMessageHandler(const Field& commandId_,
2463  const AMPS::MessageHandler& messageHandler_,
2464  unsigned requestedAcks_, Message::Command::Type commandType_)
2465  {
2466  Lock<Mutex> lock(_lock);
2467  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2468  0, commandType_);
2469  }
2470 
2471  bool removeMessageHandler(const Field& commandId_)
2472  {
2473  Lock<Mutex> lock(_lock);
2474  return _routes.removeRoute(commandId_);
2475  }
2476 
2477  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2478  {
2479  Field id = message_.getCommandId();
2480  Field subId = message_.getSubscriptionId();
2481  Field qid = message_.getQueryId();
2482  bool isSubscribeOnly = false;
2483  bool replace = false;
2484  unsigned requestedAcks = message_.getAckTypeEnum();
2485  unsigned systemAddedAcks = Message::AckType::None;
2486  Message::Command::Type commandType = message_.getCommandEnum();
2487 
2488  switch (commandType)
2489  {
2490  case Message::Command::Subscribe:
2491  case Message::Command::DeltaSubscribe:
2492  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2493  isSubscribeOnly = true;
2494  // fall through
2495  case Message::Command::SOWAndSubscribe:
2496  case Message::Command::SOWAndDeltaSubscribe:
2497  if (id.empty())
2498  {
2499  id = message_.newCommandId().getCommandId();
2500  }
2501  else
2502  {
2503  while (!replace && id != subId && _routes.hasRoute(id))
2504  {
2505  id = message_.newCommandId().getCommandId();
2506  }
2507  }
2508  if (subId.empty())
2509  {
2510  message_.setSubscriptionId(id);
2511  subId = id;
2512  }
2513  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2514  {
2515  systemAddedAcks |= Message::AckType::Persisted;
2516  }
2517  // fall through
2518  case Message::Command::SOW:
2519  if (id.empty())
2520  {
2521  id = message_.newCommandId().getCommandId();
2522  }
2523  else
2524  {
2525  while (!replace && id != subId && _routes.hasRoute(id))
2526  {
2527  message_.newCommandId();
2528  if (qid == id)
2529  {
2530  qid = message_.getCommandId();
2531  message_.setQueryId(qid);
2532  }
2533  id = message_.getCommandId();
2534  }
2535  }
2536  if (!isSubscribeOnly)
2537  {
2538  if (qid.empty())
2539  {
2540  message_.setQueryID(id);
2541  qid = id;
2542  }
2543  else
2544  {
2545  while (!replace && qid != subId && qid != id
2546  && _routes.hasRoute(qid))
2547  {
2548  qid = message_.newQueryId().getQueryId();
2549  }
2550  }
2551  }
2552  systemAddedAcks |= Message::AckType::Processed;
2553  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2554  {
2555  int routesAdded = 0;
2556  Lock<Mutex> l(_lock);
2557  if (!subId.empty() && messageHandler_.isValid())
2558  {
2559  if (!_routes.hasRoute(subId))
2560  {
2561  ++routesAdded;
2562  }
2563  // This can replace a non-subscribe with a matching id
2564  // with a subscription but not another subscription.
2565  _routes.addRoute(subId, messageHandler_, requestedAcks,
2566  systemAddedAcks, commandType);
2567  }
2568  if (!isSubscribeOnly && !qid.empty()
2569  && messageHandler_.isValid() && qid != subId)
2570  {
2571  if (routesAdded == 0)
2572  {
2573  _routes.addRoute(qid, messageHandler_,
2574  requestedAcks, systemAddedAcks, commandType);
2575  }
2576  else
2577  {
2578  void* data = NULL;
2579  {
2580  Unlock<Mutex> u(_lock);
2581  data = amps_invoke_copy_route_function(
2582  messageHandler_.userData());
2583  }
2584  if (!data)
2585  {
2586  _routes.addRoute(qid, messageHandler_, requestedAcks,
2587  systemAddedAcks, commandType);
2588  }
2589  else
2590  {
2591  _routes.addRoute(qid,
2592  MessageHandler(messageHandler_.function(),
2593  data),
2594  requestedAcks, systemAddedAcks, commandType);
2595  }
2596  }
2597  ++routesAdded;
2598  }
2599  if (!id.empty() && messageHandler_.isValid()
2600  && requestedAcks & ~Message::AckType::Persisted
2601  && id != subId && id != qid)
2602  {
2603  if (routesAdded == 0)
2604  {
2605  _routes.addRoute(id, messageHandler_, requestedAcks,
2606  systemAddedAcks, commandType);
2607  }
2608  else
2609  {
2610  void* data = NULL;
2611  {
2612  Unlock<Mutex> u(_lock);
2613  data = amps_invoke_copy_route_function(
2614  messageHandler_.userData());
2615  }
2616  if (!data)
2617  {
2618  _routes.addRoute(id, messageHandler_, requestedAcks,
2619  systemAddedAcks, commandType);
2620  }
2621  else
2622  {
2623  _routes.addRoute(id,
2624  MessageHandler(messageHandler_.function(),
2625  data),
2626  requestedAcks,
2627  systemAddedAcks, commandType);
2628  }
2629  }
2630  ++routesAdded;
2631  }
2632  try
2633  {
2634  // We aren't adding to subscription manager, so this isn't
2635  // an HA subscribe.
2636  syncAckProcessing(timeout_, message_, 0, false);
2637  message_.setAckTypeEnum(requestedAcks);
2638  }
2639  catch (...)
2640  {
2641  _routes.removeRoute(message_.getQueryID());
2642  _routes.removeRoute(message_.getSubscriptionId());
2643  _routes.removeRoute(id);
2644  message_.setAckTypeEnum(requestedAcks);
2645  throw;
2646  }
2647  }
2648  break;
2649  // These are valid commands that are used as-is
2650  case Message::Command::Unsubscribe:
2651  case Message::Command::Heartbeat:
2652  case Message::Command::Logon:
2653  case Message::Command::StartTimer:
2654  case Message::Command::StopTimer:
2655  case Message::Command::SOWDelete:
2656  {
2657  Lock<Mutex> l(_lock);
2658  // if an ack is requested, it'll need a command ID.
2659  if (message_.getAckTypeEnum() != Message::AckType::None)
2660  {
2661  if (id.empty())
2662  {
2663  message_.newCommandId();
2664  id = message_.getCommandId();
2665  }
2666  if (messageHandler_.isValid())
2667  {
2668  _routes.addRoute(id, messageHandler_, requestedAcks,
2669  Message::AckType::None, commandType);
2670  }
2671  }
2672  _send(message_);
2673  }
2674  break;
2675  case Message::Command::DeltaPublish:
2676  case Message::Command::Publish:
2677  {
2678  bool useSync = message_.getFilter().len() > 0;
2679  Lock<Mutex> l(_lock);
2680  // if an ack is requested, it'll need a command ID.
2681  unsigned ackType = message_.getAckTypeEnum();
2682  if (ackType != Message::AckType::None
2683  || useSync)
2684  {
2685  if (id.empty())
2686  {
2687  message_.newCommandId();
2688  id = message_.getCommandId();
2689  }
2690  if (messageHandler_.isValid())
2691  {
2692  _routes.addRoute(id, messageHandler_, requestedAcks,
2693  Message::AckType::None, commandType);
2694  }
2695  }
2696  if (useSync)
2697  {
2698  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2699  syncAckProcessing(timeout_, message_, 0, false);
2700  }
2701  else
2702  {
2703  _send(message_);
2704  }
2705  }
2706  break;
2707  // These are things that shouldn't be sent (not meaningful)
2708  case Message::Command::GroupBegin:
2709  case Message::Command::GroupEnd:
2710  case Message::Command::OOF:
2711  case Message::Command::Ack:
2712  case Message::Command::Unknown:
2713  default:
2714  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2715  }
2716  message_.setAckTypeEnum(requestedAcks);
2717  return id;
2718  }
2719 
2720  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2721  {
2722  Lock<Mutex> l(_lock);
2723  _disconnectHandler = disconnectHandler;
2724  }
2725 
2726  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2727  {
2728  switch (command_[0])
2729  {
2730 #if 0 // Not currently implemented to avoid an extra branch in delivery
2731  case 'p':
2732  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2733  break;
2734  case 's':
2735  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2736  break;
2737 #endif
2738  case 'h':
2739  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2740  break;
2741 #if 0 // Not currently implemented to avoid an extra branch in delivery
2742  case 'g':
2743  if (command_[6] == 'b')
2744  {
2745  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2746  }
2747  else if (command_[6] == 'e')
2748  {
2749  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2750  }
2751  else
2752  {
2753  std::ostringstream os;
2754  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2755  throw CommandException(os.str());
2756  }
2757  break;
2758  case 'o':
2759  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2760  break;
2761 #endif
2762  case 'a':
2763  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2764  break;
2765  case 'l':
2766  case 'L':
2767  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2768  break;
2769  case 'd':
2770  case 'D':
2771  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2772  break;
2773  default:
2774  std::ostringstream os;
2775  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2776  throw CommandException(os.str());
2777  break;
2778  }
2779  }
2780 
2781  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2782  {
2783  switch (command_)
2784  {
2785 #if 0 // Not currently implemented to avoid an extra branch in delivery
2786  case Message::Command::Publish:
2787  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2788  break;
2789  case Message::Command::SOW:
2790  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2791  break;
2792 #endif
2793  case Message::Command::Heartbeat:
2794  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2795  break;
2796 #if 0 // Not currently implemented to avoid an extra branch in delivery
2797  case Message::Command::GroupBegin:
2798  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2799  break;
2800  case Message::Command::GroupEnd:
2801  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2802  break;
2803  case Message::Command::OOF:
2804  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2805  break;
2806 #endif
2807  case Message::Command::Ack:
2808  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2809  break;
2810  default:
2811  unsigned bits = 0;
2812  unsigned command = command_;
2813  while (command > 0)
2814  {
2815  ++bits;
2816  command >>= 1;
2817  }
2818  char errBuf[128];
2819  AMPS_snprintf(errBuf, sizeof(errBuf),
2820  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2821  CommandConstants<0>::Lengths[bits],
2822  CommandConstants<0>::Values[bits]);
2823  throw CommandException(errBuf);
2824  break;
2825  }
2826  }
2827 
2828  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2829  {
2830  _globalCommandTypeHandlers[handlerType_] = handler_;
2831  }
2832 
2833  void setFailedWriteHandler(FailedWriteHandler* handler_)
2834  {
2835  Lock<Mutex> l(_lock);
2836  _failedWriteHandler.reset(handler_);
2837  }
2838 
2839  void setPublishStore(const Store& publishStore_)
2840  {
2841  Lock<Mutex> l(_lock);
2842  if (_connected)
2843  {
2844  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2845  }
2846  _publishStore = publishStore_;
2847  }
2848 
2849  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2850  {
2851  Lock<Mutex> l(_lock);
2852  if (_connected)
2853  {
2854  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2855  }
2856  _bookmarkStore = bookmarkStore_;
2857  }
2858 
2859  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2860  {
2861  Lock<Mutex> l(_lock);
2862  _subscriptionManager.reset(subscriptionManager_);
2863  }
2864 
2865  SubscriptionManager* getSubscriptionManager() const
2866  {
2867  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2868  }
2869 
2870  DisconnectHandler getDisconnectHandler() const
2871  {
2872  return _disconnectHandler;
2873  }
2874 
2875  MessageHandler getDuplicateMessageHandler() const
2876  {
2877  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2878  }
2879 
2880  FailedWriteHandler* getFailedWriteHandler() const
2881  {
2882  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2883  }
2884 
2885  Store getPublishStore() const
2886  {
2887  return _publishStore;
2888  }
2889 
2890  BookmarkStore getBookmarkStore() const
2891  {
2892  return _bookmarkStore;
2893  }
2894 
2895  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2896  {
2897  if (!_publishStore.isValid())
2898  {
2899  Lock<Mutex> l(_lock);
2900  _publishMessage.assignTopic(topic_, topicLen_);
2901  _publishMessage.assignData(data_, dataLen_);
2902  _send(_publishMessage);
2903  return 0;
2904  }
2905  else
2906  {
2907  publishStoreMessage.reset();
2908  publishStoreMessage.setCommandEnum(Message::Command::Publish);
2909  return _publish(topic_, topicLen_, data_, dataLen_);
2910  }
2911  }
2912 
2913  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2914  size_t dataLen_, unsigned long expiration_)
2915  {
2916  if (!_publishStore.isValid())
2917  {
2918  Lock<Mutex> l(_lock);
2919  _publishMessage.assignTopic(topic_, topicLen_);
2920  _publishMessage.assignData(data_, dataLen_);
2921  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2922  size_t pos = convertToCharArray(exprBuf, expiration_);
2923  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2924  _send(_publishMessage);
2925  _publishMessage.assignExpiration(NULL, 0);
2926  return 0;
2927  }
2928  else
2929  {
2930  publishStoreMessage.reset();
2931  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2932  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2933  publishStoreMessage.setCommandEnum(Message::Command::Publish)
2934  .assignExpiration(exprBuf + exprPos,
2935  AMPS_NUMBER_BUFFER_LEN - exprPos);
2936  return _publish(topic_, topicLen_, data_, dataLen_);
2937  }
2938  }
2939 
2940  class FlushAckHandler : ConnectionStateListener
2941  {
2942  private:
2943  ClientImpl* _pClient;
2944  Field _cmdId;
2945 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2946  std::atomic<bool> _acked;
2947  std::atomic<bool> _disconnected;
2948 #else
2949  volatile bool _acked;
2950  volatile bool _disconnected;
2951 #endif
2952  public:
2953  FlushAckHandler(ClientImpl* pClient_)
2954  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2955  {
2956  pClient_->addConnectionStateListener(this);
2957  }
2958  ~FlushAckHandler()
2959  {
2960  _pClient->removeConnectionStateListener(this);
2961  _pClient->removeMessageHandler(_cmdId);
2962  _cmdId.clear();
2963  }
2964  void setCommandId(const Field& cmdId_)
2965  {
2966  _cmdId.deepCopy(cmdId_);
2967  }
2968  void invoke(const Message&)
2969  {
2970  _acked = true;
2971  }
2972  void connectionStateChanged(State state_)
2973  {
2974  if (state_ <= Shutdown)
2975  {
2976  _disconnected = true;
2977  }
2978  }
2979  bool acked()
2980  {
2981  return _acked;
2982  }
2983  bool done()
2984  {
2985  return _acked || _disconnected;
2986  }
2987  };
2988 
2989  void publishFlush(long timeout_, unsigned ackType_)
2990  {
2991  static const char* processed = "processed";
2992  static const size_t processedLen = strlen(processed);
2993  static const char* persisted = "persisted";
2994  static const size_t persistedLen = strlen(persisted);
2995  static const char* flush = "flush";
2996  static const size_t flushLen = strlen(flush);
2997  static VersionInfo minPersisted("5.3.3.0");
2998  static VersionInfo minFlush("4");
2999  if (ackType_ != Message::AckType::Processed
3000  && ackType_ != Message::AckType::Persisted)
3001  {
3002  throw CommandException("Flush can only be used with processed or persisted acks.");
3003  }
3004  FlushAckHandler flushHandler(this);
3005  if (_serverVersion >= minFlush)
3006  {
3007  Lock<Mutex> l(_lock);
3008  if (!_connected)
3009  {
3010  throw DisconnectedException("Not connected trying to flush");
3011  }
3012  _message.reset();
3013  _message.newCommandId();
3014  _message.assignCommand(flush, flushLen);
3015  if (_serverVersion < minPersisted
3016  || ackType_ == Message::AckType::Processed)
3017  {
3018  _message.assignAckType(processed, processedLen);
3019  }
3020  else
3021  {
3022  _message.assignAckType(persisted, persistedLen);
3023  }
3024  flushHandler.setCommandId(_message.getCommandId());
3025  addMessageHandler(_message.getCommandId(),
3026  std::bind(&FlushAckHandler::invoke,
3027  std::ref(flushHandler),
3028  std::placeholders::_1),
3029  ackType_, _message.getCommandEnum());
3030  NoDelay noDelay(_client);
3031  if (_send(_message) == -1)
3032  {
3033  throw DisconnectedException("Disconnected trying to flush");
3034  }
3035  }
3036  if (_publishStore.isValid())
3037  {
3038  try
3039  {
3040  _publishStore.flush(timeout_);
3041  }
3042  catch (const AMPSException& ex)
3043  {
3044  AMPS_UNHANDLED_EXCEPTION(ex);
3045  throw;
3046  }
3047  }
3048  else if (_serverVersion < minFlush)
3049  {
3050  if (timeout_ > 0)
3051  {
3052  AMPS_USLEEP(timeout_ * 1000);
3053  }
3054  else
3055  {
3056  AMPS_USLEEP(1000 * 1000);
3057  }
3058  return;
3059  }
3060  if (timeout_)
3061  {
3062  Timer timer((double)timeout_);
3063  timer.start();
3064  while (!timer.check() && !flushHandler.done())
3065  {
3066  AMPS_USLEEP(10000);
3067  amps_invoke_waiting_function();
3068  }
3069  }
3070  else
3071  {
3072  while (!flushHandler.done())
3073  {
3074  AMPS_USLEEP(10000);
3075  amps_invoke_waiting_function();
3076  }
3077  }
3078  // No response or disconnect in timeout interval
3079  if (!flushHandler.done())
3080  {
3081  throw TimedOutException("Timed out waiting for flush");
3082  }
3083  // We got disconnected and there is no publish store
3084  if (!flushHandler.acked() && !_publishStore.isValid())
3085  {
3086  throw DisconnectedException("Disconnected waiting for flush");
3087  }
3088  }
3089 
3090  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3091  const char* data_, size_t dataLength_)
3092  {
3093  if (!_publishStore.isValid())
3094  {
3095  Lock<Mutex> l(_lock);
3096  _deltaMessage.assignTopic(topic_, topicLength_);
3097  _deltaMessage.assignData(data_, dataLength_);
3098  _send(_deltaMessage);
3099  return 0;
3100  }
3101  else
3102  {
3103  publishStoreMessage.reset();
3104  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish);
3105  return _publish(topic_, topicLength_, data_, dataLength_);
3106  }
3107  }
3108 
3109  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3110  const char* data_, size_t dataLength_,
3111  unsigned long expiration_)
3112  {
3113  if (!_publishStore.isValid())
3114  {
3115  Lock<Mutex> l(_lock);
3116  _deltaMessage.assignTopic(topic_, topicLength_);
3117  _deltaMessage.assignData(data_, dataLength_);
3118  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3119  size_t pos = convertToCharArray(exprBuf, expiration_);
3120  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3121  _send(_deltaMessage);
3122  _deltaMessage.assignExpiration(NULL, 0);
3123  return 0;
3124  }
3125  else
3126  {
3127  publishStoreMessage.reset();
3128  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3129  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3130  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish)
3131  .assignExpiration(exprBuf + exprPos,
3132  AMPS_NUMBER_BUFFER_LEN - exprPos);
3133  return _publish(topic_, topicLength_, data_, dataLength_);
3134  }
3135  }
3136 
3137  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3138  const char* data_, size_t dataLength_)
3139  {
3140  publishStoreMessage.assignTopic(topic_, topicLength_)
3141  .setAckTypeEnum(Message::AckType::Persisted)
3142  .assignData(data_, dataLength_);
3143  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3144  char buf[AMPS_NUMBER_BUFFER_LEN];
3145  size_t pos = convertToCharArray(buf, haSequenceNumber);
3146  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3147  {
3148  Lock<Mutex> l(_lock);
3149  _send(publishStoreMessage, haSequenceNumber);
3150  }
3151  return haSequenceNumber;
3152  }
3153 
3154  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3155  const char* options_ = NULL)
3156  {
3157  Lock<Mutex> l(_lock);
3158  return _logon(timeout_, authenticator_, options_);
3159  }
3160 
3161  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3162  const char* options_ = NULL)
3163  {
3164  _message.reset();
3165  _message.newCommandId();
3166  std::string newCommandId = _message.getCommandId();
3167  _message.setCommandEnum(Message::Command::Logon);
3168  _message.setClientName(_name);
3169 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3170  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3171  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3172 #endif
3173  URI uri(_lastUri);
3174  if (uri.user().size())
3175  {
3176  _message.setUserId(uri.user());
3177  }
3178  if (uri.password().size())
3179  {
3180  _message.setPassword(uri.password());
3181  }
3182  if (uri.protocol() == "amps" && uri.messageType().size())
3183  {
3184  _message.setMessageType(uri.messageType());
3185  }
3186  if (uri.isTrue("pretty"))
3187  {
3188  _message.setOptions("pretty");
3189  }
3190 
3191  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3192  if (!_logonCorrelationData.empty())
3193  {
3194  _message.assignCorrelationId(_logonCorrelationData);
3195  }
3196  if (options_)
3197  {
3198  _message.setOptions(options_);
3199  }
3200  _username = _message.getUserId();
3201  try
3202  {
3203  AtomicFlagFlip pubFlip(&_logonInProgress);
3204  NoDelay noDelay(_client);
3205  while (true)
3206  {
3207  _message.setAckTypeEnum(Message::AckType::Processed);
3208  AckResponse ack = syncAckProcessing(timeout_, _message);
3209  if (ack.status() == "retry")
3210  {
3211  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3212  _username = ack.username();
3213  _message.setUserId(_username);
3214  }
3215  else
3216  {
3217  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3218  break;
3219  }
3220  }
3221  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3222 
3223  // Now re-send the heartbeat command if configured
3224  _sendHeartbeat();
3225  // Signal any threads waiting for _logonInProgress
3226  _lock.signalAll();
3227  }
3228  catch (const AMPSException& ex)
3229  {
3230  _lock.signalAll();
3231  AMPS_UNHANDLED_EXCEPTION(ex);
3232  throw;
3233  }
3234  catch (...)
3235  {
3236  _lock.signalAll();
3237  throw;
3238  }
3239 
3240  if (_publishStore.isValid())
3241  {
3242  try
3243  {
3244  _publishStore.replay(_replayer);
3245  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3246  }
3247  catch (const PublishStoreGapException& ex)
3248  {
3249  _lock.signalAll();
3250  AMPS_UNHANDLED_EXCEPTION(ex);
3251  throw;
3252  }
3253  catch (const StoreException& ex)
3254  {
3255  _lock.signalAll();
3256  std::ostringstream os;
3257  os << "A local store exception occurred while logging on."
3258  << ex.toString();
3259  throw ConnectionException(os.str());
3260  }
3261  catch (const AMPSException& ex)
3262  {
3263  _lock.signalAll();
3264  AMPS_UNHANDLED_EXCEPTION(ex);
3265  throw;
3266  }
3267  catch (const std::exception& ex)
3268  {
3269  _lock.signalAll();
3270  AMPS_UNHANDLED_EXCEPTION(ex);
3271  throw;
3272  }
3273  catch (...)
3274  {
3275  _lock.signalAll();
3276  throw;
3277  }
3278  }
3279  _lock.signalAll();
3280  return newCommandId;
3281  }
3282 
3283  std::string subscribe(const MessageHandler& messageHandler_,
3284  const std::string& topic_,
3285  long timeout_,
3286  const std::string& filter_,
3287  const std::string& bookmark_,
3288  const std::string& options_,
3289  const std::string& subId_,
3290  bool isHASubscribe_ = true)
3291  {
3292  isHASubscribe_ &= (bool)_subscriptionManager;
3293  Lock<Mutex> l(_lock);
3294  _message.reset();
3295  _message.setCommandEnum(Message::Command::Subscribe);
3296  _message.newCommandId();
3297  std::string subId(subId_);
3298  if (subId.empty())
3299  {
3300  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3301  {
3302  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3303  }
3304 
3305  subId = _message.getCommandId();
3306  }
3307  _message.setSubscriptionId(subId);
3308  // we need to deep copy this before sending the message; while we are
3309  // waiting for a response, the fields in _message may get blown away for
3310  // other operations.
3311  AMPS::Message::Field subIdField(subId);
3312  unsigned ackTypes = Message::AckType::Processed;
3313 
3314  if (!bookmark_.empty() && _bookmarkStore.isValid())
3315  {
3316  ackTypes |= Message::AckType::Persisted;
3317  }
3318  _message.setTopic(topic_);
3319 
3320  if (filter_.length())
3321  {
3322  _message.setFilter(filter_);
3323  }
3324  if (bookmark_.length())
3325  {
3326  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3327  {
3328  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3329  _message.setBookmark(mostRecent);
3330  }
3331  else
3332  {
3333  _message.setBookmark(bookmark_);
3334  if (_bookmarkStore.isValid())
3335  {
3336  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3337  bookmark_ != AMPS_BOOKMARK_EPOCH)
3338  {
3339  _bookmarkStore.log(_message);
3340  _bookmarkStore.discard(_message);
3341  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3342  }
3343  }
3344  }
3345  }
3346  if (options_.length())
3347  {
3348  _message.setOptions(options_);
3349  }
3350 
3351  Message message = _message;
3352  if (isHASubscribe_)
3353  {
3354  message = _message.deepCopy();
3355  Unlock<Mutex> u(_lock);
3356  _subscriptionManager->subscribe(messageHandler_, message,
3357  Message::AckType::None);
3358  if (_badTimeToHASubscribe)
3359  {
3360  return subId;
3361  }
3362  }
3363  if (!_routes.hasRoute(_message.getSubscriptionId()))
3364  {
3365  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3366  Message::AckType::None, ackTypes, _message.getCommandEnum());
3367  }
3368  message.setAckTypeEnum(ackTypes);
3369  if (!options_.empty())
3370  {
3371  message.setOptions(options_);
3372  }
3373  try
3374  {
3375  syncAckProcessing(timeout_, message, isHASubscribe_);
3376  }
3377  catch (const DisconnectedException&)
3378  {
3379  if (!isHASubscribe_)
3380  {
3381  _routes.removeRoute(subIdField);
3382  throw;
3383  }
3384  else
3385  {
3386  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3387  throw;
3388  }
3389  }
3390  catch (const TimedOutException&)
3391  {
3392  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3393  throw;
3394  }
3395  catch (...)
3396  {
3397  if (isHASubscribe_)
3398  {
3399  // Have to unlock before calling into sub manager to avoid deadlock
3400  Unlock<Mutex> unlock(_lock);
3401  _subscriptionManager->unsubscribe(subIdField);
3402  }
3403  _routes.removeRoute(subIdField);
3404  throw;
3405  }
3406 
3407  return subId;
3408  }
3409  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3410  const std::string& topic_,
3411  long timeout_,
3412  const std::string& filter_,
3413  const std::string& bookmark_,
3414  const std::string& options_,
3415  const std::string& subId_ = "",
3416  bool isHASubscribe_ = true)
3417  {
3418  isHASubscribe_ &= (bool)_subscriptionManager;
3419  Lock<Mutex> l(_lock);
3420  _message.reset();
3421  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3422  _message.newCommandId();
3423  std::string subId(subId_);
3424  if (subId.empty())
3425  {
3426  subId = _message.getCommandId();
3427  }
3428  _message.setSubscriptionId(subId);
3429  // we need to deep copy this before sending the message; while we are
3430  // waiting for a response, the fields in _message may get blown away for
3431  // other operations.
3432  AMPS::Message::Field subIdField(subId);
3433  unsigned ackTypes = Message::AckType::Processed;
3434 
3435  if (!bookmark_.empty() && _bookmarkStore.isValid())
3436  {
3437  ackTypes |= Message::AckType::Persisted;
3438  }
3439  _message.setTopic(topic_);
3440  if (filter_.length())
3441  {
3442  _message.setFilter(filter_);
3443  }
3444  if (bookmark_.length())
3445  {
3446  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3447  {
3448  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3449  _message.setBookmark(mostRecent);
3450  }
3451  else
3452  {
3453  _message.setBookmark(bookmark_);
3454  if (_bookmarkStore.isValid())
3455  {
3456  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3457  bookmark_ != AMPS_BOOKMARK_EPOCH)
3458  {
3459  _bookmarkStore.log(_message);
3460  _bookmarkStore.discard(_message);
3461  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3462  }
3463  }
3464  }
3465  }
3466  if (options_.length())
3467  {
3468  _message.setOptions(options_);
3469  }
3470  Message message = _message;
3471  if (isHASubscribe_)
3472  {
3473  message = _message.deepCopy();
3474  Unlock<Mutex> u(_lock);
3475  _subscriptionManager->subscribe(messageHandler_, message,
3476  Message::AckType::None);
3477  if (_badTimeToHASubscribe)
3478  {
3479  return subId;
3480  }
3481  }
3482  if (!_routes.hasRoute(_message.getSubscriptionId()))
3483  {
3484  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3485  Message::AckType::None, ackTypes, _message.getCommandEnum());
3486  }
3487  message.setAckTypeEnum(ackTypes);
3488  if (!options_.empty())
3489  {
3490  message.setOptions(options_);
3491  }
3492  try
3493  {
3494  syncAckProcessing(timeout_, message, isHASubscribe_);
3495  }
3496  catch (const DisconnectedException&)
3497  {
3498  if (!isHASubscribe_)
3499  {
3500  _routes.removeRoute(subIdField);
3501  throw;
3502  }
3503  }
3504  catch (const TimedOutException&)
3505  {
3506  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3507  throw;
3508  }
3509  catch (...)
3510  {
3511  if (isHASubscribe_)
3512  {
3513  // Have to unlock before calling into sub manager to avoid deadlock
3514  Unlock<Mutex> unlock(_lock);
3515  _subscriptionManager->unsubscribe(subIdField);
3516  }
3517  _routes.removeRoute(subIdField);
3518  throw;
3519  }
3520  return subId;
3521  }
3522 
3523  void unsubscribe(const std::string& id)
3524  {
3525  Lock<Mutex> l(_lock);
3526  unsubscribeInternal(id);
3527  }
3528 
3529  void unsubscribe(void)
3530  {
3531  if (_subscriptionManager)
3532  {
3533  _subscriptionManager->clear();
3534  }
3535  {
3536  _routes.unsubscribeAll();
3537  Lock<Mutex> l(_lock);
3538  _message.reset();
3539  _message.setCommandEnum(Message::Command::Unsubscribe);
3540  _message.newCommandId();
3541  _message.setSubscriptionId("all");
3542  _sendWithoutRetry(_message);
3543  }
3544  deferredExecution(&amps_noOpFn, NULL);
3545  }
3546 
3547  std::string sow(const MessageHandler& messageHandler_,
3548  const std::string& topic_,
3549  const std::string& filter_ = "",
3550  const std::string& orderBy_ = "",
3551  const std::string& bookmark_ = "",
3552  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3553  int topN_ = AMPS_DEFAULT_TOP_N,
3554  const std::string& options_ = "",
3555  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3556  {
3557  Lock<Mutex> l(_lock);
3558  _message.reset();
3559  _message.setCommandEnum(Message::Command::SOW);
3560  _message.newCommandId();
3561  // need to keep our own copy of the command ID.
3562  std::string commandId = _message.getCommandId();
3563  _message.setQueryID(_message.getCommandId());
3564  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3565  _message.setAckTypeEnum(ackTypes);
3566  _message.setTopic(topic_);
3567  if (filter_.length())
3568  {
3569  _message.setFilter(filter_);
3570  }
3571  if (orderBy_.length())
3572  {
3573  _message.setOrderBy(orderBy_);
3574  }
3575  if (bookmark_.length())
3576  {
3577  _message.setBookmark(bookmark_);
3578  }
3579  _message.setBatchSize(AMPS::asString(batchSize_));
3580  if (topN_ != AMPS_DEFAULT_TOP_N)
3581  {
3582  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3583  }
3584  if (options_.length())
3585  {
3586  _message.setOptions(options_);
3587  }
3588 
3589  _routes.addRoute(_message.getQueryID(), messageHandler_,
3590  Message::AckType::None, ackTypes, _message.getCommandEnum());
3591 
3592  try
3593  {
3594  syncAckProcessing(timeout_, _message);
3595  }
3596  catch (...)
3597  {
3598  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3599  throw;
3600  }
3601 
3602  return commandId;
3603  }
3604 
3605  std::string sow(const MessageHandler& messageHandler_,
3606  const std::string& topic_,
3607  long timeout_,
3608  const std::string& filter_ = "",
3609  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3610  int topN_ = AMPS_DEFAULT_TOP_N)
3611  {
3612  std::string notSet;
3613  return sow(messageHandler_,
3614  topic_,
3615  filter_,
3616  notSet, // orderBy
3617  notSet, // bookmark
3618  batchSize_,
3619  topN_,
3620  notSet,
3621  timeout_);
3622  }
3623 
3624  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3625  const std::string& topic_,
3626  const std::string& filter_ = "",
3627  const std::string& orderBy_ = "",
3628  const std::string& bookmark_ = "",
3629  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3630  int topN_ = AMPS_DEFAULT_TOP_N,
3631  const std::string& options_ = "",
3632  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3633  bool isHASubscribe_ = true)
3634  {
3635  isHASubscribe_ &= (bool)_subscriptionManager;
3636  unsigned ackTypes = Message::AckType::Processed;
3637  Lock<Mutex> l(_lock);
3638  _message.reset();
3639  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3640  _message.newCommandId();
3641  Field cid = _message.getCommandId();
3642  std::string subId = cid;
3643  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3644  if (filter_.length())
3645  {
3646  _message.setFilter(filter_);
3647  }
3648  if (orderBy_.length())
3649  {
3650  _message.setOrderBy(orderBy_);
3651  }
3652  if (bookmark_.length())
3653  {
3654  _message.setBookmark(bookmark_);
3655  Message::Field bookmark = _message.getBookmark();
3656  if (_bookmarkStore.isValid())
3657  {
3658  ackTypes |= Message::AckType::Persisted;
3659  if (bookmark == AMPS_BOOKMARK_RECENT)
3660  {
3661  _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3662  }
3663  else if (bookmark != AMPS_BOOKMARK_NOW &&
3664  bookmark != AMPS_BOOKMARK_EPOCH)
3665  {
3666  _bookmarkStore.log(_message);
3667  if (!BookmarkRange::isRange(bookmark))
3668  {
3669  _bookmarkStore.discard(_message);
3670  _bookmarkStore.persisted(_message.getSubscriptionId(),
3671  bookmark);
3672  }
3673  }
3674  }
3675  else if (bookmark == AMPS_BOOKMARK_RECENT)
3676  {
3677  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3678  }
3679  }
3680  _message.setBatchSize(AMPS::asString(batchSize_));
3681  if (topN_ != AMPS_DEFAULT_TOP_N)
3682  {
3683  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3684  }
3685  if (options_.length())
3686  {
3687  _message.setOptions(options_);
3688  }
3689 
3690  Message message = _message;
3691  if (isHASubscribe_)
3692  {
3693  message = _message.deepCopy();
3694  Unlock<Mutex> u(_lock);
3695  _subscriptionManager->subscribe(messageHandler_, message,
3696  Message::AckType::None);
3697  if (_badTimeToHASubscribe)
3698  {
3699  return subId;
3700  }
3701  }
3702  _routes.addRoute(cid, messageHandler_,
3703  Message::AckType::None, ackTypes, _message.getCommandEnum());
3704  message.setAckTypeEnum(ackTypes);
3705  if (!options_.empty())
3706  {
3707  message.setOptions(options_);
3708  }
3709  try
3710  {
3711  syncAckProcessing(timeout_, message, isHASubscribe_);
3712  }
3713  catch (const DisconnectedException&)
3714  {
3715  if (!isHASubscribe_)
3716  {
3717  _routes.removeRoute(subId);
3718  throw;
3719  }
3720  }
3721  catch (const TimedOutException&)
3722  {
3723  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3724  throw;
3725  }
3726  catch (...)
3727  {
3728  if (isHASubscribe_)
3729  {
3730  // Have to unlock before calling into sub manager to avoid deadlock
3731  Unlock<Mutex> unlock(_lock);
3732  _subscriptionManager->unsubscribe(cid);
3733  }
3734  _routes.removeRoute(subId);
3735  throw;
3736  }
3737  return subId;
3738  }
3739 
3740  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3741  const std::string& topic_,
3742  long timeout_,
3743  const std::string& filter_ = "",
3744  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3745  bool oofEnabled_ = false,
3746  int topN_ = AMPS_DEFAULT_TOP_N,
3747  bool isHASubscribe_ = true)
3748  {
3749  std::string notSet;
3750  return sowAndSubscribe(messageHandler_,
3751  topic_,
3752  filter_,
3753  notSet, // orderBy
3754  notSet, // bookmark
3755  batchSize_,
3756  topN_,
3757  (oofEnabled_ ? "oof" : ""),
3758  timeout_,
3759  isHASubscribe_);
3760  }
3761 
3762  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3763  const std::string& topic_,
3764  const std::string& filter_ = "",
3765  const std::string& orderBy_ = "",
3766  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3767  int topN_ = AMPS_DEFAULT_TOP_N,
3768  const std::string& options_ = "",
3769  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3770  bool isHASubscribe_ = true)
3771  {
3772  isHASubscribe_ &= (bool)_subscriptionManager;
3773  Lock<Mutex> l(_lock);
3774  _message.reset();
3775  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3776  _message.newCommandId();
3777  _message.setQueryID(_message.getCommandId());
3778  _message.setSubscriptionId(_message.getCommandId());
3779  std::string subId = _message.getSubscriptionId();
3780  _message.setTopic(topic_);
3781  if (filter_.length())
3782  {
3783  _message.setFilter(filter_);
3784  }
3785  if (orderBy_.length())
3786  {
3787  _message.setOrderBy(orderBy_);
3788  }
3789  _message.setBatchSize(AMPS::asString(batchSize_));
3790  if (topN_ != AMPS_DEFAULT_TOP_N)
3791  {
3792  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3793  }
3794  if (options_.length())
3795  {
3796  _message.setOptions(options_);
3797  }
3798  Message message = _message;
3799  if (isHASubscribe_)
3800  {
3801  message = _message.deepCopy();
3802  Unlock<Mutex> u(_lock);
3803  _subscriptionManager->subscribe(messageHandler_, message,
3804  Message::AckType::None);
3805  if (_badTimeToHASubscribe)
3806  {
3807  return subId;
3808  }
3809  }
3810  _routes.addRoute(message.getQueryID(), messageHandler_,
3811  Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3812  message.setAckTypeEnum(Message::AckType::Processed);
3813  if (!options_.empty())
3814  {
3815  message.setOptions(options_);
3816  }
3817  try
3818  {
3819  syncAckProcessing(timeout_, message, isHASubscribe_);
3820  }
3821  catch (const DisconnectedException&)
3822  {
3823  if (!isHASubscribe_)
3824  {
3825  _routes.removeRoute(subId);
3826  throw;
3827  }
3828  }
3829  catch (const TimedOutException&)
3830  {
3831  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3832  throw;
3833  }
3834  catch (...)
3835  {
3836  if (isHASubscribe_)
3837  {
3838  // Have to unlock before calling into sub manager to avoid deadlock
3839  Unlock<Mutex> unlock(_lock);
3840  _subscriptionManager->unsubscribe(Field(subId));
3841  }
3842  _routes.removeRoute(subId);
3843  throw;
3844  }
3845  return subId;
3846  }
3847 
3848  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3849  const std::string& topic_,
3850  long timeout_,
3851  const std::string& filter_ = "",
3852  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3853  bool oofEnabled_ = false,
3854  bool sendEmpties_ = false,
3855  int topN_ = AMPS_DEFAULT_TOP_N,
3856  bool isHASubscribe_ = true)
3857  {
3858  std::string notSet;
3859  Message::Options options;
3860  if (oofEnabled_)
3861  {
3862  options.setOOF();
3863  }
3864  if (sendEmpties_ == false)
3865  {
3866  options.setNoEmpties();
3867  }
3868  return sowAndDeltaSubscribe(messageHandler_,
3869  topic_,
3870  filter_,
3871  notSet, // orderBy
3872  batchSize_,
3873  topN_,
3874  options,
3875  timeout_,
3876  isHASubscribe_);
3877  }
3878 
3879  std::string sowDelete(const MessageHandler& messageHandler_,
3880  const std::string& topic_,
3881  const std::string& filter_,
3882  long timeout_,
3883  Message::Field commandId_ = Message::Field())
3884  {
3885  if (_publishStore.isValid())
3886  {
3887  unsigned ackType = Message::AckType::Processed |
3888  Message::AckType::Stats |
3889  Message::AckType::Persisted;
3890  publishStoreMessage.reset();
3891  if (commandId_.empty())
3892  {
3893  publishStoreMessage.newCommandId();
3894  commandId_ = publishStoreMessage.getCommandId();
3895  }
3896  else
3897  {
3898  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3899  }
3900  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3901  .assignSubscriptionId(commandId_.data(), commandId_.len())
3902  .assignQueryID(commandId_.data(), commandId_.len())
3903  .setAckTypeEnum(ackType)
3904  .assignTopic(topic_.c_str(), topic_.length())
3905  .assignFilter(filter_.c_str(), filter_.length());
3906  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3907  char buf[AMPS_NUMBER_BUFFER_LEN];
3908  size_t pos = convertToCharArray(buf, haSequenceNumber);
3909  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3910  {
3911  try
3912  {
3913  Lock<Mutex> l(_lock);
3914  _routes.addRoute(commandId_, messageHandler_,
3915  Message::AckType::Stats,
3916  Message::AckType::Processed | Message::AckType::Persisted,
3917  publishStoreMessage.getCommandEnum());
3918  syncAckProcessing(timeout_, publishStoreMessage,
3919  haSequenceNumber);
3920  }
3921  catch (const DisconnectedException&)
3922  {
3923  // -V565
3924  // Pass - it will get replayed upon reconnect
3925  }
3926  catch (...)
3927  {
3928  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3929  throw;
3930  }
3931  }
3932  return (std::string)commandId_;
3933  }
3934  else
3935  {
3936  Lock<Mutex> l(_lock);
3937  _message.reset();
3938  if (commandId_.empty())
3939  {
3940  _message.newCommandId();
3941  commandId_ = _message.getCommandId();
3942  }
3943  else
3944  {
3945  _message.setCommandId(commandId_.data(), commandId_.len());
3946  }
3947  _message.setCommandEnum(Message::Command::SOWDelete)
3948  .assignSubscriptionId(commandId_.data(), commandId_.len())
3949  .assignQueryID(commandId_.data(), commandId_.len())
3950  .setAckTypeEnum(Message::AckType::Processed |
3951  Message::AckType::Stats)
3952  .assignTopic(topic_.c_str(), topic_.length())
3953  .assignFilter(filter_.c_str(), filter_.length());
3954  _routes.addRoute(commandId_, messageHandler_,
3955  Message::AckType::Stats,
3956  Message::AckType::Processed,
3957  _message.getCommandEnum());
3958  try
3959  {
3960  syncAckProcessing(timeout_, _message);
3961  }
3962  catch (...)
3963  {
3964  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3965  throw;
3966  }
3967  return (std::string)commandId_;
3968  }
3969  }
3970 
3971  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3972  const std::string& topic_,
3973  const std::string& data_,
3974  long timeout_,
3975  Message::Field commandId_ = Message::Field())
3976  {
3977  if (_publishStore.isValid())
3978  {
3979  unsigned ackType = Message::AckType::Processed |
3980  Message::AckType::Stats |
3981  Message::AckType::Persisted;
3982  publishStoreMessage.reset();
3983  if (commandId_.empty())
3984  {
3985  publishStoreMessage.newCommandId();
3986  commandId_ = publishStoreMessage.getCommandId();
3987  }
3988  else
3989  {
3990  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3991  }
3992  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3993  .assignSubscriptionId(commandId_.data(), commandId_.len())
3994  .assignQueryID(commandId_.data(), commandId_.len())
3995  .setAckTypeEnum(ackType)
3996  .assignTopic(topic_.c_str(), topic_.length())
3997  .assignData(data_.c_str(), data_.length());
3998  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3999  char buf[AMPS_NUMBER_BUFFER_LEN];
4000  size_t pos = convertToCharArray(buf, haSequenceNumber);
4001  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4002  {
4003  try
4004  {
4005  Lock<Mutex> l(_lock);
4006  _routes.addRoute(commandId_, messageHandler_,
4007  Message::AckType::Stats,
4008  Message::AckType::Processed | Message::AckType::Persisted,
4009  publishStoreMessage.getCommandEnum());
4010  syncAckProcessing(timeout_, publishStoreMessage,
4011  haSequenceNumber);
4012  }
4013  catch (const DisconnectedException&)
4014  {
4015  // -V565
4016  // Pass - it will get replayed upon reconnect
4017  }
4018  catch (...)
4019  {
4020  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4021  throw;
4022  }
4023  }
4024  return (std::string)commandId_;
4025  }
4026  else
4027  {
4028  Lock<Mutex> l(_lock);
4029  _message.reset();
4030  if (commandId_.empty())
4031  {
4032  _message.newCommandId();
4033  commandId_ = _message.getCommandId();
4034  }
4035  else
4036  {
4037  _message.setCommandId(commandId_.data(), commandId_.len());
4038  }
4039  _message.setCommandEnum(Message::Command::SOWDelete)
4040  .assignSubscriptionId(commandId_.data(), commandId_.len())
4041  .assignQueryID(commandId_.data(), commandId_.len())
4042  .setAckTypeEnum(Message::AckType::Processed |
4043  Message::AckType::Stats)
4044  .assignTopic(topic_.c_str(), topic_.length())
4045  .assignData(data_.c_str(), data_.length());
4046  _routes.addRoute(commandId_, messageHandler_,
4047  Message::AckType::Stats,
4048  Message::AckType::Processed,
4049  _message.getCommandEnum());
4050  try
4051  {
4052  syncAckProcessing(timeout_, _message);
4053  }
4054  catch (...)
4055  {
4056  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4057  throw;
4058  }
4059  return (std::string)commandId_;
4060  }
4061  }
4062 
4063  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4064  const std::string& topic_,
4065  const std::string& keys_,
4066  long timeout_,
4067  Message::Field commandId_ = Message::Field())
4068  {
4069  if (_publishStore.isValid())
4070  {
4071  unsigned ackType = Message::AckType::Processed |
4072  Message::AckType::Stats |
4073  Message::AckType::Persisted;
4074  publishStoreMessage.reset();
4075  if (commandId_.empty())
4076  {
4077  publishStoreMessage.newCommandId();
4078  commandId_ = publishStoreMessage.getCommandId();
4079  }
4080  else
4081  {
4082  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4083  }
4084  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4085  .assignSubscriptionId(commandId_.data(), commandId_.len())
4086  .assignQueryID(commandId_.data(), commandId_.len())
4087  .setAckTypeEnum(ackType)
4088  .assignTopic(topic_.c_str(), topic_.length())
4089  .assignSowKeys(keys_.c_str(), keys_.length());
4090  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4091  char buf[AMPS_NUMBER_BUFFER_LEN];
4092  size_t pos = convertToCharArray(buf, haSequenceNumber);
4093  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4094  {
4095  try
4096  {
4097  Lock<Mutex> l(_lock);
4098  _routes.addRoute(commandId_, messageHandler_,
4099  Message::AckType::Stats,
4100  Message::AckType::Processed | Message::AckType::Persisted,
4101  publishStoreMessage.getCommandEnum());
4102  syncAckProcessing(timeout_, publishStoreMessage,
4103  haSequenceNumber);
4104  }
4105  catch (const DisconnectedException&)
4106  {
4107  // -V565
4108  // Pass - it will get replayed upon reconnect
4109  }
4110  catch (...)
4111  {
4112  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4113  throw;
4114  }
4115  }
4116  return (std::string)commandId_;
4117  }
4118  else
4119  {
4120  Lock<Mutex> l(_lock);
4121  _message.reset();
4122  if (commandId_.empty())
4123  {
4124  _message.newCommandId();
4125  commandId_ = _message.getCommandId();
4126  }
4127  else
4128  {
4129  _message.setCommandId(commandId_.data(), commandId_.len());
4130  }
4131  _message.setCommandEnum(Message::Command::SOWDelete)
4132  .assignSubscriptionId(commandId_.data(), commandId_.len())
4133  .assignQueryID(commandId_.data(), commandId_.len())
4134  .setAckTypeEnum(Message::AckType::Processed |
4135  Message::AckType::Stats)
4136  .assignTopic(topic_.c_str(), topic_.length())
4137  .assignSowKeys(keys_.c_str(), keys_.length());
4138  _routes.addRoute(commandId_, messageHandler_,
4139  Message::AckType::Stats,
4140  Message::AckType::Processed,
4141  _message.getCommandEnum());
4142  try
4143  {
4144  syncAckProcessing(timeout_, _message);
4145  }
4146  catch (...)
4147  {
4148  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4149  throw;
4150  }
4151  return (std::string)commandId_;
4152  }
4153  }
4154 
4155  void startTimer(void)
4156  {
4157  if (_serverVersion >= "5.3.2.0")
4158  {
4159  throw CommandException("The start_timer command is deprecated.");
4160  }
4161  Lock<Mutex> l(_lock);
4162  _message.reset();
4163  _message.setCommandEnum(Message::Command::StartTimer);
4164 
4165  _send(_message);
4166  }
4167 
4168  std::string stopTimer(MessageHandler messageHandler_)
4169  {
4170  if (_serverVersion >= "5.3.2.0")
4171  {
4172  throw CommandException("The stop_timer command is deprecated.");
4173  }
4174  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4175  }
4176 
4177  amps_handle getHandle(void)
4178  {
4179  return _client;
4180  }
4181 
4189  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4190  {
4191  _pExceptionListener = pListener_;
4192  _exceptionListener = _pExceptionListener.get();
4193  }
4194 
4195  void setExceptionListener(const ExceptionListener& listener_)
4196  {
4197  _exceptionListener = &listener_;
4198  }
4199 
4200  const ExceptionListener& getExceptionListener(void) const
4201  {
4202  return *_exceptionListener;
4203  }
4204 
4205  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4206  {
4207  if (readTimeout_ < heartbeatInterval_)
4208  {
4209  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4210  }
4211  Lock<Mutex> l(_lock);
4212  if (_heartbeatInterval != heartbeatInterval_ ||
4213  _readTimeout != readTimeout_)
4214  {
4215  _heartbeatInterval = heartbeatInterval_;
4216  _readTimeout = readTimeout_;
4217  _sendHeartbeat();
4218  }
4219  }
4220 
4221  void _sendHeartbeat(void)
4222  {
4223  if (_connected && _heartbeatInterval != 0)
4224  {
4225  std::ostringstream options;
4226  options << "start," << _heartbeatInterval;
4227  _beatMessage.setOptions(options.str());
4228 
4229  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4230  _heartbeatTimer.start();
4231  try
4232  {
4233  _sendWithoutRetry(_beatMessage);
4234  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4235  }
4236  catch (ConnectionException& ex_)
4237  {
4238  // If we are disconnected when we attempt to send, that's OK;
4239  // we'll send this message after we re-connect (if we do).
4240  AMPS_UNHANDLED_EXCEPTION(ex_);
4241  }
4242  _beatMessage.setOptions("beat");
4243  }
4244  amps_result result = AMPS_E_OK;
4245  if (_readTimeout && _connected)
4246  {
4247  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4248  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4249  {
4250  AMPSException::throwFor(_client, result);
4251  }
4252  if (!_queueAckTimeout)
4253  {
4254  result = amps_client_set_idle_time(_client,
4255  (int)(_heartbeatInterval * 1000));
4256  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4257  {
4258  AMPSException::throwFor(_client, result);
4259  }
4260  }
4261  }
4262  }
4263 
4264  void addConnectionStateListener(ConnectionStateListener* listener_)
4265  {
4266  Lock<Mutex> lock(_lock);
4267  _connectionStateListeners.insert(listener_);
4268  }
4269 
4270  void removeConnectionStateListener(ConnectionStateListener* listener_)
4271  {
4272  Lock<Mutex> lock(_lock);
4273  _connectionStateListeners.erase(listener_);
4274  }
4275 
4276  void clearConnectionStateListeners()
4277  {
4278  Lock<Mutex> lock(_lock);
4279  _connectionStateListeners.clear();
4280  }
4281 
4282  void _registerHandler(Command& command_, Message::Field& cid_,
4283  MessageHandler& handler_, unsigned requestedAcks_,
4284  unsigned systemAddedAcks_, Message::Command::Type commandType_)
4285  {
4286  Message message = command_.getMessage();
4287  Message::Command::Type commandType = message.getCommandEnum();
4288  Message::Field subid = message.getSubscriptionId();
4289  Message::Field qid = message.getQueryID();
4290  // If we have an id, we're good, even if it's an existing route
4291  bool added = qid.len() || subid.len() || cid_.len();
4292  bool cidIsQid = cid_ == qid;
4293  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4294  int addedCount = 0;
4295  if (subid.len() > 0)
4296  {
4297  // This can replace a non-subscribe with a matching id
4298  // with a subscription but not another subscription.
4299  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4300  systemAddedAcks_, commandType_);
4301  if (!cidUnique
4302  && (commandType == Message::Command::Subscribe
4303  || commandType == Message::Command::DeltaSubscribe))
4304  {
4305  // We don't need to do anything else
4306  cid_ = subid;
4307  return;
4308  }
4309  }
4310  if (qid.len() > 0 && qid != subid
4311  && (commandType == Message::Command::SOW
4312  || commandType == Message::Command::SOWDelete
4313  || commandType == Message::Command::SOWAndSubscribe
4314  || commandType == Message::Command::SOWAndDeltaSubscribe))
4315  {
4316  while (_routes.hasRoute(qid))
4317  {
4318  message.newQueryId();
4319  if (cidIsQid)
4320  {
4321  cid_ = message.getQueryId();
4322  }
4323  qid = message.getQueryId();
4324  }
4325  if (addedCount == 0)
4326  {
4327  _routes.addRoute(qid, handler_, requestedAcks_,
4328  systemAddedAcks_, commandType_);
4329  }
4330  else
4331  {
4332  void* data = NULL;
4333  {
4334  Unlock<Mutex> u(_lock);
4335  data = amps_invoke_copy_route_function(handler_.userData());
4336  }
4337  if (!data)
4338  {
4339  _routes.addRoute(qid, handler_, requestedAcks_,
4340  systemAddedAcks_, commandType_);
4341  }
4342  else
4343  {
4344  _routes.addRoute(qid,
4345  MessageHandler(handler_.function(),
4346  data),
4347  requestedAcks_,
4348  systemAddedAcks_, commandType_);
4349  }
4350  }
4351  ++addedCount;
4352  }
4353  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4354  {
4355  while (_routes.hasRoute(cid_))
4356  {
4357  cid_ = message.newCommandId().getCommandId();
4358  }
4359  if (addedCount == 0)
4360  {
4361  _routes.addRoute(cid_, handler_, requestedAcks_,
4362  systemAddedAcks_, commandType_);
4363  }
4364  else
4365  {
4366  void* data = NULL;
4367  {
4368  Unlock<Mutex> u(_lock);
4369  data = amps_invoke_copy_route_function(handler_.userData());
4370  }
4371  if (!data)
4372  {
4373  _routes.addRoute(cid_, handler_, requestedAcks_,
4374  systemAddedAcks_, commandType_);
4375  }
4376  else
4377  {
4378  _routes.addRoute(cid_,
4379  MessageHandler(handler_.function(),
4380  data),
4381  requestedAcks_,
4382  systemAddedAcks_, commandType_);
4383  }
4384  }
4385  }
4386  else if ((commandType == Message::Command::Publish ||
4387  commandType == Message::Command::DeltaPublish)
4388  && requestedAcks_ & ~Message::AckType::Persisted)
4389  {
4390  cid_ = command_.getMessage().newCommandId().getCommandId();
4391  _routes.addRoute(cid_, handler_, requestedAcks_,
4392  systemAddedAcks_, commandType_);
4393  added = true;
4394  }
4395  if (!added)
4396  {
4397  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4398  }
4399  }
4400 
4401  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4402  bool isHASubscribe_ = true)
4403  {
4404  isHASubscribe_ &= (bool)_subscriptionManager;
4405  Message& message = command_.getMessage();
4406  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4407  Message::AckType::Processed : Message::AckType::None;
4408  unsigned requestedAcks = message.getAckTypeEnum();
4409  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4410  Message::Command::Type commandType = message.getCommandEnum();
4411  if (commandType == Message::Command::StopTimer)
4412  {
4413  systemAddedAcks |= Message::AckType::Completed;
4414  }
4415  Message::Field cid = message.getCommandId();
4416  if (handler_.isValid() && cid.empty())
4417  {
4418  cid = message.newCommandId().getCommandId();
4419  }
4420  if (message.getBookmark().len() > 0)
4421  {
4422  if (command_.isSubscribe())
4423  {
4424  Message::Field bookmark = message.getBookmark();
4425  if (_bookmarkStore.isValid())
4426  {
4427  systemAddedAcks |= Message::AckType::Persisted;
4428  if (bookmark == AMPS_BOOKMARK_RECENT)
4429  {
4430  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4431  }
4432  else if (bookmark != AMPS_BOOKMARK_NOW &&
4433  bookmark != AMPS_BOOKMARK_EPOCH)
4434  {
4435  _bookmarkStore.log(message);
4436  if (!BookmarkRange::isRange(bookmark))
4437  {
4438  _bookmarkStore.discard(message);
4439  _bookmarkStore.persisted(message.getSubscriptionId(),
4440  bookmark);
4441  }
4442  }
4443  }
4444  else if (bookmark == AMPS_BOOKMARK_RECENT)
4445  {
4447  }
4448  }
4449  }
4450  if (isPublishStore)
4451  {
4452  systemAddedAcks |= Message::AckType::Persisted;
4453  }
4454  bool isSubscribe = command_.isSubscribe();
4455  if (handler_.isValid() && !isSubscribe)
4456  {
4457  _registerHandler(command_, cid, handler_,
4458  requestedAcks, systemAddedAcks, commandType);
4459  }
4460  if (isPublishStore)
4461  {
4462  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4463  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4464  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4465  {
4466  Unlock<Mutex> u(_lock);
4467  haSequenceNumber = _publishStore.store(message);
4468  }
4469  message.setSequence(haSequenceNumber);
4470  try
4471  {
4472  if (useSyncSend)
4473  {
4474  syncAckProcessing((long)command_.getTimeout(), message,
4475  haSequenceNumber);
4476  }
4477  else
4478  {
4479  _send(message, haSequenceNumber);
4480  }
4481  }
4482  catch (const DisconnectedException&)
4483  {
4484  // -V565
4485  // Pass - message will get replayed when reconnected
4486  }
4487  catch (...)
4488  {
4489  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4490  throw;
4491  }
4492  }
4493  else
4494  {
4495  if (isSubscribe)
4496  {
4497  const Message::Field& subId = message.getSubscriptionId();
4498  if (isHASubscribe_)
4499  {
4500  Unlock<Mutex> u(_lock);
4501  _subscriptionManager->subscribe(handler_,
4502  message.deepCopy(),
4503  requestedAcks);
4504  if (_badTimeToHASubscribe)
4505  {
4506  message.setAckTypeEnum(requestedAcks);
4507  return std::string(subId.data(), subId.len());
4508  }
4509  }
4510  if (handler_.isValid())
4511  {
4512  _registerHandler(command_, cid, handler_,
4513  requestedAcks, systemAddedAcks, commandType);
4514  }
4515  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4516  try
4517  {
4518  syncAckProcessing((long)command_.getTimeout(), message,
4519  isHASubscribe_);
4520  }
4521  catch (const DisconnectedException&)
4522  {
4523  if (!isHASubscribe_)
4524  {
4525  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4526  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4527  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4528  message.setAckTypeEnum(requestedAcks);
4529  throw;
4530  }
4531  }
4532  catch (const TimedOutException&)
4533  {
4534  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4535  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4536  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4537  message.setAckTypeEnum(requestedAcks);
4538  throw;
4539  }
4540  catch (...)
4541  {
4542  if (isHASubscribe_)
4543  {
4544  // Have to unlock before calling into sub manager to avoid deadlock
4545  Unlock<Mutex> unlock(_lock);
4546  _subscriptionManager->unsubscribe(subId);
4547  }
4548  if (message.getQueryID().len() > 0)
4549  {
4550  _routes.removeRoute(message.getQueryID());
4551  }
4552  _routes.removeRoute(cid);
4553  _routes.removeRoute(subId);
4554  message.setAckTypeEnum(requestedAcks);
4555  throw;
4556  }
4557  if (subId.len() > 0)
4558  {
4559  message.setAckTypeEnum(requestedAcks);
4560  return std::string(subId.data(), subId.len());
4561  }
4562  }
4563  else
4564  {
4565  // SOW, Flush, etc. should always be sync. Publish/delete may not be.
4566  bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4567  || (cid.len() > 0 && command_.hasProcessedAck());
4568  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4569  try
4570  {
4571  if (useSyncSend)
4572  {
4573  syncAckProcessing((long)(command_.getTimeout()), message);
4574  }
4575  else
4576  {
4577  _send(message);
4578  }
4579  }
4580  catch (const TimedOutException&)
4581  {
4582  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4583  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4584  message.setAckTypeEnum(requestedAcks);
4585  throw;
4586  }
4587  catch (const DisconnectedException&)
4588  {
4589  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4590  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4591  message.setAckTypeEnum(requestedAcks);
4592  throw;
4593  }
4594  catch (...)
4595  {
4596  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4597  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4598  message.setAckTypeEnum(requestedAcks);
4599  throw;
4600  }
4601  }
4602  }
4603  message.setAckTypeEnum(requestedAcks);
4604  return cid;
4605  }
4606 
4607  MessageStream getEmptyMessageStream(void);
4608 
4609  std::string executeAsync(Command& command_, MessageHandler& handler_,
4610  bool isHASubscribe_ = true)
4611  {
4612  Lock<Mutex> lock(_lock);
4613  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4614  }
4615 
4616  // Queue Methods //
4617  void setAutoAck(bool isAutoAckEnabled_)
4618  {
4619  _isAutoAckEnabled = isAutoAckEnabled_;
4620  }
4621  bool getAutoAck(void) const
4622  {
4623  return _isAutoAckEnabled;
4624  }
4625  void setAckBatchSize(const unsigned batchSize_)
4626  {
4627  _ackBatchSize = batchSize_;
4628  if (!_queueAckTimeout)
4629  {
4630  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4631  amps_client_set_idle_time(_client, _queueAckTimeout);
4632  }
4633  }
4634  unsigned getAckBatchSize(void) const
4635  {
4636  return _ackBatchSize;
4637  }
4638  int getAckTimeout(void) const
4639  {
4640  return _queueAckTimeout;
4641  }
4642  void setAckTimeout(const int ackTimeout_)
4643  {
4644  amps_client_set_idle_time(_client, ackTimeout_);
4645  _queueAckTimeout = ackTimeout_;
4646  }
4647  size_t _ack(QueueBookmarks& queueBookmarks_)
4648  {
4649  if (queueBookmarks_._bookmarkCount)
4650  {
4651  publishStoreMessage.reset();
4652  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4653  .setTopic(queueBookmarks_._topic)
4654  .setBookmark(queueBookmarks_._data)
4655  .setCommandId("AMPS-queue-ack");
4656  amps_uint64_t haSequenceNumber = 0;
4657  if (_publishStore.isValid())
4658  {
4659  haSequenceNumber = _publishStore.store(publishStoreMessage);
4660  publishStoreMessage.setAckType("persisted")
4661  .setSequence(haSequenceNumber);
4662  queueBookmarks_._data.erase();
4663  queueBookmarks_._bookmarkCount = 0;
4664  }
4665  _send(publishStoreMessage, haSequenceNumber);
4666  if (!_publishStore.isValid())
4667  {
4668  queueBookmarks_._data.erase();
4669  queueBookmarks_._bookmarkCount = 0;
4670  }
4671  return 1;
4672  }
4673  return 0;
4674  }
4675  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4676  {
4677  if (_isAutoAckEnabled)
4678  {
4679  return;
4680  }
4681  _ack(topic_, bookmark_, options_);
4682  }
4683  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4684  {
4685  if (bookmark_.len() == 0)
4686  {
4687  return;
4688  }
4689  Lock<Mutex> lock(_lock);
4690  if (_ackBatchSize < 2 || options_ != NULL)
4691  {
4692  publishStoreMessage.reset();
4693  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4694  .setCommandId("AMPS-queue-ack")
4695  .setTopic(topic_).setBookmark(bookmark_);
4696  if (options_)
4697  {
4698  publishStoreMessage.setOptions(options_);
4699  }
4700  amps_uint64_t haSequenceNumber = 0;
4701  if (_publishStore.isValid())
4702  {
4703  haSequenceNumber = _publishStore.store(publishStoreMessage);
4704  publishStoreMessage.setAckType("persisted")
4705  .setSequence(haSequenceNumber);
4706  }
4707  _send(publishStoreMessage, haSequenceNumber);
4708  return;
4709  }
4710  // have we acked anything for this hash
4711  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4712  TopicHashMap::iterator it = _topicHashMap.find(hash);
4713  if (it == _topicHashMap.end())
4714  {
4715  // add a new one to the map
4716 #ifdef AMPS_USE_EMPLACE
4717  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4718 #else
4719  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4720 #endif
4721  }
4722  QueueBookmarks& queueBookmarks = it->second;
4723  if (queueBookmarks._data.length())
4724  {
4725  queueBookmarks._data.append(",");
4726  }
4727  else
4728  {
4729  queueBookmarks._oldestTime = amps_now();
4730  }
4731  queueBookmarks._data.append(bookmark_);
4732  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4733  {
4734  _ack(queueBookmarks);
4735  }
4736  }
4737  void flushAcks(void)
4738  {
4739  size_t sendCount = 0;
4740  if (!_connected)
4741  {
4742  return;
4743  }
4744  else
4745  {
4746  Lock<Mutex> lock(_lock);
4747  typedef TopicHashMap::iterator iterator;
4748  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4749  {
4750  QueueBookmarks& queueBookmarks = it->second;
4751  sendCount += _ack(queueBookmarks);
4752  }
4753  }
4754  if (sendCount && _connected)
4755  {
4756  publishFlush(0, Message::AckType::Processed);
4757  }
4758  }
4759  // called when there's idle time, to see if we need to flush out any "acks"
4760  void checkQueueAcks(void)
4761  {
4762  if (!_topicHashMap.size())
4763  {
4764  return;
4765  }
4766  Lock<Mutex> lock(_lock);
4767  try
4768  {
4769  amps_uint64_t threshold = amps_now()
4770  - (amps_uint64_t)_queueAckTimeout;
4771  typedef TopicHashMap::iterator iterator;
4772  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4773  {
4774  QueueBookmarks& queueBookmarks = it->second;
4775  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4776  {
4777  _ack(queueBookmarks);
4778  }
4779  }
4780  }
4781  catch (std::exception& ex)
4782  {
4783  AMPS_UNHANDLED_EXCEPTION(ex);
4784  }
4785  }
4786 
4787  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4788  {
4789  Lock<Mutex> lock(_deferredExecutionLock);
4790 #ifdef AMPS_USE_EMPLACE
4791  _deferredExecutionList.emplace_back(
4792  DeferredExecutionRequest(func_, userData_));
4793 #else
4794  _deferredExecutionList.push_back(
4795  DeferredExecutionRequest(func_, userData_));
4796 #endif
4797  }
4798 
4799  inline void processDeferredExecutions(void)
4800  {
4801  if (_deferredExecutionList.size())
4802  {
4803  Lock<Mutex> lock(_deferredExecutionLock);
4804  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4805  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4806  for (; it != end; ++it)
4807  {
4808  try
4809  {
4810  it->_func(it->_userData);
4811  }
4812  catch (...)
4813  {
4814  // -V565
4815  // Intentionally ignore errors
4816  }
4817  }
4818  _deferredExecutionList.clear();
4819  _routes.invalidateCache();
4820  _routeCache.invalidateCache();
4821  }
4822  }
4823 
4824  bool getRetryOnDisconnect(void) const
4825  {
4826  return _isRetryOnDisconnect;
4827  }
4828 
4829  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4830  {
4831  _isRetryOnDisconnect = isRetryOnDisconnect_;
4832  }
4833 
4834  void setDefaultMaxDepth(unsigned maxDepth_)
4835  {
4836  _defaultMaxDepth = maxDepth_;
4837  }
4838 
4839  unsigned getDefaultMaxDepth(void) const
4840  {
4841  return _defaultMaxDepth;
4842  }
4843 
4844  void setTransportFilterFunction(amps_transport_filter_function filter_,
4845  void* userData_)
4846  {
4847  amps_client_set_transport_filter_function(_client, filter_, userData_);
4848  }
4849 
4850  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4851  void* userData_)
4852  {
4853  amps_client_set_thread_created_callback(_client, callback_, userData_);
4854  }
4855  }; // class ClientImpl
4930 
4932  {
4933  RefHandle<MessageStreamImpl> _body;
4934  public:
4939  class iterator
4940  {
4941  MessageStream* _pStream;
4942  Message _current;
4943  inline void advance(void);
4944 
4945  public:
4946  iterator() // end
4947  : _pStream(NULL)
4948  {;}
4949  iterator(MessageStream* pStream_)
4950  : _pStream(pStream_)
4951  {
4952  advance();
4953  }
4954 
4955  bool operator==(const iterator& rhs) const
4956  {
4957  return _pStream == rhs._pStream;
4958  }
4959  bool operator!=(const iterator& rhs) const
4960  {
4961  return _pStream != rhs._pStream;
4962  }
4963  void operator++(void)
4964  {
4965  advance();
4966  }
4967  Message operator*(void)
4968  {
4969  return _current;
4970  }
4971  Message* operator->(void)
4972  {
4973  return &_current;
4974  }
4975  };
4977  bool isValid() const
4978  {
4979  return _body.isValid();
4980  }
4981 
4985  {
4986  if (!_body.isValid())
4987  {
4988  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4989  }
4990  return iterator(this);
4991  }
4994  // For non-SOW queries, the end is never reached.
4996  {
4997  return iterator();
4998  }
4999  inline MessageStream(void);
5000 
5006  MessageStream timeout(unsigned timeout_);
5007 
5011  MessageStream conflate(void);
5017  MessageStream maxDepth(unsigned maxDepth_);
5020  unsigned getMaxDepth(void) const;
5023  unsigned getDepth(void) const;
5024 
5025  private:
5026  inline MessageStream(const Client& client_);
5027  inline void setSOWOnly(const std::string& commandId_,
5028  const std::string& queryId_ = "");
5029  inline void setSubscription(const std::string& subId_,
5030  const std::string& commandId_ = "",
5031  const std::string& queryId_ = "");
5032  inline void setStatsOnly(const std::string& commandId_,
5033  const std::string& queryId_ = "");
5034  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5035 
5036  inline operator MessageHandler(void);
5037 
5038  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5039 
5040  friend class Client;
5041 
5042  };
5043 
5063  class Client // -V553
5064  {
5065  protected:
5066  BorrowRefHandle<ClientImpl> _body;
5067  public:
5068  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5069  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5070  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5071 
5080  Client(const std::string& clientName = "")
5081  : _body(new ClientImpl(clientName), true)
5082  {;}
5083 
5084  Client(ClientImpl* existingClient)
5085  : _body(existingClient, true)
5086  {;}
5087 
5088  Client(ClientImpl* existingClient, bool isRef)
5089  : _body(existingClient, isRef)
5090  {;}
5091 
5092  Client(const Client& rhs) : _body(rhs._body) {;}
5093  virtual ~Client(void) {;}
5094 
5095  Client& operator=(const Client& rhs)
5096  {
5097  _body = rhs._body;
5098  return *this;
5099  }
5100 
5101  bool isValid()
5102  {
5103  return _body.isValid();
5104  }
5105 
5118  void setName(const std::string& name)
5119  {
5120  _body.get().setName(name);
5121  }
5122 
5125  const std::string& getName() const
5126  {
5127  return _body.get().getName();
5128  }
5129 
5133  const std::string& getNameHash() const
5134  {
5135  return _body.get().getNameHash();
5136  }
5137 
5141  const amps_uint64_t getNameHashValue() const
5142  {
5143  return _body.get().getNameHashValue();
5144  }
5145 
5152  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5153  {
5154  _body.get().setLogonCorrelationData(logonCorrelationData_);
5155  }
5156 
5159  const std::string& getLogonCorrelationData() const
5160  {
5161  return _body.get().getLogonCorrelationData();
5162  }
5163 
5172  size_t getServerVersion() const
5173  {
5174  return _body.get().getServerVersion();
5175  }
5176 
5183  VersionInfo getServerVersionInfo() const
5184  {
5185  return _body.get().getServerVersionInfo();
5186  }
5187 
5197  static size_t convertVersionToNumber(const std::string& version_)
5198  {
5199  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5200  }
5201 
5212  static size_t convertVersionToNumber(const char* data_, size_t len_)
5213  {
5214  return AMPS::convertVersionToNumber(data_, len_);
5215  }
5216 
5219  const std::string& getURI() const
5220  {
5221  return _body.get().getURI();
5222  }
5223 
5230 
5232 
5243  void connect(const std::string& uri)
5244  {
5245  _body.get().connect(uri);
5246  }
5247 
5250  void disconnect()
5251  {
5252  _body.get().disconnect();
5253  }
5254 
5268  void send(const Message& message)
5269  {
5270  _body.get().send(message);
5271  }
5272 
5281  void addMessageHandler(const Field& commandId_,
5282  const AMPS::MessageHandler& messageHandler_,
5283  unsigned requestedAcks_, bool isSubscribe_)
5284  {
5285  Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5286  _body.get().addMessageHandler(commandId_, messageHandler_,
5287  requestedAcks_, commandType);
5288  }
5289 
5298  void addMessageHandler(const Field& commandId_,
5299  const AMPS::MessageHandler& messageHandler_,
5300  unsigned requestedAcks_, Message::Command::Type commandType_)
5301  {
5302  _body.get().addMessageHandler(commandId_, messageHandler_,
5303  requestedAcks_, commandType_);
5304  }
5305 
5309  bool removeMessageHandler(const Field& commandId_)
5310  {
5311  return _body.get().removeMessageHandler(commandId_);
5312  }
5313 
5337  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5338  {
5339  return _body.get().send(messageHandler, message, timeout);
5340  }
5341 
5351  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5352  {
5353  _body.get().setDisconnectHandler(disconnectHandler);
5354  }
5355 
5359  DisconnectHandler getDisconnectHandler(void) const
5360  {
5361  return _body.get().getDisconnectHandler();
5362  }
5363 
5368  virtual ConnectionInfo getConnectionInfo() const
5369  {
5370  return _body.get().getConnectionInfo();
5371  }
5372 
5381  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5382  {
5383  _body.get().setBookmarkStore(bookmarkStore_);
5384  }
5385 
5390  {
5391  return _body.get().getBookmarkStore();
5392  }
5393 
5398  {
5399  return _body.get().getSubscriptionManager();
5400  }
5401 
5409  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5410  {
5411  _body.get().setSubscriptionManager(subscriptionManager_);
5412  }
5413 
5433  void setPublishStore(const Store& publishStore_)
5434  {
5435  _body.get().setPublishStore(publishStore_);
5436  }
5437 
5442  {
5443  return _body.get().getPublishStore();
5444  }
5445 
5449  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5450  {
5451  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5452  duplicateMessageHandler_);
5453  }
5454 
5465  {
5466  return _body.get().getDuplicateMessageHandler();
5467  }
5468 
5479  {
5480  _body.get().setFailedWriteHandler(handler_);
5481  }
5482 
5487  {
5488  return _body.get().getFailedWriteHandler();
5489  }
5490 
5491 
5509  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5510  {
5511  return _body.get().publish(topic_.c_str(), topic_.length(),
5512  data_.c_str(), data_.length());
5513  }
5514 
5534  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5535  const char* data_, size_t dataLength_)
5536  {
5537  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5538  }
5539 
5558  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5559  unsigned long expiration_)
5560  {
5561  return _body.get().publish(topic_.c_str(), topic_.length(),
5562  data_.c_str(), data_.length(), expiration_);
5563  }
5564 
5585  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5586  const char* data_, size_t dataLength_,
5587  unsigned long expiration_)
5588  {
5589  return _body.get().publish(topic_, topicLength_,
5590  data_, dataLength_, expiration_);
5591  }
5592 
5631  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5632  {
5633  _body.get().publishFlush(timeout_, ackType_);
5634  }
5635 
5636 
5652  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5653  {
5654  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5655  data_.c_str(), data_.length());
5656  }
5657 
5675  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5676  const char* data_, size_t dataLength_)
5677  {
5678  return _body.get().deltaPublish(topic_, topicLength_,
5679  data_, dataLength_);
5680  }
5681 
5698  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5699  unsigned long expiration_)
5700  {
5701  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5702  data_.c_str(), data_.length(),
5703  expiration_);
5704  }
5705 
5724  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5725  const char* data_, size_t dataLength_,
5726  unsigned long expiration_)
5727  {
5728  return _body.get().deltaPublish(topic_, topicLength_,
5729  data_, dataLength_, expiration_);
5730  }
5731 
5747  std::string logon(int timeout_ = 0,
5748  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5749  const char* options_ = NULL)
5750  {
5751  return _body.get().logon(timeout_, authenticator_, options_);
5752  }
5766  std::string logon(const char* options_, int timeout_ = 0)
5767  {
5768  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5769  options_);
5770  }
5771 
5785  std::string logon(const std::string& options_, int timeout_ = 0)
5786  {
5787  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5788  options_.c_str());
5789  }
5790 
5810  std::string subscribe(const MessageHandler& messageHandler_,
5811  const std::string& topic_,
5812  long timeout_ = 0,
5813  const std::string& filter_ = "",
5814  const std::string& options_ = "",
5815  const std::string& subId_ = "")
5816  {
5817  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5818  filter_, "", options_, subId_);
5819  }
5820 
5836  MessageStream subscribe(const std::string& topic_,
5837  long timeout_ = 0, const std::string& filter_ = "",
5838  const std::string& options_ = "",
5839  const std::string& subId_ = "")
5840  {
5841  MessageStream result(*this);
5842  if (_body.get().getDefaultMaxDepth())
5843  {
5844  result.maxDepth(_body.get().getDefaultMaxDepth());
5845  }
5846  result.setSubscription(_body.get().subscribe(
5847  result.operator MessageHandler(),
5848  topic_, timeout_, filter_, "",
5849  options_, subId_, false));
5850  return result;
5851  }
5852 
5868  MessageStream subscribe(const char* topic_,
5869  long timeout_ = 0, const std::string& filter_ = "",
5870  const std::string& options_ = "",
5871  const std::string& subId_ = "")
5872  {
5873  MessageStream result(*this);
5874  if (_body.get().getDefaultMaxDepth())
5875  {
5876  result.maxDepth(_body.get().getDefaultMaxDepth());
5877  }
5878  result.setSubscription(_body.get().subscribe(
5879  result.operator MessageHandler(),
5880  topic_, timeout_, filter_, "",
5881  options_, subId_, false));
5882  return result;
5883  }
5884 
5897  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5898  const std::string& topic_,
5899  long timeout_,
5900  const std::string& filter_ = "",
5901  const std::string& options_ = "",
5902  const std::string& subId_ = "")
5903  {
5904  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5905  filter_, "", options_, subId_);
5906  }
5915  MessageStream deltaSubscribe(const std::string& topic_,
5916  long timeout_, const std::string& filter_ = "",
5917  const std::string& options_ = "",
5918  const std::string& subId_ = "")
5919  {
5920  MessageStream result(*this);
5921  if (_body.get().getDefaultMaxDepth())
5922  {
5923  result.maxDepth(_body.get().getDefaultMaxDepth());
5924  }
5925  result.setSubscription(_body.get().deltaSubscribe(
5926  result.operator MessageHandler(),
5927  topic_, timeout_, filter_, "",
5928  options_, subId_, false));
5929  return result;
5930  }
5931 
5933  MessageStream deltaSubscribe(const char* topic_,
5934  long timeout_, const std::string& filter_ = "",
5935  const std::string& options_ = "",
5936  const std::string& subId_ = "")
5937  {
5938  MessageStream result(*this);
5939  if (_body.get().getDefaultMaxDepth())
5940  {
5941  result.maxDepth(_body.get().getDefaultMaxDepth());
5942  }
5943  result.setSubscription(_body.get().deltaSubscribe(
5944  result.operator MessageHandler(),
5945  topic_, timeout_, filter_, "",
5946  options_, subId_, false));
5947  return result;
5948  }
5949 
5975  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5976  const std::string& topic_,
5977  long timeout_,
5978  const std::string& bookmark_,
5979  const std::string& filter_ = "",
5980  const std::string& options_ = "",
5981  const std::string& subId_ = "")
5982  {
5983  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5984  filter_, bookmark_, options_, subId_);
5985  }
6003  MessageStream bookmarkSubscribe(const std::string& topic_,
6004  long timeout_,
6005  const std::string& bookmark_,
6006  const std::string& filter_ = "",
6007  const std::string& options_ = "",
6008  const std::string& subId_ = "")
6009  {
6010  MessageStream result(*this);
6011  if (_body.get().getDefaultMaxDepth())
6012  {
6013  result.maxDepth(_body.get().getDefaultMaxDepth());
6014  }
6015  result.setSubscription(_body.get().subscribe(
6016  result.operator MessageHandler(),
6017  topic_, timeout_, filter_,
6018  bookmark_, options_,
6019  subId_, false));
6020  return result;
6021  }
6022 
6024  MessageStream bookmarkSubscribe(const char* topic_,
6025  long timeout_,
6026  const std::string& bookmark_,
6027  const std::string& filter_ = "",
6028  const std::string& options_ = "",
6029  const std::string& subId_ = "")
6030  {
6031  MessageStream result(*this);
6032  if (_body.get().getDefaultMaxDepth())
6033  {
6034  result.maxDepth(_body.get().getDefaultMaxDepth());
6035  }
6036  result.setSubscription(_body.get().subscribe(
6037  result.operator MessageHandler(),
6038  topic_, timeout_, filter_,
6039  bookmark_, options_,
6040  subId_, false));
6041  return result;
6042  }
6043 
6052  void unsubscribe(const std::string& commandId)
6053  {
6054  return _body.get().unsubscribe(commandId);
6055  }
6056 
6065  {
6066  return _body.get().unsubscribe();
6067  }
6068 
6069 
6099  std::string sow(const MessageHandler& messageHandler_,
6100  const std::string& topic_,
6101  const std::string& filter_ = "",
6102  const std::string& orderBy_ = "",
6103  const std::string& bookmark_ = "",
6104  int batchSize_ = DEFAULT_BATCH_SIZE,
6105  int topN_ = DEFAULT_TOP_N,
6106  const std::string& options_ = "",
6107  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6108  {
6109  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6110  bookmark_, batchSize_, topN_, options_,
6111  timeout_);
6112  }
6137  MessageStream sow(const std::string& topic_,
6138  const std::string& filter_ = "",
6139  const std::string& orderBy_ = "",
6140  const std::string& bookmark_ = "",
6141  int batchSize_ = DEFAULT_BATCH_SIZE,
6142  int topN_ = DEFAULT_TOP_N,
6143  const std::string& options_ = "",
6144  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6145  {
6146  MessageStream result(*this);
6147  if (_body.get().getDefaultMaxDepth())
6148  {
6149  result.maxDepth(_body.get().getDefaultMaxDepth());
6150  }
6151  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6152  topic_, filter_, orderBy_, bookmark_,
6153  batchSize_, topN_, options_, timeout_));
6154  return result;
6155  }
6156 
6158  MessageStream sow(const char* topic_,
6159  const std::string& filter_ = "",
6160  const std::string& orderBy_ = "",
6161  const std::string& bookmark_ = "",
6162  int batchSize_ = DEFAULT_BATCH_SIZE,
6163  int topN_ = DEFAULT_TOP_N,
6164  const std::string& options_ = "",
6165  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6166  {
6167  MessageStream result(*this);
6168  if (_body.get().getDefaultMaxDepth())
6169  {
6170  result.maxDepth(_body.get().getDefaultMaxDepth());
6171  }
6172  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6173  topic_, filter_, orderBy_, bookmark_,
6174  batchSize_, topN_, options_, timeout_));
6175  return result;
6176  }
6199  std::string sow(const MessageHandler& messageHandler_,
6200  const std::string& topic_,
6201  long timeout_,
6202  const std::string& filter_ = "",
6203  int batchSize_ = DEFAULT_BATCH_SIZE,
6204  int topN_ = DEFAULT_TOP_N)
6205  {
6206  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6207  batchSize_, topN_);
6208  }
6231  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6232  const std::string& topic_,
6233  long timeout_,
6234  const std::string& filter_ = "",
6235  int batchSize_ = DEFAULT_BATCH_SIZE,
6236  bool oofEnabled_ = false,
6237  int topN_ = DEFAULT_TOP_N)
6238  {
6239  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6240  filter_, batchSize_, oofEnabled_,
6241  topN_);
6242  }
6243 
6263  MessageStream sowAndSubscribe(const std::string& topic_,
6264  long timeout_,
6265  const std::string& filter_ = "",
6266  int batchSize_ = DEFAULT_BATCH_SIZE,
6267  bool oofEnabled_ = false,
6268  int topN_ = DEFAULT_TOP_N)
6269  {
6270  MessageStream result(*this);
6271  if (_body.get().getDefaultMaxDepth())
6272  {
6273  result.maxDepth(_body.get().getDefaultMaxDepth());
6274  }
6275  result.setSubscription(_body.get().sowAndSubscribe(
6276  result.operator MessageHandler(),
6277  topic_, timeout_, filter_,
6278  batchSize_, oofEnabled_,
6279  topN_, false));
6280  return result;
6281  }
6301  MessageStream sowAndSubscribe(const char* topic_,
6302  long timeout_,
6303  const std::string& filter_ = "",
6304  int batchSize_ = DEFAULT_BATCH_SIZE,
6305  bool oofEnabled_ = false,
6306  int topN_ = DEFAULT_TOP_N)
6307  {
6308  MessageStream result(*this);
6309  if (_body.get().getDefaultMaxDepth())
6310  {
6311  result.maxDepth(_body.get().getDefaultMaxDepth());
6312  }
6313  result.setSubscription(_body.get().sowAndSubscribe(
6314  result.operator MessageHandler(),
6315  topic_, timeout_, filter_,
6316  batchSize_, oofEnabled_,
6317  topN_, false));
6318  return result;
6319  }
6320 
6321 
6349  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6350  const std::string& topic_,
6351  const std::string& filter_ = "",
6352  const std::string& orderBy_ = "",
6353  const std::string& bookmark_ = "",
6354  int batchSize_ = DEFAULT_BATCH_SIZE,
6355  int topN_ = DEFAULT_TOP_N,
6356  const std::string& options_ = "",
6357  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6358  {
6359  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6360  orderBy_, bookmark_, batchSize_,
6361  topN_, options_, timeout_);
6362  }
6363 
6388  MessageStream sowAndSubscribe(const std::string& topic_,
6389  const std::string& filter_ = "",
6390  const std::string& orderBy_ = "",
6391  const std::string& bookmark_ = "",
6392  int batchSize_ = DEFAULT_BATCH_SIZE,
6393  int topN_ = DEFAULT_TOP_N,
6394  const std::string& options_ = "",
6395  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6396  {
6397  MessageStream result(*this);
6398  if (_body.get().getDefaultMaxDepth())
6399  {
6400  result.maxDepth(_body.get().getDefaultMaxDepth());
6401  }
6402  result.setSubscription(_body.get().sowAndSubscribe(
6403  result.operator MessageHandler(),
6404  topic_, filter_, orderBy_,
6405  bookmark_, batchSize_, topN_,
6406  options_, timeout_, false));
6407  return result;
6408  }
6409 
6411  MessageStream sowAndSubscribe(const char* topic_,
6412  const std::string& filter_ = "",
6413  const std::string& orderBy_ = "",
6414  const std::string& bookmark_ = "",
6415  int batchSize_ = DEFAULT_BATCH_SIZE,
6416  int topN_ = DEFAULT_TOP_N,
6417  const std::string& options_ = "",
6418  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6419  {
6420  MessageStream result(*this);
6421  if (_body.get().getDefaultMaxDepth())
6422  {
6423  result.maxDepth(_body.get().getDefaultMaxDepth());
6424  }
6425  result.setSubscription(_body.get().sowAndSubscribe(
6426  result.operator MessageHandler(),
6427  topic_, filter_, orderBy_,
6428  bookmark_, batchSize_, topN_,
6429  options_, timeout_, false));
6430  return result;
6431  }
6432 
6457  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6458  const std::string& topic_,
6459  const std::string& filter_ = "",
6460  const std::string& orderBy_ = "",
6461  int batchSize_ = DEFAULT_BATCH_SIZE,
6462  int topN_ = DEFAULT_TOP_N,
6463  const std::string& options_ = "",
6464  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6465  {
6466  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6467  filter_, orderBy_, batchSize_,
6468  topN_, options_, timeout_);
6469  }
6490  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6491  const std::string& filter_ = "",
6492  const std::string& orderBy_ = "",
6493  int batchSize_ = DEFAULT_BATCH_SIZE,
6494  int topN_ = DEFAULT_TOP_N,
6495  const std::string& options_ = "",
6496  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6497  {
6498  MessageStream result(*this);
6499  if (_body.get().getDefaultMaxDepth())
6500  {
6501  result.maxDepth(_body.get().getDefaultMaxDepth());
6502  }
6503  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6504  result.operator MessageHandler(),
6505  topic_, filter_, orderBy_,
6506  batchSize_, topN_, options_,
6507  timeout_, false));
6508  return result;
6509  }
6510 
6513  const std::string& filter_ = "",
6514  const std::string& orderBy_ = "",
6515  int batchSize_ = DEFAULT_BATCH_SIZE,
6516  int topN_ = DEFAULT_TOP_N,
6517  const std::string& options_ = "",
6518  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6519  {
6520  MessageStream result(*this);
6521  if (_body.get().getDefaultMaxDepth())
6522  {
6523  result.maxDepth(_body.get().getDefaultMaxDepth());
6524  }
6525  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6526  result.operator MessageHandler(),
6527  topic_, filter_, orderBy_,
6528  batchSize_, topN_, options_,
6529  timeout_, false));
6530  return result;
6531  }
6532 
6557  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6558  const std::string& topic_,
6559  long timeout_,
6560  const std::string& filter_ = "",
6561  int batchSize_ = DEFAULT_BATCH_SIZE,
6562  bool oofEnabled_ = false,
6563  bool sendEmpties_ = false,
6564  int topN_ = DEFAULT_TOP_N)
6565  {
6566  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6567  timeout_, filter_, batchSize_,
6568  oofEnabled_, sendEmpties_,
6569  topN_);
6570  }
6571 
6593  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6594  long timeout_,
6595  const std::string& filter_ = "",
6596  int batchSize_ = DEFAULT_BATCH_SIZE,
6597  bool oofEnabled_ = false,
6598  bool sendEmpties_ = false,
6599  int topN_ = DEFAULT_TOP_N)
6600  {
6601  MessageStream result(*this);
6602  if (_body.get().getDefaultMaxDepth())
6603  {
6604  result.maxDepth(_body.get().getDefaultMaxDepth());
6605  }
6606  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6607  result.operator MessageHandler(),
6608  topic_, timeout_, filter_,
6609  batchSize_, oofEnabled_,
6610  sendEmpties_, topN_, false));
6611  return result;
6612  }
6635  long timeout_,
6636  const std::string& filter_ = "",
6637  int batchSize_ = DEFAULT_BATCH_SIZE,
6638  bool oofEnabled_ = false,
6639  bool sendEmpties_ = false,
6640  int topN_ = DEFAULT_TOP_N)
6641  {
6642  MessageStream result(*this);
6643  if (_body.get().getDefaultMaxDepth())
6644  {
6645  result.maxDepth(_body.get().getDefaultMaxDepth());
6646  }
6647  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6648  result.operator MessageHandler(),
6649  topic_, timeout_, filter_,
6650  batchSize_, oofEnabled_,
6651  sendEmpties_, topN_, false));
6652  return result;
6653  }
6673  std::string sowDelete(const MessageHandler& messageHandler,
6674  const std::string& topic,
6675  const std::string& filter,
6676  long timeout)
6677  {
6678  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6679  }
6696  Message sowDelete(const std::string& topic, const std::string& filter,
6697  long timeout = 0)
6698  {
6699  MessageStream stream(*this);
6700  char buf[Message::IdentifierLength + 1];
6701  buf[Message::IdentifierLength] = 0;
6702  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6703  Field cid(buf);
6704  try
6705  {
6706  stream.setStatsOnly(cid);
6707  _body.get().sowDelete(stream.operator MessageHandler(), topic, filter, timeout, cid);
6708  return *(stream.begin());
6709  }
6710  catch (const DisconnectedException&)
6711  {
6712  removeMessageHandler(cid);
6713  throw;
6714  }
6715  }
6716 
6721  void startTimer()
6722  {
6723  _body.get().startTimer();
6724  }
6725 
6732  std::string stopTimer(const MessageHandler& messageHandler)
6733  {
6734  return _body.get().stopTimer(messageHandler);
6735  }
6736 
6758  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6759  const std::string& topic_,
6760  const std::string& keys_,
6761  long timeout_ = 0)
6762  {
6763  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6764  }
6785  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6786  long timeout_ = 0)
6787  {
6788  MessageStream stream(*this);
6789  char buf[Message::IdentifierLength + 1];
6790  buf[Message::IdentifierLength] = 0;
6791  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6792  Field cid(buf);
6793  try
6794  {
6795  stream.setStatsOnly(cid);
6796  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6797  return *(stream.begin());
6798  }
6799  catch (const DisconnectedException&)
6800  {
6801  removeMessageHandler(cid);
6802  throw;
6803  }
6804  }
6805 
6820  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6821  const std::string& topic_, const std::string& data_,
6822  long timeout_ = 0)
6823  {
6824  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6825  }
6826 
6841  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
6842  long timeout_ = 0)
6843  {
6844  MessageStream stream(*this);
6845  char buf[Message::IdentifierLength + 1];
6846  buf[Message::IdentifierLength] = 0;
6847  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6848  Field cid(buf);
6849  try
6850  {
6851  stream.setStatsOnly(cid);
6852  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
6853  return *(stream.begin());
6854  }
6855  catch (const DisconnectedException&)
6856  {
6857  removeMessageHandler(cid);
6858  throw;
6859  }
6860  }
6861 
6866  {
6867  return _body.get().getHandle();
6868  }
6869 
6878  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6879  {
6880  _body.get().setExceptionListener(pListener_);
6881  }
6882 
6892  {
6893  _body.get().setExceptionListener(listener_);
6894  }
6895 
6899  {
6900  return _body.get().getExceptionListener();
6901  }
6902 
6910  // type of message) from the server for the specified interval (plus a grace period),
6924  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6925  {
6926  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6927  }
6928 
6936  // type of message) from the server for the specified interval (plus a grace period),
6948  void setHeartbeat(unsigned heartbeatTime_)
6949  {
6950  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6951  }
6952 
6955  {
6956  setLastChanceMessageHandler(messageHandler);
6957  }
6958 
6962  {
6963  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6964  messageHandler);
6965  }
6966 
6987  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6988  {
6989  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6990  }
6991 
7012  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7013  {
7014  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7015  }
7016 
7022  static const char* BOOKMARK_NOW()
7023  {
7024  return AMPS_BOOKMARK_NOW;
7025  }
7031  static const char* NOW()
7032  {
7033  return AMPS_BOOKMARK_NOW;
7034  }
7035 
7041  static const char* BOOKMARK_EPOCH()
7042  {
7043  return AMPS_BOOKMARK_EPOCH;
7044  }
7045 
7051  static const char* EPOCH()
7052  {
7053  return AMPS_BOOKMARK_EPOCH;
7054  }
7055 
7062  static const char* BOOKMARK_MOST_RECENT()
7063  {
7064  return AMPS_BOOKMARK_RECENT;
7065  }
7066 
7073  static const char* MOST_RECENT()
7074  {
7075  return AMPS_BOOKMARK_RECENT;
7076  }
7077 
7084  static const char* BOOKMARK_RECENT()
7085  {
7086  return AMPS_BOOKMARK_RECENT;
7087  }
7088 
7089 
7096  {
7097  _body.get().addConnectionStateListener(listener);
7098  }
7099 
7104  {
7105  _body.get().removeConnectionStateListener(listener);
7106  }
7107 
7111  {
7112  _body.get().clearConnectionStateListeners();
7113  }
7114 
7140  std::string executeAsync(Command& command_, MessageHandler handler_)
7141  {
7142  return _body.get().executeAsync(command_, handler_);
7143  }
7144 
7174  std::string executeAsyncNoResubscribe(Command& command_,
7175  MessageHandler handler_)
7176  {
7177  std::string id;
7178  try
7179  {
7180  if (command_.isSubscribe())
7181  {
7182  Message& message = command_.getMessage();
7183  Field subId = message.getSubscriptionId();
7184  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7185  if (useExistingHandler)
7186  {
7187  MessageHandler existingHandler;
7188  if (_body.get()._routes.getRoute(subId, existingHandler))
7189  {
7190  // we found an existing handler.
7191  _body.get().executeAsync(command_, existingHandler, false);
7192  return id; // empty string indicates existing
7193  }
7194  }
7195  }
7196  id = _body.get().executeAsync(command_, handler_, false);
7197  }
7198  catch (const DisconnectedException&)
7199  {
7200  removeMessageHandler(command_.getMessage().getCommandId());
7201  if (command_.isSubscribe())
7202  {
7203  removeMessageHandler(command_.getMessage().getSubscriptionId());
7204  }
7205  if (command_.isSow())
7206  {
7207  removeMessageHandler(command_.getMessage().getQueryID());
7208  }
7209  throw;
7210  }
7211  return id;
7212  }
7213 
7226  MessageStream execute(Command& command_);
7227 
7236  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7237  {
7238  _body.get().ack(topic_, bookmark_, options_);
7239  }
7240 
7248  void ack(Message& message_, const char* options_ = NULL)
7249  {
7250  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7251  }
7260  void ack(const std::string& topic_, const std::string& bookmark_,
7261  const char* options_ = NULL)
7262  {
7263  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7264  }
7265 
7271  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7272  {
7273  _body.get()._ack(topic_, bookmark_, options_);
7274  }
7284  void flushAcks(void)
7285  {
7286  _body.get().flushAcks();
7287  }
7288 
7293  bool getAutoAck(void) const
7294  {
7295  return _body.get().getAutoAck();
7296  }
7303  void setAutoAck(bool isAutoAckEnabled_)
7304  {
7305  _body.get().setAutoAck(isAutoAckEnabled_);
7306  }
7311  unsigned getAckBatchSize(void) const
7312  {
7313  return _body.get().getAckBatchSize();
7314  }
7321  void setAckBatchSize(const unsigned ackBatchSize_)
7322  {
7323  _body.get().setAckBatchSize(ackBatchSize_);
7324  }
7325 
7332  int getAckTimeout(void) const
7333  {
7334  return _body.get().getAckTimeout();
7335  }
7344  void setAckTimeout(const int ackTimeout_)
7345  {
7346  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7347  {
7348  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7349  }
7350  _body.get().setAckTimeout(ackTimeout_);
7351  }
7352 
7353 
7362  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7363  {
7364  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7365  }
7366 
7371  bool getRetryOnDisconnect(void) const
7372  {
7373  return _body.get().getRetryOnDisconnect();
7374  }
7375 
7380  void setDefaultMaxDepth(unsigned maxDepth_)
7381  {
7382  _body.get().setDefaultMaxDepth(maxDepth_);
7383  }
7384 
7389  unsigned getDefaultMaxDepth(void) const
7390  {
7391  return _body.get().getDefaultMaxDepth();
7392  }
7393 
7401  void* userData_)
7402  {
7403  return _body.get().setTransportFilterFunction(filter_, userData_);
7404  }
7405 
7415  void* userData_)
7416  {
7417  return _body.get().setThreadCreatedCallback(callback_, userData_);
7418  }
7419 
7425  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7426  {
7427  _body.get().deferredExecution(func_, userData_);
7428  }
7432  };
7433 
7434  inline void
7435  ClientImpl::lastChance(AMPS::Message& message)
7436  {
7437  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7438  }
7439 
7440  inline unsigned
7441  ClientImpl::persistedAck(AMPS::Message& message)
7442  {
7443  unsigned deliveries = 0;
7444  try
7445  {
7446  /*
7447  * Best Practice: If you don't care about the dupe acks that
7448  * occur during failover or rapid disconnect/reconnect, then just
7449  * ignore them. We could discard each duplicate from the
7450  * persisted store, but the storage costs of doing 1 record
7451  * discards is heavy. In most scenarios we'll just quickly blow
7452  * through the duplicates and get back to processing the
7453  * non-dupes.
7454  */
7455  const char* data = NULL;
7456  size_t len = 0;
7457  const char* status = NULL;
7458  size_t statusLen = 0;
7459  amps_handle messageHandle = message.getMessage();
7460  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7461  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7462  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7463  if (len == NotEntitled || len == Duplicate ||
7464  (statusLen == Failure && status[0] == 'f'))
7465  {
7466  if (_failedWriteHandler)
7467  {
7468  if (_publishStore.isValid())
7469  {
7470  amps_uint64_t sequence =
7471  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7472  FailedWriteStoreReplayer replayer(this, data, len);
7473  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7474  replayer, sequence));
7475  }
7476  else // Call the handler with what little we have
7477  {
7478  static Message emptyMessage;
7479  emptyMessage.setSequence(message.getSequence());
7480  AMPS_CALL_EXCEPTION_WRAPPER(
7481  _failedWriteHandler->failedWrite(emptyMessage,
7482  data, len));
7483  }
7484  ++deliveries;
7485  }
7486  }
7487  if (_publishStore.isValid())
7488  {
7489  // Ack for publisher will have sequence while
7490  // ack for bookmark subscribe won't
7491  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7492  AMPS_Sequence);
7493  if (seq > 0)
7494  {
7495  ++deliveries;
7496  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7497  }
7498  }
7499 
7500  if (!deliveries && _bookmarkStore.isValid())
7501  {
7502  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7503  &data, &len);
7504  if (len > 0)
7505  {
7506  Message::Field subId(data, len);
7507  const char* bookmarkData = NULL;
7508  size_t bookmarkLen = 0;
7509  amps_message_get_field_value(messageHandle,
7510  AMPS_Bookmark,
7511  &bookmarkData,
7512  &bookmarkLen);
7513  // Everything is there and not unsubscribed AC-912
7514  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7515  {
7516  ++deliveries;
7517  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7518  }
7519  }
7520  }
7521  }
7522  catch (std::exception& ex)
7523  {
7524  AMPS_UNHANDLED_EXCEPTION(ex);
7525  }
7526  return deliveries;
7527  }
7528 
7529  inline unsigned
7530  ClientImpl::processedAck(Message& message)
7531  {
7532  unsigned deliveries = 0;
7533  AckResponse ack;
7534  const char* data = NULL;
7535  size_t len = 0;
7536  amps_handle messageHandle = message.getMessage();
7537  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7538  Lock<Mutex> l(_lock);
7539  if (data && len)
7540  {
7541  Lock<Mutex> guard(_ackMapLock);
7542  AckMap::iterator i = _ackMap.find(std::string(data, len));
7543  if (i != _ackMap.end())
7544  {
7545  ++deliveries;
7546  ack = i->second;
7547  _ackMap.erase(i);
7548  }
7549  }
7550  if (deliveries)
7551  {
7552  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7553  ack.setStatus(data, len);
7554  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7555  ack.setReason(data, len);
7556  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7557  ack.setUsername(data, len);
7558  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7559  ack.setPassword(data, len);
7560  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7561  ack.setServerVersion(data, len);
7562  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7563  ack.setOptions(data, len);
7564  // This sets bookmark, nameHashValue, and sequenceNo
7565  ack.setBookmark(message.getBookmark());
7566  ack.setResponded();
7567  _lock.signalAll();
7568  }
7569  return deliveries;
7570  }
7571 
7572  inline void
7573  ClientImpl::checkAndSendHeartbeat(bool force)
7574  {
7575  if (force || _heartbeatTimer.check())
7576  {
7577  _heartbeatTimer.start();
7578  try
7579  {
7580  sendWithoutRetry(_beatMessage);
7581  }
7582  catch (const AMPSException&)
7583  {
7584  ;
7585  }
7586  }
7587  }
7588 
7589  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7590  {
7591  ConnectionInfo info;
7592  std::ostringstream writer;
7593 
7594  info["client.uri"] = _lastUri;
7595  info["client.name"] = _name;
7596  info["client.username"] = _username;
7597  if (_publishStore.isValid())
7598  {
7599  writer << _publishStore.unpersistedCount();
7600  info["publishStore.unpersistedCount"] = writer.str();
7601  writer.clear();
7602  writer.str("");
7603  }
7604 
7605  return info;
7606  }
7607 
7608  inline amps_result
7609  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7610  {
7611  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7612  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7613  ClientImpl* me = (ClientImpl*) userData_;
7614  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7615  if (!messageHandle_)
7616  {
7617  if (me->_queueAckTimeout)
7618  {
7619  me->checkQueueAcks();
7620  }
7621  me->checkAndSendHeartbeat();
7622  return AMPS_E_OK;
7623  }
7624 
7625  me->_readMessage.replace(messageHandle_);
7626  Message& message = me->_readMessage;
7627  Message::Command::Type commandType = message.getCommandEnum();
7628  if (commandType & SOWMask)
7629  {
7630 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7631  // A small cheat here to get the right handler, using knowledge of the
7632  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7633  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7634  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7635  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7636 #endif
7637  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7638  message.getQueryID()));
7639  }
7640  else if (commandType & PublishMask)
7641  {
7642 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7643  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7644  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7645  GlobalCommandTypeHandlers::Publish :
7646  GlobalCommandTypeHandlers::OOF)].invoke(message));
7647 #endif
7648  const char* subIds = NULL;
7649  size_t subIdsLen = 0;
7650  // Publish command, send to subscriptions
7651  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7652  &subIds, &subIdsLen);
7653  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7654  for (size_t i = 0; i < subIdCount; ++i)
7655  {
7656  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7657  MessageHandler& handler = lookupResult.handler;
7658  if (handler.isValid())
7659  {
7660  amps_message_set_field_value(messageHandle_,
7661  AMPS_SubscriptionId,
7662  subIds + lookupResult.idOffset,
7663  lookupResult.idLength);
7664  Message::Field bookmark = message.getBookmark();
7665  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7666  bool isAutoAck = me->_isAutoAckEnabled;
7667 
7668  if (!isMessageQueue && !bookmark.empty() &&
7669  me->_bookmarkStore.isValid())
7670  {
7671  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7672  {
7673  //Call duplicate message handler in handlers map
7674  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7675  {
7676  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7677  }
7678  }
7679  else
7680  {
7681  me->_bookmarkStore.log(me->_readMessage);
7682  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7683  handler.invoke(message));
7684  }
7685  }
7686  else
7687  {
7688  if (isMessageQueue && isAutoAck)
7689  {
7690  try
7691  {
7692  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7693  if (!message.getIgnoreAutoAck())
7694  {
7695  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7696  me->_ack(message.getTopic(), message.getBookmark()));
7697  }
7698  }
7699  catch (std::exception& ex)
7700  {
7701  if (!message.getIgnoreAutoAck())
7702  {
7703  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7704  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7705  }
7706  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7707  }
7708  }
7709  else
7710  {
7711  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7712  handler.invoke(message));
7713  }
7714  }
7715  }
7716  else
7717  {
7718  me->lastChance(message);
7719  }
7720  } // for (subidsEnd)
7721  }
7722  else if (commandType == Message::Command::Ack)
7723  {
7724  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7725  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7726  unsigned ackType = message.getAckTypeEnum();
7727  unsigned deliveries = 0U;
7728  switch (ackType)
7729  {
7730  case Message::AckType::Persisted:
7731  deliveries += me->persistedAck(message);
7732  break;
7733  case Message::AckType::Processed: // processed
7734  deliveries += me->processedAck(message);
7735  break;
7736  }
7737  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7738  if (deliveries == 0)
7739  {
7740  me->lastChance(message);
7741  }
7742  }
7743  else if (commandType == Message::Command::Heartbeat)
7744  {
7745  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7746  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7747  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7748  {
7749  me->checkAndSendHeartbeat(true);
7750  }
7751  else
7752  {
7753  me->lastChance(message);
7754  }
7755  return AMPS_E_OK;
7756  }
7757  else if (!message.getCommandId().empty())
7758  {
7759  unsigned deliveries = 0U;
7760  try
7761  {
7762  while (me->_connected) // Keep sending heartbeats when stream is full
7763  {
7764  try
7765  {
7766  deliveries = me->_routes.deliverData(message, message.getCommandId());
7767  break;
7768  }
7769 #ifdef _WIN32
7770  catch (MessageStreamFullException&)
7771 #else
7772  catch (MessageStreamFullException& ex_)
7773 #endif
7774  {
7775  try
7776  {
7777  me->checkAndSendHeartbeat(false);
7778  }
7779 #ifdef _WIN32
7780  catch (std::exception&)
7781 #else
7782  catch (std::exception& ex_)
7783 #endif
7784  {
7785  ;
7786  }
7787  }
7788  }
7789  }
7790  catch (std::exception& ex_)
7791  {
7792  try
7793  {
7794  me->_exceptionListener->exceptionThrown(ex_);
7795  }
7796  catch (...)
7797  {
7798  ;
7799  }
7800  }
7801  if (deliveries == 0)
7802  {
7803  me->lastChance(message);
7804  }
7805  }
7806  me->checkAndSendHeartbeat();
7807  return AMPS_E_OK;
7808  }
7809 
7810  inline void
7811  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7812  {
7813  ClientImpl* me = (ClientImpl*) userData;
7814  //Client wrapper(me);
7815  // Go ahead and signal any waiters if they are around...
7816  me->clearAcks(failedConnectionVersion);
7817  }
7818 
7819  inline amps_result
7820  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7821  {
7822  ClientImpl* me = (ClientImpl*) userData;
7823  Lock<Mutex> l(me->_lock);
7824  Client wrapper(me, false);
7825  if (me->_connected)
7826  {
7827  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7828  }
7829  while (true)
7830  {
7831  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7832  try
7833  {
7834  me->_connected = false;
7835  {
7836  // Have to release the lock here or receive thread can't
7837  // invoke the message handler.
7838  Unlock<Mutex> unlock(me->_lock);
7839  me->_disconnectHandler.invoke(wrapper);
7840  }
7841  }
7842  catch (const std::exception& ex)
7843  {
7844  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7845  }
7846  me->_lock.signalAll();
7847 
7848  if (!me->_connected)
7849  {
7850  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7851  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
7852  return AMPS_E_DISCONNECTED;
7853  }
7854  try
7855  {
7856  // Resubscribe
7857  if (me->_subscriptionManager)
7858  {
7859  {
7860  // Have to release the lock here or receive thread can't
7861  // invoke the message handler.
7862  Unlock<Mutex> unlock(me->_lock);
7863  me->_subscriptionManager->resubscribe(wrapper);
7864  }
7865  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7866  }
7867  return AMPS_E_OK;
7868  }
7869  catch (const AMPSException& subEx)
7870  {
7871  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7872  }
7873  catch (const std::exception& subEx)
7874  {
7875  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7876  return AMPS_E_RETRY;
7877  }
7878  catch (...)
7879  {
7880  return AMPS_E_RETRY;
7881  }
7882  }
7883  return AMPS_E_RETRY;
7884  }
7885 
7886  class FIX
7887  {
7888  const char* _data;
7889  size_t _len;
7890  char _fieldSep;
7891  public:
7892  class iterator
7893  {
7894  const char* _data;
7895  size_t _len;
7896  size_t _pos;
7897  char _fieldSep;
7898  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
7899  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7900  {
7901  while (_pos != _len && _data[_pos] == _fieldSep)
7902  {
7903  ++_pos;
7904  }
7905  }
7906  public:
7907  typedef void* difference_type;
7908  typedef std::forward_iterator_tag iterator_category;
7909  typedef std::pair<Message::Field, Message::Field> value_type;
7910  typedef value_type* pointer;
7911  typedef value_type& reference;
7912  bool operator==(const iterator& rhs) const
7913  {
7914  return _pos == rhs._pos;
7915  }
7916  bool operator!=(const iterator& rhs) const
7917  {
7918  return _pos != rhs._pos;
7919  }
7920  iterator& operator++()
7921  {
7922  // Skip through the data
7923  while (_pos != _len && _data[_pos] != _fieldSep)
7924  {
7925  ++_pos;
7926  }
7927  // Skip through any field separators
7928  while (_pos != _len && _data[_pos] == _fieldSep)
7929  {
7930  ++_pos;
7931  }
7932  return *this;
7933  }
7934 
7935  value_type operator*() const
7936  {
7937  value_type result;
7938  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7939  for (; i < _len && _data[i] != '='; ++i)
7940  {
7941  ++keyLength;
7942  }
7943 
7944  result.first.assign(_data + _pos, keyLength);
7945 
7946  if (i < _len && _data[i] == '=')
7947  {
7948  ++i;
7949  valueStart = i;
7950  for (; i < _len && _data[i] != _fieldSep; ++i)
7951  {
7952  valueLength++;
7953  }
7954  }
7955  result.second.assign(_data + valueStart, valueLength);
7956  return result;
7957  }
7958 
7959  friend class FIX;
7960  };
7961  class reverse_iterator
7962  {
7963  const char* _data;
7964  size_t _len;
7965  const char* _pos;
7966  char _fieldSep;
7967  public:
7968  typedef std::pair<Message::Field, Message::Field> value_type;
7969  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7970  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7971  {
7972  if (_pos)
7973  {
7974  // skip past meaningless trailing fieldseps
7975  while (_pos >= _data && *_pos == _fieldSep)
7976  {
7977  --_pos;
7978  }
7979  while (_pos > _data && *_pos != _fieldSep)
7980  {
7981  --_pos;
7982  }
7983  // if we stopped before the 0th character, it's because
7984  // it's a field sep. advance one to point to the first character
7985  // of a key.
7986  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7987  {
7988  ++_pos;
7989  }
7990  if (_pos < _data)
7991  {
7992  _pos = 0;
7993  }
7994  }
7995  }
7996  bool operator==(const reverse_iterator& rhs) const
7997  {
7998  return _pos == rhs._pos;
7999  }
8000  bool operator!=(const reverse_iterator& rhs) const
8001  {
8002  return _pos != rhs._pos;
8003  }
8004  reverse_iterator& operator++()
8005  {
8006  if (_pos == _data)
8007  {
8008  _pos = 0;
8009  }
8010  else
8011  {
8012  // back up 1 to a field separator
8013  --_pos;
8014  // keep backing up through field separators
8015  while (_pos >= _data && *_pos == _fieldSep)
8016  {
8017  --_pos;
8018  }
8019  // now back up to the beginning of this field
8020  while (_pos > _data && *_pos != _fieldSep)
8021  {
8022  --_pos;
8023  }
8024  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8025  {
8026  ++_pos;
8027  }
8028  if (_pos < _data)
8029  {
8030  _pos = 0;
8031  }
8032  }
8033  return *this;
8034  }
8035  value_type operator*() const
8036  {
8037  value_type result;
8038  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8039  size_t i = (size_t)(_pos - _data);
8040  for (; i < _len && _data[i] != '='; ++i)
8041  {
8042  ++keyLength;
8043  }
8044  result.first.assign(_pos, keyLength);
8045  if (i < _len && _data[i] == '=')
8046  {
8047  ++i;
8048  valueStart = i;
8049  for (; i < _len && _data[i] != _fieldSep; ++i)
8050  {
8051  valueLength++;
8052  }
8053  }
8054  result.second.assign(_data + valueStart, valueLength);
8055  return result;
8056  }
8057  };
8058  FIX(const Message::Field& data, char fieldSeparator = 1)
8059  : _data(data.data()), _len(data.len()),
8060  _fieldSep(fieldSeparator)
8061  {
8062  }
8063 
8064  FIX(const char* data, size_t len, char fieldSeparator = 1)
8065  : _data(data), _len(len), _fieldSep(fieldSeparator)
8066  {
8067  }
8068 
8069  iterator begin() const
8070  {
8071  return iterator(_data, _len, 0, _fieldSep);
8072  }
8073  iterator end() const
8074  {
8075  return iterator(_data, _len, _len, _fieldSep);
8076  }
8077 
8078 
8079  reverse_iterator rbegin() const
8080  {
8081  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8082  }
8083 
8084  reverse_iterator rend() const
8085  {
8086  return reverse_iterator(_data, _len, 0, _fieldSep);
8087  }
8088  };
8089 
8090 
8103 
8104  template <class T>
8106  {
8107  std::stringstream _data;
8108  char _fs;
8109  public:
8115  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8116 
8124  void append(const T& tag, const char* value, size_t offset, size_t length)
8125  {
8126  _data << tag << '=';
8127  _data.write(value + offset, (std::streamsize)length);
8128  _data << _fs;
8129  }
8135  void append(const T& tag, const std::string& value)
8136  {
8137  _data << tag << '=' << value << _fs;
8138  }
8139 
8142  std::string getString() const
8143  {
8144  return _data.str();
8145  }
8146  operator std::string() const
8147  {
8148  return _data.str();
8149  }
8150 
8152  void reset()
8153  {
8154  _data.str(std::string());
8155  }
8156  };
8157 
8161 
8163 
8167 
8169 
8170 
8178 
8180  {
8181  char _fs;
8182  public:
8187  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8188 
8191  typedef std::map<Message::Field, Message::Field> map_type;
8192 
8198  map_type toMap(const Message::Field& data)
8199  {
8200  FIX fix(data, _fs);
8201  map_type retval;
8202  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8203  {
8204  retval.insert(*a);
8205  }
8206 
8207  return retval;
8208  }
8209  };
8210 
8211 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8212  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8213  {
8214  Mutex _lock;
8215  std::deque<Message> _q;
8216  std::deque<Message> _cache;
8217  std::string _commandId;
8218  std::string _subId;
8219  std::string _queryId;
8220  Client _client;
8221  unsigned _timeout;
8222  unsigned _maxDepth;
8223  unsigned _requestedAcks;
8224  size_t _cacheMax;
8225  Message::Field _previousTopic;
8226  Message::Field _previousBookmark;
8227  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8228 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8229  std::atomic<State> _state;
8230 #else
8231  volatile State _state;
8232 #endif
8233  typedef std::map<std::string, Message*> SOWKeyMap;
8234  SOWKeyMap _sowKeyMap;
8235  public:
8236  MessageStreamImpl(const Client& client_)
8237  : _client(client_),
8238  _timeout(0),
8239  _maxDepth((unsigned)~0),
8240  _requestedAcks(0),
8241  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8242  _state(Unset)
8243  {
8244  if (_client.isValid())
8245  {
8246  _client.addConnectionStateListener(this);
8247  }
8248  }
8249 
8250  MessageStreamImpl(ClientImpl* client_)
8251  : _client(client_),
8252  _timeout(0),
8253  _maxDepth((unsigned)~0),
8254  _requestedAcks(0),
8255  _state(Unset)
8256  {
8257  if (_client.isValid())
8258  {
8259  _client.addConnectionStateListener(this);
8260  }
8261  }
8262 
8263  ~MessageStreamImpl()
8264  {
8265  }
8266 
8267  virtual void destroy()
8268  {
8269  try
8270  {
8271  close();
8272  }
8273  catch (std::exception& e)
8274  {
8275  try
8276  {
8277  if (_client.isValid())
8278  {
8279  _client.getExceptionListener().exceptionThrown(e);
8280  }
8281  }
8282  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8283  }
8284  if (_client.isValid())
8285  {
8286  _client.removeConnectionStateListener(this);
8287  Client c = _client;
8288  _client = Client((ClientImpl*)NULL);
8289  c.deferredExecution(MessageStreamImpl::destroyer, this);
8290  }
8291  else
8292  {
8293  delete this;
8294  }
8295  }
8296 
8297  static void destroyer(void* vpMessageStreamImpl_)
8298  {
8299  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8300  }
8301 
8302  void setSubscription(const std::string& subId_,
8303  const std::string& commandId_ = "",
8304  const std::string& queryId_ = "")
8305  {
8306  Lock<Mutex> lock(_lock);
8307  _subId = subId_;
8308  if (!commandId_.empty() && commandId_ != subId_)
8309  {
8310  _commandId = commandId_;
8311  }
8312  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8313  {
8314  _queryId = queryId_;
8315  }
8316  // It's possible to disconnect between creation/registration and here.
8317  if (Disconnected == _state)
8318  {
8319  return;
8320  }
8321  assert(Unset == _state);
8322  _state = Subscribe;
8323  }
8324 
8325  void setSOWOnly(const std::string& commandId_,
8326  const std::string& queryId_ = "")
8327  {
8328  Lock<Mutex> lock(_lock);
8329  _commandId = commandId_;
8330  if (!queryId_.empty() && queryId_ != commandId_)
8331  {
8332  _queryId = queryId_;
8333  }
8334  // It's possible to disconnect between creation/registration and here.
8335  if (Disconnected == _state)
8336  {
8337  return;
8338  }
8339  assert(Unset == _state);
8340  _state = SOWOnly;
8341  }
8342 
8343  void setStatsOnly(const std::string& commandId_,
8344  const std::string& queryId_ = "")
8345  {
8346  Lock<Mutex> lock(_lock);
8347  _commandId = commandId_;
8348  if (!queryId_.empty() && queryId_ != commandId_)
8349  {
8350  _queryId = queryId_;
8351  }
8352  // It's possible to disconnect between creation/registration and here.
8353  if (Disconnected == _state)
8354  {
8355  return;
8356  }
8357  assert(Unset == _state);
8358  _state = AcksOnly;
8359  _requestedAcks = Message::AckType::Stats;
8360  }
8361 
8362  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8363  {
8364  Lock<Mutex> lock(_lock);
8365  _commandId = commandId_;
8366  // It's possible to disconnect between creation/registration and here.
8367  if (Disconnected == _state)
8368  {
8369  return;
8370  }
8371  assert(Unset == _state);
8372  _state = AcksOnly;
8373  _requestedAcks = acks_;
8374  }
8375 
8376  void connectionStateChanged(ConnectionStateListener::State state_)
8377  {
8378  Lock<Mutex> lock(_lock);
8379  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8380  {
8381  _state = Disconnected;
8382  close();
8383  }
8384  _lock.signalAll();
8385  }
8386 
8387  void timeout(unsigned timeout_)
8388  {
8389  _timeout = timeout_;
8390  }
8391  void conflate(void)
8392  {
8393  if (_state == Subscribe)
8394  {
8395  _state = Conflate;
8396  }
8397  }
8398  void maxDepth(unsigned maxDepth_)
8399  {
8400  if (maxDepth_)
8401  {
8402  _maxDepth = maxDepth_;
8403  }
8404  else
8405  {
8406  _maxDepth = (unsigned)~0;
8407  }
8408  }
8409  unsigned getMaxDepth(void) const
8410  {
8411  return _maxDepth;
8412  }
8413  unsigned getDepth(void) const
8414  {
8415  return (unsigned)(_q.size());
8416  }
8417 
8418  bool next(Message& current_)
8419  {
8420  Lock<Mutex> lock(_lock);
8421  if (!_previousTopic.empty() && !_previousBookmark.empty())
8422  {
8423  try
8424  {
8425  if (_client.isValid())
8426  {
8427  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8428  }
8429  }
8430 #ifdef _WIN32
8431  catch (AMPSException&)
8432 #else
8433  catch (AMPSException& e)
8434 #endif
8435  {
8436  current_.invalidate();
8437  _previousTopic.clear();
8438  _previousBookmark.clear();
8439  return false;
8440  }
8441  _previousTopic.clear();
8442  _previousBookmark.clear();
8443  }
8444  // Don't wait to wait more than 1s at a time
8445  long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8446  Timer timer((double)_timeout);
8447  timer.start();
8448  while (_q.empty() && _state & Running)
8449  {
8450  // Using timeout so python can interrupt
8451  _lock.wait(minWaitTime);
8452  {
8453  Unlock<Mutex> unlck(_lock);
8454  amps_invoke_waiting_function();
8455  }
8456  if (_timeout)
8457  {
8458  // In case we woke up early, see how much longer to wait
8459  if (timer.checkAndGetRemaining(&minWaitTime))
8460  {
8461  // No time left
8462  break;
8463  }
8464  // Adjust next wait time
8465  minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8466  }
8467  }
8468  if (current_.isValid() && _cache.size() < _cacheMax)
8469  {
8470  current_.reset();
8471  _cache.push_back(current_);
8472  }
8473  if (!_q.empty())
8474  {
8475  current_ = _q.front();
8476  if (_q.size() == _maxDepth)
8477  {
8478  _lock.signalAll();
8479  }
8480  _q.pop_front();
8481  if (_state == Conflate)
8482  {
8483  std::string sowKey = current_.getSowKey();
8484  if (sowKey.length())
8485  {
8486  _sowKeyMap.erase(sowKey);
8487  }
8488  }
8489  else if (_state == AcksOnly)
8490  {
8491  _requestedAcks &= ~(current_.getAckTypeEnum());
8492  }
8493  if ((_state == AcksOnly && _requestedAcks == 0) ||
8494  (_state == SOWOnly && current_.getCommand() == "group_end"))
8495  {
8496  _state = Closed;
8497  }
8498  else if (current_.isValid()
8499  && current_.getCommandEnum() == Message::Command::Publish
8500  && _client.isValid() && _client.getAutoAck()
8501  && !current_.getLeasePeriod().empty()
8502  && !current_.getBookmark().empty())
8503  {
8504  _previousTopic = current_.getTopic().deepCopy();
8505  _previousBookmark = current_.getBookmark().deepCopy();
8506  }
8507  return true;
8508  }
8509  if (_state == Disconnected)
8510  {
8511  throw DisconnectedException("Connection closed.");
8512  }
8513  current_.invalidate();
8514  if (_state == Closed)
8515  {
8516  return false;
8517  }
8518  return _timeout != 0;
8519  }
8520  void close(void)
8521  {
8522  if (_client.isValid())
8523  {
8524  if (_state == SOWOnly || _state == Subscribe) //not delete
8525  {
8526  if (!_commandId.empty())
8527  {
8528  _client.unsubscribe(_commandId);
8529  }
8530  if (!_subId.empty())
8531  {
8532  _client.unsubscribe(_subId);
8533  }
8534  if (!_queryId.empty())
8535  {
8536  _client.unsubscribe(_queryId);
8537  }
8538  }
8539  else
8540  {
8541  if (!_commandId.empty())
8542  {
8543  _client.removeMessageHandler(_commandId);
8544  }
8545  if (!_subId.empty())
8546  {
8547  _client.removeMessageHandler(_subId);
8548  }
8549  if (!_queryId.empty())
8550  {
8551  _client.removeMessageHandler(_queryId);
8552  }
8553  }
8554  }
8555  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8556  {
8557  _state = Closed;
8558  }
8559  }
8560  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8561  {
8562  Lock<Mutex> lock(this_->_lock);
8563  if (this_->_state != Conflate)
8564  {
8565  AMPS_TESTING_SLOW_MESSAGE_STREAM
8566  if (this_->_q.size() >= this_->_maxDepth)
8567  {
8568  // We throw here so that heartbeats can be sent. The exception
8569  // will be handled internally only, and the same Message will
8570  // come back to try again. Make sure to signal.
8571  this_->_lock.signalAll();
8572  throw MessageStreamFullException("Stream is currently full.");
8573  }
8574  if (!this_->_cache.empty())
8575  {
8576  this_->_cache.front().deepCopy(message_);
8577  this_->_q.push_back(this_->_cache.front());
8578  this_->_cache.pop_front();
8579  }
8580  else
8581  {
8582 #ifdef AMPS_USE_EMPLACE
8583  this_->_q.emplace_back(message_.deepCopy());
8584 #else
8585  this_->_q.push_back(message_.deepCopy());
8586 #endif
8587  }
8588  if (message_.getCommandEnum() == Message::Command::Publish &&
8589  this_->_client.isValid() && this_->_client.getAutoAck() &&
8590  !message_.getLeasePeriod().empty() &&
8591  !message_.getBookmark().empty())
8592  {
8593  message_.setIgnoreAutoAck();
8594  }
8595  }
8596  else
8597  {
8598  std::string sowKey = message_.getSowKey();
8599  if (sowKey.length())
8600  {
8601  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8602  if (it != this_->_sowKeyMap.end())
8603  {
8604  it->second->deepCopy(message_);
8605  }
8606  else
8607  {
8608  if (this_->_q.size() >= this_->_maxDepth)
8609  {
8610  // We throw here so that heartbeats can be sent. The
8611  // exception will be handled internally only, and the
8612  // same Message will come back to try again. Make sure
8613  // to signal.
8614  this_->_lock.signalAll();
8615  throw MessageStreamFullException("Stream is currently full.");
8616  }
8617  if (!this_->_cache.empty())
8618  {
8619  this_->_cache.front().deepCopy(message_);
8620  this_->_q.push_back(this_->_cache.front());
8621  this_->_cache.pop_front();
8622  }
8623  else
8624  {
8625 #ifdef AMPS_USE_EMPLACE
8626  this_->_q.emplace_back(message_.deepCopy());
8627 #else
8628  this_->_q.push_back(message_.deepCopy());
8629 #endif
8630  }
8631  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8632  }
8633  }
8634  else
8635  {
8636  if (this_->_q.size() >= this_->_maxDepth)
8637  {
8638  // We throw here so that heartbeats can be sent. The exception
8639  // will be handled internally only, and the same Message will
8640  // come back to try again. Make sure to signal.
8641  this_->_lock.signalAll();
8642  throw MessageStreamFullException("Stream is currently full.");
8643  }
8644  if (!this_->_cache.empty())
8645  {
8646  this_->_cache.front().deepCopy(message_);
8647  this_->_q.push_back(this_->_cache.front());
8648  this_->_cache.pop_front();
8649  }
8650  else
8651  {
8652 #ifdef AMPS_USE_EMPLACE
8653  this_->_q.emplace_back(message_.deepCopy());
8654 #else
8655  this_->_q.push_back(message_.deepCopy());
8656 #endif
8657  }
8658  if (message_.getCommandEnum() == Message::Command::Publish &&
8659  this_->_client.isValid() && this_->_client.getAutoAck() &&
8660  !message_.getLeasePeriod().empty() &&
8661  !message_.getBookmark().empty())
8662  {
8663  message_.setIgnoreAutoAck();
8664  }
8665  }
8666  }
8667  this_->_lock.signalAll();
8668  }
8669  };
8670  inline MessageStream::MessageStream(void)
8671  {
8672  }
8673  inline MessageStream::MessageStream(const Client& client_)
8674  : _body(new MessageStreamImpl(client_))
8675  {
8676  }
8677  inline void MessageStream::iterator::advance(void)
8678  {
8679  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8680  }
8681  inline MessageStream::operator MessageHandler(void)
8682  {
8683  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8684  }
8685  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8686  {
8687  MessageStream result;
8688  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8689  {
8690  result._body = (MessageStreamImpl*)(handler_._userData);
8691  }
8692  return result;
8693  }
8694 
8695  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8696  const std::string& queryId_)
8697  {
8698  _body->setSOWOnly(commandId_, queryId_);
8699  }
8700  inline void MessageStream::setSubscription(const std::string& subId_,
8701  const std::string& commandId_,
8702  const std::string& queryId_)
8703  {
8704  _body->setSubscription(subId_, commandId_, queryId_);
8705  }
8706  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8707  const std::string& queryId_)
8708  {
8709  _body->setStatsOnly(commandId_, queryId_);
8710  }
8711  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8712  unsigned acks_)
8713  {
8714  _body->setAcksOnly(commandId_, acks_);
8715  }
8716  inline MessageStream MessageStream::timeout(unsigned timeout_)
8717  {
8718  _body->timeout(timeout_);
8719  return *this;
8720  }
8722  {
8723  _body->conflate();
8724  return *this;
8725  }
8726  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8727  {
8728  _body->maxDepth(maxDepth_);
8729  return *this;
8730  }
8731  inline unsigned MessageStream::getMaxDepth(void) const
8732  {
8733  return _body->getMaxDepth();
8734  }
8735  inline unsigned MessageStream::getDepth(void) const
8736  {
8737  return _body->getDepth();
8738  }
8739 
8740  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8741  {
8742  return *(_pEmptyMessageStream.get());
8743  }
8744 
8746  {
8747  // If the command is sow and has a sub_id, OR
8748  // if the command has a replace option, return the existing
8749  // messagestream, don't create a new one.
8750  ClientImpl& body = _body.get();
8751  Message& message = command_.getMessage();
8752  Field subId = message.getSubscriptionId();
8753  unsigned ackTypes = message.getAckTypeEnum();
8754  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8755  if (useExistingHandler)
8756  {
8757  // Try to find the existing message handler.
8758  if (!subId.empty())
8759  {
8760  MessageHandler existingHandler;
8761  if (body._routes.getRoute(subId, existingHandler))
8762  {
8763  // we found an existing handler. It might not be a message stream, but that's okay.
8764  body.executeAsync(command_, existingHandler, false);
8765  return MessageStream::fromExistingHandler(existingHandler);
8766  }
8767  }
8768  // fall through; we'll a new handler altogether.
8769  }
8770  // Make sure something will be returned to the stream or use the empty one
8771  // Check that: it's a command that doesn't normally return data, and there
8772  // are no acks requested for the cmd id
8773  Message::Command::Type command = message.getCommandEnum();
8774  if ((command & Message::Command::NoDataCommands)
8775  && (ackTypes == Message::AckType::Persisted
8776  || ackTypes == Message::AckType::None))
8777  {
8778  executeAsync(command_, MessageHandler());
8779  if (!body._pEmptyMessageStream)
8780  {
8781  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
8782  body._pEmptyMessageStream.get()->_body->close();
8783  }
8784  return body.getEmptyMessageStream();
8785  }
8786  MessageStream stream(*this);
8787  if (body.getDefaultMaxDepth())
8788  {
8789  stream.maxDepth(body.getDefaultMaxDepth());
8790  }
8791  MessageHandler handler = stream.operator MessageHandler();
8792  std::string commandID = body.executeAsync(command_, handler, false);
8793  if (command_.hasStatsAck())
8794  {
8795  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
8796  }
8797  else if (command_.isSow())
8798  {
8799  if (command_.getAckTypeEnum() & Message::AckType::Completed)
8800  {
8801  stream.setAcksOnly(commandID,
8802  ackTypes);
8803  }
8804  else
8805  {
8806  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
8807  }
8808  }
8809  else if (command_.isSubscribe())
8810  {
8811  stream.setSubscription(commandID,
8812  command_.getMessage().getCommandId(),
8813  command_.getMessage().getQueryId());
8814  }
8815  else
8816  {
8817  // Persisted acks for writes don't come back with command id
8818  if (command == Message::Command::Publish ||
8819  command == Message::Command::DeltaPublish ||
8820  command == Message::Command::SOWDelete)
8821  {
8822  stream.setAcksOnly(commandID,
8823  ackTypes & (unsigned)~Message::AckType::Persisted);
8824  }
8825  else
8826  {
8827  stream.setAcksOnly(commandID, ackTypes);
8828  }
8829  }
8830  return stream;
8831  }
8832 
8833 // This is here because it uses api from Client.
8834  inline void Message::ack(const char* options_) const
8835  {
8836  ClientImpl* pClient = _body.get().clientImpl();
8837  Message::Field bookmark = getBookmark();
8838  if (pClient && bookmark.len() &&
8839  !pClient->getAutoAck())
8840  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
8841  {
8842  pClient->ack(getTopic(), bookmark, options_);
8843  }
8844  }
8845 }// end namespace AMPS
8846 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:744
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:695
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5080
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6758
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6732
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:942
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5309
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8105
void startTimer()
Definition: ampsplusplus.hpp:6721
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6263
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1086
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8721
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5337
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:556
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:755
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7012
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:920
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7380
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5141
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5197
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6052
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:785
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1058
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:669
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7389
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7248
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6003
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5433
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1335
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5698
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5212
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:721
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5449
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7103
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7332
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5183
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6785
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:564
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1306
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8142
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7400
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5585
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4984
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:656
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7031
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:898
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7041
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5268
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5975
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1149
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7362
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8731
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5243
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5298
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1450
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6457
Success.
Definition: amps.h:221
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1032
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5675
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8187
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6099
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5281
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4977
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5486
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8745
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:642
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5441
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7414
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:947
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5172
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1494
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:825
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5747
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1239
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:688
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5133
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1094
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6411
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5063
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6987
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:714
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7022
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7095
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:579
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:847
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7110
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5534
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:662
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5397
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5810
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:571
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6898
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1049
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1490
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6820
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:793
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1260
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1044
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5915
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7051
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:588
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:806
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5766
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1026
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5152
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8115
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7260
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1039
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1080
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6301
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1180
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8124
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1320
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5389
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1277
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7321
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1370
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6696
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1478
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5381
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:701
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6891
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:596
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5409
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6954
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8135
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7311
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5652
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6388
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:878
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4939
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6634
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1269
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:833
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5631
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7140
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:621
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6593
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5785
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6064
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7303
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5933
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1228
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6557
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:862
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6878
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1352
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5464
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:734
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5359
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8179
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5368
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7236
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1428
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8191
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:682
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5351
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1212
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8198
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:203
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6490
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6948
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5868
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4931
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1344
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:799
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7073
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:727
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:812
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7084
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:668
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1298
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:675
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8726
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5250
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6137
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8735
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5159
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6924
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:853
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5219
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:884
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1248
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7062
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:708
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6865
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8152
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5478
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6512
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7344
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:638
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7293
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5509
Definition: ampsplusplus.hpp:102
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5118
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:995
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1290
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1400
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1156
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:778
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6158
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5558
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6961
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5724
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6841
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5897
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8716
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6199
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5125
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:468
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6349
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5836
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6024
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:608
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7174
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:766
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6673
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4995
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7371
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7284
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6231