AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.2
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43  #include <inttypes.h>
44 #endif
45 #if defined(sun)
46  #include <sys/atomic.h>
47 #endif
48 #include "amps/BookmarkStore.hpp"
49 #include "amps/MessageRouter.hpp"
50 #include "amps/util.hpp"
51 #include "amps/ampscrc.hpp"
52 #if __cplusplus >= 201100L || _MSC_VER >= 1900
53  #include <atomic>
54 #endif
55 
56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
57  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
58 #endif
59 
64 
65 
72 
83 
84 // For StoreBuffer implementations
85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
91 #define AMPS_DEFAULT_TOP_N -1
92 #define AMPS_DEFAULT_BATCH_SIZE 10
93 #define AMPS_NUMBER_BUFFER_LEN 20
94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
95 
96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
97  #define AMPS_X64 1
98 #endif
99 
100 static thread_local AMPS::Message publishStoreMessage;
101 
102 namespace AMPS
103 {
104 
105  typedef std::map<std::string, std::string> ConnectionInfo;
106 
107  template<class Type>
108  inline std::string asString(Type x_)
109  {
110  std::ostringstream os;
111  os << x_;
112  return os.str();
113  }
114 
115  inline
116  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
117  {
118  size_t pos = AMPS_NUMBER_BUFFER_LEN;
119  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
120  {
121  if (seqNo_ > 0)
122  {
123  buf_[--pos] = (char)(seqNo_ % 10 + '0');
124  seqNo_ /= 10;
125  }
126  }
127  return pos;
128  }
129 
130 #ifdef _WIN32
131  inline
132  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
133  {
134  size_t pos = AMPS_NUMBER_BUFFER_LEN;
135  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
136  {
137  if (seqNo_ > 0)
138  {
139  buf_[--pos] = (char)(seqNo_ % 10 + '0');
140  seqNo_ /= 10;
141  }
142  }
143  return pos;
144  }
145 #endif
146 
150  class Reason
151  {
152  public:
153  static const char* duplicate()
154  {
155  return "duplicate";
156  }
157  static const char* badFilter()
158  {
159  return "bad filter";
160  }
161  static const char* badRegexTopic()
162  {
163  return "bad regex topic";
164  }
165  static const char* subscriptionAlreadyExists()
166  {
167  return "subscription already exists";
168  }
169  static const char* nameInUse()
170  {
171  return "name in use";
172  }
173  static const char* authFailure()
174  {
175  return "auth failure";
176  }
177  static const char* notEntitled()
178  {
179  return "not entitled";
180  }
181  static const char* authDisabled()
182  {
183  return "authentication disabled";
184  }
185  static const char* subidInUse()
186  {
187  return "subid in use";
188  }
189  static const char* noTopic()
190  {
191  return "no topic";
192  }
193  };
194 
204  {
205  public:
206  virtual ~ExceptionListener() {;}
207  virtual void exceptionThrown(const std::exception&) const {;}
208  };
209 
211 
212 
213 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
214  try\
215  {\
216  x;\
217  }\
218  catch (std::exception& ex_)\
219  {\
220  try\
221  {\
222  _exceptionListener->exceptionThrown(ex_);\
223  }\
224  catch(...)\
225  {\
226  ;\
227  }\
228  }
229  /*
230  * Note : we don't attempt to trap non std::exception exceptions
231  * here because doing so interferes with pthread_exit on some OSes.
232  catch (...)\
233  {\
234  try\
235  {\
236  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
237  "An unhandled exception of unknown type was thrown by "\
238  "the registered handler.", AMPS_E_USAGE));\
239  }\
240  catch(...)\
241  {\
242  ;\
243  }\
244  }
245  */
246 #ifdef _WIN32
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
248  try\
249  {\
250  while(me->_connected)\
251  {\
252  try\
253  {\
254  x;\
255  break;\
256  }\
257  catch(MessageStreamFullException&)\
258  {\
259  try\
260  {\
261  me->checkAndSendHeartbeat(false);\
262  }\
263  catch (std::exception& ex_)\
264  {\
265  try\
266  {\
267  me->_exceptionListener->exceptionThrown(ex_);\
268  }\
269  catch(...)\
270  {\
271  ;\
272  }\
273  break;\
274  }\
275  }\
276  }\
277  }\
278  catch (std::exception& ex_)\
279  {\
280  try\
281  {\
282  me->_exceptionListener->exceptionThrown(ex_);\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }
289  /*
290  * Note : we don't attempt to trap non std::exception exceptions
291  * here because doing so interferes with pthread_exit on some OSes.
292  catch (...)\
293  {\
294  try\
295  {\
296  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
297  "An unhandled exception of unknown type was thrown by "\
298  "the registered handler.", AMPS_E_USAGE));\
299  }\
300  catch(...)\
301  {\
302  ;\
303  }\
304  }*/
305 
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException&)\
315  {\
316  try\
317  {\
318  me->checkAndSendHeartbeat(false);\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  break;\
331  }\
332  }\
333  }
334 #else
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
336  try\
337  {\
338  while(me->_connected)\
339  {\
340  try\
341  {\
342  x;\
343  break;\
344  }\
345  catch(MessageStreamFullException& ex_)\
346  {\
347  try\
348  {\
349  me->checkAndSendHeartbeat(false);\
350  }\
351  catch (std::exception& ex_)\
352  {\
353  try\
354  {\
355  me->_exceptionListener->exceptionThrown(ex_);\
356  }\
357  catch(...)\
358  {\
359  ;\
360  }\
361  break;\
362  }\
363  }\
364  }\
365  }\
366  catch (std::exception& ex_)\
367  {\
368  try\
369  {\
370  me->_exceptionListener->exceptionThrown(ex_);\
371  }\
372  catch(...)\
373  {\
374  ;\
375  }\
376  }
377  /*
378  * Note : we don't attempt to trap non std::exception exceptions
379  * here because doing so interferes with pthread_exit on some OSes.
380  catch (...)\
381  {\
382  try\
383  {\
384  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
385  "An unhandled exception of unknown type was thrown by "\
386  "the registered handler.", AMPS_E_USAGE));\
387  }\
388  catch(...)\
389  {\
390  ;\
391  }\
392  }*/
393 
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395  while(me->_connected)\
396  {\
397  try\
398  {\
399  x;\
400  break;\
401  }\
402  catch(MessageStreamFullException& ex_)\
403  {\
404  try\
405  {\
406  me->checkAndSendHeartbeat(false);\
407  }\
408  catch (std::exception& ex_)\
409  {\
410  try\
411  {\
412  me->_exceptionListener->exceptionThrown(ex_);\
413  }\
414  catch(...)\
415  {\
416  ;\
417  }\
418  break;\
419  }\
420  }\
421  }
422 #endif
423 
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
425  try\
426  {\
427  _exceptionListener->exceptionThrown(ex);\
428  }\
429  catch(...)\
430  {;}
431 
432 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
433  try\
434  {\
435  me->_exceptionListener->exceptionThrown(ex);\
436  }\
437  catch(...)\
438  {;}
439 
440 
441  class Client;
442 
467 
468  class Command
469  {
470  Message _message;
471  unsigned _timeout;
472  unsigned _batchSize;
473  unsigned _flags;
474  static const unsigned Subscribe = 1;
475  static const unsigned SOW = 2;
476  static const unsigned NeedsSequenceNumber = 4;
477  static const unsigned ProcessedAck = 8;
478  static const unsigned StatsAck = 16;
479  void init(Message::Command::Type command_)
480  {
481  _timeout = 0;
482  _batchSize = 0;
483  _flags = 0;
484  _message.reset();
485  _message.setCommandEnum(command_);
486  _setIds();
487  }
488  void init(const std::string& command_)
489  {
490  _timeout = 0;
491  _batchSize = 0;
492  _flags = 0;
493  _message.reset();
494  _message.setCommand(command_);
495  _setIds();
496  }
497  void init(const char* command_, size_t commandLen_)
498  {
499  _timeout = 0;
500  _batchSize = 0;
501  _flags = 0;
502  _message.reset();
503  _message.setCommand(command_, commandLen_);
504  _setIds();
505  }
506  void _setIds(void)
507  {
508  Message::Command::Type command = _message.getCommandEnum();
509  if (!(command & Message::Command::NoDataCommands))
510  {
511  _message.newCommandId();
512  if (command == Message::Command::Subscribe ||
513  command == Message::Command::SOWAndSubscribe ||
514  command == Message::Command::DeltaSubscribe ||
515  command == Message::Command::SOWAndDeltaSubscribe)
516  {
517  _message.setSubscriptionId(_message.getCommandId());
518  _flags |= Subscribe;
519  }
520  if (command == Message::Command::SOW
521  || command == Message::Command::SOWAndSubscribe
522  || command == Message::Command::SOWAndDeltaSubscribe)
523  {
524  _message.setQueryID(_message.getCommandId());
525  if (_batchSize == 0)
526  {
527  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
528  }
529  if (command == Message::Command::SOW)
530  {
531  _flags |= SOW;
532  }
533  }
534  _flags |= ProcessedAck;
535  }
536  else if (command == Message::Command::SOWDelete)
537  {
538  _message.newCommandId();
539  _flags |= ProcessedAck;
540  _flags |= NeedsSequenceNumber;
541  }
542  else if (command == Message::Command::Publish
543  || command == Message::Command::DeltaPublish)
544  {
545  _flags |= NeedsSequenceNumber;
546  }
547  else if (command == Message::Command::StopTimer)
548  {
549  _message.newCommandId();
550  }
551  }
552  public:
556  Command(const std::string& command_)
557  {
558  init(command_);
559  }
564  Command(const char* command_, size_t commandLen_)
565  {
566  init(command_, commandLen_);
567  }
571  Command(Message::Command::Type command_)
572  {
573  init(command_);
574  }
575 
579  Command& reset(const std::string& command_)
580  {
581  init(command_);
582  return *this;
583  }
588  Command& reset(const char* command_, size_t commandLen_)
589  {
590  init(command_, commandLen_);
591  return *this;
592  }
596  Command& reset(Message::Command::Type command_)
597  {
598  init(command_);
599  return *this;
600  }
608  Command& setSowKey(const std::string& sowKey_)
609  {
610  _message.setSowKey(sowKey_);
611  return *this;
612  }
621  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
622  {
623  _message.setSowKey(sowKey_, sowKeyLen_);
624  return *this;
625  }
638  Command& setSowKeys(const std::string& sowKeys_)
639  {
640  _message.setSowKeys(sowKeys_);
641  return *this;
642  }
656  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
657  {
658  _message.setSowKeys(sowKeys_, sowKeysLen_);
659  return *this;
660  }
662  Command& setCommandId(const std::string& cmdId_)
663  {
664  _message.setCommandId(cmdId_);
665  return *this;
666  }
669  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
670  {
671  _message.setCommandId(cmdId_, cmdIdLen_);
672  return *this;
673  }
675  Command& setTopic(const std::string& topic_)
676  {
677  _message.setTopic(topic_);
678  return *this;
679  }
682  Command& setTopic(const char* topic_, size_t topicLen_)
683  {
684  _message.setTopic(topic_, topicLen_);
685  return *this;
686  }
688  Command& setFilter(const std::string& filter_)
689  {
690  _message.setFilter(filter_);
691  return *this;
692  }
695  Command& setFilter(const char* filter_, size_t filterLen_)
696  {
697  _message.setFilter(filter_, filterLen_);
698  return *this;
699  }
701  Command& setOrderBy(const std::string& orderBy_)
702  {
703  _message.setOrderBy(orderBy_);
704  return *this;
705  }
708  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
709  {
710  _message.setOrderBy(orderBy_, orderByLen_);
711  return *this;
712  }
714  Command& setSubId(const std::string& subId_)
715  {
716  _message.setSubscriptionId(subId_);
717  return *this;
718  }
721  Command& setSubId(const char* subId_, size_t subIdLen_)
722  {
723  _message.setSubscriptionId(subId_, subIdLen_);
724  return *this;
725  }
727  Command& setQueryId(const std::string& queryId_)
728  {
729  _message.setQueryId(queryId_);
730  return *this;
731  }
734  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
735  {
736  _message.setQueryId(queryId_, queryIdLen_);
737  return *this;
738  }
744  Command& setBookmark(const std::string& bookmark_)
745  {
746  _message.setBookmark(bookmark_);
747  return *this;
748  }
755  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
756  {
757  _message.setBookmark(bookmark_, bookmarkLen_);
758  return *this;
759  }
766  Command& setCorrelationId(const std::string& correlationId_)
767  {
768  _message.setCorrelationId(correlationId_);
769  return *this;
770  }
778  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
779  {
780  _message.setCorrelationId(correlationId_, correlationIdLen_);
781  return *this;
782  }
785  Command& setOptions(const std::string& options_)
786  {
787  _message.setOptions(options_);
788  return *this;
789  }
793  Command& setOptions(const char* options_, size_t optionsLen_)
794  {
795  _message.setOptions(options_, optionsLen_);
796  return *this;
797  }
799  Command& setSequence(const std::string& seq_)
800  {
801  _message.setSequence(seq_);
802  return *this;
803  }
806  Command& setSequence(const char* seq_, size_t seqLen_)
807  {
808  _message.setSequence(seq_, seqLen_);
809  return *this;
810  }
812  Command& setSequence(const amps_uint64_t seq_)
813  {
814  std::ostringstream os;
815  os << seq_;
816  _message.setSequence(os.str());
817  return *this;
818  }
819  amps_uint64_t getSequence() const
820  {
821  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
822  }
825  Command& setData(const std::string& data_)
826  {
827  _message.setData(data_);
828  return *this;
829  }
833  Command& setData(const char* data_, size_t dataLen_)
834  {
835  _message.setData(data_, dataLen_);
836  return *this;
837  }
847  Command& setTimeout(unsigned timeout_)
848  {
849  _timeout = timeout_;
850  return *this;
851  }
853  Command& setTopN(unsigned topN_)
854  {
855  _message.setTopNRecordsReturned(topN_);
856  return *this;
857  }
862  Command& setBatchSize(unsigned batchSize_)
863  {
864  _message.setBatchSize(batchSize_);
865  _batchSize = batchSize_;
866  return *this;
867  }
878  Command& setExpiration(unsigned expiration_)
879  {
880  _message.setExpiration(expiration_);
881  return *this;
882  }
884  Command& addAckType(const std::string& ackType_)
885  {
886  _message.setAckType(_message.getAckType() + "," + ackType_);
887  if (ackType_ == "processed")
888  {
889  _flags |= ProcessedAck;
890  }
891  else if (ackType_ == "stats")
892  {
893  _flags |= StatsAck;
894  }
895  return *this;
896  }
898  Command& setAckType(const std::string& ackType_)
899  {
900  _message.setAckType(ackType_);
901  if (ackType_.find("processed") != std::string::npos)
902  {
903  _flags |= ProcessedAck;
904  }
905  else
906  {
907  _flags &= ~ProcessedAck;
908  }
909  if (ackType_.find("stats") != std::string::npos)
910  {
911  _flags |= StatsAck;
912  }
913  else
914  {
915  _flags &= ~StatsAck;
916  }
917  return *this;
918  }
920  Command& setAckType(unsigned ackType_)
921  {
922  _message.setAckTypeEnum(ackType_);
923  if (ackType_ & Message::AckType::Processed)
924  {
925  _flags |= ProcessedAck;
926  }
927  else
928  {
929  _flags &= ~ProcessedAck;
930  }
931  if (ackType_ & Message::AckType::Stats)
932  {
933  _flags |= StatsAck;
934  }
935  else
936  {
937  _flags &= ~StatsAck;
938  }
939  return *this;
940  }
942  std::string getAckType() const
943  {
944  return (std::string)(_message.getAckType());
945  }
947  unsigned getAckTypeEnum() const
948  {
949  return _message.getAckTypeEnum();
950  }
951 
952  Message& getMessage(void)
953  {
954  return _message;
955  }
956  unsigned getTimeout(void) const
957  {
958  return _timeout;
959  }
960  unsigned getBatchSize(void) const
961  {
962  return _batchSize;
963  }
964  bool isSubscribe(void) const
965  {
966  return _flags & Subscribe;
967  }
968  bool isSow(void) const
969  {
970  return (_flags & SOW) != 0;
971  }
972  bool hasProcessedAck(void) const
973  {
974  return (_flags & ProcessedAck) != 0;
975  }
976  bool hasStatsAck(void) const
977  {
978  return (_flags & StatsAck) != 0;
979  }
980  bool needsSequenceNumber(void) const
981  {
982  return (_flags & NeedsSequenceNumber) != 0;
983  }
984  };
985 
988  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
989 
990  class Message;
992 
996  {
997  public:
998  virtual ~Authenticator() {;}
999 
1005  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
1013  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
1020  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
1021  };
1022 
1027  {
1028  public:
1029  virtual ~DefaultAuthenticator() {;}
1032  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1033  {
1034  return password_;
1035  }
1036 
1039  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1040  {
1041  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1042  }
1043 
1044  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1045 
1050  {
1051  static DefaultAuthenticator d;
1052  return d;
1053  }
1054  };
1055 
1059  {
1060  public:
1061 
1065  virtual void execute(Message& message_) = 0;
1066 
1067  virtual ~StoreReplayer() {;}
1068  };
1069 
1070  class Store;
1071 
1080  typedef bool (*PublishStoreResizeHandler)(Store store_,
1081  size_t size_,
1082  void* userData_);
1083 
1086  class StoreImpl : public RefBody
1087  {
1088  public:
1094  StoreImpl(bool errorOnPublishGap_ = false)
1095  : _resizeHandler(NULL)
1096  , _resizeHandlerData(NULL)
1097  , _errorOnPublishGap(errorOnPublishGap_)
1098  {;}
1099 
1104  virtual amps_uint64_t store(const Message& message_) = 0;
1105 
1112  virtual void discardUpTo(amps_uint64_t index_) = 0;
1113 
1118  virtual void replay(StoreReplayer& replayer_) = 0;
1119 
1127  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1128 
1133  virtual size_t unpersistedCount() const = 0;
1134 
1135  virtual ~StoreImpl() {;}
1136 
1145  virtual void flush(long timeout_) = 0;
1146 
1149  static inline size_t getUnsetPosition()
1150  {
1151  return AMPS_UNSET_INDEX;
1152  }
1153 
1156  static inline amps_uint64_t getUnsetSequence()
1157  {
1158  return AMPS_UNSET_SEQUENCE;
1159  }
1160 
1164  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1165 
1169  virtual amps_uint64_t getLastPersisted() = 0;
1170 
1180  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1181  void* userData_)
1182  {
1183  _resizeHandler = handler_;
1184  _resizeHandlerData = userData_;
1185  }
1186 
1187  inline virtual PublishStoreResizeHandler getResizeHandler() const
1188  {
1189  return _resizeHandler;
1190  }
1191 
1192  bool callResizeHandler(size_t newSize_);
1193 
1194  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1195  {
1196  _errorOnPublishGap = errorOnPublishGap_;
1197  }
1198 
1199  inline virtual bool getErrorOnPublishGap() const
1200  {
1201  return _errorOnPublishGap;
1202  }
1203 
1204  private:
1205  PublishStoreResizeHandler _resizeHandler;
1206  void* _resizeHandlerData;
1207  bool _errorOnPublishGap;
1208  };
1209 
1212  class Store
1213  {
1214  RefHandle<StoreImpl> _body;
1215  public:
1216  Store() {;}
1217  Store(StoreImpl* body_) : _body(body_) {;}
1218  Store(const Store& rhs) : _body(rhs._body) {;}
1219  Store& operator=(const Store& rhs)
1220  {
1221  _body = rhs._body;
1222  return *this;
1223  }
1224 
1228  amps_uint64_t store(const Message& message_)
1229  {
1230  return _body.get().store(message_);
1231  }
1232 
1239  void discardUpTo(amps_uint64_t index_)
1240  {
1241  _body.get().discardUpTo(index_);
1242  }
1243 
1248  void replay(StoreReplayer& replayer_)
1249  {
1250  _body.get().replay(replayer_);
1251  }
1252 
1260  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1261  {
1262  return _body.get().replaySingle(replayer_, index_);
1263  }
1264 
1269  size_t unpersistedCount() const
1270  {
1271  return _body.get().unpersistedCount();
1272  }
1273 
1277  bool isValid() const
1278  {
1279  return _body.isValid();
1280  }
1281 
1290  void flush(long timeout_ = 0)
1291  {
1292  return _body.get().flush(timeout_);
1293  }
1294 
1298  amps_uint64_t getLowestUnpersisted()
1299  {
1300  return _body.get().getLowestUnpersisted();
1301  }
1302 
1306  amps_uint64_t getLastPersisted()
1307  {
1308  return _body.get().getLastPersisted();
1309  }
1310 
1320  void setResizeHandler(PublishStoreResizeHandler handler_,
1321  void* userData_)
1322  {
1323  _body.get().setResizeHandler(handler_, userData_);
1324  }
1325 
1326  PublishStoreResizeHandler getResizeHandler() const
1327  {
1328  return _body.get().getResizeHandler();
1329  }
1330 
1335  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1336  {
1337  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1338  }
1339 
1344  inline bool getErrorOnPublishGap() const
1345  {
1346  return _body.get().getErrorOnPublishGap();
1347  }
1348 
1352  StoreImpl* get()
1353  {
1354  if (_body.isValid())
1355  {
1356  return &_body.get();
1357  }
1358  else
1359  {
1360  return NULL;
1361  }
1362  }
1363 
1364  };
1365 
1371  {
1372  public:
1373  virtual ~FailedWriteHandler() {;}
1380  virtual void failedWrite(const Message& message_,
1381  const char* reason_, size_t reasonLength_) = 0;
1382  };
1383 
1384 
1385  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1386  {
1387  if (_resizeHandler)
1388  {
1389  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1390  }
1391  return true;
1392  }
1393 
1400  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1401  void* data_)
1402  {
1403  long* timeoutp = (long*)data_;
1404  size_t count = store_.unpersistedCount();
1405  if (count == 0)
1406  {
1407  return false;
1408  }
1409  try
1410  {
1411  store_.flush(*timeoutp);
1412  }
1413 #ifdef _WIN32
1414  catch (const TimedOutException&)
1415 #else
1416  catch (const TimedOutException& e)
1417 #endif
1418  {
1419  return true;
1420  }
1421  return (count == store_.unpersistedCount());
1422  }
1423 
1429  {
1430  public:
1442  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1443  unsigned requestedAckTypes_,
1444  const AMPSException& exception_) = 0;
1445  };
1446 
1451  {
1452  public:
1453  virtual ~SubscriptionManager() {;}
1461  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1462  unsigned requestedAckTypes_) = 0;
1466  virtual void unsubscribe(const Message::Field& subId_) = 0;
1469  virtual void clear() = 0;
1473  virtual void resubscribe(Client& client_) = 0;
1478  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1479  {
1480  _failedResubscribeHandler = handler_;
1481  }
1482  protected:
1483  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1484  };
1485 
1489 
1491  {
1492  public:
1494  typedef enum { Disconnected = 0,
1495  Shutdown = 1,
1496  Connected = 2,
1497  LoggedOn = 4,
1498  PublishReplayed = 8,
1499  HeartbeatInitiated = 16,
1500  Resubscribed = 32,
1501  UNKNOWN = 16384
1502  } State;
1503 
1513  virtual void connectionStateChanged(State newState_) = 0;
1514  virtual ~ConnectionStateListener() {;};
1515  };
1516 
1517 
1518  class MessageStreamImpl;
1519  class MessageStream;
1520 
1521  typedef void(*DeferredExecutionFunc)(void*);
1522 
1523  class ClientImpl : public RefBody // -V553
1524  {
1525  // Class to wrap turning of Nagle for things like flush and logon
1526  class NoDelay
1527  {
1528  private:
1529  AMPS_SOCKET _socket;
1530  int _noDelay;
1531  char* _valuePtr;
1532 #ifdef _WIN32
1533  int _valueLen;
1534 #else
1535  socklen_t _valueLen;
1536 #endif
1537  public:
1538  NoDelay(amps_handle client_)
1539  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1540  {
1541  _valuePtr = (char*)&_noDelay;
1542  _socket = amps_client_get_socket(client_);
1543  if (_socket != AMPS_INVALID_SOCKET)
1544  {
1545  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1546  if (!_noDelay)
1547  {
1548  _noDelay = 1;
1549  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1550  }
1551  else
1552  {
1553  _socket = AMPS_INVALID_SOCKET;
1554  }
1555  }
1556  }
1557 
1558  ~NoDelay()
1559  {
1560  if (_socket != AMPS_INVALID_SOCKET)
1561  {
1562  _noDelay = 0;
1563  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1564  }
1565  }
1566  };
1567 
1568  friend class Client;
1569  protected:
1570  amps_handle _client;
1571  DisconnectHandler _disconnectHandler;
1572  enum GlobalCommandTypeHandlers : size_t
1573  {
1574  Publish = 0,
1575  SOW = 1,
1576  GroupBegin = 2,
1577  GroupEnd = 3,
1578  Heartbeat = 4,
1579  OOF = 5,
1580  Ack = 6,
1581  LastChance = 7,
1582  DuplicateMessage = 8,
1583  COUNT = 9
1584  };
1585  std::vector<MessageHandler> _globalCommandTypeHandlers;
1586  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1587  MessageRouter _routes;
1588  MessageRouter::RouteCache _routeCache;
1589  mutable Mutex _lock;
1590  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1591  amps_uint64_t _nameHashValue;
1592  BookmarkStore _bookmarkStore;
1593  Store _publishStore;
1594  bool _isRetryOnDisconnect;
1595  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1596 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1597  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1598 #else
1599  volatile amps_uint64_t _lastSentHaSequenceNumber;
1600 #endif
1601  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1602  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1603  VersionInfo _serverVersion;
1604  Timer _heartbeatTimer;
1605  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1606 
1607  // queue data
1608  int _queueAckTimeout;
1609  bool _isAutoAckEnabled;
1610  unsigned _ackBatchSize;
1611  unsigned _queuedAckCount;
1612  unsigned _defaultMaxDepth;
1613  struct QueueBookmarks
1614  {
1615  QueueBookmarks(const std::string& topic_)
1616  : _topic(topic_)
1617  , _oldestTime(0)
1618  , _bookmarkCount(0)
1619  {;}
1620  std::string _topic;
1621  std::string _data;
1622  amps_uint64_t _oldestTime;
1623  unsigned _bookmarkCount;
1624  };
1625  typedef amps_uint64_t topic_hash;
1626  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1627  TopicHashMap _topicHashMap;
1628 
1629  class ClientStoreReplayer : public StoreReplayer
1630  {
1631  ClientImpl* _client;
1632  public:
1633  unsigned _version;
1634  amps_result _res;
1635 
1636  ClientStoreReplayer()
1637  : _client(NULL), _version(0), _res(AMPS_E_OK)
1638  {}
1639 
1640  ClientStoreReplayer(ClientImpl* client_)
1641  : _client(client_), _version(0), _res(AMPS_E_OK)
1642  {}
1643 
1644  void setClient(ClientImpl* client_)
1645  {
1646  _client = client_;
1647  }
1648 
1649  void execute(Message& message_)
1650  {
1651  if (!_client)
1652  {
1653  throw CommandException("Can't replay without a client.");
1654  }
1655  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1656  AMPS_Sequence);
1657  if (index > _client->_lastSentHaSequenceNumber)
1658  {
1659  _client->_lastSentHaSequenceNumber = index;
1660  }
1661 
1662  _res = AMPS_E_OK;
1663  // Don't replay a queue cancel message after a reconnect.
1664  // Currently, the only messages that will have anything in options
1665  // are cancel messages.
1666  if (!message_.getCommand().empty() &&
1667  (!_client->_logonInProgress ||
1668  message_.getOptions().len() < 6))
1669  {
1670  _res = amps_client_send_with_version(_client->_client,
1671  message_.getMessage(),
1672  &_version);
1673  if (_res != AMPS_E_OK)
1674  {
1675  throw DisconnectedException("AMPS Server disconnected during replay");
1676  }
1677  }
1678  }
1679 
1680  };
1681  ClientStoreReplayer _replayer;
1682 
1683  class FailedWriteStoreReplayer : public StoreReplayer
1684  {
1685  ClientImpl* _parent;
1686  const char* _reason;
1687  size_t _reasonLength;
1688  size_t _replayCount;
1689  public:
1690  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1691  : _parent(parent),
1692  _reason(reason_),
1693  _reasonLength(reasonLength_),
1694  _replayCount(0)
1695  {;}
1696  void execute(Message& message_)
1697  {
1698  if (_parent->_failedWriteHandler)
1699  {
1700  ++_replayCount;
1701  _parent->_failedWriteHandler->failedWrite(message_,
1702  _reason, _reasonLength);
1703  }
1704  }
1705  size_t replayCount(void) const
1706  {
1707  return _replayCount;
1708  }
1709  };
1710 
1711  struct AckResponseImpl : public RefBody
1712  {
1713  std::string username, password, reason, status, bookmark, options;
1714  amps_uint64_t sequenceNo;
1715  amps_uint64_t nameHashValue;
1716  VersionInfo serverVersion;
1717 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1718  std::atomic<bool> responded;
1719  std::atomic<bool> abandoned;
1720 #else
1721  volatile bool responded;
1722  volatile bool abandoned;
1723 #endif
1724  unsigned connectionVersion;
1725  AckResponseImpl() :
1726  RefBody(),
1727  sequenceNo((amps_uint64_t)0),
1728  serverVersion(),
1729  responded(false),
1730  abandoned(false),
1731  connectionVersion(0)
1732  {
1733  }
1734  };
1735 
1736  class AckResponse
1737  {
1738  RefHandle<AckResponseImpl> _body;
1739  public:
1740  AckResponse() : _body(NULL) {;}
1741  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1742  static AckResponse create()
1743  {
1744  AckResponse r;
1745  r._body = new AckResponseImpl();
1746  return r;
1747  }
1748 
1749  const std::string& username()
1750  {
1751  return _body.get().username;
1752  }
1753  void setUsername(const char* data_, size_t len_)
1754  {
1755  if (data_)
1756  {
1757  _body.get().username.assign(data_, len_);
1758  }
1759  else
1760  {
1761  _body.get().username.clear();
1762  }
1763  }
1764  const std::string& password()
1765  {
1766  return _body.get().password;
1767  }
1768  void setPassword(const char* data_, size_t len_)
1769  {
1770  if (data_)
1771  {
1772  _body.get().password.assign(data_, len_);
1773  }
1774  else
1775  {
1776  _body.get().password.clear();
1777  }
1778  }
1779  const std::string& reason()
1780  {
1781  return _body.get().reason;
1782  }
1783  void setReason(const char* data_, size_t len_)
1784  {
1785  if (data_)
1786  {
1787  _body.get().reason.assign(data_, len_);
1788  }
1789  else
1790  {
1791  _body.get().reason.clear();
1792  }
1793  }
1794  const std::string& status()
1795  {
1796  return _body.get().status;
1797  }
1798  void setStatus(const char* data_, size_t len_)
1799  {
1800  if (data_)
1801  {
1802  _body.get().status.assign(data_, len_);
1803  }
1804  else
1805  {
1806  _body.get().status.clear();
1807  }
1808  }
1809  const std::string& bookmark()
1810  {
1811  return _body.get().bookmark;
1812  }
1813  void setBookmark(const Field& bookmark_)
1814  {
1815  if (!bookmark_.empty())
1816  {
1817  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1818  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1819  _body.get().sequenceNo);
1820  }
1821  else
1822  {
1823  _body.get().bookmark.clear();
1824  _body.get().sequenceNo = (amps_uint64_t)0;
1825  _body.get().nameHashValue = (amps_uint64_t)0;
1826  }
1827  }
1828  amps_uint64_t sequenceNo() const
1829  {
1830  return _body.get().sequenceNo;
1831  }
1832  amps_uint64_t nameHashValue() const
1833  {
1834  return _body.get().nameHashValue;
1835  }
1836  void setSequenceNo(const char* data_, size_t len_)
1837  {
1838  amps_uint64_t result = (amps_uint64_t)0;
1839  if (data_)
1840  {
1841  for (size_t i = 0; i < len_; ++i)
1842  {
1843  result *= (amps_uint64_t)10;
1844  result += (amps_uint64_t)(data_[i] - '0');
1845  }
1846  }
1847  _body.get().sequenceNo = result;
1848  }
1849  VersionInfo serverVersion() const
1850  {
1851  return _body.get().serverVersion;
1852  }
1853  void setServerVersion(const char* data_, size_t len_)
1854  {
1855  if (data_)
1856  {
1857  _body.get().serverVersion.setVersion(std::string(data_, len_));
1858  }
1859  }
1860  bool responded()
1861  {
1862  return _body.get().responded;
1863  }
1864  void setResponded()
1865  {
1866  _body.get().responded = true;
1867  }
1868  bool abandoned()
1869  {
1870  return _body.get().abandoned;
1871  }
1872  void setAbandoned()
1873  {
1874  if (_body.isValid())
1875  {
1876  _body.get().abandoned = true;
1877  }
1878  }
1879 
1880  void setConnectionVersion(unsigned connectionVersion)
1881  {
1882  _body.get().connectionVersion = connectionVersion;
1883  }
1884 
1885  unsigned getConnectionVersion()
1886  {
1887  return _body.get().connectionVersion;
1888  }
1889  void setOptions(const char* data_, size_t len_)
1890  {
1891  if (data_)
1892  {
1893  _body.get().options.assign(data_, len_);
1894  }
1895  else
1896  {
1897  _body.get().options.clear();
1898  }
1899  }
1900 
1901  const std::string& options()
1902  {
1903  return _body.get().options;
1904  }
1905 
1906  AckResponse& operator=(const AckResponse& rhs)
1907  {
1908  _body = rhs._body;
1909  return *this;
1910  }
1911  };
1912 
1913 
1914  typedef std::map<std::string, AckResponse> AckMap;
1915  AckMap _ackMap;
1916  Mutex _ackMapLock;
1917  DefaultExceptionListener _defaultExceptionListener;
1918  protected:
1919 
1920  struct DeferredExecutionRequest
1921  {
1922  DeferredExecutionRequest(DeferredExecutionFunc func_,
1923  void* userData_)
1924  : _func(func_),
1925  _userData(userData_)
1926  {;}
1927 
1928  DeferredExecutionFunc _func;
1929  void* _userData;
1930  };
1931  const ExceptionListener* _exceptionListener;
1932  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1933  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1934  volatile bool _connected;
1935  std::string _username;
1936  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1937  ConnectionStateListeners _connectionStateListeners;
1938  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1939  Mutex _deferredExecutionLock;
1940  DeferredExecutionList _deferredExecutionList;
1941  unsigned _heartbeatInterval;
1942  unsigned _readTimeout;
1943 
1944  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1945  {
1946  // If we disconnected before we got to notification, don't notify.
1947  // This should only be able to happen for Resubscribed, since the lock
1948  // is released to let the subscription manager run resubscribe so a
1949  // disconnect could be called before the change is broadcast.
1950  if (!_connected && newState_ > ConnectionStateListener::Connected)
1951  {
1952  return;
1953  }
1954  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1955  {
1956  AMPS_CALL_EXCEPTION_WRAPPER(
1957  (*it)->connectionStateChanged(newState_));
1958  }
1959  }
1960  unsigned processedAck(Message& message);
1961  unsigned persistedAck(Message& meesage);
1962  void lastChance(Message& message);
1963  void checkAndSendHeartbeat(bool force = false);
1964  virtual ConnectionInfo getConnectionInfo() const;
1965  static amps_result
1966  ClientImplMessageHandler(amps_handle message, void* userData);
1967  static void
1968  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1969  static amps_result
1970  ClientImplDisconnectHandler(amps_handle client, void* userData);
1971 
1972  void unsubscribeInternal(const std::string& id)
1973  {
1974  if (id.empty())
1975  {
1976  return;
1977  }
1978  // remove the handler first to avoid any more message delivery
1979  Message::Field subId;
1980  subId.assign(id.data(), id.length());
1981  _routes.removeRoute(subId);
1982  // Lock is already acquired
1983  if (_subscriptionManager)
1984  {
1985  // Have to unlock before calling into sub manager to avoid deadlock
1986  Unlock<Mutex> unlock(_lock);
1987  _subscriptionManager->unsubscribe(subId);
1988  }
1989  _message.reset();
1990  _message.setCommandEnum(Message::Command::Unsubscribe);
1991  _message.newCommandId();
1992  _message.setSubscriptionId(id);
1993  _sendWithoutRetry(_message);
1994  deferredExecution(&amps_noOpFn, NULL);
1995  }
1996 
1997  AckResponse syncAckProcessing(long timeout_, Message& message_,
1998  bool isHASubscribe_)
1999  {
2000  return syncAckProcessing(timeout_, message_,
2001  (amps_uint64_t)0, isHASubscribe_);
2002  }
2003 
2004  AckResponse syncAckProcessing(long timeout_, Message& message_,
2005  amps_uint64_t haSeq = (amps_uint64_t)0,
2006  bool isHASubscribe_ = false)
2007  {
2008  // inv: we already have _lock locked up.
2009  AckResponse ack = AckResponse::create();
2010  if (1)
2011  {
2012  Lock<Mutex> guard(_ackMapLock);
2013  _ackMap[message_.getCommandId()] = ack;
2014  }
2015  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
2016  if (ack.getConnectionVersion() == 0)
2017  {
2018  // Send failed
2019  throw DisconnectedException("Connection closed while waiting for response.");
2020  }
2021  bool timedOut = false;
2022  AMPS_START_TIMER(timeout_)
2023  while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
2024  {
2025  if (timeout_)
2026  {
2027  timedOut = !_lock.wait(timeout_);
2028  // May have woken up early, check real time
2029  if (timedOut)
2030  {
2031  AMPS_RESET_TIMER(timedOut, timeout_);
2032  }
2033  }
2034  else
2035  {
2036  // Using a timeout version to ensure python can interrupt
2037  _lock.wait(1000);
2038  Unlock<Mutex> unlck(_lock);
2039  amps_invoke_waiting_function();
2040  }
2041  }
2042  if (ack.responded())
2043  {
2044  if (ack.status() != "failure")
2045  {
2046  if (message_.getCommand() == "logon")
2047  {
2048  amps_uint64_t ackSequence = ack.sequenceNo();
2049  if (_lastSentHaSequenceNumber < ackSequence)
2050  {
2051  _lastSentHaSequenceNumber = ackSequence;
2052  }
2053  if (_publishStore.isValid())
2054  {
2055  // If this throws, logon will fail and eitehr be
2056  // handled in HAClient/ServerChooser or by the caller
2057  // of logon.
2058  _publishStore.discardUpTo(ackSequence);
2059  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2060  {
2061  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2062  }
2063  }
2064  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2065  _nameHashValue = ack.nameHashValue();
2066  _serverVersion = ack.serverVersion();
2067  if (_bookmarkStore.isValid())
2068  {
2069  _bookmarkStore.setServerVersion(_serverVersion);
2070  }
2071  }
2072  if (_ackBatchSize)
2073  {
2074  const std::string& options = ack.options();
2075  size_t index = options.find_first_of("max_backlog=");
2076  if (index != std::string::npos)
2077  {
2078  unsigned data = 0;
2079  const char* c = options.c_str() + index + 12;
2080  while (*c && *c != ',')
2081  {
2082  data = (data * 10) + (unsigned)(*c++ -48);
2083  }
2084  if (_ackBatchSize > data)
2085  {
2086  _ackBatchSize = data;
2087  }
2088  }
2089  }
2090  return ack;
2091  }
2092  const size_t NotEntitled = 12;
2093  std::string ackReason = ack.reason();
2094  if (ackReason.length() == 0)
2095  {
2096  return ack; // none
2097  }
2098  if (ackReason.length() == NotEntitled &&
2099  ackReason[0] == 'n' &&
2100  message_.getUserId().len() == 0)
2101  {
2102  message_.assignUserId(_username);
2103  }
2104  message_.throwFor(_client, ackReason);
2105  }
2106  else // !ack.responded()
2107  {
2108  if (!ack.abandoned())
2109  {
2110  throw TimedOutException("timed out waiting for operation.");
2111  }
2112  else
2113  {
2114  throw DisconnectedException("Connection closed while waiting for response.");
2115  }
2116  }
2117  return ack;
2118  }
2119 
2120  void _cleanup(void)
2121  {
2122  if (!_client)
2123  {
2124  return;
2125  }
2126  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2127  amps_client_set_disconnect_handler(_client, NULL, 0L);
2128  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2129  _pEmptyMessageStream.reset(NULL);
2130  amps_client_destroy(_client);
2131  _client = NULL;
2132  }
2133 
2134  public:
2135 
2136  ClientImpl(const std::string& clientName)
2137  : _client(NULL), _name(clientName)
2138  , _isRetryOnDisconnect(true)
2139  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2140  , _badTimeToHASubscribe(0), _serverVersion()
2141  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2142  , _isAutoAckEnabled(false)
2143  , _ackBatchSize(0)
2144  , _queuedAckCount(0)
2145  , _defaultMaxDepth(0)
2146  , _connected(false)
2147  , _heartbeatInterval(0)
2148  , _readTimeout(0)
2149  {
2150  _replayer.setClient(this);
2151  _client = amps_client_create(clientName.c_str());
2153  (amps_handler)ClientImpl::ClientImplMessageHandler,
2154  this);
2156  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2157  this);
2159  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2160  this);
2161  _exceptionListener = &_defaultExceptionListener;
2162  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2163  {
2164 #ifdef AMPS_USE_EMPLACE
2165  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2166 #else
2167  _globalCommandTypeHandlers.push_back(MessageHandler());
2168 #endif
2169  }
2170  }
2171 
2172  virtual ~ClientImpl()
2173  {
2174  _cleanup();
2175  }
2176 
2177  const std::string& getName() const
2178  {
2179  return _name;
2180  }
2181 
2182  const std::string& getNameHash() const
2183  {
2184  return _nameHash;
2185  }
2186 
2187  const amps_uint64_t getNameHashValue() const
2188  {
2189  return _nameHashValue;
2190  }
2191 
2192  void setName(const std::string& name)
2193  {
2194  // This operation will fail if the client's
2195  // name is already set.
2196  amps_result result = amps_client_set_name(_client, name.c_str());
2197  if (result != AMPS_E_OK)
2198  {
2199  AMPSException::throwFor(_client, result);
2200  }
2201  _name = name;
2202  }
2203 
2204  const std::string& getLogonCorrelationData() const
2205  {
2206  return _logonCorrelationData;
2207  }
2208 
2209  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2210  {
2211  _logonCorrelationData = logonCorrelationData_;
2212  }
2213 
2214  size_t getServerVersion() const
2215  {
2216  return _serverVersion.getOldStyleVersion();
2217  }
2218 
2219  VersionInfo getServerVersionInfo() const
2220  {
2221  return _serverVersion;
2222  }
2223 
2224  const std::string& getURI() const
2225  {
2226  return _lastUri;
2227  }
2228 
2229  virtual void connect(const std::string& uri)
2230  {
2231  Lock<Mutex> l(_lock);
2232  _connect(uri);
2233  }
2234 
2235  virtual void _connect(const std::string& uri)
2236  {
2237  _lastUri = uri;
2238  amps_result result = amps_client_connect(_client, uri.c_str());
2239  if (result != AMPS_E_OK)
2240  {
2241  AMPSException::throwFor(_client, result);
2242  }
2243  _message.reset();
2244  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2245  _publishMessage.setCommandEnum(Message::Command::Publish);
2246  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2247  _beatMessage.setOptions("beat");
2248  _readMessage.setClientImpl(this);
2249  if (_queueAckTimeout)
2250  {
2251  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2252  if (result != AMPS_E_OK)
2253  {
2254  AMPSException::throwFor(_client, result);
2255  }
2256  }
2257  _connected = true;
2258  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2259  }
2260 
2261  void setDisconnected()
2262  {
2263  {
2264  Lock<Mutex> l(_lock);
2265  if (_connected)
2266  {
2267  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2268  }
2269  _connected = false;
2270  _heartbeatTimer.setTimeout(0.0);
2271  }
2272  clearAcks(INT_MAX);
2273  amps_client_disconnect(_client);
2274  _routes.clear();
2275  }
2276 
2277  virtual void disconnect()
2278  {
2279  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2280  setDisconnected();
2281  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2282  Lock<Mutex> l(_lock);
2283  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2284  }
2285 
2286  void clearAcks(unsigned failedVersion)
2287  {
2288  // Have to lock to prevent race conditions
2289  Lock<Mutex> guard(_ackMapLock);
2290  {
2291  // Go ahead and signal any waiters if they are around...
2292  std::vector<std::string> worklist;
2293  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2294  {
2295  if (i->second.getConnectionVersion() <= failedVersion)
2296  {
2297  i->second.setAbandoned();
2298  worklist.push_back(i->first);
2299  }
2300  }
2301 
2302  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2303  {
2304  _ackMap.erase(*j);
2305  }
2306  }
2307 
2308  _lock.signalAll();
2309  }
2310 
2311  int send(const Message& message)
2312  {
2313  Lock<Mutex> l(_lock);
2314  return _send(message);
2315  }
2316 
2317  void sendWithoutRetry(const Message& message_)
2318  {
2319  Lock<Mutex> l(_lock);
2320  // If we got here while logon was in progress, then we tried to send
2321  // while we were disconnected so throw DisconnectedException
2322  if (_logonInProgress)
2323  {
2324  throw DisconnectedException("The client has been disconnected.");
2325  }
2326  _sendWithoutRetry(message_);
2327  }
2328 
2329  void _sendWithoutRetry(const Message& message_)
2330  {
2331  amps_result result = amps_client_send(_client, message_.getMessage());
2332  if (result != AMPS_E_OK)
2333  {
2334  AMPSException::throwFor(_client, result);
2335  }
2336  }
2337 
2338  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2339  bool isHASubscribe_ = false)
2340  {
2341  // Lock is already acquired
2342  amps_result result = AMPS_E_RETRY;
2343 
2344  // Create a local reference to this message, as we'll need to hold on
2345  // to a reference to it in case reconnect occurs.
2346  Message localMessage = message;
2347  unsigned version = 0;
2348 
2349  while (result == AMPS_E_RETRY)
2350  {
2351  if (haSeq && _logonInProgress)
2352  {
2353  // If retrySend is disabled, do not wait for the reconnect
2354  // to finish, just throw.
2355  if (!_isRetryOnDisconnect)
2356  {
2357  AMPSException::throwFor(_client, AMPS_E_RETRY);
2358  }
2359  if (!_lock.wait(1000))
2360  {
2361  amps_invoke_waiting_function();
2362  }
2363  }
2364  else
2365  {
2366  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2367  (isHASubscribe_ && _badTimeToHASubscribe))
2368  {
2369  return (int)version;
2370  }
2371  // It's possible to get here out of order, but this way we'll
2372  // always send in order.
2373  if (haSeq > _lastSentHaSequenceNumber)
2374  {
2375  while (haSeq > _lastSentHaSequenceNumber + 1)
2376  {
2377  try
2378  {
2379  // Replayer updates _lastSentHaSsequenceNumber
2380  if (!_publishStore.replaySingle(_replayer,
2381  _lastSentHaSequenceNumber + 1))
2382  {
2383  //++_lastSentHaSequenceNumber;
2384  continue;
2385  }
2386  result = AMPS_E_OK;
2387  version = _replayer._version;
2388  }
2389 #ifdef _WIN32
2390  catch (const DisconnectedException&)
2391 #else
2392  catch (const DisconnectedException& e)
2393 #endif
2394  {
2395  result = _replayer._res;
2396  break;
2397  }
2398  }
2399  result = amps_client_send_with_version(_client,
2400  localMessage.getMessage(),
2401  &version);
2402  ++_lastSentHaSequenceNumber;
2403  }
2404  else
2405  {
2406  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2407  {
2408  while (_logonInProgress)
2409  {
2410  if (!_lock.wait(1000))
2411  {
2412  amps_invoke_waiting_function();
2413  }
2414  }
2415  }
2416  result = amps_client_send_with_version(_client,
2417  localMessage.getMessage(),
2418  &version);
2419  }
2420  if (result != AMPS_E_OK)
2421  {
2422  if (!isHASubscribe_ && !haSeq &&
2423  localMessage.getMessage() == message.getMessage())
2424  {
2425  localMessage = message.deepCopy();
2426  }
2427  if (_isRetryOnDisconnect)
2428  {
2429  Unlock<Mutex> u(_lock);
2430  result = amps_client_attempt_reconnect(_client, version);
2431  // If this is an HA publish or subscribe command, it was
2432  // stored first and will have already been replayed by the
2433  // store or sub manager after reconnect, so just return.
2434  if ((isHASubscribe_ || haSeq) &&
2435  result == AMPS_E_RETRY)
2436  {
2437  return (int)version;
2438  }
2439  }
2440  else
2441  {
2442  // retrySend is disabled so throw the error
2443  // from the send as an exception, do not retry.
2444  AMPSException::throwFor(_client, result);
2445  }
2446  }
2447  }
2448  if (result == AMPS_E_RETRY)
2449  {
2450  amps_invoke_waiting_function();
2451  }
2452  }
2453 
2454  if (result != AMPS_E_OK)
2455  {
2456  AMPSException::throwFor(_client, result);
2457  }
2458  return (int)version;
2459  }
2460 
2461  void addMessageHandler(const Field& commandId_,
2462  const AMPS::MessageHandler& messageHandler_,
2463  unsigned requestedAcks_, Message::Command::Type commandType_)
2464  {
2465  Lock<Mutex> lock(_lock);
2466  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2467  0, commandType_);
2468  }
2469 
2470  bool removeMessageHandler(const Field& commandId_)
2471  {
2472  Lock<Mutex> lock(_lock);
2473  return _routes.removeRoute(commandId_);
2474  }
2475 
2476  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2477  {
2478  Field id = message_.getCommandId();
2479  Field subId = message_.getSubscriptionId();
2480  Field qid = message_.getQueryId();
2481  bool isSubscribeOnly = false;
2482  bool replace = false;
2483  unsigned requestedAcks = message_.getAckTypeEnum();
2484  unsigned systemAddedAcks = Message::AckType::None;
2485  Message::Command::Type commandType = message_.getCommandEnum();
2486 
2487  switch (commandType)
2488  {
2489  case Message::Command::Subscribe:
2490  case Message::Command::DeltaSubscribe:
2491  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2492  isSubscribeOnly = true;
2493  // fall through
2494  case Message::Command::SOWAndSubscribe:
2495  case Message::Command::SOWAndDeltaSubscribe:
2496  if (id.empty())
2497  {
2498  id = message_.newCommandId().getCommandId();
2499  }
2500  else
2501  {
2502  while (!replace && id != subId && _routes.hasRoute(id))
2503  {
2504  id = message_.newCommandId().getCommandId();
2505  }
2506  }
2507  if (subId.empty())
2508  {
2509  message_.setSubscriptionId(id);
2510  subId = id;
2511  }
2512  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2513  {
2514  systemAddedAcks |= Message::AckType::Persisted;
2515  }
2516  // fall through
2517  case Message::Command::SOW:
2518  if (id.empty())
2519  {
2520  id = message_.newCommandId().getCommandId();
2521  }
2522  else
2523  {
2524  while (!replace && id != subId && _routes.hasRoute(id))
2525  {
2526  message_.newCommandId();
2527  if (qid == id)
2528  {
2529  qid = message_.getCommandId();
2530  message_.setQueryId(qid);
2531  }
2532  id = message_.getCommandId();
2533  }
2534  }
2535  if (!isSubscribeOnly)
2536  {
2537  if (qid.empty())
2538  {
2539  message_.setQueryID(id);
2540  qid = id;
2541  }
2542  else
2543  {
2544  while (!replace && qid != subId && qid != id
2545  && _routes.hasRoute(qid))
2546  {
2547  qid = message_.newQueryId().getQueryId();
2548  }
2549  }
2550  }
2551  systemAddedAcks |= Message::AckType::Processed;
2552  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2553  {
2554  int routesAdded = 0;
2555  Lock<Mutex> l(_lock);
2556  if (!subId.empty() && messageHandler_.isValid())
2557  {
2558  if (!_routes.hasRoute(subId))
2559  {
2560  ++routesAdded;
2561  }
2562  // This can replace a non-subscribe with a matching id
2563  // with a subscription but not another subscription.
2564  _routes.addRoute(subId, messageHandler_, requestedAcks,
2565  systemAddedAcks, commandType);
2566  }
2567  if (!isSubscribeOnly && !qid.empty()
2568  && messageHandler_.isValid() && qid != subId)
2569  {
2570  if (routesAdded == 0)
2571  {
2572  _routes.addRoute(qid, messageHandler_,
2573  requestedAcks, systemAddedAcks, commandType);
2574  }
2575  else
2576  {
2577  void* data = NULL;
2578  {
2579  Unlock<Mutex> u(_lock);
2580  data = amps_invoke_copy_route_function(
2581  messageHandler_.userData());
2582  }
2583  if (!data)
2584  {
2585  _routes.addRoute(qid, messageHandler_, requestedAcks,
2586  systemAddedAcks, commandType);
2587  }
2588  else
2589  {
2590  _routes.addRoute(qid,
2591  MessageHandler(messageHandler_.function(),
2592  data),
2593  requestedAcks, systemAddedAcks, commandType);
2594  }
2595  }
2596  ++routesAdded;
2597  }
2598  if (!id.empty() && messageHandler_.isValid()
2599  && requestedAcks & ~Message::AckType::Persisted
2600  && id != subId && id != qid)
2601  {
2602  if (routesAdded == 0)
2603  {
2604  _routes.addRoute(id, messageHandler_, requestedAcks,
2605  systemAddedAcks, commandType);
2606  }
2607  else
2608  {
2609  void* data = NULL;
2610  {
2611  Unlock<Mutex> u(_lock);
2612  data = amps_invoke_copy_route_function(
2613  messageHandler_.userData());
2614  }
2615  if (!data)
2616  {
2617  _routes.addRoute(id, messageHandler_, requestedAcks,
2618  systemAddedAcks, commandType);
2619  }
2620  else
2621  {
2622  _routes.addRoute(id,
2623  MessageHandler(messageHandler_.function(),
2624  data),
2625  requestedAcks,
2626  systemAddedAcks, commandType);
2627  }
2628  }
2629  ++routesAdded;
2630  }
2631  try
2632  {
2633  // We aren't adding to subscription manager, so this isn't
2634  // an HA subscribe.
2635  syncAckProcessing(timeout_, message_, 0, false);
2636  message_.setAckTypeEnum(requestedAcks);
2637  }
2638  catch (...)
2639  {
2640  _routes.removeRoute(message_.getQueryID());
2641  _routes.removeRoute(message_.getSubscriptionId());
2642  _routes.removeRoute(id);
2643  message_.setAckTypeEnum(requestedAcks);
2644  throw;
2645  }
2646  }
2647  break;
2648  // These are valid commands that are used as-is
2649  case Message::Command::Unsubscribe:
2650  case Message::Command::Heartbeat:
2651  case Message::Command::Logon:
2652  case Message::Command::StartTimer:
2653  case Message::Command::StopTimer:
2654  case Message::Command::SOWDelete:
2655  {
2656  Lock<Mutex> l(_lock);
2657  // if an ack is requested, it'll need a command ID.
2658  if (message_.getAckTypeEnum() != Message::AckType::None)
2659  {
2660  if (id.empty())
2661  {
2662  message_.newCommandId();
2663  id = message_.getCommandId();
2664  }
2665  if (messageHandler_.isValid())
2666  {
2667  _routes.addRoute(id, messageHandler_, requestedAcks,
2668  Message::AckType::None, commandType);
2669  }
2670  }
2671  _send(message_);
2672  }
2673  break;
2674  case Message::Command::DeltaPublish:
2675  case Message::Command::Publish:
2676  {
2677  bool useSync = message_.getFilter().len() > 0;
2678  Lock<Mutex> l(_lock);
2679  // if an ack is requested, it'll need a command ID.
2680  unsigned ackType = message_.getAckTypeEnum();
2681  if (ackType != Message::AckType::None
2682  || useSync)
2683  {
2684  if (id.empty())
2685  {
2686  message_.newCommandId();
2687  id = message_.getCommandId();
2688  }
2689  if (messageHandler_.isValid())
2690  {
2691  _routes.addRoute(id, messageHandler_, requestedAcks,
2692  Message::AckType::None, commandType);
2693  }
2694  }
2695  if (useSync)
2696  {
2697  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2698  syncAckProcessing(timeout_, message_, 0, false);
2699  }
2700  else
2701  {
2702  _send(message_);
2703  }
2704  }
2705  break;
2706  // These are things that shouldn't be sent (not meaningful)
2707  case Message::Command::GroupBegin:
2708  case Message::Command::GroupEnd:
2709  case Message::Command::OOF:
2710  case Message::Command::Ack:
2711  case Message::Command::Unknown:
2712  default:
2713  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2714  }
2715  message_.setAckTypeEnum(requestedAcks);
2716  return id;
2717  }
2718 
2719  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2720  {
2721  Lock<Mutex> l(_lock);
2722  _disconnectHandler = disconnectHandler;
2723  }
2724 
2725  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2726  {
2727  switch (command_[0])
2728  {
2729 #if 0 // Not currently implemented to avoid an extra branch in delivery
2730  case 'p':
2731  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2732  break;
2733  case 's':
2734  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2735  break;
2736 #endif
2737  case 'h':
2738  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2739  break;
2740 #if 0 // Not currently implemented to avoid an extra branch in delivery
2741  case 'g':
2742  if (command_[6] == 'b')
2743  {
2744  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2745  }
2746  else if (command_[6] == 'e')
2747  {
2748  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2749  }
2750  else
2751  {
2752  std::ostringstream os;
2753  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2754  throw CommandException(os.str());
2755  }
2756  break;
2757  case 'o':
2758  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2759  break;
2760 #endif
2761  case 'a':
2762  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2763  break;
2764  case 'l':
2765  case 'L':
2766  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2767  break;
2768  case 'd':
2769  case 'D':
2770  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2771  break;
2772  default:
2773  std::ostringstream os;
2774  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2775  throw CommandException(os.str());
2776  break;
2777  }
2778  }
2779 
2780  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2781  {
2782  switch (command_)
2783  {
2784 #if 0 // Not currently implemented to avoid an extra branch in delivery
2785  case Message::Command::Publish:
2786  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2787  break;
2788  case Message::Command::SOW:
2789  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2790  break;
2791 #endif
2792  case Message::Command::Heartbeat:
2793  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2794  break;
2795 #if 0 // Not currently implemented to avoid an extra branch in delivery
2796  case Message::Command::GroupBegin:
2797  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2798  break;
2799  case Message::Command::GroupEnd:
2800  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2801  break;
2802  case Message::Command::OOF:
2803  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2804  break;
2805 #endif
2806  case Message::Command::Ack:
2807  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2808  break;
2809  default:
2810  unsigned bits = 0;
2811  unsigned command = command_;
2812  while (command > 0)
2813  {
2814  ++bits;
2815  command >>= 1;
2816  }
2817  char errBuf[128];
2818  AMPS_snprintf(errBuf, sizeof(errBuf),
2819  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2820  CommandConstants<0>::Lengths[bits],
2821  CommandConstants<0>::Values[bits]);
2822  throw CommandException(errBuf);
2823  break;
2824  }
2825  }
2826 
2827  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2828  {
2829  _globalCommandTypeHandlers[handlerType_] = handler_;
2830  }
2831 
2832  void setFailedWriteHandler(FailedWriteHandler* handler_)
2833  {
2834  Lock<Mutex> l(_lock);
2835  _failedWriteHandler.reset(handler_);
2836  }
2837 
2838  void setPublishStore(const Store& publishStore_)
2839  {
2840  Lock<Mutex> l(_lock);
2841  if (_connected)
2842  {
2843  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2844  }
2845  _publishStore = publishStore_;
2846  }
2847 
2848  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2849  {
2850  Lock<Mutex> l(_lock);
2851  if (_connected)
2852  {
2853  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2854  }
2855  _bookmarkStore = bookmarkStore_;
2856  }
2857 
2858  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2859  {
2860  Lock<Mutex> l(_lock);
2861  _subscriptionManager.reset(subscriptionManager_);
2862  }
2863 
2864  SubscriptionManager* getSubscriptionManager() const
2865  {
2866  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2867  }
2868 
2869  DisconnectHandler getDisconnectHandler() const
2870  {
2871  return _disconnectHandler;
2872  }
2873 
2874  MessageHandler getDuplicateMessageHandler() const
2875  {
2876  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2877  }
2878 
2879  FailedWriteHandler* getFailedWriteHandler() const
2880  {
2881  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2882  }
2883 
2884  Store getPublishStore() const
2885  {
2886  return _publishStore;
2887  }
2888 
2889  BookmarkStore getBookmarkStore() const
2890  {
2891  return _bookmarkStore;
2892  }
2893 
2894  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2895  {
2896  if (!_publishStore.isValid())
2897  {
2898  Lock<Mutex> l(_lock);
2899  _publishMessage.assignTopic(topic_, topicLen_);
2900  _publishMessage.assignData(data_, dataLen_);
2901  _send(_publishMessage);
2902  return 0;
2903  }
2904  else
2905  {
2906  publishStoreMessage.reset();
2907  publishStoreMessage.setCommandEnum(Message::Command::Publish);
2908  return _publish(topic_, topicLen_, data_, dataLen_);
2909  }
2910  }
2911 
2912  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2913  size_t dataLen_, unsigned long expiration_)
2914  {
2915  if (!_publishStore.isValid())
2916  {
2917  Lock<Mutex> l(_lock);
2918  _publishMessage.assignTopic(topic_, topicLen_);
2919  _publishMessage.assignData(data_, dataLen_);
2920  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2921  size_t pos = convertToCharArray(exprBuf, expiration_);
2922  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2923  _send(_publishMessage);
2924  _publishMessage.assignExpiration(NULL, 0);
2925  return 0;
2926  }
2927  else
2928  {
2929  publishStoreMessage.reset();
2930  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2931  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2932  publishStoreMessage.setCommandEnum(Message::Command::Publish)
2933  .assignExpiration(exprBuf + exprPos,
2934  AMPS_NUMBER_BUFFER_LEN - exprPos);
2935  return _publish(topic_, topicLen_, data_, dataLen_);
2936  }
2937  }
2938 
2939  class FlushAckHandler : ConnectionStateListener
2940  {
2941  private:
2942  ClientImpl* _pClient;
2943  Field _cmdId;
2944 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2945  std::atomic<bool> _acked;
2946  std::atomic<bool> _disconnected;
2947 #else
2948  volatile bool _acked;
2949  volatile bool _disconnected;
2950 #endif
2951  public:
2952  FlushAckHandler(ClientImpl* pClient_)
2953  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2954  {
2955  pClient_->addConnectionStateListener(this);
2956  }
2957  ~FlushAckHandler()
2958  {
2959  _pClient->removeConnectionStateListener(this);
2960  _pClient->removeMessageHandler(_cmdId);
2961  _cmdId.clear();
2962  }
2963  void setCommandId(const Field& cmdId_)
2964  {
2965  _cmdId.deepCopy(cmdId_);
2966  }
2967  void invoke(const Message&)
2968  {
2969  _acked = true;
2970  }
2971  void connectionStateChanged(State state_)
2972  {
2973  if (state_ <= Shutdown)
2974  {
2975  _disconnected = true;
2976  }
2977  }
2978  bool acked()
2979  {
2980  return _acked;
2981  }
2982  bool done()
2983  {
2984  return _acked || _disconnected;
2985  }
2986  };
2987 
2988  void publishFlush(long timeout_, unsigned ackType_)
2989  {
2990  static const char* processed = "processed";
2991  static const size_t processedLen = strlen(processed);
2992  static const char* persisted = "persisted";
2993  static const size_t persistedLen = strlen(persisted);
2994  static const char* flush = "flush";
2995  static const size_t flushLen = strlen(flush);
2996  static VersionInfo minPersisted("5.3.3.0");
2997  static VersionInfo minFlush("4");
2998  if (ackType_ != Message::AckType::Processed
2999  && ackType_ != Message::AckType::Persisted)
3000  {
3001  throw CommandException("Flush can only be used with processed or persisted acks.");
3002  }
3003  FlushAckHandler flushHandler(this);
3004  if (_serverVersion >= minFlush)
3005  {
3006  Lock<Mutex> l(_lock);
3007  if (!_connected)
3008  {
3009  throw DisconnectedException("Not connected trying to flush");
3010  }
3011  _message.reset();
3012  _message.newCommandId();
3013  _message.assignCommand(flush, flushLen);
3014  if (_serverVersion < minPersisted
3015  || ackType_ == Message::AckType::Processed)
3016  {
3017  _message.assignAckType(processed, processedLen);
3018  }
3019  else
3020  {
3021  _message.assignAckType(persisted, persistedLen);
3022  }
3023  flushHandler.setCommandId(_message.getCommandId());
3024  addMessageHandler(_message.getCommandId(),
3025  std::bind(&FlushAckHandler::invoke,
3026  std::ref(flushHandler),
3027  std::placeholders::_1),
3028  ackType_, _message.getCommandEnum());
3029  NoDelay noDelay(_client);
3030  if (_send(_message) == -1)
3031  {
3032  throw DisconnectedException("Disconnected trying to flush");
3033  }
3034  }
3035  if (_publishStore.isValid())
3036  {
3037  try
3038  {
3039  _publishStore.flush(timeout_);
3040  }
3041  catch (const AMPSException& ex)
3042  {
3043  AMPS_UNHANDLED_EXCEPTION(ex);
3044  throw;
3045  }
3046  }
3047  else if (_serverVersion < minFlush)
3048  {
3049  if (timeout_ > 0)
3050  {
3051  AMPS_USLEEP(timeout_ * 1000);
3052  }
3053  else
3054  {
3055  AMPS_USLEEP(1000 * 1000);
3056  }
3057  return;
3058  }
3059  if (timeout_)
3060  {
3061  Timer timer((double)timeout_);
3062  timer.start();
3063  while (!timer.check() && !flushHandler.done())
3064  {
3065  AMPS_USLEEP(10000);
3066  amps_invoke_waiting_function();
3067  }
3068  }
3069  else
3070  {
3071  while (!flushHandler.done())
3072  {
3073  AMPS_USLEEP(10000);
3074  amps_invoke_waiting_function();
3075  }
3076  }
3077  // No response or disconnect in timeout interval
3078  if (!flushHandler.done())
3079  {
3080  throw TimedOutException("Timed out waiting for flush");
3081  }
3082  // We got disconnected and there is no publish store
3083  if (!flushHandler.acked() && !_publishStore.isValid())
3084  {
3085  throw DisconnectedException("Disconnected waiting for flush");
3086  }
3087  }
3088 
3089  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3090  const char* data_, size_t dataLength_)
3091  {
3092  if (!_publishStore.isValid())
3093  {
3094  Lock<Mutex> l(_lock);
3095  _deltaMessage.assignTopic(topic_, topicLength_);
3096  _deltaMessage.assignData(data_, dataLength_);
3097  _send(_deltaMessage);
3098  return 0;
3099  }
3100  else
3101  {
3102  publishStoreMessage.reset();
3103  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish);
3104  return _publish(topic_, topicLength_, data_, dataLength_);
3105  }
3106  }
3107 
3108  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3109  const char* data_, size_t dataLength_,
3110  unsigned long expiration_)
3111  {
3112  if (!_publishStore.isValid())
3113  {
3114  Lock<Mutex> l(_lock);
3115  _deltaMessage.assignTopic(topic_, topicLength_);
3116  _deltaMessage.assignData(data_, dataLength_);
3117  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3118  size_t pos = convertToCharArray(exprBuf, expiration_);
3119  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3120  _send(_deltaMessage);
3121  _deltaMessage.assignExpiration(NULL, 0);
3122  return 0;
3123  }
3124  else
3125  {
3126  publishStoreMessage.reset();
3127  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3128  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3129  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish)
3130  .assignExpiration(exprBuf + exprPos,
3131  AMPS_NUMBER_BUFFER_LEN - exprPos);
3132  return _publish(topic_, topicLength_, data_, dataLength_);
3133  }
3134  }
3135 
3136  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3137  const char* data_, size_t dataLength_)
3138  {
3139  publishStoreMessage.assignTopic(topic_, topicLength_)
3140  .setAckTypeEnum(Message::AckType::Persisted)
3141  .assignData(data_, dataLength_);
3142  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3143  char buf[AMPS_NUMBER_BUFFER_LEN];
3144  size_t pos = convertToCharArray(buf, haSequenceNumber);
3145  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3146  {
3147  Lock<Mutex> l(_lock);
3148  _send(publishStoreMessage, haSequenceNumber);
3149  }
3150  return haSequenceNumber;
3151  }
3152 
3153  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3154  const char* options_ = NULL)
3155  {
3156  Lock<Mutex> l(_lock);
3157  return _logon(timeout_, authenticator_, options_);
3158  }
3159 
3160  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3161  const char* options_ = NULL)
3162  {
3163  _message.reset();
3164  _message.newCommandId();
3165  std::string newCommandId = _message.getCommandId();
3166  _message.setCommandEnum(Message::Command::Logon);
3167  _message.setClientName(_name);
3168 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3169  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3170  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3171 #endif
3172  URI uri(_lastUri);
3173  if (uri.user().size())
3174  {
3175  _message.setUserId(uri.user());
3176  }
3177  if (uri.password().size())
3178  {
3179  _message.setPassword(uri.password());
3180  }
3181  if (uri.protocol() == "amps" && uri.messageType().size())
3182  {
3183  _message.setMessageType(uri.messageType());
3184  }
3185  if (uri.isTrue("pretty"))
3186  {
3187  _message.setOptions("pretty");
3188  }
3189 
3190  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3191  if (!_logonCorrelationData.empty())
3192  {
3193  _message.assignCorrelationId(_logonCorrelationData);
3194  }
3195  if (options_)
3196  {
3197  _message.setOptions(options_);
3198  }
3199  _username = _message.getUserId();
3200  try
3201  {
3202  AtomicFlagFlip pubFlip(&_logonInProgress);
3203  NoDelay noDelay(_client);
3204  while (true)
3205  {
3206  _message.setAckTypeEnum(Message::AckType::Processed);
3207  AckResponse ack = syncAckProcessing(timeout_, _message);
3208  if (ack.status() == "retry")
3209  {
3210  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3211  _username = ack.username();
3212  _message.setUserId(_username);
3213  }
3214  else
3215  {
3216  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3217  break;
3218  }
3219  }
3220  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3221 
3222  // Now re-send the heartbeat command if configured
3223  _sendHeartbeat();
3224  // Signal any threads waiting for _logonInProgress
3225  _lock.signalAll();
3226  }
3227  catch (const AMPSException& ex)
3228  {
3229  _lock.signalAll();
3230  AMPS_UNHANDLED_EXCEPTION(ex);
3231  throw;
3232  }
3233  catch (...)
3234  {
3235  _lock.signalAll();
3236  throw;
3237  }
3238 
3239  if (_publishStore.isValid())
3240  {
3241  try
3242  {
3243  _publishStore.replay(_replayer);
3244  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3245  }
3246  catch (const PublishStoreGapException& ex)
3247  {
3248  _lock.signalAll();
3249  AMPS_UNHANDLED_EXCEPTION(ex);
3250  throw;
3251  }
3252  catch (const StoreException& ex)
3253  {
3254  _lock.signalAll();
3255  std::ostringstream os;
3256  os << "A local store exception occurred while logging on."
3257  << ex.toString();
3258  throw ConnectionException(os.str());
3259  }
3260  catch (const AMPSException& ex)
3261  {
3262  _lock.signalAll();
3263  AMPS_UNHANDLED_EXCEPTION(ex);
3264  throw;
3265  }
3266  catch (const std::exception& ex)
3267  {
3268  _lock.signalAll();
3269  AMPS_UNHANDLED_EXCEPTION(ex);
3270  throw;
3271  }
3272  catch (...)
3273  {
3274  _lock.signalAll();
3275  throw;
3276  }
3277  }
3278  _lock.signalAll();
3279  return newCommandId;
3280  }
3281 
3282  std::string subscribe(const MessageHandler& messageHandler_,
3283  const std::string& topic_,
3284  long timeout_,
3285  const std::string& filter_,
3286  const std::string& bookmark_,
3287  const std::string& options_,
3288  const std::string& subId_,
3289  bool isHASubscribe_ = true)
3290  {
3291  isHASubscribe_ &= (bool)_subscriptionManager;
3292  Lock<Mutex> l(_lock);
3293  _message.reset();
3294  _message.setCommandEnum(Message::Command::Subscribe);
3295  _message.newCommandId();
3296  std::string subId(subId_);
3297  if (subId.empty())
3298  {
3299  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3300  {
3301  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3302  }
3303 
3304  subId = _message.getCommandId();
3305  }
3306  _message.setSubscriptionId(subId);
3307  // we need to deep copy this before sending the message; while we are
3308  // waiting for a response, the fields in _message may get blown away for
3309  // other operations.
3310  AMPS::Message::Field subIdField(subId);
3311  unsigned ackTypes = Message::AckType::Processed;
3312 
3313  if (!bookmark_.empty() && _bookmarkStore.isValid())
3314  {
3315  ackTypes |= Message::AckType::Persisted;
3316  }
3317  _message.setTopic(topic_);
3318 
3319  if (filter_.length())
3320  {
3321  _message.setFilter(filter_);
3322  }
3323  if (bookmark_.length())
3324  {
3325  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3326  {
3327  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3328  _message.setBookmark(mostRecent);
3329  }
3330  else
3331  {
3332  _message.setBookmark(bookmark_);
3333  if (_bookmarkStore.isValid())
3334  {
3335  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3336  bookmark_ != AMPS_BOOKMARK_EPOCH)
3337  {
3338  _bookmarkStore.log(_message);
3339  _bookmarkStore.discard(_message);
3340  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3341  }
3342  }
3343  }
3344  }
3345  if (options_.length())
3346  {
3347  _message.setOptions(options_);
3348  }
3349 
3350  Message message = _message;
3351  if (isHASubscribe_)
3352  {
3353  message = _message.deepCopy();
3354  Unlock<Mutex> u(_lock);
3355  _subscriptionManager->subscribe(messageHandler_, message,
3356  Message::AckType::None);
3357  if (_badTimeToHASubscribe)
3358  {
3359  return subId;
3360  }
3361  }
3362  if (!_routes.hasRoute(_message.getSubscriptionId()))
3363  {
3364  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3365  Message::AckType::None, ackTypes, _message.getCommandEnum());
3366  }
3367  message.setAckTypeEnum(ackTypes);
3368  if (!options_.empty())
3369  {
3370  message.setOptions(options_);
3371  }
3372  try
3373  {
3374  syncAckProcessing(timeout_, message, isHASubscribe_);
3375  }
3376  catch (const DisconnectedException&)
3377  {
3378  if (!isHASubscribe_)
3379  {
3380  _routes.removeRoute(subIdField);
3381  throw;
3382  }
3383  else
3384  {
3385  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3386  throw;
3387  }
3388  }
3389  catch (const TimedOutException&)
3390  {
3391  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3392  throw;
3393  }
3394  catch (...)
3395  {
3396  if (isHASubscribe_)
3397  {
3398  // Have to unlock before calling into sub manager to avoid deadlock
3399  Unlock<Mutex> unlock(_lock);
3400  _subscriptionManager->unsubscribe(subIdField);
3401  }
3402  _routes.removeRoute(subIdField);
3403  throw;
3404  }
3405 
3406  return subId;
3407  }
3408  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3409  const std::string& topic_,
3410  long timeout_,
3411  const std::string& filter_,
3412  const std::string& bookmark_,
3413  const std::string& options_,
3414  const std::string& subId_ = "",
3415  bool isHASubscribe_ = true)
3416  {
3417  isHASubscribe_ &= (bool)_subscriptionManager;
3418  Lock<Mutex> l(_lock);
3419  _message.reset();
3420  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3421  _message.newCommandId();
3422  std::string subId(subId_);
3423  if (subId.empty())
3424  {
3425  subId = _message.getCommandId();
3426  }
3427  _message.setSubscriptionId(subId);
3428  // we need to deep copy this before sending the message; while we are
3429  // waiting for a response, the fields in _message may get blown away for
3430  // other operations.
3431  AMPS::Message::Field subIdField(subId);
3432  unsigned ackTypes = Message::AckType::Processed;
3433 
3434  if (!bookmark_.empty() && _bookmarkStore.isValid())
3435  {
3436  ackTypes |= Message::AckType::Persisted;
3437  }
3438  _message.setTopic(topic_);
3439  if (filter_.length())
3440  {
3441  _message.setFilter(filter_);
3442  }
3443  if (bookmark_.length())
3444  {
3445  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3446  {
3447  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3448  _message.setBookmark(mostRecent);
3449  }
3450  else
3451  {
3452  _message.setBookmark(bookmark_);
3453  if (_bookmarkStore.isValid())
3454  {
3455  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3456  bookmark_ != AMPS_BOOKMARK_EPOCH)
3457  {
3458  _bookmarkStore.log(_message);
3459  _bookmarkStore.discard(_message);
3460  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3461  }
3462  }
3463  }
3464  }
3465  if (options_.length())
3466  {
3467  _message.setOptions(options_);
3468  }
3469  Message message = _message;
3470  if (isHASubscribe_)
3471  {
3472  message = _message.deepCopy();
3473  Unlock<Mutex> u(_lock);
3474  _subscriptionManager->subscribe(messageHandler_, message,
3475  Message::AckType::None);
3476  if (_badTimeToHASubscribe)
3477  {
3478  return subId;
3479  }
3480  }
3481  if (!_routes.hasRoute(_message.getSubscriptionId()))
3482  {
3483  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3484  Message::AckType::None, ackTypes, _message.getCommandEnum());
3485  }
3486  message.setAckTypeEnum(ackTypes);
3487  if (!options_.empty())
3488  {
3489  message.setOptions(options_);
3490  }
3491  try
3492  {
3493  syncAckProcessing(timeout_, message, isHASubscribe_);
3494  }
3495  catch (const DisconnectedException&)
3496  {
3497  if (!isHASubscribe_)
3498  {
3499  _routes.removeRoute(subIdField);
3500  throw;
3501  }
3502  }
3503  catch (const TimedOutException&)
3504  {
3505  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3506  throw;
3507  }
3508  catch (...)
3509  {
3510  if (isHASubscribe_)
3511  {
3512  // Have to unlock before calling into sub manager to avoid deadlock
3513  Unlock<Mutex> unlock(_lock);
3514  _subscriptionManager->unsubscribe(subIdField);
3515  }
3516  _routes.removeRoute(subIdField);
3517  throw;
3518  }
3519  return subId;
3520  }
3521 
3522  void unsubscribe(const std::string& id)
3523  {
3524  Lock<Mutex> l(_lock);
3525  unsubscribeInternal(id);
3526  }
3527 
3528  void unsubscribe(void)
3529  {
3530  if (_subscriptionManager)
3531  {
3532  _subscriptionManager->clear();
3533  }
3534  {
3535  _routes.unsubscribeAll();
3536  Lock<Mutex> l(_lock);
3537  _message.reset();
3538  _message.setCommandEnum(Message::Command::Unsubscribe);
3539  _message.newCommandId();
3540  _message.setSubscriptionId("all");
3541  _sendWithoutRetry(_message);
3542  }
3543  deferredExecution(&amps_noOpFn, NULL);
3544  }
3545 
3546  std::string sow(const MessageHandler& messageHandler_,
3547  const std::string& topic_,
3548  const std::string& filter_ = "",
3549  const std::string& orderBy_ = "",
3550  const std::string& bookmark_ = "",
3551  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3552  int topN_ = AMPS_DEFAULT_TOP_N,
3553  const std::string& options_ = "",
3554  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3555  {
3556  Lock<Mutex> l(_lock);
3557  _message.reset();
3558  _message.setCommandEnum(Message::Command::SOW);
3559  _message.newCommandId();
3560  // need to keep our own copy of the command ID.
3561  std::string commandId = _message.getCommandId();
3562  _message.setQueryID(_message.getCommandId());
3563  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3564  _message.setAckTypeEnum(ackTypes);
3565  _message.setTopic(topic_);
3566  if (filter_.length())
3567  {
3568  _message.setFilter(filter_);
3569  }
3570  if (orderBy_.length())
3571  {
3572  _message.setOrderBy(orderBy_);
3573  }
3574  if (bookmark_.length())
3575  {
3576  _message.setBookmark(bookmark_);
3577  }
3578  _message.setBatchSize(AMPS::asString(batchSize_));
3579  if (topN_ != AMPS_DEFAULT_TOP_N)
3580  {
3581  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3582  }
3583  if (options_.length())
3584  {
3585  _message.setOptions(options_);
3586  }
3587 
3588  _routes.addRoute(_message.getQueryID(), messageHandler_,
3589  Message::AckType::None, ackTypes, _message.getCommandEnum());
3590 
3591  try
3592  {
3593  syncAckProcessing(timeout_, _message);
3594  }
3595  catch (...)
3596  {
3597  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3598  throw;
3599  }
3600 
3601  return commandId;
3602  }
3603 
3604  std::string sow(const MessageHandler& messageHandler_,
3605  const std::string& topic_,
3606  long timeout_,
3607  const std::string& filter_ = "",
3608  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3609  int topN_ = AMPS_DEFAULT_TOP_N)
3610  {
3611  std::string notSet;
3612  return sow(messageHandler_,
3613  topic_,
3614  filter_,
3615  notSet, // orderBy
3616  notSet, // bookmark
3617  batchSize_,
3618  topN_,
3619  notSet,
3620  timeout_);
3621  }
3622 
3623  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3624  const std::string& topic_,
3625  const std::string& filter_ = "",
3626  const std::string& orderBy_ = "",
3627  const std::string& bookmark_ = "",
3628  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3629  int topN_ = AMPS_DEFAULT_TOP_N,
3630  const std::string& options_ = "",
3631  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3632  bool isHASubscribe_ = true)
3633  {
3634  isHASubscribe_ &= (bool)_subscriptionManager;
3635  unsigned ackTypes = Message::AckType::Processed;
3636  Lock<Mutex> l(_lock);
3637  _message.reset();
3638  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3639  _message.newCommandId();
3640  Field cid = _message.getCommandId();
3641  std::string subId = cid;
3642  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3643  if (filter_.length())
3644  {
3645  _message.setFilter(filter_);
3646  }
3647  if (orderBy_.length())
3648  {
3649  _message.setOrderBy(orderBy_);
3650  }
3651  if (bookmark_.length())
3652  {
3653  _message.setBookmark(bookmark_);
3654  Message::Field bookmark = _message.getBookmark();
3655  if (_bookmarkStore.isValid())
3656  {
3657  ackTypes |= Message::AckType::Persisted;
3658  if (bookmark == AMPS_BOOKMARK_RECENT)
3659  {
3660  _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3661  }
3662  else if (bookmark != AMPS_BOOKMARK_NOW &&
3663  bookmark != AMPS_BOOKMARK_EPOCH)
3664  {
3665  _bookmarkStore.log(_message);
3666  if (!BookmarkRange::isRange(bookmark))
3667  {
3668  _bookmarkStore.discard(_message);
3669  _bookmarkStore.persisted(_message.getSubscriptionId(),
3670  bookmark);
3671  }
3672  }
3673  }
3674  else if (bookmark == AMPS_BOOKMARK_RECENT)
3675  {
3676  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3677  }
3678  }
3679  _message.setBatchSize(AMPS::asString(batchSize_));
3680  if (topN_ != AMPS_DEFAULT_TOP_N)
3681  {
3682  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3683  }
3684  if (options_.length())
3685  {
3686  _message.setOptions(options_);
3687  }
3688 
3689  Message message = _message;
3690  if (isHASubscribe_)
3691  {
3692  message = _message.deepCopy();
3693  Unlock<Mutex> u(_lock);
3694  _subscriptionManager->subscribe(messageHandler_, message,
3695  Message::AckType::None);
3696  if (_badTimeToHASubscribe)
3697  {
3698  return subId;
3699  }
3700  }
3701  _routes.addRoute(cid, messageHandler_,
3702  Message::AckType::None, ackTypes, _message.getCommandEnum());
3703  message.setAckTypeEnum(ackTypes);
3704  if (!options_.empty())
3705  {
3706  message.setOptions(options_);
3707  }
3708  try
3709  {
3710  syncAckProcessing(timeout_, message, isHASubscribe_);
3711  }
3712  catch (const DisconnectedException&)
3713  {
3714  if (!isHASubscribe_)
3715  {
3716  _routes.removeRoute(subId);
3717  throw;
3718  }
3719  }
3720  catch (const TimedOutException&)
3721  {
3722  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3723  throw;
3724  }
3725  catch (...)
3726  {
3727  if (isHASubscribe_)
3728  {
3729  // Have to unlock before calling into sub manager to avoid deadlock
3730  Unlock<Mutex> unlock(_lock);
3731  _subscriptionManager->unsubscribe(cid);
3732  }
3733  _routes.removeRoute(subId);
3734  throw;
3735  }
3736  return subId;
3737  }
3738 
3739  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3740  const std::string& topic_,
3741  long timeout_,
3742  const std::string& filter_ = "",
3743  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3744  bool oofEnabled_ = false,
3745  int topN_ = AMPS_DEFAULT_TOP_N,
3746  bool isHASubscribe_ = true)
3747  {
3748  std::string notSet;
3749  return sowAndSubscribe(messageHandler_,
3750  topic_,
3751  filter_,
3752  notSet, // orderBy
3753  notSet, // bookmark
3754  batchSize_,
3755  topN_,
3756  (oofEnabled_ ? "oof" : ""),
3757  timeout_,
3758  isHASubscribe_);
3759  }
3760 
3761  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3762  const std::string& topic_,
3763  const std::string& filter_ = "",
3764  const std::string& orderBy_ = "",
3765  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3766  int topN_ = AMPS_DEFAULT_TOP_N,
3767  const std::string& options_ = "",
3768  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3769  bool isHASubscribe_ = true)
3770  {
3771  isHASubscribe_ &= (bool)_subscriptionManager;
3772  Lock<Mutex> l(_lock);
3773  _message.reset();
3774  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3775  _message.newCommandId();
3776  _message.setQueryID(_message.getCommandId());
3777  _message.setSubscriptionId(_message.getCommandId());
3778  std::string subId = _message.getSubscriptionId();
3779  _message.setTopic(topic_);
3780  if (filter_.length())
3781  {
3782  _message.setFilter(filter_);
3783  }
3784  if (orderBy_.length())
3785  {
3786  _message.setOrderBy(orderBy_);
3787  }
3788  _message.setBatchSize(AMPS::asString(batchSize_));
3789  if (topN_ != AMPS_DEFAULT_TOP_N)
3790  {
3791  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3792  }
3793  if (options_.length())
3794  {
3795  _message.setOptions(options_);
3796  }
3797  Message message = _message;
3798  if (isHASubscribe_)
3799  {
3800  message = _message.deepCopy();
3801  Unlock<Mutex> u(_lock);
3802  _subscriptionManager->subscribe(messageHandler_, message,
3803  Message::AckType::None);
3804  if (_badTimeToHASubscribe)
3805  {
3806  return subId;
3807  }
3808  }
3809  _routes.addRoute(message.getQueryID(), messageHandler_,
3810  Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3811  message.setAckTypeEnum(Message::AckType::Processed);
3812  if (!options_.empty())
3813  {
3814  message.setOptions(options_);
3815  }
3816  try
3817  {
3818  syncAckProcessing(timeout_, message, isHASubscribe_);
3819  }
3820  catch (const DisconnectedException&)
3821  {
3822  if (!isHASubscribe_)
3823  {
3824  _routes.removeRoute(subId);
3825  throw;
3826  }
3827  }
3828  catch (const TimedOutException&)
3829  {
3830  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3831  throw;
3832  }
3833  catch (...)
3834  {
3835  if (isHASubscribe_)
3836  {
3837  // Have to unlock before calling into sub manager to avoid deadlock
3838  Unlock<Mutex> unlock(_lock);
3839  _subscriptionManager->unsubscribe(Field(subId));
3840  }
3841  _routes.removeRoute(subId);
3842  throw;
3843  }
3844  return subId;
3845  }
3846 
3847  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3848  const std::string& topic_,
3849  long timeout_,
3850  const std::string& filter_ = "",
3851  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3852  bool oofEnabled_ = false,
3853  bool sendEmpties_ = false,
3854  int topN_ = AMPS_DEFAULT_TOP_N,
3855  bool isHASubscribe_ = true)
3856  {
3857  std::string notSet;
3858  Message::Options options;
3859  if (oofEnabled_)
3860  {
3861  options.setOOF();
3862  }
3863  if (sendEmpties_ == false)
3864  {
3865  options.setNoEmpties();
3866  }
3867  return sowAndDeltaSubscribe(messageHandler_,
3868  topic_,
3869  filter_,
3870  notSet, // orderBy
3871  batchSize_,
3872  topN_,
3873  options,
3874  timeout_,
3875  isHASubscribe_);
3876  }
3877 
3878  std::string sowDelete(const MessageHandler& messageHandler_,
3879  const std::string& topic_,
3880  const std::string& filter_,
3881  long timeout_,
3882  Message::Field commandId_ = Message::Field())
3883  {
3884  if (_publishStore.isValid())
3885  {
3886  unsigned ackType = Message::AckType::Processed |
3887  Message::AckType::Stats |
3888  Message::AckType::Persisted;
3889  publishStoreMessage.reset();
3890  if (commandId_.empty())
3891  {
3892  publishStoreMessage.newCommandId();
3893  commandId_ = publishStoreMessage.getCommandId();
3894  }
3895  else
3896  {
3897  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3898  }
3899  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3900  .assignSubscriptionId(commandId_.data(), commandId_.len())
3901  .assignQueryID(commandId_.data(), commandId_.len())
3902  .setAckTypeEnum(ackType)
3903  .assignTopic(topic_.c_str(), topic_.length())
3904  .assignFilter(filter_.c_str(), filter_.length());
3905  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3906  char buf[AMPS_NUMBER_BUFFER_LEN];
3907  size_t pos = convertToCharArray(buf, haSequenceNumber);
3908  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3909  {
3910  try
3911  {
3912  Lock<Mutex> l(_lock);
3913  _routes.addRoute(commandId_, messageHandler_,
3914  Message::AckType::Stats,
3915  Message::AckType::Processed | Message::AckType::Persisted,
3916  publishStoreMessage.getCommandEnum());
3917  syncAckProcessing(timeout_, publishStoreMessage,
3918  haSequenceNumber);
3919  }
3920  catch (const DisconnectedException&)
3921  {
3922  // -V565
3923  // Pass - it will get replayed upon reconnect
3924  }
3925  catch (...)
3926  {
3927  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3928  throw;
3929  }
3930  }
3931  return (std::string)commandId_;
3932  }
3933  else
3934  {
3935  Lock<Mutex> l(_lock);
3936  _message.reset();
3937  if (commandId_.empty())
3938  {
3939  _message.newCommandId();
3940  commandId_ = _message.getCommandId();
3941  }
3942  else
3943  {
3944  _message.setCommandId(commandId_.data(), commandId_.len());
3945  }
3946  _message.setCommandEnum(Message::Command::SOWDelete)
3947  .assignSubscriptionId(commandId_.data(), commandId_.len())
3948  .assignQueryID(commandId_.data(), commandId_.len())
3949  .setAckTypeEnum(Message::AckType::Processed |
3950  Message::AckType::Stats)
3951  .assignTopic(topic_.c_str(), topic_.length())
3952  .assignFilter(filter_.c_str(), filter_.length());
3953  _routes.addRoute(commandId_, messageHandler_,
3954  Message::AckType::Stats,
3955  Message::AckType::Processed,
3956  _message.getCommandEnum());
3957  try
3958  {
3959  syncAckProcessing(timeout_, _message);
3960  }
3961  catch (...)
3962  {
3963  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3964  throw;
3965  }
3966  return (std::string)commandId_;
3967  }
3968  }
3969 
3970  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3971  const std::string& topic_,
3972  const std::string& data_,
3973  long timeout_,
3974  Message::Field commandId_ = Message::Field())
3975  {
3976  if (_publishStore.isValid())
3977  {
3978  unsigned ackType = Message::AckType::Processed |
3979  Message::AckType::Stats |
3980  Message::AckType::Persisted;
3981  publishStoreMessage.reset();
3982  if (commandId_.empty())
3983  {
3984  publishStoreMessage.newCommandId();
3985  commandId_ = publishStoreMessage.getCommandId();
3986  }
3987  else
3988  {
3989  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3990  }
3991  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3992  .assignSubscriptionId(commandId_.data(), commandId_.len())
3993  .assignQueryID(commandId_.data(), commandId_.len())
3994  .setAckTypeEnum(ackType)
3995  .assignTopic(topic_.c_str(), topic_.length())
3996  .assignData(data_.c_str(), data_.length());
3997  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3998  char buf[AMPS_NUMBER_BUFFER_LEN];
3999  size_t pos = convertToCharArray(buf, haSequenceNumber);
4000  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4001  {
4002  try
4003  {
4004  Lock<Mutex> l(_lock);
4005  _routes.addRoute(commandId_, messageHandler_,
4006  Message::AckType::Stats,
4007  Message::AckType::Processed | Message::AckType::Persisted,
4008  publishStoreMessage.getCommandEnum());
4009  syncAckProcessing(timeout_, publishStoreMessage,
4010  haSequenceNumber);
4011  }
4012  catch (const DisconnectedException&)
4013  {
4014  // -V565
4015  // Pass - it will get replayed upon reconnect
4016  }
4017  catch (...)
4018  {
4019  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4020  throw;
4021  }
4022  }
4023  return (std::string)commandId_;
4024  }
4025  else
4026  {
4027  Lock<Mutex> l(_lock);
4028  _message.reset();
4029  if (commandId_.empty())
4030  {
4031  _message.newCommandId();
4032  commandId_ = _message.getCommandId();
4033  }
4034  else
4035  {
4036  _message.setCommandId(commandId_.data(), commandId_.len());
4037  }
4038  _message.setCommandEnum(Message::Command::SOWDelete)
4039  .assignSubscriptionId(commandId_.data(), commandId_.len())
4040  .assignQueryID(commandId_.data(), commandId_.len())
4041  .setAckTypeEnum(Message::AckType::Processed |
4042  Message::AckType::Stats)
4043  .assignTopic(topic_.c_str(), topic_.length())
4044  .assignData(data_.c_str(), data_.length());
4045  _routes.addRoute(commandId_, messageHandler_,
4046  Message::AckType::Stats,
4047  Message::AckType::Processed,
4048  _message.getCommandEnum());
4049  try
4050  {
4051  syncAckProcessing(timeout_, _message);
4052  }
4053  catch (...)
4054  {
4055  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4056  throw;
4057  }
4058  return (std::string)commandId_;
4059  }
4060  }
4061 
4062  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4063  const std::string& topic_,
4064  const std::string& keys_,
4065  long timeout_,
4066  Message::Field commandId_ = Message::Field())
4067  {
4068  if (_publishStore.isValid())
4069  {
4070  unsigned ackType = Message::AckType::Processed |
4071  Message::AckType::Stats |
4072  Message::AckType::Persisted;
4073  publishStoreMessage.reset();
4074  if (commandId_.empty())
4075  {
4076  publishStoreMessage.newCommandId();
4077  commandId_ = publishStoreMessage.getCommandId();
4078  }
4079  else
4080  {
4081  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4082  }
4083  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4084  .assignSubscriptionId(commandId_.data(), commandId_.len())
4085  .assignQueryID(commandId_.data(), commandId_.len())
4086  .setAckTypeEnum(ackType)
4087  .assignTopic(topic_.c_str(), topic_.length())
4088  .assignSowKeys(keys_.c_str(), keys_.length());
4089  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4090  char buf[AMPS_NUMBER_BUFFER_LEN];
4091  size_t pos = convertToCharArray(buf, haSequenceNumber);
4092  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4093  {
4094  try
4095  {
4096  Lock<Mutex> l(_lock);
4097  _routes.addRoute(commandId_, messageHandler_,
4098  Message::AckType::Stats,
4099  Message::AckType::Processed | Message::AckType::Persisted,
4100  publishStoreMessage.getCommandEnum());
4101  syncAckProcessing(timeout_, publishStoreMessage,
4102  haSequenceNumber);
4103  }
4104  catch (const DisconnectedException&)
4105  {
4106  // -V565
4107  // Pass - it will get replayed upon reconnect
4108  }
4109  catch (...)
4110  {
4111  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4112  throw;
4113  }
4114  }
4115  return (std::string)commandId_;
4116  }
4117  else
4118  {
4119  Lock<Mutex> l(_lock);
4120  _message.reset();
4121  if (commandId_.empty())
4122  {
4123  _message.newCommandId();
4124  commandId_ = _message.getCommandId();
4125  }
4126  else
4127  {
4128  _message.setCommandId(commandId_.data(), commandId_.len());
4129  }
4130  _message.setCommandEnum(Message::Command::SOWDelete)
4131  .assignSubscriptionId(commandId_.data(), commandId_.len())
4132  .assignQueryID(commandId_.data(), commandId_.len())
4133  .setAckTypeEnum(Message::AckType::Processed |
4134  Message::AckType::Stats)
4135  .assignTopic(topic_.c_str(), topic_.length())
4136  .assignSowKeys(keys_.c_str(), keys_.length());
4137  _routes.addRoute(commandId_, messageHandler_,
4138  Message::AckType::Stats,
4139  Message::AckType::Processed,
4140  _message.getCommandEnum());
4141  try
4142  {
4143  syncAckProcessing(timeout_, _message);
4144  }
4145  catch (...)
4146  {
4147  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4148  throw;
4149  }
4150  return (std::string)commandId_;
4151  }
4152  }
4153 
4154  void startTimer(void)
4155  {
4156  if (_serverVersion >= "5.3.2.0")
4157  {
4158  throw CommandException("The start_timer command is deprecated.");
4159  }
4160  Lock<Mutex> l(_lock);
4161  _message.reset();
4162  _message.setCommandEnum(Message::Command::StartTimer);
4163 
4164  _send(_message);
4165  }
4166 
4167  std::string stopTimer(MessageHandler messageHandler_)
4168  {
4169  if (_serverVersion >= "5.3.2.0")
4170  {
4171  throw CommandException("The stop_timer command is deprecated.");
4172  }
4173  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4174  }
4175 
4176  amps_handle getHandle(void)
4177  {
4178  return _client;
4179  }
4180 
4188  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4189  {
4190  _pExceptionListener = pListener_;
4191  _exceptionListener = _pExceptionListener.get();
4192  }
4193 
4194  void setExceptionListener(const ExceptionListener& listener_)
4195  {
4196  _exceptionListener = &listener_;
4197  }
4198 
4199  const ExceptionListener& getExceptionListener(void) const
4200  {
4201  return *_exceptionListener;
4202  }
4203 
4204  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4205  {
4206  if (readTimeout_ < heartbeatInterval_)
4207  {
4208  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4209  }
4210  Lock<Mutex> l(_lock);
4211  if (_heartbeatInterval != heartbeatInterval_ ||
4212  _readTimeout != readTimeout_)
4213  {
4214  _heartbeatInterval = heartbeatInterval_;
4215  _readTimeout = readTimeout_;
4216  _sendHeartbeat();
4217  }
4218  }
4219 
4220  void _sendHeartbeat(void)
4221  {
4222  if (_connected && _heartbeatInterval != 0)
4223  {
4224  std::ostringstream options;
4225  options << "start," << _heartbeatInterval;
4226  _beatMessage.setOptions(options.str());
4227 
4228  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4229  _heartbeatTimer.start();
4230  try
4231  {
4232  _sendWithoutRetry(_beatMessage);
4233  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4234  }
4235  catch (ConnectionException& ex_)
4236  {
4237  // If we are disconnected when we attempt to send, that's OK;
4238  // we'll send this message after we re-connect (if we do).
4239  AMPS_UNHANDLED_EXCEPTION(ex_);
4240  }
4241  _beatMessage.setOptions("beat");
4242  }
4243  amps_result result = AMPS_E_OK;
4244  if (_readTimeout && _connected)
4245  {
4246  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4247  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4248  {
4249  AMPSException::throwFor(_client, result);
4250  }
4251  if (!_queueAckTimeout)
4252  {
4253  result = amps_client_set_idle_time(_client,
4254  (int)(_heartbeatInterval * 1000));
4255  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4256  {
4257  AMPSException::throwFor(_client, result);
4258  }
4259  }
4260  }
4261  }
4262 
4263  void addConnectionStateListener(ConnectionStateListener* listener_)
4264  {
4265  Lock<Mutex> lock(_lock);
4266  _connectionStateListeners.insert(listener_);
4267  }
4268 
4269  void removeConnectionStateListener(ConnectionStateListener* listener_)
4270  {
4271  Lock<Mutex> lock(_lock);
4272  _connectionStateListeners.erase(listener_);
4273  }
4274 
4275  void clearConnectionStateListeners()
4276  {
4277  Lock<Mutex> lock(_lock);
4278  _connectionStateListeners.clear();
4279  }
4280 
4281  void _registerHandler(Command& command_, Message::Field& cid_,
4282  MessageHandler& handler_, unsigned requestedAcks_,
4283  unsigned systemAddedAcks_, Message::Command::Type commandType_)
4284  {
4285  Message message = command_.getMessage();
4286  Message::Command::Type commandType = message.getCommandEnum();
4287  Message::Field subid = message.getSubscriptionId();
4288  Message::Field qid = message.getQueryID();
4289  // If we have an id, we're good, even if it's an existing route
4290  bool added = qid.len() || subid.len() || cid_.len();
4291  bool cidIsQid = cid_ == qid;
4292  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4293  int addedCount = 0;
4294  if (subid.len() > 0)
4295  {
4296  // This can replace a non-subscribe with a matching id
4297  // with a subscription but not another subscription.
4298  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4299  systemAddedAcks_, commandType_);
4300  if (!cidUnique
4301  && (commandType == Message::Command::Subscribe
4302  || commandType == Message::Command::DeltaSubscribe))
4303  {
4304  // We don't need to do anything else
4305  cid_ = subid;
4306  return;
4307  }
4308  }
4309  if (qid.len() > 0 && qid != subid
4310  && (commandType == Message::Command::SOW
4311  || commandType == Message::Command::SOWDelete
4312  || commandType == Message::Command::SOWAndSubscribe
4313  || commandType == Message::Command::SOWAndDeltaSubscribe))
4314  {
4315  while (_routes.hasRoute(qid))
4316  {
4317  message.newQueryId();
4318  if (cidIsQid)
4319  {
4320  cid_ = message.getQueryId();
4321  }
4322  qid = message.getQueryId();
4323  }
4324  if (addedCount == 0)
4325  {
4326  _routes.addRoute(qid, handler_, requestedAcks_,
4327  systemAddedAcks_, commandType_);
4328  }
4329  else
4330  {
4331  void* data = NULL;
4332  {
4333  Unlock<Mutex> u(_lock);
4334  data = amps_invoke_copy_route_function(handler_.userData());
4335  }
4336  if (!data)
4337  {
4338  _routes.addRoute(qid, handler_, requestedAcks_,
4339  systemAddedAcks_, commandType_);
4340  }
4341  else
4342  {
4343  _routes.addRoute(qid,
4344  MessageHandler(handler_.function(),
4345  data),
4346  requestedAcks_,
4347  systemAddedAcks_, commandType_);
4348  }
4349  }
4350  ++addedCount;
4351  }
4352  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4353  {
4354  while (_routes.hasRoute(cid_))
4355  {
4356  cid_ = message.newCommandId().getCommandId();
4357  }
4358  if (addedCount == 0)
4359  {
4360  _routes.addRoute(cid_, handler_, requestedAcks_,
4361  systemAddedAcks_, commandType_);
4362  }
4363  else
4364  {
4365  void* data = NULL;
4366  {
4367  Unlock<Mutex> u(_lock);
4368  data = amps_invoke_copy_route_function(handler_.userData());
4369  }
4370  if (!data)
4371  {
4372  _routes.addRoute(cid_, handler_, requestedAcks_,
4373  systemAddedAcks_, commandType_);
4374  }
4375  else
4376  {
4377  _routes.addRoute(cid_,
4378  MessageHandler(handler_.function(),
4379  data),
4380  requestedAcks_,
4381  systemAddedAcks_, commandType_);
4382  }
4383  }
4384  }
4385  else if ((commandType == Message::Command::Publish ||
4386  commandType == Message::Command::DeltaPublish)
4387  && requestedAcks_ & ~Message::AckType::Persisted)
4388  {
4389  cid_ = command_.getMessage().newCommandId().getCommandId();
4390  _routes.addRoute(cid_, handler_, requestedAcks_,
4391  systemAddedAcks_, commandType_);
4392  added = true;
4393  }
4394  if (!added)
4395  {
4396  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4397  }
4398  }
4399 
4400  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4401  bool isHASubscribe_ = true)
4402  {
4403  isHASubscribe_ &= (bool)_subscriptionManager;
4404  Message& message = command_.getMessage();
4405  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4406  Message::AckType::Processed : Message::AckType::None;
4407  unsigned requestedAcks = message.getAckTypeEnum();
4408  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4409  Message::Command::Type commandType = message.getCommandEnum();
4410  if (commandType == Message::Command::SOWAndSubscribe
4411  || commandType == Message::Command::SOWAndDeltaSubscribe
4412  || commandType == Message::Command::StopTimer)
4413  {
4414  systemAddedAcks |= Message::AckType::Completed;
4415  }
4416  Message::Field cid = message.getCommandId();
4417  if (handler_.isValid() && cid.empty())
4418  {
4419  cid = message.newCommandId().getCommandId();
4420  }
4421  if (message.getBookmark().len() > 0)
4422  {
4423  if (command_.isSubscribe())
4424  {
4425  Message::Field bookmark = message.getBookmark();
4426  if (_bookmarkStore.isValid())
4427  {
4428  systemAddedAcks |= Message::AckType::Persisted;
4429  if (bookmark == AMPS_BOOKMARK_RECENT)
4430  {
4431  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4432  }
4433  else if (bookmark != AMPS_BOOKMARK_NOW &&
4434  bookmark != AMPS_BOOKMARK_EPOCH)
4435  {
4436  _bookmarkStore.log(message);
4437  if (!BookmarkRange::isRange(bookmark))
4438  {
4439  _bookmarkStore.discard(message);
4440  _bookmarkStore.persisted(message.getSubscriptionId(),
4441  bookmark);
4442  }
4443  }
4444  }
4445  else if (bookmark == AMPS_BOOKMARK_RECENT)
4446  {
4448  }
4449  }
4450  }
4451  if (isPublishStore)
4452  {
4453  systemAddedAcks |= Message::AckType::Persisted;
4454  }
4455  bool isSubscribe = command_.isSubscribe();
4456  if (handler_.isValid() && !isSubscribe)
4457  {
4458  _registerHandler(command_, cid, handler_,
4459  requestedAcks, systemAddedAcks, commandType);
4460  }
4461  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4462  if (isPublishStore)
4463  {
4464  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4465  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4466  {
4467  Unlock<Mutex> u(_lock);
4468  haSequenceNumber = _publishStore.store(message);
4469  }
4470  message.setSequence(haSequenceNumber);
4471  try
4472  {
4473  if (useSyncSend)
4474  {
4475  syncAckProcessing((long)command_.getTimeout(), message,
4476  haSequenceNumber);
4477  }
4478  else
4479  {
4480  _send(message, haSequenceNumber);
4481  }
4482  }
4483  catch (const DisconnectedException&)
4484  {
4485  // -V565
4486  // Pass - message will get replayed when reconnected
4487  }
4488  catch (...)
4489  {
4490  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4491  throw;
4492  }
4493  }
4494  else
4495  {
4496  if (isSubscribe)
4497  {
4498  const Message::Field& subId = message.getSubscriptionId();
4499  if (isHASubscribe_)
4500  {
4501  Unlock<Mutex> u(_lock);
4502  _subscriptionManager->subscribe(handler_,
4503  message.deepCopy(),
4504  requestedAcks);
4505  if (_badTimeToHASubscribe)
4506  {
4507  message.setAckTypeEnum(requestedAcks);
4508  return std::string(subId.data(), subId.len());
4509  }
4510  }
4511  if (handler_.isValid())
4512  {
4513  _registerHandler(command_, cid, handler_,
4514  requestedAcks, systemAddedAcks, commandType);
4515  }
4516  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4517  try
4518  {
4519  if (useSyncSend)
4520  {
4521  syncAckProcessing((long)command_.getTimeout(), message,
4522  isHASubscribe_);
4523  }
4524  else
4525  {
4526  _send(message);
4527  }
4528  }
4529  catch (const DisconnectedException&)
4530  {
4531  if (!isHASubscribe_)
4532  {
4533  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4534  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4535  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4536  message.setAckTypeEnum(requestedAcks);
4537  throw;
4538  }
4539  }
4540  catch (const TimedOutException&)
4541  {
4542  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4543  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4544  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4545  message.setAckTypeEnum(requestedAcks);
4546  throw;
4547  }
4548  catch (...)
4549  {
4550  if (isHASubscribe_)
4551  {
4552  // Have to unlock before calling into sub manager to avoid deadlock
4553  Unlock<Mutex> unlock(_lock);
4554  _subscriptionManager->unsubscribe(subId);
4555  }
4556  if (message.getQueryID().len() > 0)
4557  {
4558  _routes.removeRoute(message.getQueryID());
4559  }
4560  _routes.removeRoute(cid);
4561  _routes.removeRoute(subId);
4562  message.setAckTypeEnum(requestedAcks);
4563  throw;
4564  }
4565  if (subId.len() > 0)
4566  {
4567  message.setAckTypeEnum(requestedAcks);
4568  return std::string(subId.data(), subId.len());
4569  }
4570  }
4571  else
4572  {
4573  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4574  try
4575  {
4576  if (useSyncSend)
4577  {
4578  syncAckProcessing((long)(command_.getTimeout()), message);
4579  }
4580  else
4581  {
4582  _send(message);
4583  }
4584  }
4585  catch (const TimedOutException&)
4586  {
4587  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4588  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4589  message.setAckTypeEnum(requestedAcks);
4590  throw;
4591  }
4592  catch (const DisconnectedException&)
4593  {
4594  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4595  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4596  message.setAckTypeEnum(requestedAcks);
4597  throw;
4598  }
4599  catch (...)
4600  {
4601  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4602  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4603  message.setAckTypeEnum(requestedAcks);
4604  throw;
4605  }
4606  }
4607  }
4608  message.setAckTypeEnum(requestedAcks);
4609  return cid;
4610  }
4611 
4612  MessageStream getEmptyMessageStream(void);
4613 
4614  std::string executeAsync(Command& command_, MessageHandler& handler_,
4615  bool isHASubscribe_ = true)
4616  {
4617  Lock<Mutex> lock(_lock);
4618  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4619  }
4620 
4621  // Queue Methods //
4622  void setAutoAck(bool isAutoAckEnabled_)
4623  {
4624  _isAutoAckEnabled = isAutoAckEnabled_;
4625  }
4626  bool getAutoAck(void) const
4627  {
4628  return _isAutoAckEnabled;
4629  }
4630  void setAckBatchSize(const unsigned batchSize_)
4631  {
4632  _ackBatchSize = batchSize_;
4633  if (!_queueAckTimeout)
4634  {
4635  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4636  amps_client_set_idle_time(_client, _queueAckTimeout);
4637  }
4638  }
4639  unsigned getAckBatchSize(void) const
4640  {
4641  return _ackBatchSize;
4642  }
4643  int getAckTimeout(void) const
4644  {
4645  return _queueAckTimeout;
4646  }
4647  void setAckTimeout(const int ackTimeout_)
4648  {
4649  amps_client_set_idle_time(_client, ackTimeout_);
4650  _queueAckTimeout = ackTimeout_;
4651  }
4652  size_t _ack(QueueBookmarks& queueBookmarks_)
4653  {
4654  if (queueBookmarks_._bookmarkCount)
4655  {
4656  publishStoreMessage.reset();
4657  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4658  .setTopic(queueBookmarks_._topic)
4659  .setBookmark(queueBookmarks_._data)
4660  .setCommandId("AMPS-queue-ack");
4661  amps_uint64_t haSequenceNumber = 0;
4662  if (_publishStore.isValid())
4663  {
4664  haSequenceNumber = _publishStore.store(publishStoreMessage);
4665  publishStoreMessage.setAckType("persisted")
4666  .setSequence(haSequenceNumber);
4667  queueBookmarks_._data.erase();
4668  queueBookmarks_._bookmarkCount = 0;
4669  }
4670  _send(publishStoreMessage, haSequenceNumber);
4671  if (!_publishStore.isValid())
4672  {
4673  queueBookmarks_._data.erase();
4674  queueBookmarks_._bookmarkCount = 0;
4675  }
4676  return 1;
4677  }
4678  return 0;
4679  }
4680  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4681  {
4682  if (_isAutoAckEnabled)
4683  {
4684  return;
4685  }
4686  _ack(topic_, bookmark_, options_);
4687  }
4688  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4689  {
4690  if (bookmark_.len() == 0)
4691  {
4692  return;
4693  }
4694  Lock<Mutex> lock(_lock);
4695  if (_ackBatchSize < 2 || options_ != NULL)
4696  {
4697  publishStoreMessage.reset();
4698  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4699  .setCommandId("AMPS-queue-ack")
4700  .setTopic(topic_).setBookmark(bookmark_);
4701  if (options_)
4702  {
4703  publishStoreMessage.setOptions(options_);
4704  }
4705  amps_uint64_t haSequenceNumber = 0;
4706  if (_publishStore.isValid())
4707  {
4708  haSequenceNumber = _publishStore.store(publishStoreMessage);
4709  publishStoreMessage.setAckType("persisted")
4710  .setSequence(haSequenceNumber);
4711  }
4712  _send(publishStoreMessage, haSequenceNumber);
4713  return;
4714  }
4715  // have we acked anything for this hash
4716  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4717  TopicHashMap::iterator it = _topicHashMap.find(hash);
4718  if (it == _topicHashMap.end())
4719  {
4720  // add a new one to the map
4721 #ifdef AMPS_USE_EMPLACE
4722  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4723 #else
4724  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4725 #endif
4726  }
4727  QueueBookmarks& queueBookmarks = it->second;
4728  if (queueBookmarks._data.length())
4729  {
4730  queueBookmarks._data.append(",");
4731  }
4732  else
4733  {
4734  queueBookmarks._oldestTime = amps_now();
4735  }
4736  queueBookmarks._data.append(bookmark_);
4737  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4738  {
4739  _ack(queueBookmarks);
4740  }
4741  }
4742  void flushAcks(void)
4743  {
4744  size_t sendCount = 0;
4745  if (!_connected)
4746  {
4747  return;
4748  }
4749  else
4750  {
4751  Lock<Mutex> lock(_lock);
4752  typedef TopicHashMap::iterator iterator;
4753  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4754  {
4755  QueueBookmarks& queueBookmarks = it->second;
4756  sendCount += _ack(queueBookmarks);
4757  }
4758  }
4759  if (sendCount && _connected)
4760  {
4761  publishFlush(0, Message::AckType::Processed);
4762  }
4763  }
4764  // called when there's idle time, to see if we need to flush out any "acks"
4765  void checkQueueAcks(void)
4766  {
4767  if (!_topicHashMap.size())
4768  {
4769  return;
4770  }
4771  Lock<Mutex> lock(_lock);
4772  try
4773  {
4774  amps_uint64_t threshold = amps_now()
4775  - (amps_uint64_t)_queueAckTimeout;
4776  typedef TopicHashMap::iterator iterator;
4777  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4778  {
4779  QueueBookmarks& queueBookmarks = it->second;
4780  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4781  {
4782  _ack(queueBookmarks);
4783  }
4784  }
4785  }
4786  catch (std::exception& ex)
4787  {
4788  AMPS_UNHANDLED_EXCEPTION(ex);
4789  }
4790  }
4791 
4792  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4793  {
4794  Lock<Mutex> lock(_deferredExecutionLock);
4795 #ifdef AMPS_USE_EMPLACE
4796  _deferredExecutionList.emplace_back(
4797  DeferredExecutionRequest(func_, userData_));
4798 #else
4799  _deferredExecutionList.push_back(
4800  DeferredExecutionRequest(func_, userData_));
4801 #endif
4802  }
4803 
4804  inline void processDeferredExecutions(void)
4805  {
4806  if (_deferredExecutionList.size())
4807  {
4808  Lock<Mutex> lock(_deferredExecutionLock);
4809  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4810  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4811  for (; it != end; ++it)
4812  {
4813  try
4814  {
4815  it->_func(it->_userData);
4816  }
4817  catch (...)
4818  {
4819  // -V565
4820  // Intentionally ignore errors
4821  }
4822  }
4823  _deferredExecutionList.clear();
4824  _routes.invalidateCache();
4825  _routeCache.invalidateCache();
4826  }
4827  }
4828 
4829  bool getRetryOnDisconnect(void) const
4830  {
4831  return _isRetryOnDisconnect;
4832  }
4833 
4834  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4835  {
4836  _isRetryOnDisconnect = isRetryOnDisconnect_;
4837  }
4838 
4839  void setDefaultMaxDepth(unsigned maxDepth_)
4840  {
4841  _defaultMaxDepth = maxDepth_;
4842  }
4843 
4844  unsigned getDefaultMaxDepth(void) const
4845  {
4846  return _defaultMaxDepth;
4847  }
4848 
4849  void setTransportFilterFunction(amps_transport_filter_function filter_,
4850  void* userData_)
4851  {
4852  amps_client_set_transport_filter_function(_client, filter_, userData_);
4853  }
4854 
4855  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4856  void* userData_)
4857  {
4858  amps_client_set_thread_created_callback(_client, callback_, userData_);
4859  }
4860  }; // class ClientImpl
4935 
4937  {
4938  RefHandle<MessageStreamImpl> _body;
4939  public:
4944  class iterator
4945  {
4946  MessageStream* _pStream;
4947  Message _current;
4948  inline void advance(void);
4949 
4950  public:
4951  iterator() // end
4952  : _pStream(NULL)
4953  {;}
4954  iterator(MessageStream* pStream_)
4955  : _pStream(pStream_)
4956  {
4957  advance();
4958  }
4959 
4960  bool operator==(const iterator& rhs) const
4961  {
4962  return _pStream == rhs._pStream;
4963  }
4964  bool operator!=(const iterator& rhs) const
4965  {
4966  return _pStream != rhs._pStream;
4967  }
4968  void operator++(void)
4969  {
4970  advance();
4971  }
4972  Message operator*(void)
4973  {
4974  return _current;
4975  }
4976  Message* operator->(void)
4977  {
4978  return &_current;
4979  }
4980  };
4982  bool isValid() const
4983  {
4984  return _body.isValid();
4985  }
4986 
4990  {
4991  if (!_body.isValid())
4992  {
4993  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4994  }
4995  return iterator(this);
4996  }
4999  // For non-SOW queries, the end is never reached.
5001  {
5002  return iterator();
5003  }
5004  inline MessageStream(void);
5005 
5011  MessageStream timeout(unsigned timeout_);
5012 
5016  MessageStream conflate(void);
5022  MessageStream maxDepth(unsigned maxDepth_);
5025  unsigned getMaxDepth(void) const;
5028  unsigned getDepth(void) const;
5029 
5030  private:
5031  inline MessageStream(const Client& client_);
5032  inline void setSOWOnly(const std::string& commandId_,
5033  const std::string& queryId_ = "");
5034  inline void setSubscription(const std::string& subId_,
5035  const std::string& commandId_ = "",
5036  const std::string& queryId_ = "");
5037  inline void setStatsOnly(const std::string& commandId_,
5038  const std::string& queryId_ = "");
5039  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5040 
5041  inline operator MessageHandler(void);
5042 
5043  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5044 
5045  friend class Client;
5046 
5047  };
5048 
5068  class Client // -V553
5069  {
5070  protected:
5071  BorrowRefHandle<ClientImpl> _body;
5072  public:
5073  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5074  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5075  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5076 
5085  Client(const std::string& clientName = "")
5086  : _body(new ClientImpl(clientName), true)
5087  {;}
5088 
5089  Client(ClientImpl* existingClient)
5090  : _body(existingClient, true)
5091  {;}
5092 
5093  Client(ClientImpl* existingClient, bool isRef)
5094  : _body(existingClient, isRef)
5095  {;}
5096 
5097  Client(const Client& rhs) : _body(rhs._body) {;}
5098  virtual ~Client(void) {;}
5099 
5100  Client& operator=(const Client& rhs)
5101  {
5102  _body = rhs._body;
5103  return *this;
5104  }
5105 
5106  bool isValid()
5107  {
5108  return _body.isValid();
5109  }
5110 
5123  void setName(const std::string& name)
5124  {
5125  _body.get().setName(name);
5126  }
5127 
5130  const std::string& getName() const
5131  {
5132  return _body.get().getName();
5133  }
5134 
5138  const std::string& getNameHash() const
5139  {
5140  return _body.get().getNameHash();
5141  }
5142 
5146  const amps_uint64_t getNameHashValue() const
5147  {
5148  return _body.get().getNameHashValue();
5149  }
5150 
5157  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5158  {
5159  _body.get().setLogonCorrelationData(logonCorrelationData_);
5160  }
5161 
5164  const std::string& getLogonCorrelationData() const
5165  {
5166  return _body.get().getLogonCorrelationData();
5167  }
5168 
5177  size_t getServerVersion() const
5178  {
5179  return _body.get().getServerVersion();
5180  }
5181 
5188  VersionInfo getServerVersionInfo() const
5189  {
5190  return _body.get().getServerVersionInfo();
5191  }
5192 
5202  static size_t convertVersionToNumber(const std::string& version_)
5203  {
5204  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5205  }
5206 
5217  static size_t convertVersionToNumber(const char* data_, size_t len_)
5218  {
5219  return AMPS::convertVersionToNumber(data_, len_);
5220  }
5221 
5224  const std::string& getURI() const
5225  {
5226  return _body.get().getURI();
5227  }
5228 
5235 
5237 
5248  void connect(const std::string& uri)
5249  {
5250  _body.get().connect(uri);
5251  }
5252 
5255  void disconnect()
5256  {
5257  _body.get().disconnect();
5258  }
5259 
5273  void send(const Message& message)
5274  {
5275  _body.get().send(message);
5276  }
5277 
5286  void addMessageHandler(const Field& commandId_,
5287  const AMPS::MessageHandler& messageHandler_,
5288  unsigned requestedAcks_, bool isSubscribe_)
5289  {
5290  Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5291  _body.get().addMessageHandler(commandId_, messageHandler_,
5292  requestedAcks_, commandType);
5293  }
5294 
5303  void addMessageHandler(const Field& commandId_,
5304  const AMPS::MessageHandler& messageHandler_,
5305  unsigned requestedAcks_, Message::Command::Type commandType_)
5306  {
5307  _body.get().addMessageHandler(commandId_, messageHandler_,
5308  requestedAcks_, commandType_);
5309  }
5310 
5314  bool removeMessageHandler(const Field& commandId_)
5315  {
5316  return _body.get().removeMessageHandler(commandId_);
5317  }
5318 
5342  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5343  {
5344  return _body.get().send(messageHandler, message, timeout);
5345  }
5346 
5356  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5357  {
5358  _body.get().setDisconnectHandler(disconnectHandler);
5359  }
5360 
5364  DisconnectHandler getDisconnectHandler(void) const
5365  {
5366  return _body.get().getDisconnectHandler();
5367  }
5368 
5373  virtual ConnectionInfo getConnectionInfo() const
5374  {
5375  return _body.get().getConnectionInfo();
5376  }
5377 
5386  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5387  {
5388  _body.get().setBookmarkStore(bookmarkStore_);
5389  }
5390 
5395  {
5396  return _body.get().getBookmarkStore();
5397  }
5398 
5403  {
5404  return _body.get().getSubscriptionManager();
5405  }
5406 
5414  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5415  {
5416  _body.get().setSubscriptionManager(subscriptionManager_);
5417  }
5418 
5438  void setPublishStore(const Store& publishStore_)
5439  {
5440  _body.get().setPublishStore(publishStore_);
5441  }
5442 
5447  {
5448  return _body.get().getPublishStore();
5449  }
5450 
5454  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5455  {
5456  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5457  duplicateMessageHandler_);
5458  }
5459 
5470  {
5471  return _body.get().getDuplicateMessageHandler();
5472  }
5473 
5484  {
5485  _body.get().setFailedWriteHandler(handler_);
5486  }
5487 
5492  {
5493  return _body.get().getFailedWriteHandler();
5494  }
5495 
5496 
5514  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5515  {
5516  return _body.get().publish(topic_.c_str(), topic_.length(),
5517  data_.c_str(), data_.length());
5518  }
5519 
5539  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5540  const char* data_, size_t dataLength_)
5541  {
5542  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5543  }
5544 
5563  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5564  unsigned long expiration_)
5565  {
5566  return _body.get().publish(topic_.c_str(), topic_.length(),
5567  data_.c_str(), data_.length(), expiration_);
5568  }
5569 
5590  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5591  const char* data_, size_t dataLength_,
5592  unsigned long expiration_)
5593  {
5594  return _body.get().publish(topic_, topicLength_,
5595  data_, dataLength_, expiration_);
5596  }
5597 
5636  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5637  {
5638  _body.get().publishFlush(timeout_, ackType_);
5639  }
5640 
5641 
5657  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5658  {
5659  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5660  data_.c_str(), data_.length());
5661  }
5662 
5680  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5681  const char* data_, size_t dataLength_)
5682  {
5683  return _body.get().deltaPublish(topic_, topicLength_,
5684  data_, dataLength_);
5685  }
5686 
5703  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5704  unsigned long expiration_)
5705  {
5706  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5707  data_.c_str(), data_.length(),
5708  expiration_);
5709  }
5710 
5729  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5730  const char* data_, size_t dataLength_,
5731  unsigned long expiration_)
5732  {
5733  return _body.get().deltaPublish(topic_, topicLength_,
5734  data_, dataLength_, expiration_);
5735  }
5736 
5752  std::string logon(int timeout_ = 0,
5753  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5754  const char* options_ = NULL)
5755  {
5756  return _body.get().logon(timeout_, authenticator_, options_);
5757  }
5771  std::string logon(const char* options_, int timeout_ = 0)
5772  {
5773  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5774  options_);
5775  }
5776 
5790  std::string logon(const std::string& options_, int timeout_ = 0)
5791  {
5792  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5793  options_.c_str());
5794  }
5795 
5815  std::string subscribe(const MessageHandler& messageHandler_,
5816  const std::string& topic_,
5817  long timeout_ = 0,
5818  const std::string& filter_ = "",
5819  const std::string& options_ = "",
5820  const std::string& subId_ = "")
5821  {
5822  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5823  filter_, "", options_, subId_);
5824  }
5825 
5841  MessageStream subscribe(const std::string& topic_,
5842  long timeout_ = 0, const std::string& filter_ = "",
5843  const std::string& options_ = "",
5844  const std::string& subId_ = "")
5845  {
5846  MessageStream result(*this);
5847  if (_body.get().getDefaultMaxDepth())
5848  {
5849  result.maxDepth(_body.get().getDefaultMaxDepth());
5850  }
5851  result.setSubscription(_body.get().subscribe(
5852  result.operator MessageHandler(),
5853  topic_, timeout_, filter_, "",
5854  options_, subId_, false));
5855  return result;
5856  }
5857 
5873  MessageStream subscribe(const char* topic_,
5874  long timeout_ = 0, const std::string& filter_ = "",
5875  const std::string& options_ = "",
5876  const std::string& subId_ = "")
5877  {
5878  MessageStream result(*this);
5879  if (_body.get().getDefaultMaxDepth())
5880  {
5881  result.maxDepth(_body.get().getDefaultMaxDepth());
5882  }
5883  result.setSubscription(_body.get().subscribe(
5884  result.operator MessageHandler(),
5885  topic_, timeout_, filter_, "",
5886  options_, subId_, false));
5887  return result;
5888  }
5889 
5902  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5903  const std::string& topic_,
5904  long timeout_,
5905  const std::string& filter_ = "",
5906  const std::string& options_ = "",
5907  const std::string& subId_ = "")
5908  {
5909  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5910  filter_, "", options_, subId_);
5911  }
5920  MessageStream deltaSubscribe(const std::string& topic_,
5921  long timeout_, const std::string& filter_ = "",
5922  const std::string& options_ = "",
5923  const std::string& subId_ = "")
5924  {
5925  MessageStream result(*this);
5926  if (_body.get().getDefaultMaxDepth())
5927  {
5928  result.maxDepth(_body.get().getDefaultMaxDepth());
5929  }
5930  result.setSubscription(_body.get().deltaSubscribe(
5931  result.operator MessageHandler(),
5932  topic_, timeout_, filter_, "",
5933  options_, subId_, false));
5934  return result;
5935  }
5936 
5938  MessageStream deltaSubscribe(const char* topic_,
5939  long timeout_, const std::string& filter_ = "",
5940  const std::string& options_ = "",
5941  const std::string& subId_ = "")
5942  {
5943  MessageStream result(*this);
5944  if (_body.get().getDefaultMaxDepth())
5945  {
5946  result.maxDepth(_body.get().getDefaultMaxDepth());
5947  }
5948  result.setSubscription(_body.get().deltaSubscribe(
5949  result.operator MessageHandler(),
5950  topic_, timeout_, filter_, "",
5951  options_, subId_, false));
5952  return result;
5953  }
5954 
5980  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5981  const std::string& topic_,
5982  long timeout_,
5983  const std::string& bookmark_,
5984  const std::string& filter_ = "",
5985  const std::string& options_ = "",
5986  const std::string& subId_ = "")
5987  {
5988  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5989  filter_, bookmark_, options_, subId_);
5990  }
6008  MessageStream bookmarkSubscribe(const std::string& topic_,
6009  long timeout_,
6010  const std::string& bookmark_,
6011  const std::string& filter_ = "",
6012  const std::string& options_ = "",
6013  const std::string& subId_ = "")
6014  {
6015  MessageStream result(*this);
6016  if (_body.get().getDefaultMaxDepth())
6017  {
6018  result.maxDepth(_body.get().getDefaultMaxDepth());
6019  }
6020  result.setSubscription(_body.get().subscribe(
6021  result.operator MessageHandler(),
6022  topic_, timeout_, filter_,
6023  bookmark_, options_,
6024  subId_, false));
6025  return result;
6026  }
6027 
6029  MessageStream bookmarkSubscribe(const char* topic_,
6030  long timeout_,
6031  const std::string& bookmark_,
6032  const std::string& filter_ = "",
6033  const std::string& options_ = "",
6034  const std::string& subId_ = "")
6035  {
6036  MessageStream result(*this);
6037  if (_body.get().getDefaultMaxDepth())
6038  {
6039  result.maxDepth(_body.get().getDefaultMaxDepth());
6040  }
6041  result.setSubscription(_body.get().subscribe(
6042  result.operator MessageHandler(),
6043  topic_, timeout_, filter_,
6044  bookmark_, options_,
6045  subId_, false));
6046  return result;
6047  }
6048 
6057  void unsubscribe(const std::string& commandId)
6058  {
6059  return _body.get().unsubscribe(commandId);
6060  }
6061 
6070  {
6071  return _body.get().unsubscribe();
6072  }
6073 
6074 
6104  std::string sow(const MessageHandler& messageHandler_,
6105  const std::string& topic_,
6106  const std::string& filter_ = "",
6107  const std::string& orderBy_ = "",
6108  const std::string& bookmark_ = "",
6109  int batchSize_ = DEFAULT_BATCH_SIZE,
6110  int topN_ = DEFAULT_TOP_N,
6111  const std::string& options_ = "",
6112  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6113  {
6114  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6115  bookmark_, batchSize_, topN_, options_,
6116  timeout_);
6117  }
6142  MessageStream sow(const std::string& topic_,
6143  const std::string& filter_ = "",
6144  const std::string& orderBy_ = "",
6145  const std::string& bookmark_ = "",
6146  int batchSize_ = DEFAULT_BATCH_SIZE,
6147  int topN_ = DEFAULT_TOP_N,
6148  const std::string& options_ = "",
6149  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6150  {
6151  MessageStream result(*this);
6152  if (_body.get().getDefaultMaxDepth())
6153  {
6154  result.maxDepth(_body.get().getDefaultMaxDepth());
6155  }
6156  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6157  topic_, filter_, orderBy_, bookmark_,
6158  batchSize_, topN_, options_, timeout_));
6159  return result;
6160  }
6161 
6163  MessageStream sow(const char* topic_,
6164  const std::string& filter_ = "",
6165  const std::string& orderBy_ = "",
6166  const std::string& bookmark_ = "",
6167  int batchSize_ = DEFAULT_BATCH_SIZE,
6168  int topN_ = DEFAULT_TOP_N,
6169  const std::string& options_ = "",
6170  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6171  {
6172  MessageStream result(*this);
6173  if (_body.get().getDefaultMaxDepth())
6174  {
6175  result.maxDepth(_body.get().getDefaultMaxDepth());
6176  }
6177  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6178  topic_, filter_, orderBy_, bookmark_,
6179  batchSize_, topN_, options_, timeout_));
6180  return result;
6181  }
6204  std::string sow(const MessageHandler& messageHandler_,
6205  const std::string& topic_,
6206  long timeout_,
6207  const std::string& filter_ = "",
6208  int batchSize_ = DEFAULT_BATCH_SIZE,
6209  int topN_ = DEFAULT_TOP_N)
6210  {
6211  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6212  batchSize_, topN_);
6213  }
6236  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6237  const std::string& topic_,
6238  long timeout_,
6239  const std::string& filter_ = "",
6240  int batchSize_ = DEFAULT_BATCH_SIZE,
6241  bool oofEnabled_ = false,
6242  int topN_ = DEFAULT_TOP_N)
6243  {
6244  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6245  filter_, batchSize_, oofEnabled_,
6246  topN_);
6247  }
6248 
6268  MessageStream sowAndSubscribe(const std::string& topic_,
6269  long timeout_,
6270  const std::string& filter_ = "",
6271  int batchSize_ = DEFAULT_BATCH_SIZE,
6272  bool oofEnabled_ = false,
6273  int topN_ = DEFAULT_TOP_N)
6274  {
6275  MessageStream result(*this);
6276  if (_body.get().getDefaultMaxDepth())
6277  {
6278  result.maxDepth(_body.get().getDefaultMaxDepth());
6279  }
6280  result.setSubscription(_body.get().sowAndSubscribe(
6281  result.operator MessageHandler(),
6282  topic_, timeout_, filter_,
6283  batchSize_, oofEnabled_,
6284  topN_, false));
6285  return result;
6286  }
6306  MessageStream sowAndSubscribe(const char* topic_,
6307  long timeout_,
6308  const std::string& filter_ = "",
6309  int batchSize_ = DEFAULT_BATCH_SIZE,
6310  bool oofEnabled_ = false,
6311  int topN_ = DEFAULT_TOP_N)
6312  {
6313  MessageStream result(*this);
6314  if (_body.get().getDefaultMaxDepth())
6315  {
6316  result.maxDepth(_body.get().getDefaultMaxDepth());
6317  }
6318  result.setSubscription(_body.get().sowAndSubscribe(
6319  result.operator MessageHandler(),
6320  topic_, timeout_, filter_,
6321  batchSize_, oofEnabled_,
6322  topN_, false));
6323  return result;
6324  }
6325 
6326 
6354  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6355  const std::string& topic_,
6356  const std::string& filter_ = "",
6357  const std::string& orderBy_ = "",
6358  const std::string& bookmark_ = "",
6359  int batchSize_ = DEFAULT_BATCH_SIZE,
6360  int topN_ = DEFAULT_TOP_N,
6361  const std::string& options_ = "",
6362  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6363  {
6364  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6365  orderBy_, bookmark_, batchSize_,
6366  topN_, options_, timeout_);
6367  }
6368 
6393  MessageStream sowAndSubscribe(const std::string& topic_,
6394  const std::string& filter_ = "",
6395  const std::string& orderBy_ = "",
6396  const std::string& bookmark_ = "",
6397  int batchSize_ = DEFAULT_BATCH_SIZE,
6398  int topN_ = DEFAULT_TOP_N,
6399  const std::string& options_ = "",
6400  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6401  {
6402  MessageStream result(*this);
6403  if (_body.get().getDefaultMaxDepth())
6404  {
6405  result.maxDepth(_body.get().getDefaultMaxDepth());
6406  }
6407  result.setSubscription(_body.get().sowAndSubscribe(
6408  result.operator MessageHandler(),
6409  topic_, filter_, orderBy_,
6410  bookmark_, batchSize_, topN_,
6411  options_, timeout_, false));
6412  return result;
6413  }
6414 
6416  MessageStream sowAndSubscribe(const char* topic_,
6417  const std::string& filter_ = "",
6418  const std::string& orderBy_ = "",
6419  const std::string& bookmark_ = "",
6420  int batchSize_ = DEFAULT_BATCH_SIZE,
6421  int topN_ = DEFAULT_TOP_N,
6422  const std::string& options_ = "",
6423  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6424  {
6425  MessageStream result(*this);
6426  if (_body.get().getDefaultMaxDepth())
6427  {
6428  result.maxDepth(_body.get().getDefaultMaxDepth());
6429  }
6430  result.setSubscription(_body.get().sowAndSubscribe(
6431  result.operator MessageHandler(),
6432  topic_, filter_, orderBy_,
6433  bookmark_, batchSize_, topN_,
6434  options_, timeout_, false));
6435  return result;
6436  }
6437 
6462  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6463  const std::string& topic_,
6464  const std::string& filter_ = "",
6465  const std::string& orderBy_ = "",
6466  int batchSize_ = DEFAULT_BATCH_SIZE,
6467  int topN_ = DEFAULT_TOP_N,
6468  const std::string& options_ = "",
6469  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6470  {
6471  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6472  filter_, orderBy_, batchSize_,
6473  topN_, options_, timeout_);
6474  }
6495  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6496  const std::string& filter_ = "",
6497  const std::string& orderBy_ = "",
6498  int batchSize_ = DEFAULT_BATCH_SIZE,
6499  int topN_ = DEFAULT_TOP_N,
6500  const std::string& options_ = "",
6501  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6502  {
6503  MessageStream result(*this);
6504  if (_body.get().getDefaultMaxDepth())
6505  {
6506  result.maxDepth(_body.get().getDefaultMaxDepth());
6507  }
6508  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6509  result.operator MessageHandler(),
6510  topic_, filter_, orderBy_,
6511  batchSize_, topN_, options_,
6512  timeout_, false));
6513  return result;
6514  }
6515 
6518  const std::string& filter_ = "",
6519  const std::string& orderBy_ = "",
6520  int batchSize_ = DEFAULT_BATCH_SIZE,
6521  int topN_ = DEFAULT_TOP_N,
6522  const std::string& options_ = "",
6523  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6524  {
6525  MessageStream result(*this);
6526  if (_body.get().getDefaultMaxDepth())
6527  {
6528  result.maxDepth(_body.get().getDefaultMaxDepth());
6529  }
6530  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6531  result.operator MessageHandler(),
6532  topic_, filter_, orderBy_,
6533  batchSize_, topN_, options_,
6534  timeout_, false));
6535  return result;
6536  }
6537 
6562  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6563  const std::string& topic_,
6564  long timeout_,
6565  const std::string& filter_ = "",
6566  int batchSize_ = DEFAULT_BATCH_SIZE,
6567  bool oofEnabled_ = false,
6568  bool sendEmpties_ = false,
6569  int topN_ = DEFAULT_TOP_N)
6570  {
6571  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6572  timeout_, filter_, batchSize_,
6573  oofEnabled_, sendEmpties_,
6574  topN_);
6575  }
6576 
6598  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6599  long timeout_,
6600  const std::string& filter_ = "",
6601  int batchSize_ = DEFAULT_BATCH_SIZE,
6602  bool oofEnabled_ = false,
6603  bool sendEmpties_ = false,
6604  int topN_ = DEFAULT_TOP_N)
6605  {
6606  MessageStream result(*this);
6607  if (_body.get().getDefaultMaxDepth())
6608  {
6609  result.maxDepth(_body.get().getDefaultMaxDepth());
6610  }
6611  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6612  result.operator MessageHandler(),
6613  topic_, timeout_, filter_,
6614  batchSize_, oofEnabled_,
6615  sendEmpties_, topN_, false));
6616  return result;
6617  }
6640  long timeout_,
6641  const std::string& filter_ = "",
6642  int batchSize_ = DEFAULT_BATCH_SIZE,
6643  bool oofEnabled_ = false,
6644  bool sendEmpties_ = false,
6645  int topN_ = DEFAULT_TOP_N)
6646  {
6647  MessageStream result(*this);
6648  if (_body.get().getDefaultMaxDepth())
6649  {
6650  result.maxDepth(_body.get().getDefaultMaxDepth());
6651  }
6652  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6653  result.operator MessageHandler(),
6654  topic_, timeout_, filter_,
6655  batchSize_, oofEnabled_,
6656  sendEmpties_, topN_, false));
6657  return result;
6658  }
6678  std::string sowDelete(const MessageHandler& messageHandler,
6679  const std::string& topic,
6680  const std::string& filter,
6681  long timeout)
6682  {
6683  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6684  }
6701  Message sowDelete(const std::string& topic, const std::string& filter,
6702  long timeout = 0)
6703  {
6704  MessageStream stream(*this);
6705  char buf[Message::IdentifierLength + 1];
6706  buf[Message::IdentifierLength] = 0;
6707  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6708  Field cid(buf);
6709  try
6710  {
6711  stream.setStatsOnly(cid);
6712  _body.get().sowDelete(stream.operator MessageHandler(), topic, filter, timeout, cid);
6713  return *(stream.begin());
6714  }
6715  catch (const DisconnectedException&)
6716  {
6717  removeMessageHandler(cid);
6718  throw;
6719  }
6720  }
6721 
6726  void startTimer()
6727  {
6728  _body.get().startTimer();
6729  }
6730 
6737  std::string stopTimer(const MessageHandler& messageHandler)
6738  {
6739  return _body.get().stopTimer(messageHandler);
6740  }
6741 
6763  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6764  const std::string& topic_,
6765  const std::string& keys_,
6766  long timeout_ = 0)
6767  {
6768  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6769  }
6790  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6791  long timeout_ = 0)
6792  {
6793  MessageStream stream(*this);
6794  char buf[Message::IdentifierLength + 1];
6795  buf[Message::IdentifierLength] = 0;
6796  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6797  Field cid(buf);
6798  try
6799  {
6800  stream.setStatsOnly(cid);
6801  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6802  return *(stream.begin());
6803  }
6804  catch (const DisconnectedException&)
6805  {
6806  removeMessageHandler(cid);
6807  throw;
6808  }
6809  }
6810 
6825  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6826  const std::string& topic_, const std::string& data_,
6827  long timeout_ = 0)
6828  {
6829  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6830  }
6831 
6846  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
6847  long timeout_ = 0)
6848  {
6849  MessageStream stream(*this);
6850  char buf[Message::IdentifierLength + 1];
6851  buf[Message::IdentifierLength] = 0;
6852  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6853  Field cid(buf);
6854  try
6855  {
6856  stream.setStatsOnly(cid);
6857  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
6858  return *(stream.begin());
6859  }
6860  catch (const DisconnectedException&)
6861  {
6862  removeMessageHandler(cid);
6863  throw;
6864  }
6865  }
6866 
6871  {
6872  return _body.get().getHandle();
6873  }
6874 
6883  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6884  {
6885  _body.get().setExceptionListener(pListener_);
6886  }
6887 
6897  {
6898  _body.get().setExceptionListener(listener_);
6899  }
6900 
6904  {
6905  return _body.get().getExceptionListener();
6906  }
6907 
6915  // type of message) from the server for the specified interval (plus a grace period),
6929  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6930  {
6931  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6932  }
6933 
6941  // type of message) from the server for the specified interval (plus a grace period),
6953  void setHeartbeat(unsigned heartbeatTime_)
6954  {
6955  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6956  }
6957 
6960  {
6961  setLastChanceMessageHandler(messageHandler);
6962  }
6963 
6967  {
6968  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6969  messageHandler);
6970  }
6971 
6992  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6993  {
6994  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6995  }
6996 
7017  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7018  {
7019  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7020  }
7021 
7027  static const char* BOOKMARK_NOW()
7028  {
7029  return AMPS_BOOKMARK_NOW;
7030  }
7036  static const char* NOW()
7037  {
7038  return AMPS_BOOKMARK_NOW;
7039  }
7040 
7046  static const char* BOOKMARK_EPOCH()
7047  {
7048  return AMPS_BOOKMARK_EPOCH;
7049  }
7050 
7056  static const char* EPOCH()
7057  {
7058  return AMPS_BOOKMARK_EPOCH;
7059  }
7060 
7067  static const char* BOOKMARK_MOST_RECENT()
7068  {
7069  return AMPS_BOOKMARK_RECENT;
7070  }
7071 
7078  static const char* MOST_RECENT()
7079  {
7080  return AMPS_BOOKMARK_RECENT;
7081  }
7082 
7089  static const char* BOOKMARK_RECENT()
7090  {
7091  return AMPS_BOOKMARK_RECENT;
7092  }
7093 
7094 
7101  {
7102  _body.get().addConnectionStateListener(listener);
7103  }
7104 
7109  {
7110  _body.get().removeConnectionStateListener(listener);
7111  }
7112 
7116  {
7117  _body.get().clearConnectionStateListeners();
7118  }
7119 
7145  std::string executeAsync(Command& command_, MessageHandler handler_)
7146  {
7147  return _body.get().executeAsync(command_, handler_);
7148  }
7149 
7179  std::string executeAsyncNoResubscribe(Command& command_,
7180  MessageHandler handler_)
7181  {
7182  std::string id;
7183  try
7184  {
7185  if (command_.isSubscribe())
7186  {
7187  Message& message = command_.getMessage();
7188  Field subId = message.getSubscriptionId();
7189  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7190  if (useExistingHandler)
7191  {
7192  MessageHandler existingHandler;
7193  if (_body.get()._routes.getRoute(subId, existingHandler))
7194  {
7195  // we found an existing handler.
7196  _body.get().executeAsync(command_, existingHandler, false);
7197  return id; // empty string indicates existing
7198  }
7199  }
7200  }
7201  id = _body.get().executeAsync(command_, handler_, false);
7202  }
7203  catch (const DisconnectedException&)
7204  {
7205  removeMessageHandler(command_.getMessage().getCommandId());
7206  if (command_.isSubscribe())
7207  {
7208  removeMessageHandler(command_.getMessage().getSubscriptionId());
7209  }
7210  if (command_.isSow())
7211  {
7212  removeMessageHandler(command_.getMessage().getQueryID());
7213  }
7214  throw;
7215  }
7216  return id;
7217  }
7218 
7231  MessageStream execute(Command& command_);
7232 
7241  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7242  {
7243  _body.get().ack(topic_, bookmark_, options_);
7244  }
7245 
7253  void ack(Message& message_, const char* options_ = NULL)
7254  {
7255  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7256  }
7265  void ack(const std::string& topic_, const std::string& bookmark_,
7266  const char* options_ = NULL)
7267  {
7268  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7269  }
7270 
7276  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7277  {
7278  _body.get()._ack(topic_, bookmark_, options_);
7279  }
7289  void flushAcks(void)
7290  {
7291  _body.get().flushAcks();
7292  }
7293 
7298  bool getAutoAck(void) const
7299  {
7300  return _body.get().getAutoAck();
7301  }
7308  void setAutoAck(bool isAutoAckEnabled_)
7309  {
7310  _body.get().setAutoAck(isAutoAckEnabled_);
7311  }
7316  unsigned getAckBatchSize(void) const
7317  {
7318  return _body.get().getAckBatchSize();
7319  }
7326  void setAckBatchSize(const unsigned ackBatchSize_)
7327  {
7328  _body.get().setAckBatchSize(ackBatchSize_);
7329  }
7330 
7337  int getAckTimeout(void) const
7338  {
7339  return _body.get().getAckTimeout();
7340  }
7349  void setAckTimeout(const int ackTimeout_)
7350  {
7351  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7352  {
7353  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7354  }
7355  _body.get().setAckTimeout(ackTimeout_);
7356  }
7357 
7358 
7367  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7368  {
7369  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7370  }
7371 
7376  bool getRetryOnDisconnect(void) const
7377  {
7378  return _body.get().getRetryOnDisconnect();
7379  }
7380 
7385  void setDefaultMaxDepth(unsigned maxDepth_)
7386  {
7387  _body.get().setDefaultMaxDepth(maxDepth_);
7388  }
7389 
7394  unsigned getDefaultMaxDepth(void) const
7395  {
7396  return _body.get().getDefaultMaxDepth();
7397  }
7398 
7406  void* userData_)
7407  {
7408  return _body.get().setTransportFilterFunction(filter_, userData_);
7409  }
7410 
7420  void* userData_)
7421  {
7422  return _body.get().setThreadCreatedCallback(callback_, userData_);
7423  }
7424 
7430  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7431  {
7432  _body.get().deferredExecution(func_, userData_);
7433  }
7437  };
7438 
7439  inline void
7440  ClientImpl::lastChance(AMPS::Message& message)
7441  {
7442  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7443  }
7444 
7445  inline unsigned
7446  ClientImpl::persistedAck(AMPS::Message& message)
7447  {
7448  unsigned deliveries = 0;
7449  try
7450  {
7451  /*
7452  * Best Practice: If you don't care about the dupe acks that
7453  * occur during failover or rapid disconnect/reconnect, then just
7454  * ignore them. We could discard each duplicate from the
7455  * persisted store, but the storage costs of doing 1 record
7456  * discards is heavy. In most scenarios we'll just quickly blow
7457  * through the duplicates and get back to processing the
7458  * non-dupes.
7459  */
7460  const char* data = NULL;
7461  size_t len = 0;
7462  const char* status = NULL;
7463  size_t statusLen = 0;
7464  amps_handle messageHandle = message.getMessage();
7465  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7466  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7467  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7468  if (len == NotEntitled || len == Duplicate ||
7469  (statusLen == Failure && status[0] == 'f'))
7470  {
7471  if (_failedWriteHandler)
7472  {
7473  if (_publishStore.isValid())
7474  {
7475  amps_uint64_t sequence =
7476  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7477  FailedWriteStoreReplayer replayer(this, data, len);
7478  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7479  replayer, sequence));
7480  }
7481  else // Call the handler with what little we have
7482  {
7483  static Message emptyMessage;
7484  emptyMessage.setSequence(message.getSequence());
7485  AMPS_CALL_EXCEPTION_WRAPPER(
7486  _failedWriteHandler->failedWrite(emptyMessage,
7487  data, len));
7488  }
7489  ++deliveries;
7490  }
7491  }
7492  if (_publishStore.isValid())
7493  {
7494  // Ack for publisher will have sequence while
7495  // ack for bookmark subscribe won't
7496  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7497  AMPS_Sequence);
7498  if (seq > 0)
7499  {
7500  ++deliveries;
7501  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7502  }
7503  }
7504 
7505  if (!deliveries && _bookmarkStore.isValid())
7506  {
7507  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7508  &data, &len);
7509  if (len > 0)
7510  {
7511  Message::Field subId(data, len);
7512  const char* bookmarkData = NULL;
7513  size_t bookmarkLen = 0;
7514  amps_message_get_field_value(messageHandle,
7515  AMPS_Bookmark,
7516  &bookmarkData,
7517  &bookmarkLen);
7518  // Everything is there and not unsubscribed AC-912
7519  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7520  {
7521  ++deliveries;
7522  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7523  }
7524  }
7525  }
7526  }
7527  catch (std::exception& ex)
7528  {
7529  AMPS_UNHANDLED_EXCEPTION(ex);
7530  }
7531  return deliveries;
7532  }
7533 
7534  inline unsigned
7535  ClientImpl::processedAck(Message& message)
7536  {
7537  unsigned deliveries = 0;
7538  AckResponse ack;
7539  const char* data = NULL;
7540  size_t len = 0;
7541  amps_handle messageHandle = message.getMessage();
7542  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7543  Lock<Mutex> l(_lock);
7544  if (data && len)
7545  {
7546  Lock<Mutex> guard(_ackMapLock);
7547  AckMap::iterator i = _ackMap.find(std::string(data, len));
7548  if (i != _ackMap.end())
7549  {
7550  ++deliveries;
7551  ack = i->second;
7552  _ackMap.erase(i);
7553  }
7554  }
7555  if (deliveries)
7556  {
7557  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7558  ack.setStatus(data, len);
7559  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7560  ack.setReason(data, len);
7561  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7562  ack.setUsername(data, len);
7563  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7564  ack.setPassword(data, len);
7565  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7566  ack.setServerVersion(data, len);
7567  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7568  ack.setOptions(data, len);
7569  // This sets bookmark, nameHashValue, and sequenceNo
7570  ack.setBookmark(message.getBookmark());
7571  ack.setResponded();
7572  _lock.signalAll();
7573  }
7574  return deliveries;
7575  }
7576 
7577  inline void
7578  ClientImpl::checkAndSendHeartbeat(bool force)
7579  {
7580  if (force || _heartbeatTimer.check())
7581  {
7582  _heartbeatTimer.start();
7583  try
7584  {
7585  sendWithoutRetry(_beatMessage);
7586  }
7587  catch (const AMPSException&)
7588  {
7589  ;
7590  }
7591  }
7592  }
7593 
7594  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7595  {
7596  ConnectionInfo info;
7597  std::ostringstream writer;
7598 
7599  info["client.uri"] = _lastUri;
7600  info["client.name"] = _name;
7601  info["client.username"] = _username;
7602  if (_publishStore.isValid())
7603  {
7604  writer << _publishStore.unpersistedCount();
7605  info["publishStore.unpersistedCount"] = writer.str();
7606  writer.clear();
7607  writer.str("");
7608  }
7609 
7610  return info;
7611  }
7612 
7613  inline amps_result
7614  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7615  {
7616  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7617  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7618  ClientImpl* me = (ClientImpl*) userData_;
7619  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7620  if (!messageHandle_)
7621  {
7622  if (me->_queueAckTimeout)
7623  {
7624  me->checkQueueAcks();
7625  }
7626  me->checkAndSendHeartbeat();
7627  return AMPS_E_OK;
7628  }
7629 
7630  me->_readMessage.replace(messageHandle_);
7631  Message& message = me->_readMessage;
7632  Message::Command::Type commandType = message.getCommandEnum();
7633  if (commandType & SOWMask)
7634  {
7635 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7636  // A small cheat here to get the right handler, using knowledge of the
7637  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7638  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7639  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7640  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7641 #endif
7642  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7643  message.getQueryID()));
7644  }
7645  else if (commandType & PublishMask)
7646  {
7647 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7648  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7649  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7650  GlobalCommandTypeHandlers::Publish :
7651  GlobalCommandTypeHandlers::OOF)].invoke(message));
7652 #endif
7653  const char* subIds = NULL;
7654  size_t subIdsLen = 0;
7655  // Publish command, send to subscriptions
7656  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7657  &subIds, &subIdsLen);
7658  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7659  for (size_t i = 0; i < subIdCount; ++i)
7660  {
7661  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7662  MessageHandler& handler = lookupResult.handler;
7663  if (handler.isValid())
7664  {
7665  amps_message_set_field_value(messageHandle_,
7666  AMPS_SubscriptionId,
7667  subIds + lookupResult.idOffset,
7668  lookupResult.idLength);
7669  Message::Field bookmark = message.getBookmark();
7670  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7671  bool isAutoAck = me->_isAutoAckEnabled;
7672 
7673  if (!isMessageQueue && !bookmark.empty() &&
7674  me->_bookmarkStore.isValid())
7675  {
7676  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7677  {
7678  //Call duplicate message handler in handlers map
7679  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7680  {
7681  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7682  }
7683  }
7684  else
7685  {
7686  me->_bookmarkStore.log(me->_readMessage);
7687  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7688  handler.invoke(message));
7689  }
7690  }
7691  else
7692  {
7693  if (isMessageQueue && isAutoAck)
7694  {
7695  try
7696  {
7697  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7698  if (!message.getIgnoreAutoAck())
7699  {
7700  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7701  me->_ack(message.getTopic(), message.getBookmark()));
7702  }
7703  }
7704  catch (std::exception& ex)
7705  {
7706  if (!message.getIgnoreAutoAck())
7707  {
7708  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7709  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7710  }
7711  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7712  }
7713  }
7714  else
7715  {
7716  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7717  handler.invoke(message));
7718  }
7719  }
7720  }
7721  else
7722  {
7723  me->lastChance(message);
7724  }
7725  } // for (subidsEnd)
7726  }
7727  else if (commandType == Message::Command::Ack)
7728  {
7729  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7730  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7731  unsigned ackType = message.getAckTypeEnum();
7732  unsigned deliveries = 0U;
7733  switch (ackType)
7734  {
7735  case Message::AckType::Persisted:
7736  deliveries += me->persistedAck(message);
7737  break;
7738  case Message::AckType::Processed: // processed
7739  deliveries += me->processedAck(message);
7740  break;
7741  }
7742  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7743  if (deliveries == 0)
7744  {
7745  me->lastChance(message);
7746  }
7747  }
7748  else if (commandType == Message::Command::Heartbeat)
7749  {
7750  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7751  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7752  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7753  {
7754  me->checkAndSendHeartbeat(true);
7755  }
7756  else
7757  {
7758  me->lastChance(message);
7759  }
7760  return AMPS_E_OK;
7761  }
7762  else if (!message.getCommandId().empty())
7763  {
7764  unsigned deliveries = 0U;
7765  try
7766  {
7767  while (me->_connected) // Keep sending heartbeats when stream is full
7768  {
7769  try
7770  {
7771  deliveries = me->_routes.deliverData(message, message.getCommandId());
7772  break;
7773  }
7774 #ifdef _WIN32
7775  catch (MessageStreamFullException&)
7776 #else
7777  catch (MessageStreamFullException& ex_)
7778 #endif
7779  {
7780  try
7781  {
7782  me->checkAndSendHeartbeat(false);
7783  }
7784 #ifdef _WIN32
7785  catch (std::exception&)
7786 #else
7787  catch (std::exception& ex_)
7788 #endif
7789  {
7790  ;
7791  }
7792  }
7793  }
7794  }
7795  catch (std::exception& ex_)
7796  {
7797  try
7798  {
7799  me->_exceptionListener->exceptionThrown(ex_);
7800  }
7801  catch (...)
7802  {
7803  ;
7804  }
7805  }
7806  if (deliveries == 0)
7807  {
7808  me->lastChance(message);
7809  }
7810  }
7811  me->checkAndSendHeartbeat();
7812  return AMPS_E_OK;
7813  }
7814 
7815  inline void
7816  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7817  {
7818  ClientImpl* me = (ClientImpl*) userData;
7819  //Client wrapper(me);
7820  // Go ahead and signal any waiters if they are around...
7821  me->clearAcks(failedConnectionVersion);
7822  }
7823 
7824  inline amps_result
7825  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7826  {
7827  ClientImpl* me = (ClientImpl*) userData;
7828  Lock<Mutex> l(me->_lock);
7829  Client wrapper(me, false);
7830  if (me->_connected)
7831  {
7832  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7833  }
7834  while (true)
7835  {
7836  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7837  try
7838  {
7839  me->_connected = false;
7840  {
7841  // Have to release the lock here or receive thread can't
7842  // invoke the message handler.
7843  Unlock<Mutex> unlock(me->_lock);
7844  me->_disconnectHandler.invoke(wrapper);
7845  }
7846  }
7847  catch (const std::exception& ex)
7848  {
7849  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7850  }
7851  me->_lock.signalAll();
7852 
7853  if (!me->_connected)
7854  {
7855  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7856  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
7857  return AMPS_E_DISCONNECTED;
7858  }
7859  try
7860  {
7861  // Resubscribe
7862  if (me->_subscriptionManager)
7863  {
7864  {
7865  // Have to release the lock here or receive thread can't
7866  // invoke the message handler.
7867  Unlock<Mutex> unlock(me->_lock);
7868  me->_subscriptionManager->resubscribe(wrapper);
7869  }
7870  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7871  }
7872  return AMPS_E_OK;
7873  }
7874  catch (const AMPSException& subEx)
7875  {
7876  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7877  }
7878  catch (const std::exception& subEx)
7879  {
7880  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7881  return AMPS_E_RETRY;
7882  }
7883  catch (...)
7884  {
7885  return AMPS_E_RETRY;
7886  }
7887  }
7888  return AMPS_E_RETRY;
7889  }
7890 
7891  class FIX
7892  {
7893  const char* _data;
7894  size_t _len;
7895  char _fieldSep;
7896  public:
7897  class iterator
7898  {
7899  const char* _data;
7900  size_t _len;
7901  size_t _pos;
7902  char _fieldSep;
7903  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
7904  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7905  {
7906  while (_pos != _len && _data[_pos] == _fieldSep)
7907  {
7908  ++_pos;
7909  }
7910  }
7911  public:
7912  typedef void* difference_type;
7913  typedef std::forward_iterator_tag iterator_category;
7914  typedef std::pair<Message::Field, Message::Field> value_type;
7915  typedef value_type* pointer;
7916  typedef value_type& reference;
7917  bool operator==(const iterator& rhs) const
7918  {
7919  return _pos == rhs._pos;
7920  }
7921  bool operator!=(const iterator& rhs) const
7922  {
7923  return _pos != rhs._pos;
7924  }
7925  iterator& operator++()
7926  {
7927  // Skip through the data
7928  while (_pos != _len && _data[_pos] != _fieldSep)
7929  {
7930  ++_pos;
7931  }
7932  // Skip through any field separators
7933  while (_pos != _len && _data[_pos] == _fieldSep)
7934  {
7935  ++_pos;
7936  }
7937  return *this;
7938  }
7939 
7940  value_type operator*() const
7941  {
7942  value_type result;
7943  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7944  for (; i < _len && _data[i] != '='; ++i)
7945  {
7946  ++keyLength;
7947  }
7948 
7949  result.first.assign(_data + _pos, keyLength);
7950 
7951  if (i < _len && _data[i] == '=')
7952  {
7953  ++i;
7954  valueStart = i;
7955  for (; i < _len && _data[i] != _fieldSep; ++i)
7956  {
7957  valueLength++;
7958  }
7959  }
7960  result.second.assign(_data + valueStart, valueLength);
7961  return result;
7962  }
7963 
7964  friend class FIX;
7965  };
7966  class reverse_iterator
7967  {
7968  const char* _data;
7969  size_t _len;
7970  const char* _pos;
7971  char _fieldSep;
7972  public:
7973  typedef std::pair<Message::Field, Message::Field> value_type;
7974  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7975  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7976  {
7977  if (_pos)
7978  {
7979  // skip past meaningless trailing fieldseps
7980  while (_pos >= _data && *_pos == _fieldSep)
7981  {
7982  --_pos;
7983  }
7984  while (_pos > _data && *_pos != _fieldSep)
7985  {
7986  --_pos;
7987  }
7988  // if we stopped before the 0th character, it's because
7989  // it's a field sep. advance one to point to the first character
7990  // of a key.
7991  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7992  {
7993  ++_pos;
7994  }
7995  if (_pos < _data)
7996  {
7997  _pos = 0;
7998  }
7999  }
8000  }
8001  bool operator==(const reverse_iterator& rhs) const
8002  {
8003  return _pos == rhs._pos;
8004  }
8005  bool operator!=(const reverse_iterator& rhs) const
8006  {
8007  return _pos != rhs._pos;
8008  }
8009  reverse_iterator& operator++()
8010  {
8011  if (_pos == _data)
8012  {
8013  _pos = 0;
8014  }
8015  else
8016  {
8017  // back up 1 to a field separator
8018  --_pos;
8019  // keep backing up through field separators
8020  while (_pos >= _data && *_pos == _fieldSep)
8021  {
8022  --_pos;
8023  }
8024  // now back up to the beginning of this field
8025  while (_pos > _data && *_pos != _fieldSep)
8026  {
8027  --_pos;
8028  }
8029  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8030  {
8031  ++_pos;
8032  }
8033  if (_pos < _data)
8034  {
8035  _pos = 0;
8036  }
8037  }
8038  return *this;
8039  }
8040  value_type operator*() const
8041  {
8042  value_type result;
8043  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8044  size_t i = (size_t)(_pos - _data);
8045  for (; i < _len && _data[i] != '='; ++i)
8046  {
8047  ++keyLength;
8048  }
8049  result.first.assign(_pos, keyLength);
8050  if (i < _len && _data[i] == '=')
8051  {
8052  ++i;
8053  valueStart = i;
8054  for (; i < _len && _data[i] != _fieldSep; ++i)
8055  {
8056  valueLength++;
8057  }
8058  }
8059  result.second.assign(_data + valueStart, valueLength);
8060  return result;
8061  }
8062  };
8063  FIX(const Message::Field& data, char fieldSeparator = 1)
8064  : _data(data.data()), _len(data.len()),
8065  _fieldSep(fieldSeparator)
8066  {
8067  }
8068 
8069  FIX(const char* data, size_t len, char fieldSeparator = 1)
8070  : _data(data), _len(len), _fieldSep(fieldSeparator)
8071  {
8072  }
8073 
8074  iterator begin() const
8075  {
8076  return iterator(_data, _len, 0, _fieldSep);
8077  }
8078  iterator end() const
8079  {
8080  return iterator(_data, _len, _len, _fieldSep);
8081  }
8082 
8083 
8084  reverse_iterator rbegin() const
8085  {
8086  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8087  }
8088 
8089  reverse_iterator rend() const
8090  {
8091  return reverse_iterator(_data, _len, 0, _fieldSep);
8092  }
8093  };
8094 
8095 
8108 
8109  template <class T>
8111  {
8112  std::stringstream _data;
8113  char _fs;
8114  public:
8120  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8121 
8129  void append(const T& tag, const char* value, size_t offset, size_t length)
8130  {
8131  _data << tag << '=';
8132  _data.write(value + offset, (std::streamsize)length);
8133  _data << _fs;
8134  }
8140  void append(const T& tag, const std::string& value)
8141  {
8142  _data << tag << '=' << value << _fs;
8143  }
8144 
8147  std::string getString() const
8148  {
8149  return _data.str();
8150  }
8151  operator std::string() const
8152  {
8153  return _data.str();
8154  }
8155 
8157  void reset()
8158  {
8159  _data.str(std::string());
8160  }
8161  };
8162 
8166 
8168 
8172 
8174 
8175 
8183 
8185  {
8186  char _fs;
8187  public:
8192  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8193 
8196  typedef std::map<Message::Field, Message::Field> map_type;
8197 
8203  map_type toMap(const Message::Field& data)
8204  {
8205  FIX fix(data, _fs);
8206  map_type retval;
8207  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8208  {
8209  retval.insert(*a);
8210  }
8211 
8212  return retval;
8213  }
8214  };
8215 
8216 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8217  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8218  {
8219  Mutex _lock;
8220  std::deque<Message> _q;
8221  std::deque<Message> _cache;
8222  std::string _commandId;
8223  std::string _subId;
8224  std::string _queryId;
8225  Client _client;
8226  unsigned _timeout;
8227  unsigned _maxDepth;
8228  unsigned _requestedAcks;
8229  size_t _cacheMax;
8230  Message::Field _previousTopic;
8231  Message::Field _previousBookmark;
8232  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8233 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8234  std::atomic<State> _state;
8235 #else
8236  volatile State _state;
8237 #endif
8238  typedef std::map<std::string, Message*> SOWKeyMap;
8239  SOWKeyMap _sowKeyMap;
8240  public:
8241  MessageStreamImpl(const Client& client_)
8242  : _client(client_),
8243  _timeout(0),
8244  _maxDepth((unsigned)~0),
8245  _requestedAcks(0),
8246  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8247  _state(Unset)
8248  {
8249  if (_client.isValid())
8250  {
8251  _client.addConnectionStateListener(this);
8252  }
8253  }
8254 
8255  MessageStreamImpl(ClientImpl* client_)
8256  : _client(client_),
8257  _timeout(0),
8258  _maxDepth((unsigned)~0),
8259  _requestedAcks(0),
8260  _state(Unset)
8261  {
8262  if (_client.isValid())
8263  {
8264  _client.addConnectionStateListener(this);
8265  }
8266  }
8267 
8268  ~MessageStreamImpl()
8269  {
8270  }
8271 
8272  virtual void destroy()
8273  {
8274  try
8275  {
8276  close();
8277  }
8278  catch (std::exception& e)
8279  {
8280  try
8281  {
8282  if (_client.isValid())
8283  {
8284  _client.getExceptionListener().exceptionThrown(e);
8285  }
8286  }
8287  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8288  }
8289  if (_client.isValid())
8290  {
8291  _client.removeConnectionStateListener(this);
8292  Client c = _client;
8293  _client = Client((ClientImpl*)NULL);
8294  c.deferredExecution(MessageStreamImpl::destroyer, this);
8295  }
8296  else
8297  {
8298  delete this;
8299  }
8300  }
8301 
8302  static void destroyer(void* vpMessageStreamImpl_)
8303  {
8304  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8305  }
8306 
8307  void setSubscription(const std::string& subId_,
8308  const std::string& commandId_ = "",
8309  const std::string& queryId_ = "")
8310  {
8311  Lock<Mutex> lock(_lock);
8312  _subId = subId_;
8313  if (!commandId_.empty() && commandId_ != subId_)
8314  {
8315  _commandId = commandId_;
8316  }
8317  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8318  {
8319  _queryId = queryId_;
8320  }
8321  // It's possible to disconnect between creation/registration and here.
8322  if (Disconnected == _state)
8323  {
8324  return;
8325  }
8326  assert(Unset == _state);
8327  _state = Subscribe;
8328  }
8329 
8330  void setSOWOnly(const std::string& commandId_,
8331  const std::string& queryId_ = "")
8332  {
8333  Lock<Mutex> lock(_lock);
8334  _commandId = commandId_;
8335  if (!queryId_.empty() && queryId_ != commandId_)
8336  {
8337  _queryId = queryId_;
8338  }
8339  // It's possible to disconnect between creation/registration and here.
8340  if (Disconnected == _state)
8341  {
8342  return;
8343  }
8344  assert(Unset == _state);
8345  _state = SOWOnly;
8346  }
8347 
8348  void setStatsOnly(const std::string& commandId_,
8349  const std::string& queryId_ = "")
8350  {
8351  Lock<Mutex> lock(_lock);
8352  _commandId = commandId_;
8353  if (!queryId_.empty() && queryId_ != commandId_)
8354  {
8355  _queryId = queryId_;
8356  }
8357  // It's possible to disconnect between creation/registration and here.
8358  if (Disconnected == _state)
8359  {
8360  return;
8361  }
8362  assert(Unset == _state);
8363  _state = AcksOnly;
8364  _requestedAcks = Message::AckType::Stats;
8365  }
8366 
8367  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8368  {
8369  Lock<Mutex> lock(_lock);
8370  _commandId = commandId_;
8371  // It's possible to disconnect between creation/registration and here.
8372  if (Disconnected == _state)
8373  {
8374  return;
8375  }
8376  assert(Unset == _state);
8377  _state = AcksOnly;
8378  _requestedAcks = acks_;
8379  }
8380 
8381  void connectionStateChanged(ConnectionStateListener::State state_)
8382  {
8383  Lock<Mutex> lock(_lock);
8384  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8385  {
8386  _state = Disconnected;
8387  close();
8388  }
8389  _lock.signalAll();
8390  }
8391 
8392  void timeout(unsigned timeout_)
8393  {
8394  _timeout = timeout_;
8395  }
8396  void conflate(void)
8397  {
8398  if (_state == Subscribe)
8399  {
8400  _state = Conflate;
8401  }
8402  }
8403  void maxDepth(unsigned maxDepth_)
8404  {
8405  if (maxDepth_)
8406  {
8407  _maxDepth = maxDepth_;
8408  }
8409  else
8410  {
8411  _maxDepth = (unsigned)~0;
8412  }
8413  }
8414  unsigned getMaxDepth(void) const
8415  {
8416  return _maxDepth;
8417  }
8418  unsigned getDepth(void) const
8419  {
8420  return (unsigned)(_q.size());
8421  }
8422 
8423  bool next(Message& current_)
8424  {
8425  Lock<Mutex> lock(_lock);
8426  if (!_previousTopic.empty() && !_previousBookmark.empty())
8427  {
8428  try
8429  {
8430  if (_client.isValid())
8431  {
8432  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8433  }
8434  }
8435 #ifdef _WIN32
8436  catch (AMPSException&)
8437 #else
8438  catch (AMPSException& e)
8439 #endif
8440  {
8441  current_.invalidate();
8442  _previousTopic.clear();
8443  _previousBookmark.clear();
8444  return false;
8445  }
8446  _previousTopic.clear();
8447  _previousBookmark.clear();
8448  }
8449  // Don't wait to wait more than 1s at a time
8450  long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8451  Timer timer((double)_timeout);
8452  timer.start();
8453  while (_q.empty() && _state & Running)
8454  {
8455  // Using timeout so python can interrupt
8456  _lock.wait(minWaitTime);
8457  {
8458  Unlock<Mutex> unlck(_lock);
8459  amps_invoke_waiting_function();
8460  }
8461  if (_timeout)
8462  {
8463  // In case we woke up early, see how much longer to wait
8464  if (timer.checkAndGetRemaining(&minWaitTime))
8465  {
8466  // No time left
8467  break;
8468  }
8469  // Adjust next wait time
8470  minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8471  }
8472  }
8473  if (current_.isValid() && _cache.size() < _cacheMax)
8474  {
8475  current_.reset();
8476  _cache.push_back(current_);
8477  }
8478  if (!_q.empty())
8479  {
8480  current_ = _q.front();
8481  if (_q.size() == _maxDepth)
8482  {
8483  _lock.signalAll();
8484  }
8485  _q.pop_front();
8486  if (_state == Conflate)
8487  {
8488  std::string sowKey = current_.getSowKey();
8489  if (sowKey.length())
8490  {
8491  _sowKeyMap.erase(sowKey);
8492  }
8493  }
8494  else if (_state == AcksOnly)
8495  {
8496  _requestedAcks &= ~(current_.getAckTypeEnum());
8497  }
8498  if ((_state == AcksOnly && _requestedAcks == 0) ||
8499  (_state == SOWOnly && current_.getCommand() == "group_end"))
8500  {
8501  _state = Closed;
8502  }
8503  else if (current_.getCommandEnum() == Message::Command::Publish &&
8504  _client.isValid() && _client.getAutoAck() &&
8505  !current_.getLeasePeriod().empty() &&
8506  !current_.getBookmark().empty())
8507  {
8508  _previousTopic = current_.getTopic().deepCopy();
8509  _previousBookmark = current_.getBookmark().deepCopy();
8510  }
8511  return true;
8512  }
8513  if (_state == Disconnected)
8514  {
8515  throw DisconnectedException("Connection closed.");
8516  }
8517  current_.invalidate();
8518  if (_state == Closed)
8519  {
8520  return false;
8521  }
8522  return _timeout != 0;
8523  }
8524  void close(void)
8525  {
8526  if (_client.isValid())
8527  {
8528  if (_state == SOWOnly || _state == Subscribe) //not delete
8529  {
8530  if (!_commandId.empty())
8531  {
8532  _client.unsubscribe(_commandId);
8533  }
8534  if (!_subId.empty())
8535  {
8536  _client.unsubscribe(_subId);
8537  }
8538  if (!_queryId.empty())
8539  {
8540  _client.unsubscribe(_queryId);
8541  }
8542  }
8543  else
8544  {
8545  if (!_commandId.empty())
8546  {
8547  _client.removeMessageHandler(_commandId);
8548  }
8549  if (!_subId.empty())
8550  {
8551  _client.removeMessageHandler(_subId);
8552  }
8553  if (!_queryId.empty())
8554  {
8555  _client.removeMessageHandler(_queryId);
8556  }
8557  }
8558  }
8559  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8560  {
8561  _state = Closed;
8562  }
8563  }
8564  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8565  {
8566  Lock<Mutex> lock(this_->_lock);
8567  if (this_->_state != Conflate)
8568  {
8569  AMPS_TESTING_SLOW_MESSAGE_STREAM
8570  if (this_->_q.size() >= this_->_maxDepth)
8571  {
8572  // We throw here so that heartbeats can be sent. The exception
8573  // will be handled internally only, and the same Message will
8574  // come back to try again. Make sure to signal.
8575  this_->_lock.signalAll();
8576  throw MessageStreamFullException("Stream is currently full.");
8577  }
8578  if (!this_->_cache.empty())
8579  {
8580  this_->_cache.front().deepCopy(message_);
8581  this_->_q.push_back(this_->_cache.front());
8582  this_->_cache.pop_front();
8583  }
8584  else
8585  {
8586 #ifdef AMPS_USE_EMPLACE
8587  this_->_q.emplace_back(message_.deepCopy());
8588 #else
8589  this_->_q.push_back(message_.deepCopy());
8590 #endif
8591  }
8592  if (message_.getCommandEnum() == Message::Command::Publish &&
8593  this_->_client.isValid() && this_->_client.getAutoAck() &&
8594  !message_.getLeasePeriod().empty() &&
8595  !message_.getBookmark().empty())
8596  {
8597  message_.setIgnoreAutoAck();
8598  }
8599  }
8600  else
8601  {
8602  std::string sowKey = message_.getSowKey();
8603  if (sowKey.length())
8604  {
8605  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8606  if (it != this_->_sowKeyMap.end())
8607  {
8608  it->second->deepCopy(message_);
8609  }
8610  else
8611  {
8612  if (this_->_q.size() >= this_->_maxDepth)
8613  {
8614  // We throw here so that heartbeats can be sent. The
8615  // exception will be handled internally only, and the
8616  // same Message will come back to try again. Make sure
8617  // to signal.
8618  this_->_lock.signalAll();
8619  throw MessageStreamFullException("Stream is currently full.");
8620  }
8621  if (!this_->_cache.empty())
8622  {
8623  this_->_cache.front().deepCopy(message_);
8624  this_->_q.push_back(this_->_cache.front());
8625  this_->_cache.pop_front();
8626  }
8627  else
8628  {
8629 #ifdef AMPS_USE_EMPLACE
8630  this_->_q.emplace_back(message_.deepCopy());
8631 #else
8632  this_->_q.push_back(message_.deepCopy());
8633 #endif
8634  }
8635  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8636  }
8637  }
8638  else
8639  {
8640  if (this_->_q.size() >= this_->_maxDepth)
8641  {
8642  // We throw here so that heartbeats can be sent. The exception
8643  // will be handled internally only, and the same Message will
8644  // come back to try again. Make sure to signal.
8645  this_->_lock.signalAll();
8646  throw MessageStreamFullException("Stream is currently full.");
8647  }
8648  if (!this_->_cache.empty())
8649  {
8650  this_->_cache.front().deepCopy(message_);
8651  this_->_q.push_back(this_->_cache.front());
8652  this_->_cache.pop_front();
8653  }
8654  else
8655  {
8656 #ifdef AMPS_USE_EMPLACE
8657  this_->_q.emplace_back(message_.deepCopy());
8658 #else
8659  this_->_q.push_back(message_.deepCopy());
8660 #endif
8661  }
8662  if (message_.getCommandEnum() == Message::Command::Publish &&
8663  this_->_client.isValid() && this_->_client.getAutoAck() &&
8664  !message_.getLeasePeriod().empty() &&
8665  !message_.getBookmark().empty())
8666  {
8667  message_.setIgnoreAutoAck();
8668  }
8669  }
8670  }
8671  this_->_lock.signalAll();
8672  }
8673  };
8674  inline MessageStream::MessageStream(void)
8675  {
8676  }
8677  inline MessageStream::MessageStream(const Client& client_)
8678  : _body(new MessageStreamImpl(client_))
8679  {
8680  }
8681  inline void MessageStream::iterator::advance(void)
8682  {
8683  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8684  }
8685  inline MessageStream::operator MessageHandler(void)
8686  {
8687  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8688  }
8689  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8690  {
8691  MessageStream result;
8692  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8693  {
8694  result._body = (MessageStreamImpl*)(handler_._userData);
8695  }
8696  return result;
8697  }
8698 
8699  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8700  const std::string& queryId_)
8701  {
8702  _body->setSOWOnly(commandId_, queryId_);
8703  }
8704  inline void MessageStream::setSubscription(const std::string& subId_,
8705  const std::string& commandId_,
8706  const std::string& queryId_)
8707  {
8708  _body->setSubscription(subId_, commandId_, queryId_);
8709  }
8710  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8711  const std::string& queryId_)
8712  {
8713  _body->setStatsOnly(commandId_, queryId_);
8714  }
8715  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8716  unsigned acks_)
8717  {
8718  _body->setAcksOnly(commandId_, acks_);
8719  }
8720  inline MessageStream MessageStream::timeout(unsigned timeout_)
8721  {
8722  _body->timeout(timeout_);
8723  return *this;
8724  }
8726  {
8727  _body->conflate();
8728  return *this;
8729  }
8730  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8731  {
8732  _body->maxDepth(maxDepth_);
8733  return *this;
8734  }
8735  inline unsigned MessageStream::getMaxDepth(void) const
8736  {
8737  return _body->getMaxDepth();
8738  }
8739  inline unsigned MessageStream::getDepth(void) const
8740  {
8741  return _body->getDepth();
8742  }
8743 
8744  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8745  {
8746  return *(_pEmptyMessageStream.get());
8747  }
8748 
8750  {
8751  // If the command is sow and has a sub_id, OR
8752  // if the command has a replace option, return the existing
8753  // messagestream, don't create a new one.
8754  ClientImpl& body = _body.get();
8755  Message& message = command_.getMessage();
8756  Field subId = message.getSubscriptionId();
8757  unsigned ackTypes = message.getAckTypeEnum();
8758  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8759  if (useExistingHandler)
8760  {
8761  // Try to find the existing message handler.
8762  if (!subId.empty())
8763  {
8764  MessageHandler existingHandler;
8765  if (body._routes.getRoute(subId, existingHandler))
8766  {
8767  // we found an existing handler. It might not be a message stream, but that's okay.
8768  body.executeAsync(command_, existingHandler, false);
8769  return MessageStream::fromExistingHandler(existingHandler);
8770  }
8771  }
8772  // fall through; we'll a new handler altogether.
8773  }
8774  // Make sure something will be returned to the stream or use the empty one
8775  // Check that: it's a command that doesn't normally return data, and there
8776  // are no acks requested for the cmd id
8777  Message::Command::Type command = message.getCommandEnum();
8778  if ((command & Message::Command::NoDataCommands)
8779  && (ackTypes == Message::AckType::Persisted
8780  || ackTypes == Message::AckType::None))
8781  {
8782  executeAsync(command_, MessageHandler());
8783  if (!body._pEmptyMessageStream)
8784  {
8785  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
8786  body._pEmptyMessageStream.get()->_body->close();
8787  }
8788  return body.getEmptyMessageStream();
8789  }
8790  MessageStream stream(*this);
8791  if (body.getDefaultMaxDepth())
8792  {
8793  stream.maxDepth(body.getDefaultMaxDepth());
8794  }
8795  MessageHandler handler = stream.operator MessageHandler();
8796  std::string commandID = body.executeAsync(command_, handler, false);
8797  if (command_.hasStatsAck())
8798  {
8799  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
8800  }
8801  else if (command_.isSow())
8802  {
8803  if (command_.getAckTypeEnum() & Message::AckType::Completed)
8804  {
8805  stream.setAcksOnly(commandID,
8806  ackTypes);
8807  }
8808  else
8809  {
8810  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
8811  }
8812  }
8813  else if (command_.isSubscribe())
8814  {
8815  stream.setSubscription(commandID,
8816  command_.getMessage().getCommandId(),
8817  command_.getMessage().getQueryId());
8818  }
8819  else
8820  {
8821  // Persisted acks for writes don't come back with command id
8822  if (command == Message::Command::Publish ||
8823  command == Message::Command::DeltaPublish ||
8824  command == Message::Command::SOWDelete)
8825  {
8826  stream.setAcksOnly(commandID,
8827  ackTypes & (unsigned)~Message::AckType::Persisted);
8828  }
8829  else
8830  {
8831  stream.setAcksOnly(commandID, ackTypes);
8832  }
8833  }
8834  return stream;
8835  }
8836 
8837 // This is here because it uses api from Client.
8838  inline void Message::ack(const char* options_) const
8839  {
8840  ClientImpl* pClient = _body.get().clientImpl();
8841  Message::Field bookmark = getBookmark();
8842  if (pClient && bookmark.len() &&
8843  !pClient->getAutoAck())
8844  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
8845  {
8846  pClient->ack(getTopic(), bookmark, options_);
8847  }
8848  }
8849 }// end namespace AMPS
8850 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:744
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:695
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5085
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6763
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6737
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:942
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5314
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8110
void startTimer()
Definition: ampsplusplus.hpp:6726
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6268
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1086
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8725
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5342
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:556
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:755
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7017
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:920
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7385
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5146
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5202
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6057
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:785
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1058
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:669
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7394
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7253
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6008
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5438
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1335
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5703
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5217
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:721
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5454
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7108
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7337
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5188
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6790
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:564
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1306
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8147
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7405
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5590
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4989
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:656
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7036
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:898
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7046
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5273
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5980
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1149
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7367
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8735
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5248
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5303
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1450
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6462
Success.
Definition: amps.h:209
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1032
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5680
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8192
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6104
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:199
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5286
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4982
amps_result
Return values from amps_xxx functions.
Definition: amps.h:204
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5491
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8749
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:630
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5446
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7419
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:947
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5177
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1494
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:825
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5752
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1239
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:688
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5138
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1094
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6416
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5068
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6992
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:714
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7027
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7100
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:579
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:847
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7115
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5539
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:662
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5402
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5815
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:571
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6903
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1049
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1490
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6825
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:793
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1260
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1044
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5920
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7056
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:588
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:806
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5771
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1026
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5157
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8120
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7265
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1039
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1080
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6306
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1180
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8129
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1320
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5394
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1277
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7326
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1370
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6701
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1478
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5386
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:701
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6896
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:596
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5414
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6959
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8140
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7316
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5657
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6393
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:878
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4944
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6639
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1269
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:833
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5636
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7145
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:621
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6598
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5790
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6069
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7308
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5938
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1228
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6562
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:862
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6883
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1352
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5469
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:734
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5364
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8184
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5373
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7241
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1428
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8196
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:682
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5356
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1212
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8203
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:203
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6495
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6953
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5873
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4936
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1344
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:799
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7078
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:727
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:812
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7089
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:656
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1298
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:675
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8730
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5255
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6142
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8739
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:233
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5164
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6929
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:853
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5224
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:884
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1248
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7067
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:708
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6870
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8157
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5483
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6517
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7349
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:638
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7298
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5514
Definition: ampsplusplus.hpp:102
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5123
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:995
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1290
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1400
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1156
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:778
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6163
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5563
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6966
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5729
The client and server are disconnected.
Definition: amps.h:237
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6846
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5902
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8720
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6204
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5130
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:468
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6354
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5841
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6029
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:608
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7179
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:766
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6678
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5000
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7376
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7289
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6236