AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.4
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43  #include <inttypes.h>
44 #endif
45 #if defined(sun)
46  #include <sys/atomic.h>
47 #endif
48 #include "amps/BookmarkStore.hpp"
49 #include "amps/MessageRouter.hpp"
50 #include "amps/util.hpp"
51 #include "amps/ampscrc.hpp"
52 #if __cplusplus >= 201100L || _MSC_VER >= 1900
53  #include <atomic>
54 #endif
55 
56 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
57  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
58 #endif
59 
64 
65 
72 
83 
84 // For StoreBuffer implementations
85 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
86 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
89 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
90 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
91 #define AMPS_DEFAULT_TOP_N -1
92 #define AMPS_DEFAULT_BATCH_SIZE 10
93 #define AMPS_NUMBER_BUFFER_LEN 20
94 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
95 
96 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
97  #define AMPS_X64 1
98 #endif
99 
100 static thread_local AMPS::Message publishStoreMessage;
101 
102 namespace AMPS
103 {
104 
105  typedef std::map<std::string, std::string> ConnectionInfo;
106 
107  template<class Type>
108  inline std::string asString(Type x_)
109  {
110  std::ostringstream os;
111  os << x_;
112  return os.str();
113  }
114 
115  inline
116  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
117  {
118  size_t pos = AMPS_NUMBER_BUFFER_LEN;
119  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
120  {
121  if (seqNo_ > 0)
122  {
123  buf_[--pos] = (char)(seqNo_ % 10 + '0');
124  seqNo_ /= 10;
125  }
126  }
127  return pos;
128  }
129 
130 #ifdef _WIN32
131  inline
132  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
133  {
134  size_t pos = AMPS_NUMBER_BUFFER_LEN;
135  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
136  {
137  if (seqNo_ > 0)
138  {
139  buf_[--pos] = (char)(seqNo_ % 10 + '0');
140  seqNo_ /= 10;
141  }
142  }
143  return pos;
144  }
145 #endif
146 
150  class Reason
151  {
152  public:
153  static const char* duplicate()
154  {
155  return "duplicate";
156  }
157  static const char* badFilter()
158  {
159  return "bad filter";
160  }
161  static const char* badRegexTopic()
162  {
163  return "bad regex topic";
164  }
165  static const char* subscriptionAlreadyExists()
166  {
167  return "subscription already exists";
168  }
169  static const char* nameInUse()
170  {
171  return "name in use";
172  }
173  static const char* authFailure()
174  {
175  return "auth failure";
176  }
177  static const char* notEntitled()
178  {
179  return "not entitled";
180  }
181  static const char* authDisabled()
182  {
183  return "authentication disabled";
184  }
185  static const char* subidInUse()
186  {
187  return "subid in use";
188  }
189  static const char* noTopic()
190  {
191  return "no topic";
192  }
193  };
194 
204  {
205  public:
206  virtual ~ExceptionListener() {;}
207  virtual void exceptionThrown(const std::exception&) const {;}
208  };
209 
211 
212 
213 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
214  try\
215  {\
216  x;\
217  }\
218  catch (std::exception& ex_)\
219  {\
220  try\
221  {\
222  _exceptionListener->exceptionThrown(ex_);\
223  }\
224  catch(...)\
225  {\
226  ;\
227  }\
228  }
229  /*
230  * Note : we don't attempt to trap non std::exception exceptions
231  * here because doing so interferes with pthread_exit on some OSes.
232  catch (...)\
233  {\
234  try\
235  {\
236  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
237  "An unhandled exception of unknown type was thrown by "\
238  "the registered handler.", AMPS_E_USAGE));\
239  }\
240  catch(...)\
241  {\
242  ;\
243  }\
244  }*/
245 #ifdef _WIN32
246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
247  try\
248  {\
249  while(me->_connected)\
250  {\
251  try\
252  {\
253  x;\
254  break;\
255  }\
256  catch(MessageStreamFullException&)\
257  {\
258  try\
259  {\
260  me->checkAndSendHeartbeat(false);\
261  }\
262  catch (std::exception& ex_)\
263  {\
264  try\
265  {\
266  me->_exceptionListener->exceptionThrown(ex_);\
267  }\
268  catch(...)\
269  {\
270  ;\
271  }\
272  break;\
273  }\
274  }\
275  }\
276  }\
277  catch (std::exception& ex_)\
278  {\
279  try\
280  {\
281  me->_exceptionListener->exceptionThrown(ex_);\
282  }\
283  catch(...)\
284  {\
285  ;\
286  }\
287  }
288  /*
289  * Note : we don't attempt to trap non std::exception exceptions
290  * here because doing so interferes with pthread_exit on some OSes.
291  catch (...)\
292  {\
293  try\
294  {\
295  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
296  "An unhandled exception of unknown type was thrown by "\
297  "the registered handler.", AMPS_E_USAGE));\
298  }\
299  catch(...)\
300  {\
301  ;\
302  }\
303  }*/
304 
305 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
306  while(me->_connected)\
307  {\
308  try\
309  {\
310  x;\
311  break;\
312  }\
313  catch(MessageStreamFullException&)\
314  {\
315  try\
316  {\
317  me->checkAndSendHeartbeat(false);\
318  }\
319  catch (std::exception& ex_)\
320  {\
321  try\
322  {\
323  me->_exceptionListener->exceptionThrown(ex_);\
324  }\
325  catch(...)\
326  {\
327  ;\
328  }\
329  break;\
330  }\
331  }\
332  }
333 #else
334 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
335  try\
336  {\
337  while(me->_connected)\
338  {\
339  try\
340  {\
341  x;\
342  break;\
343  }\
344  catch(MessageStreamFullException& ex_)\
345  {\
346  try\
347  {\
348  me->checkAndSendHeartbeat(false);\
349  }\
350  catch (std::exception& ex_)\
351  {\
352  try\
353  {\
354  me->_exceptionListener->exceptionThrown(ex_);\
355  }\
356  catch(...)\
357  {\
358  ;\
359  }\
360  break;\
361  }\
362  }\
363  }\
364  }\
365  catch (std::exception& ex_)\
366  {\
367  try\
368  {\
369  me->_exceptionListener->exceptionThrown(ex_);\
370  }\
371  catch(...)\
372  {\
373  ;\
374  }\
375  }
376  /*
377  * Note : we don't attempt to trap non std::exception exceptions
378  * here because doing so interferes with pthread_exit on some OSes.
379  catch (...)\
380  {\
381  try\
382  {\
383  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
384  "An unhandled exception of unknown type was thrown by "\
385  "the registered handler.", AMPS_E_USAGE));\
386  }\
387  catch(...)\
388  {\
389  ;\
390  }\
391  }*/
392 
393 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
394  while(me->_connected)\
395  {\
396  try\
397  {\
398  x;\
399  break;\
400  }\
401  catch(MessageStreamFullException& ex_)\
402  {\
403  try\
404  {\
405  me->checkAndSendHeartbeat(false);\
406  }\
407  catch (std::exception& ex_)\
408  {\
409  try\
410  {\
411  me->_exceptionListener->exceptionThrown(ex_);\
412  }\
413  catch(...)\
414  {\
415  ;\
416  }\
417  break;\
418  }\
419  }\
420  }
421 #endif
422 
423 #define AMPS_UNHANDLED_EXCEPTION(ex) \
424  try\
425  {\
426  _exceptionListener->exceptionThrown(ex);\
427  }\
428  catch(...)\
429  {\
430  ;\
431  }
432 
433 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
434  try\
435  {\
436  me->_exceptionListener->exceptionThrown(ex);\
437  }\
438  catch(...)\
439  {\
440  ;\
441  }
442 
443 
444  class Client;
445 
470 
471  class Command
472  {
473  Message _message;
474  unsigned _timeout;
475  unsigned _batchSize;
476  unsigned _flags;
477  static const unsigned Subscribe = 1;
478  static const unsigned SOW = 2;
479  static const unsigned NeedsSequenceNumber = 4;
480  static const unsigned ProcessedAck = 8;
481  static const unsigned StatsAck = 16;
482  void init(Message::Command::Type command_)
483  {
484  _timeout = 0;
485  _batchSize = 0;
486  _flags = 0;
487  _message.reset();
488  _message.setCommandEnum(command_);
489  _setIds();
490  }
491  void init(const std::string& command_)
492  {
493  _timeout = 0;
494  _batchSize = 0;
495  _flags = 0;
496  _message.reset();
497  _message.setCommand(command_);
498  _setIds();
499  }
500  void init(const char* command_, size_t commandLen_)
501  {
502  _timeout = 0;
503  _batchSize = 0;
504  _flags = 0;
505  _message.reset();
506  _message.setCommand(command_, commandLen_);
507  _setIds();
508  }
509  void _setIds(void)
510  {
511  Message::Command::Type command = _message.getCommandEnum();
512  if (!(command & Message::Command::NoDataCommands))
513  {
514  _message.newCommandId();
515  if (command == Message::Command::Subscribe ||
516  command == Message::Command::SOWAndSubscribe ||
517  command == Message::Command::DeltaSubscribe ||
518  command == Message::Command::SOWAndDeltaSubscribe)
519  {
520  _message.setSubscriptionId(_message.getCommandId());
521  _flags |= Subscribe;
522  }
523  if (command == Message::Command::SOW
524  || command == Message::Command::SOWAndSubscribe
525  || command == Message::Command::SOWAndDeltaSubscribe)
526  {
527  _message.setQueryID(_message.getCommandId());
528  if (_batchSize == 0)
529  {
530  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
531  }
532  if (command == Message::Command::SOW)
533  {
534  _flags |= SOW;
535  }
536  }
537  _flags |= ProcessedAck;
538  }
539  else if (command == Message::Command::SOWDelete)
540  {
541  _message.newCommandId();
542  _flags |= ProcessedAck;
543  _flags |= NeedsSequenceNumber;
544  }
545  else if (command == Message::Command::Publish
546  || command == Message::Command::DeltaPublish)
547  {
548  _flags |= NeedsSequenceNumber;
549  }
550  else if (command == Message::Command::StopTimer)
551  {
552  _message.newCommandId();
553  }
554  }
555  public:
559  Command(const std::string& command_)
560  {
561  init(command_);
562  }
567  Command(const char* command_, size_t commandLen_)
568  {
569  init(command_, commandLen_);
570  }
574  Command(Message::Command::Type command_)
575  {
576  init(command_);
577  }
578 
582  Command& reset(const std::string& command_)
583  {
584  init(command_);
585  return *this;
586  }
591  Command& reset(const char* command_, size_t commandLen_)
592  {
593  init(command_, commandLen_);
594  return *this;
595  }
599  Command& reset(Message::Command::Type command_)
600  {
601  init(command_);
602  return *this;
603  }
611  Command& setSowKey(const std::string& sowKey_)
612  {
613  _message.setSowKey(sowKey_);
614  return *this;
615  }
624  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
625  {
626  _message.setSowKey(sowKey_, sowKeyLen_);
627  return *this;
628  }
641  Command& setSowKeys(const std::string& sowKeys_)
642  {
643  _message.setSowKeys(sowKeys_);
644  return *this;
645  }
659  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
660  {
661  _message.setSowKeys(sowKeys_, sowKeysLen_);
662  return *this;
663  }
665  Command& setCommandId(const std::string& cmdId_)
666  {
667  _message.setCommandId(cmdId_);
668  return *this;
669  }
672  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
673  {
674  _message.setCommandId(cmdId_, cmdIdLen_);
675  return *this;
676  }
678  Command& setTopic(const std::string& topic_)
679  {
680  _message.setTopic(topic_);
681  return *this;
682  }
685  Command& setTopic(const char* topic_, size_t topicLen_)
686  {
687  _message.setTopic(topic_, topicLen_);
688  return *this;
689  }
691  Command& setFilter(const std::string& filter_)
692  {
693  _message.setFilter(filter_);
694  return *this;
695  }
698  Command& setFilter(const char* filter_, size_t filterLen_)
699  {
700  _message.setFilter(filter_, filterLen_);
701  return *this;
702  }
704  Command& setOrderBy(const std::string& orderBy_)
705  {
706  _message.setOrderBy(orderBy_);
707  return *this;
708  }
711  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
712  {
713  _message.setOrderBy(orderBy_, orderByLen_);
714  return *this;
715  }
717  Command& setSubId(const std::string& subId_)
718  {
719  _message.setSubscriptionId(subId_);
720  return *this;
721  }
724  Command& setSubId(const char* subId_, size_t subIdLen_)
725  {
726  _message.setSubscriptionId(subId_, subIdLen_);
727  return *this;
728  }
730  Command& setQueryId(const std::string& queryId_)
731  {
732  _message.setQueryId(queryId_);
733  return *this;
734  }
737  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
738  {
739  _message.setQueryId(queryId_, queryIdLen_);
740  return *this;
741  }
747  Command& setBookmark(const std::string& bookmark_)
748  {
749  _message.setBookmark(bookmark_);
750  return *this;
751  }
758  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
759  {
760  _message.setBookmark(bookmark_, bookmarkLen_);
761  return *this;
762  }
769  Command& setCorrelationId(const std::string& correlationId_)
770  {
771  _message.setCorrelationId(correlationId_);
772  return *this;
773  }
781  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
782  {
783  _message.setCorrelationId(correlationId_, correlationIdLen_);
784  return *this;
785  }
788  Command& setOptions(const std::string& options_)
789  {
790  _message.setOptions(options_);
791  return *this;
792  }
796  Command& setOptions(const char* options_, size_t optionsLen_)
797  {
798  _message.setOptions(options_, optionsLen_);
799  return *this;
800  }
802  Command& setSequence(const std::string& seq_)
803  {
804  _message.setSequence(seq_);
805  return *this;
806  }
809  Command& setSequence(const char* seq_, size_t seqLen_)
810  {
811  _message.setSequence(seq_, seqLen_);
812  return *this;
813  }
815  Command& setSequence(const amps_uint64_t seq_)
816  {
817  std::ostringstream os;
818  os << seq_;
819  _message.setSequence(os.str());
820  return *this;
821  }
822  amps_uint64_t getSequence() const
823  {
824  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
825  }
828  Command& setData(const std::string& data_)
829  {
830  _message.setData(data_);
831  return *this;
832  }
836  Command& setData(const char* data_, size_t dataLen_)
837  {
838  _message.setData(data_, dataLen_);
839  return *this;
840  }
850  Command& setTimeout(unsigned timeout_)
851  {
852  _timeout = timeout_;
853  return *this;
854  }
856  Command& setTopN(unsigned topN_)
857  {
858  _message.setTopNRecordsReturned(topN_);
859  return *this;
860  }
865  Command& setBatchSize(unsigned batchSize_)
866  {
867  _message.setBatchSize(batchSize_);
868  _batchSize = batchSize_;
869  return *this;
870  }
881  Command& setExpiration(unsigned expiration_)
882  {
883  _message.setExpiration(expiration_);
884  return *this;
885  }
887  Command& addAckType(const std::string& ackType_)
888  {
889  _message.setAckType(_message.getAckType() + "," + ackType_);
890  if (ackType_ == "processed")
891  {
892  _flags |= ProcessedAck;
893  }
894  else if (ackType_ == "stats")
895  {
896  _flags |= StatsAck;
897  }
898  return *this;
899  }
901  Command& setAckType(const std::string& ackType_)
902  {
903  _message.setAckType(ackType_);
904  if (ackType_.find("processed") != std::string::npos)
905  {
906  _flags |= ProcessedAck;
907  }
908  else
909  {
910  _flags &= ~ProcessedAck;
911  }
912  if (ackType_.find("stats") != std::string::npos)
913  {
914  _flags |= StatsAck;
915  }
916  else
917  {
918  _flags &= ~StatsAck;
919  }
920  return *this;
921  }
923  Command& setAckType(unsigned ackType_)
924  {
925  _message.setAckTypeEnum(ackType_);
926  if (ackType_ & Message::AckType::Processed)
927  {
928  _flags |= ProcessedAck;
929  }
930  else
931  {
932  _flags &= ~ProcessedAck;
933  }
934  if (ackType_ & Message::AckType::Stats)
935  {
936  _flags |= StatsAck;
937  }
938  else
939  {
940  _flags &= ~StatsAck;
941  }
942  return *this;
943  }
945  std::string getAckType() const
946  {
947  return (std::string)(_message.getAckType());
948  }
950  unsigned getAckTypeEnum() const
951  {
952  return _message.getAckTypeEnum();
953  }
954 
955  Message& getMessage(void)
956  {
957  return _message;
958  }
959  unsigned getTimeout(void) const
960  {
961  return _timeout;
962  }
963  unsigned getBatchSize(void) const
964  {
965  return _batchSize;
966  }
967  bool isSubscribe(void) const
968  {
969  return _flags & Subscribe;
970  }
971  bool isSow(void) const
972  {
973  return (_flags & SOW) != 0;
974  }
975  bool hasProcessedAck(void) const
976  {
977  return (_flags & ProcessedAck) != 0;
978  }
979  bool hasStatsAck(void) const
980  {
981  return (_flags & StatsAck) != 0;
982  }
983  bool needsSequenceNumber(void) const
984  {
985  return (_flags & NeedsSequenceNumber) != 0;
986  }
987  };
988 
991  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
992 
993  class Message;
995 
999  {
1000  public:
1001  virtual ~Authenticator() {;}
1002 
1008  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
1016  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
1023  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
1024  };
1025 
1030  {
1031  public:
1032  virtual ~DefaultAuthenticator() {;}
1035  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1036  {
1037  return password_;
1038  }
1039 
1042  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1043  {
1044  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1045  }
1046 
1047  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1048 
1053  {
1054  static DefaultAuthenticator d;
1055  return d;
1056  }
1057  };
1058 
1062  {
1063  public:
1064 
1068  virtual void execute(Message& message_) = 0;
1069 
1070  virtual ~StoreReplayer() {;}
1071  };
1072 
1073  class Store;
1074 
1083  typedef bool (*PublishStoreResizeHandler)(Store store_,
1084  size_t size_,
1085  void* userData_);
1086 
1089  class StoreImpl : public RefBody
1090  {
1091  public:
1097  StoreImpl(bool errorOnPublishGap_ = false)
1098  : _resizeHandler(NULL)
1099  , _resizeHandlerData(NULL)
1100  , _errorOnPublishGap(errorOnPublishGap_)
1101  {;}
1102 
1107  virtual amps_uint64_t store(const Message& message_) = 0;
1108 
1115  virtual void discardUpTo(amps_uint64_t index_) = 0;
1116 
1121  virtual void replay(StoreReplayer& replayer_) = 0;
1122 
1130  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1131 
1136  virtual size_t unpersistedCount() const = 0;
1137 
1138  virtual ~StoreImpl() {;}
1139 
1148  virtual void flush(long timeout_) = 0;
1149 
1152  static inline size_t getUnsetPosition()
1153  {
1154  return AMPS_UNSET_INDEX;
1155  }
1156 
1159  static inline amps_uint64_t getUnsetSequence()
1160  {
1161  return AMPS_UNSET_SEQUENCE;
1162  }
1163 
1167  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1168 
1172  virtual amps_uint64_t getLastPersisted() = 0;
1173 
1183  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1184  void* userData_)
1185  {
1186  _resizeHandler = handler_;
1187  _resizeHandlerData = userData_;
1188  }
1189 
1190  inline virtual PublishStoreResizeHandler getResizeHandler() const
1191  {
1192  return _resizeHandler;
1193  }
1194 
1195  bool callResizeHandler(size_t newSize_);
1196 
1197  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1198  {
1199  _errorOnPublishGap = errorOnPublishGap_;
1200  }
1201 
1202  inline virtual bool getErrorOnPublishGap() const
1203  {
1204  return _errorOnPublishGap;
1205  }
1206 
1207  private:
1208  PublishStoreResizeHandler _resizeHandler;
1209  void* _resizeHandlerData;
1210  bool _errorOnPublishGap;
1211  };
1212 
1215  class Store
1216  {
1217  RefHandle<StoreImpl> _body;
1218  public:
1219  Store() {;}
1220  Store(StoreImpl* body_) : _body(body_) {;}
1221  Store(const Store& rhs) : _body(rhs._body) {;}
1222  Store& operator=(const Store& rhs)
1223  {
1224  _body = rhs._body;
1225  return *this;
1226  }
1227 
1231  amps_uint64_t store(const Message& message_)
1232  {
1233  return _body.get().store(message_);
1234  }
1235 
1242  void discardUpTo(amps_uint64_t index_)
1243  {
1244  _body.get().discardUpTo(index_);
1245  }
1246 
1251  void replay(StoreReplayer& replayer_)
1252  {
1253  _body.get().replay(replayer_);
1254  }
1255 
1263  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1264  {
1265  return _body.get().replaySingle(replayer_, index_);
1266  }
1267 
1272  size_t unpersistedCount() const
1273  {
1274  return _body.get().unpersistedCount();
1275  }
1276 
1280  bool isValid() const
1281  {
1282  return _body.isValid();
1283  }
1284 
1293  void flush(long timeout_ = 0)
1294  {
1295  return _body.get().flush(timeout_);
1296  }
1297 
1301  amps_uint64_t getLowestUnpersisted()
1302  {
1303  return _body.get().getLowestUnpersisted();
1304  }
1305 
1309  amps_uint64_t getLastPersisted()
1310  {
1311  return _body.get().getLastPersisted();
1312  }
1313 
1323  void setResizeHandler(PublishStoreResizeHandler handler_,
1324  void* userData_)
1325  {
1326  _body.get().setResizeHandler(handler_, userData_);
1327  }
1328 
1329  PublishStoreResizeHandler getResizeHandler() const
1330  {
1331  return _body.get().getResizeHandler();
1332  }
1333 
1338  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1339  {
1340  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1341  }
1342 
1347  inline bool getErrorOnPublishGap() const
1348  {
1349  return _body.get().getErrorOnPublishGap();
1350  }
1351 
1355  StoreImpl* get()
1356  {
1357  if (_body.isValid())
1358  {
1359  return &_body.get();
1360  }
1361  else
1362  {
1363  return NULL;
1364  }
1365  }
1366 
1367  };
1368 
1374  {
1375  public:
1376  virtual ~FailedWriteHandler() {;}
1383  virtual void failedWrite(const Message& message_,
1384  const char* reason_, size_t reasonLength_) = 0;
1385  };
1386 
1387 
1388  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1389  {
1390  if (_resizeHandler)
1391  {
1392  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1393  }
1394  return true;
1395  }
1396 
1403  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1404  void* data_)
1405  {
1406  long* timeoutp = (long*)data_;
1407  size_t count = store_.unpersistedCount();
1408  if (count == 0)
1409  {
1410  return false;
1411  }
1412  try
1413  {
1414  store_.flush(*timeoutp);
1415  }
1416 #ifdef _WIN32
1417  catch (const TimedOutException&)
1418 #else
1419  catch (const TimedOutException& e)
1420 #endif
1421  {
1422  return true;
1423  }
1424  return (count == store_.unpersistedCount());
1425  }
1426 
1432  {
1433  public:
1445  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1446  unsigned requestedAckTypes_,
1447  const AMPSException& exception_) = 0;
1448  };
1449 
1454  {
1455  public:
1456  virtual ~SubscriptionManager() {;}
1464  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1465  unsigned requestedAckTypes_) = 0;
1469  virtual void unsubscribe(const Message::Field& subId_) = 0;
1472  virtual void clear() = 0;
1476  virtual void resubscribe(Client& client_) = 0;
1481  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1482  {
1483  _failedResubscribeHandler = handler_;
1484  }
1485  protected:
1486  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1487  };
1488 
1492 
1494  {
1495  public:
1497  typedef enum { Disconnected = 0,
1498  Shutdown = 1,
1499  Connected = 2,
1500  LoggedOn = 4,
1501  PublishReplayed = 8,
1502  HeartbeatInitiated = 16,
1503  Resubscribed = 32,
1504  UNKNOWN = 16384
1505  } State;
1506 
1516  virtual void connectionStateChanged(State newState_) = 0;
1517  virtual ~ConnectionStateListener() {;}
1518  };
1519 
1520 
1521  class MessageStreamImpl;
1522  class MessageStream;
1523 
1524  typedef void(*DeferredExecutionFunc)(void*);
1525 
1526  class ClientImpl : public RefBody // -V553
1527  {
1528  // Class to wrap turning of Nagle for things like flush and logon
1529  class NoDelay
1530  {
1531  private:
1532  AMPS_SOCKET _socket;
1533  int _noDelay;
1534  char* _valuePtr;
1535 #ifdef _WIN32
1536  int _valueLen;
1537 #else
1538  socklen_t _valueLen;
1539 #endif
1540  public:
1541  NoDelay(amps_handle client_)
1542  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1543  {
1544  _valuePtr = (char*)&_noDelay;
1545  _socket = amps_client_get_socket(client_);
1546  if (_socket != AMPS_INVALID_SOCKET)
1547  {
1548  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1549  if (!_noDelay)
1550  {
1551  _noDelay = 1;
1552  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1553  }
1554  else
1555  {
1556  _socket = AMPS_INVALID_SOCKET;
1557  }
1558  }
1559  }
1560 
1561  ~NoDelay()
1562  {
1563  if (_socket != AMPS_INVALID_SOCKET)
1564  {
1565  _noDelay = 0;
1566  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1567  }
1568  }
1569  };
1570 
1571  friend class Client;
1572  protected:
1573  amps_handle _client;
1574  DisconnectHandler _disconnectHandler;
1575  enum GlobalCommandTypeHandlers : size_t
1576  {
1577  Publish = 0,
1578  SOW = 1,
1579  GroupBegin = 2,
1580  GroupEnd = 3,
1581  Heartbeat = 4,
1582  OOF = 5,
1583  Ack = 6,
1584  LastChance = 7,
1585  DuplicateMessage = 8,
1586  COUNT = 9
1587  };
1588  std::vector<MessageHandler> _globalCommandTypeHandlers;
1589  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1590  MessageRouter _routes;
1591  MessageRouter::RouteCache _routeCache;
1592  mutable Mutex _lock;
1593  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1594  amps_uint64_t _nameHashValue;
1595  BookmarkStore _bookmarkStore;
1596  Store _publishStore;
1597  bool _isRetryOnDisconnect;
1598  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1599 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1600  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1601 #else
1602  volatile amps_uint64_t _lastSentHaSequenceNumber;
1603 #endif
1604  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1605  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1606  VersionInfo _serverVersion;
1607  Timer _heartbeatTimer;
1608  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1609 
1610  // queue data
1611  int _queueAckTimeout;
1612  bool _isAutoAckEnabled;
1613  unsigned _ackBatchSize;
1614  unsigned _queuedAckCount;
1615  unsigned _defaultMaxDepth;
1616  struct QueueBookmarks
1617  {
1618  QueueBookmarks(const std::string& topic_)
1619  : _topic(topic_)
1620  , _oldestTime(0)
1621  , _bookmarkCount(0)
1622  {;}
1623  std::string _topic;
1624  std::string _data;
1625  amps_uint64_t _oldestTime;
1626  unsigned _bookmarkCount;
1627  };
1628  typedef amps_uint64_t topic_hash;
1629  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1630  TopicHashMap _topicHashMap;
1631 
1632  class ClientStoreReplayer : public StoreReplayer
1633  {
1634  ClientImpl* _client;
1635  public:
1636  unsigned _version;
1637  amps_result _res;
1638 
1639  ClientStoreReplayer()
1640  : _client(NULL), _version(0), _res(AMPS_E_OK)
1641  {}
1642 
1643  ClientStoreReplayer(ClientImpl* client_)
1644  : _client(client_), _version(0), _res(AMPS_E_OK)
1645  {}
1646 
1647  void setClient(ClientImpl* client_)
1648  {
1649  _client = client_;
1650  }
1651 
1652  void execute(Message& message_)
1653  {
1654  if (!_client)
1655  {
1656  throw CommandException("Can't replay without a client.");
1657  }
1658  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1659  AMPS_Sequence);
1660  if (index > _client->_lastSentHaSequenceNumber)
1661  {
1662  _client->_lastSentHaSequenceNumber = index;
1663  }
1664 
1665  _res = AMPS_E_OK;
1666  // Don't replay a queue cancel message after a reconnect.
1667  // Currently, the only messages that will have anything in options
1668  // are cancel messages.
1669  if (!message_.getCommand().empty() &&
1670  (!_client->_logonInProgress ||
1671  message_.getOptions().len() < 6))
1672  {
1673  _res = amps_client_send_with_version(_client->_client,
1674  message_.getMessage(),
1675  &_version);
1676  if (_res != AMPS_E_OK)
1677  {
1678  throw DisconnectedException("AMPS Server disconnected during replay");
1679  }
1680  }
1681  }
1682 
1683  };
1684  ClientStoreReplayer _replayer;
1685 
1686  class FailedWriteStoreReplayer : public StoreReplayer
1687  {
1688  ClientImpl* _parent;
1689  const char* _reason;
1690  size_t _reasonLength;
1691  size_t _replayCount;
1692  public:
1693  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1694  : _parent(parent),
1695  _reason(reason_),
1696  _reasonLength(reasonLength_),
1697  _replayCount(0)
1698  {;}
1699  void execute(Message& message_)
1700  {
1701  if (_parent->_failedWriteHandler)
1702  {
1703  ++_replayCount;
1704  _parent->_failedWriteHandler->failedWrite(message_,
1705  _reason, _reasonLength);
1706  }
1707  }
1708  size_t replayCount(void) const
1709  {
1710  return _replayCount;
1711  }
1712  };
1713 
1714  struct AckResponseImpl : public RefBody
1715  {
1716  std::string username, password, reason, status, bookmark, options;
1717  amps_uint64_t sequenceNo;
1718  amps_uint64_t nameHashValue;
1719  VersionInfo serverVersion;
1720 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1721  std::atomic<bool> responded;
1722  std::atomic<bool> abandoned;
1723 #else
1724  volatile bool responded;
1725  volatile bool abandoned;
1726 #endif
1727  unsigned connectionVersion;
1728  AckResponseImpl() :
1729  RefBody(),
1730  username(), password(), reason(), status(), bookmark(), options(),
1731  sequenceNo((amps_uint64_t)0),
1732  serverVersion(),
1733  responded(false),
1734  abandoned(false),
1735  connectionVersion(UINT_MAX) // Don't abandon if unsent AC-1329
1736  {
1737  }
1738  };
1739 
1740  class AckResponse
1741  {
1742  RefHandle<AckResponseImpl> _body;
1743  public:
1744  AckResponse() : _body(NULL) {;}
1745  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1746  static AckResponse create()
1747  {
1748  AckResponse r;
1749  r._body = new AckResponseImpl();
1750  return r;
1751  }
1752 
1753  const std::string& username()
1754  {
1755  return _body.get().username;
1756  }
1757  void setUsername(const char* data_, size_t len_)
1758  {
1759  if (data_)
1760  {
1761  _body.get().username.assign(data_, len_);
1762  }
1763  else
1764  {
1765  _body.get().username.clear();
1766  }
1767  }
1768  const std::string& password()
1769  {
1770  return _body.get().password;
1771  }
1772  void setPassword(const char* data_, size_t len_)
1773  {
1774  if (data_)
1775  {
1776  _body.get().password.assign(data_, len_);
1777  }
1778  else
1779  {
1780  _body.get().password.clear();
1781  }
1782  }
1783  const std::string& reason()
1784  {
1785  return _body.get().reason;
1786  }
1787  void setReason(const char* data_, size_t len_)
1788  {
1789  if (data_)
1790  {
1791  _body.get().reason.assign(data_, len_);
1792  }
1793  else
1794  {
1795  _body.get().reason.clear();
1796  }
1797  }
1798  const std::string& status()
1799  {
1800  return _body.get().status;
1801  }
1802  void setStatus(const char* data_, size_t len_)
1803  {
1804  if (data_)
1805  {
1806  _body.get().status.assign(data_, len_);
1807  }
1808  else
1809  {
1810  _body.get().status.clear();
1811  }
1812  }
1813  const std::string& bookmark()
1814  {
1815  return _body.get().bookmark;
1816  }
1817  void setBookmark(const Field& bookmark_)
1818  {
1819  if (!bookmark_.empty())
1820  {
1821  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1822  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1823  _body.get().sequenceNo);
1824  }
1825  else
1826  {
1827  _body.get().bookmark.clear();
1828  _body.get().sequenceNo = (amps_uint64_t)0;
1829  _body.get().nameHashValue = (amps_uint64_t)0;
1830  }
1831  }
1832  amps_uint64_t sequenceNo() const
1833  {
1834  return _body.get().sequenceNo;
1835  }
1836  amps_uint64_t nameHashValue() const
1837  {
1838  return _body.get().nameHashValue;
1839  }
1840  void setSequenceNo(const char* data_, size_t len_)
1841  {
1842  amps_uint64_t result = (amps_uint64_t)0;
1843  if (data_)
1844  {
1845  for (size_t i = 0; i < len_; ++i)
1846  {
1847  result *= (amps_uint64_t)10;
1848  result += (amps_uint64_t)(data_[i] - '0');
1849  }
1850  }
1851  _body.get().sequenceNo = result;
1852  }
1853  VersionInfo serverVersion() const
1854  {
1855  return _body.get().serverVersion;
1856  }
1857  void setServerVersion(const char* data_, size_t len_)
1858  {
1859  if (data_)
1860  {
1861  _body.get().serverVersion.setVersion(std::string(data_, len_));
1862  }
1863  }
1864  bool responded()
1865  {
1866  return _body.get().responded;
1867  }
1868  void setResponded()
1869  {
1870  _body.get().responded = true;
1871  }
1872  bool abandoned()
1873  {
1874  return _body.get().abandoned;
1875  }
1876  void setAbandoned()
1877  {
1878  if (_body.isValid())
1879  {
1880  _body.get().abandoned = true;
1881  }
1882  }
1883 
1884  void setConnectionVersion(unsigned connectionVersion)
1885  {
1886  _body.get().connectionVersion = connectionVersion;
1887  }
1888 
1889  unsigned getConnectionVersion()
1890  {
1891  return _body.get().connectionVersion;
1892  }
1893  void setOptions(const char* data_, size_t len_)
1894  {
1895  if (data_)
1896  {
1897  _body.get().options.assign(data_, len_);
1898  }
1899  else
1900  {
1901  _body.get().options.clear();
1902  }
1903  }
1904 
1905  const std::string& options()
1906  {
1907  return _body.get().options;
1908  }
1909 
1910  AckResponse& operator=(const AckResponse& rhs)
1911  {
1912  _body = rhs._body;
1913  return *this;
1914  }
1915  };
1916 
1917 
1918  typedef std::map<std::string, AckResponse> AckMap;
1919  AckMap _ackMap;
1920  Mutex _ackMapLock;
1921  DefaultExceptionListener _defaultExceptionListener;
1922  protected:
1923 
1924  struct DeferredExecutionRequest
1925  {
1926  DeferredExecutionRequest(DeferredExecutionFunc func_,
1927  void* userData_)
1928  : _func(func_),
1929  _userData(userData_)
1930  {;}
1931 
1932  DeferredExecutionFunc _func;
1933  void* _userData;
1934  };
1935  const ExceptionListener* _exceptionListener;
1936  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1937  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1938  volatile bool _connected;
1939  std::string _username;
1940  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1941  ConnectionStateListeners _connectionStateListeners;
1942  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1943  Mutex _deferredExecutionLock;
1944  DeferredExecutionList _deferredExecutionList;
1945  unsigned _heartbeatInterval;
1946  unsigned _readTimeout;
1947 
1948  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1949  {
1950  // If we disconnected before we got to notification, don't notify.
1951  // This should only be able to happen for Resubscribed, since the lock
1952  // is released to let the subscription manager run resubscribe so a
1953  // disconnect could be called before the change is broadcast.
1954  if (!_connected && newState_ > ConnectionStateListener::Connected)
1955  {
1956  return;
1957  }
1958  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1959  {
1960  AMPS_CALL_EXCEPTION_WRAPPER(
1961  (*it)->connectionStateChanged(newState_));
1962  }
1963  }
1964  unsigned processedAck(Message& message);
1965  unsigned persistedAck(Message& meesage);
1966  void lastChance(Message& message);
1967  void checkAndSendHeartbeat(bool force = false);
1968  virtual ConnectionInfo getConnectionInfo() const;
1969  static amps_result
1970  ClientImplMessageHandler(amps_handle message, void* userData);
1971  static void
1972  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1973  static amps_result
1974  ClientImplDisconnectHandler(amps_handle client, void* userData);
1975 
1976  void unsubscribeInternal(const std::string& id)
1977  {
1978  if (id.empty())
1979  {
1980  return;
1981  }
1982  // remove the handler first to avoid any more message delivery
1983  Message::Field subId;
1984  subId.assign(id.data(), id.length());
1985  _routes.removeRoute(subId);
1986  // Lock is already acquired
1987  if (_subscriptionManager)
1988  {
1989  // Have to unlock before calling into sub manager to avoid deadlock
1990  Unlock<Mutex> unlock(_lock);
1991  _subscriptionManager->unsubscribe(subId);
1992  }
1993  _message.reset();
1994  _message.setCommandEnum(Message::Command::Unsubscribe);
1995  _message.newCommandId();
1996  _message.setSubscriptionId(id);
1997  _sendWithoutRetry(_message);
1998  deferredExecution(&amps_noOpFn, NULL);
1999  }
2000 
2001  AckResponse syncAckProcessing(long timeout_, Message& message_,
2002  bool isHASubscribe_)
2003  {
2004  return syncAckProcessing(timeout_, message_,
2005  (amps_uint64_t)0, isHASubscribe_);
2006  }
2007 
2008  AckResponse syncAckProcessing(long timeout_, Message& message_,
2009  amps_uint64_t haSeq = (amps_uint64_t)0,
2010  bool isHASubscribe_ = false)
2011  {
2012  // inv: we already have _lock locked up.
2013  AckResponse ack = AckResponse::create();
2014  if (1)
2015  {
2016  Lock<Mutex> guard(_ackMapLock);
2017  _ackMap[message_.getCommandId()] = ack;
2018  }
2019  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
2020  if (ack.getConnectionVersion() == 0)
2021  {
2022  // Send failed
2023  throw DisconnectedException("Connection closed while waiting for response.");
2024  }
2025  bool timedOut = false;
2026  AMPS_START_TIMER(timeout_)
2027  while (!timedOut && !ack.responded() && !ack.abandoned())
2028  {
2029  if (timeout_)
2030  {
2031  timedOut = !_lock.wait(timeout_);
2032  // May have woken up early, check real time
2033  if (timedOut)
2034  {
2035  AMPS_RESET_TIMER(timedOut, timeout_);
2036  }
2037  }
2038  else
2039  {
2040  // Using a timeout version to ensure python can interrupt
2041  _lock.wait(1000);
2042  Unlock<Mutex> unlck(_lock);
2043  amps_invoke_waiting_function();
2044  }
2045  }
2046  if (ack.responded())
2047  {
2048  if (ack.status() != "failure")
2049  {
2050  if (message_.getCommand() == "logon")
2051  {
2052  amps_uint64_t ackSequence = ack.sequenceNo();
2053  if (_lastSentHaSequenceNumber < ackSequence)
2054  {
2055  _lastSentHaSequenceNumber = ackSequence;
2056  }
2057  if (_publishStore.isValid())
2058  {
2059  // If this throws, logon will fail and eitehr be
2060  // handled in HAClient/ServerChooser or by the caller
2061  // of logon.
2062  _publishStore.discardUpTo(ackSequence);
2063  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2064  {
2065  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2066  }
2067  }
2068  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2069  _nameHashValue = ack.nameHashValue();
2070  _serverVersion = ack.serverVersion();
2071  if (_bookmarkStore.isValid())
2072  {
2073  _bookmarkStore.setServerVersion(_serverVersion);
2074  }
2075  }
2076  if (_ackBatchSize)
2077  {
2078  const std::string& options = ack.options();
2079  size_t index = options.find_first_of("max_backlog=");
2080  if (index != std::string::npos)
2081  {
2082  unsigned data = 0;
2083  const char* c = options.c_str() + index + 12;
2084  while (*c && *c != ',')
2085  {
2086  data = (data * 10) + (unsigned)(*c++ -48);
2087  }
2088  if (_ackBatchSize > data)
2089  {
2090  _ackBatchSize = data;
2091  }
2092  }
2093  }
2094  return ack;
2095  }
2096  const size_t NotEntitled = 12;
2097  std::string ackReason = ack.reason();
2098  if (ackReason.length() == 0)
2099  {
2100  return ack; // none
2101  }
2102  if (ackReason.length() == NotEntitled &&
2103  ackReason[0] == 'n' &&
2104  message_.getUserId().len() == 0)
2105  {
2106  message_.assignUserId(_username);
2107  }
2108  message_.throwFor(_client, ackReason);
2109  }
2110  else // !ack.responded()
2111  {
2112  if (!ack.abandoned())
2113  {
2114  throw TimedOutException("timed out waiting for operation.");
2115  }
2116  else
2117  {
2118  throw DisconnectedException("Connection closed while waiting for response.");
2119  }
2120  }
2121  return ack;
2122  }
2123 
2124  void _cleanup(void)
2125  {
2126  if (!_client)
2127  {
2128  return;
2129  }
2130  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2131  amps_client_set_disconnect_handler(_client, NULL, 0L);
2132  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2133  _pEmptyMessageStream.reset(NULL);
2134  amps_client_destroy(_client);
2135  _client = NULL;
2136  }
2137 
2138  public:
2139 
2140  ClientImpl(const std::string& clientName)
2141  : _client(NULL), _name(clientName)
2142  , _isRetryOnDisconnect(true)
2143  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2144  , _badTimeToHASubscribe(0), _serverVersion()
2145  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2146  , _isAutoAckEnabled(false)
2147  , _ackBatchSize(0)
2148  , _queuedAckCount(0)
2149  , _defaultMaxDepth(0)
2150  , _connected(false)
2151  , _heartbeatInterval(0)
2152  , _readTimeout(0)
2153  {
2154  _replayer.setClient(this);
2155  _client = amps_client_create(clientName.c_str());
2157  (amps_handler)ClientImpl::ClientImplMessageHandler,
2158  this);
2160  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2161  this);
2163  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2164  this);
2165  _exceptionListener = &_defaultExceptionListener;
2166  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2167  {
2168 #ifdef AMPS_USE_EMPLACE
2169  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2170 #else
2171  _globalCommandTypeHandlers.push_back(MessageHandler());
2172 #endif
2173  }
2174  }
2175 
2176  virtual ~ClientImpl()
2177  {
2178  _cleanup();
2179  }
2180 
2181  const std::string& getName() const
2182  {
2183  return _name;
2184  }
2185 
2186  const std::string& getNameHash() const
2187  {
2188  return _nameHash;
2189  }
2190 
2191  const amps_uint64_t getNameHashValue() const
2192  {
2193  return _nameHashValue;
2194  }
2195 
2196  void setName(const std::string& name)
2197  {
2198  // This operation will fail if the client's
2199  // name is already set.
2200  amps_result result = amps_client_set_name(_client, name.c_str());
2201  if (result != AMPS_E_OK)
2202  {
2203  AMPSException::throwFor(_client, result);
2204  }
2205  _name = name;
2206  }
2207 
2208  const std::string& getLogonCorrelationData() const
2209  {
2210  return _logonCorrelationData;
2211  }
2212 
2213  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2214  {
2215  _logonCorrelationData = logonCorrelationData_;
2216  }
2217 
2218  size_t getServerVersion() const
2219  {
2220  return _serverVersion.getOldStyleVersion();
2221  }
2222 
2223  VersionInfo getServerVersionInfo() const
2224  {
2225  return _serverVersion;
2226  }
2227 
2228  const std::string& getURI() const
2229  {
2230  return _lastUri;
2231  }
2232 
2233  virtual void connect(const std::string& uri)
2234  {
2235  Lock<Mutex> l(_lock);
2236  _connect(uri);
2237  }
2238 
2239  virtual void _connect(const std::string& uri)
2240  {
2241  _lastUri = uri;
2242  amps_result result = amps_client_connect(_client, uri.c_str());
2243  if (result != AMPS_E_OK)
2244  {
2245  AMPSException::throwFor(_client, result);
2246  }
2247  _message.reset();
2248  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2249  _publishMessage.setCommandEnum(Message::Command::Publish);
2250  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2251  _beatMessage.setOptions("beat");
2252  _readMessage.setClientImpl(this);
2253  if (_queueAckTimeout)
2254  {
2255  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2256  if (result != AMPS_E_OK)
2257  {
2258  AMPSException::throwFor(_client, result);
2259  }
2260  }
2261  _connected = true;
2262  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2263  }
2264 
2265  void setDisconnected()
2266  {
2267  {
2268  Lock<Mutex> l(_lock);
2269  if (_connected)
2270  {
2271  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2272  }
2273  _connected = false;
2274  _heartbeatTimer.setTimeout(0.0);
2275  // AC-1189 AC-1329 AC-1337 We need acks cleared while lock is held,
2276  // but not for unsent commands.
2277  clearAcks(UINT_MAX-1);
2278  }
2279  amps_client_disconnect(_client);
2280  _routes.clear();
2281  }
2282 
2283  virtual void disconnect()
2284  {
2285  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2286  setDisconnected();
2287  // Abandon all acks, sent and unsent
2288  clearAcks(UINT_MAX);
2289  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2290  Lock<Mutex> l(_lock);
2291  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2292  }
2293 
2294  void clearAcks(unsigned failedVersion)
2295  {
2296  // Have to lock to prevent race conditions
2297  Lock<Mutex> guard(_ackMapLock);
2298  {
2299  // Go ahead and signal any waiters if they are around...
2300  std::vector<std::string> worklist;
2301  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2302  {
2303  if (i->second.getConnectionVersion() <= failedVersion)
2304  {
2305  i->second.setAbandoned();
2306  worklist.push_back(i->first);
2307  }
2308  }
2309 
2310  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2311  {
2312  _ackMap.erase(*j);
2313  }
2314  }
2315 
2316  _lock.signalAll();
2317  }
2318 
2319  int send(const Message& message)
2320  {
2321  Lock<Mutex> l(_lock);
2322  return _send(message);
2323  }
2324 
2325  void sendWithoutRetry(const Message& message_)
2326  {
2327  Lock<Mutex> l(_lock);
2328  // If we got here while logon was in progress, then we tried to send
2329  // while we were disconnected so throw DisconnectedException
2330  if (_logonInProgress)
2331  {
2332  throw DisconnectedException("The client has been disconnected.");
2333  }
2334  _sendWithoutRetry(message_);
2335  }
2336 
2337  void _sendWithoutRetry(const Message& message_)
2338  {
2339  amps_result result = amps_client_send(_client, message_.getMessage());
2340  if (result != AMPS_E_OK)
2341  {
2342  AMPSException::throwFor(_client, result);
2343  }
2344  }
2345 
2346  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2347  bool isHASubscribe_ = false)
2348  {
2349  // Lock is already acquired
2350  amps_result result = AMPS_E_RETRY;
2351 
2352  // Create a local reference to this message, as we'll need to hold on
2353  // to a reference to it in case reconnect occurs.
2354  Message localMessage = message;
2355  unsigned version = 0;
2356 
2357  while (result == AMPS_E_RETRY)
2358  {
2359  if (haSeq && _logonInProgress)
2360  {
2361  // If retrySend is disabled, do not wait for the reconnect
2362  // to finish, just throw.
2363  if (!_isRetryOnDisconnect)
2364  {
2365  AMPSException::throwFor(_client, AMPS_E_RETRY);
2366  }
2367  if (!_lock.wait(1000))
2368  {
2369  amps_invoke_waiting_function();
2370  }
2371  }
2372  else
2373  {
2374  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2375  (isHASubscribe_ && _badTimeToHASubscribe))
2376  {
2377  return (int)version;
2378  }
2379  // It's possible to get here out of order, but this way we'll
2380  // always send in order.
2381  if (haSeq > _lastSentHaSequenceNumber)
2382  {
2383  while (haSeq > _lastSentHaSequenceNumber + 1)
2384  {
2385  try
2386  {
2387  // Replayer updates _lastSentHaSsequenceNumber
2388  if (!_publishStore.replaySingle(_replayer,
2389  _lastSentHaSequenceNumber + 1))
2390  {
2391  //++_lastSentHaSequenceNumber;
2392  continue;
2393  }
2394  result = AMPS_E_OK;
2395  version = _replayer._version;
2396  }
2397 #ifdef _WIN32
2398  catch (const DisconnectedException&)
2399 #else
2400  catch (const DisconnectedException& e)
2401 #endif
2402  {
2403  result = _replayer._res;
2404  break;
2405  }
2406  }
2407  result = amps_client_send_with_version(_client,
2408  localMessage.getMessage(),
2409  &version);
2410  ++_lastSentHaSequenceNumber;
2411  }
2412  else
2413  {
2414  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2415  {
2416  while (_logonInProgress)
2417  {
2418  if (!_lock.wait(1000))
2419  {
2420  amps_invoke_waiting_function();
2421  }
2422  }
2423  }
2424  result = amps_client_send_with_version(_client,
2425  localMessage.getMessage(),
2426  &version);
2427  }
2428  if (result != AMPS_E_OK)
2429  {
2430  if (!isHASubscribe_ && !haSeq &&
2431  localMessage.getMessage() == message.getMessage())
2432  {
2433  localMessage = message.deepCopy();
2434  }
2435  if (_isRetryOnDisconnect)
2436  {
2437  Unlock<Mutex> u(_lock);
2438  result = amps_client_attempt_reconnect(_client, version);
2439  // If this is an HA publish or subscribe command, it was
2440  // stored first and will have already been replayed by the
2441  // store or sub manager after reconnect, so just return.
2442  if ((isHASubscribe_ || haSeq) &&
2443  result == AMPS_E_RETRY)
2444  {
2445  return (int)version;
2446  }
2447  }
2448  else
2449  {
2450  // retrySend is disabled so throw the error
2451  // from the send as an exception, do not retry.
2452  AMPSException::throwFor(_client, result);
2453  }
2454  }
2455  }
2456  if (result == AMPS_E_RETRY)
2457  {
2458  amps_invoke_waiting_function();
2459  }
2460  }
2461 
2462  if (result != AMPS_E_OK)
2463  {
2464  AMPSException::throwFor(_client, result);
2465  }
2466  return (int)version;
2467  }
2468 
2469  void addMessageHandler(const Field& commandId_,
2470  const AMPS::MessageHandler& messageHandler_,
2471  unsigned requestedAcks_, Message::Command::Type commandType_)
2472  {
2473  Lock<Mutex> lock(_lock);
2474  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2475  0, commandType_);
2476  }
2477 
2478  bool removeMessageHandler(const Field& commandId_)
2479  {
2480  Lock<Mutex> lock(_lock);
2481  return _routes.removeRoute(commandId_);
2482  }
2483 
2484  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2485  {
2486  Field id = message_.getCommandId();
2487  Field subId = message_.getSubscriptionId();
2488  Field qid = message_.getQueryId();
2489  bool isSubscribeOnly = false;
2490  bool replace = false;
2491  unsigned requestedAcks = message_.getAckTypeEnum();
2492  unsigned systemAddedAcks = Message::AckType::None;
2493  Message::Command::Type commandType = message_.getCommandEnum();
2494 
2495  switch (commandType)
2496  {
2497  case Message::Command::Subscribe:
2498  case Message::Command::DeltaSubscribe:
2499  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2500  isSubscribeOnly = true;
2501  // fall through
2502  case Message::Command::SOWAndSubscribe:
2503  case Message::Command::SOWAndDeltaSubscribe:
2504  if (id.empty())
2505  {
2506  id = message_.newCommandId().getCommandId();
2507  }
2508  else
2509  {
2510  while (!replace && id != subId && _routes.hasRoute(id))
2511  {
2512  id = message_.newCommandId().getCommandId();
2513  }
2514  }
2515  if (subId.empty())
2516  {
2517  message_.setSubscriptionId(id);
2518  subId = id;
2519  }
2520  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2521  {
2522  systemAddedAcks |= Message::AckType::Persisted;
2523  }
2524  // fall through
2525  case Message::Command::SOW:
2526  if (id.empty())
2527  {
2528  id = message_.newCommandId().getCommandId();
2529  }
2530  else
2531  {
2532  while (!replace && id != subId && _routes.hasRoute(id))
2533  {
2534  message_.newCommandId();
2535  if (qid == id)
2536  {
2537  qid = message_.getCommandId();
2538  message_.setQueryId(qid);
2539  }
2540  id = message_.getCommandId();
2541  }
2542  }
2543  if (!isSubscribeOnly)
2544  {
2545  if (qid.empty())
2546  {
2547  message_.setQueryID(id);
2548  qid = id;
2549  }
2550  else
2551  {
2552  while (!replace && qid != subId && qid != id
2553  && _routes.hasRoute(qid))
2554  {
2555  qid = message_.newQueryId().getQueryId();
2556  }
2557  }
2558  }
2559  systemAddedAcks |= Message::AckType::Processed;
2560  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2561  {
2562  int routesAdded = 0;
2563  Lock<Mutex> l(_lock);
2564  if (!subId.empty() && messageHandler_.isValid())
2565  {
2566  if (!_routes.hasRoute(subId))
2567  {
2568  ++routesAdded;
2569  }
2570  // This can replace a non-subscribe with a matching id
2571  // with a subscription but not another subscription.
2572  _routes.addRoute(subId, messageHandler_, requestedAcks,
2573  systemAddedAcks, commandType);
2574  }
2575  if (!isSubscribeOnly && !qid.empty()
2576  && messageHandler_.isValid() && qid != subId)
2577  {
2578  if (routesAdded == 0)
2579  {
2580  _routes.addRoute(qid, messageHandler_,
2581  requestedAcks, systemAddedAcks, commandType);
2582  }
2583  else
2584  {
2585  void* data = NULL;
2586  {
2587  Unlock<Mutex> u(_lock);
2588  data = amps_invoke_copy_route_function(
2589  messageHandler_.userData());
2590  }
2591  if (!data)
2592  {
2593  _routes.addRoute(qid, messageHandler_, requestedAcks,
2594  systemAddedAcks, commandType);
2595  }
2596  else
2597  {
2598  _routes.addRoute(qid,
2599  MessageHandler(messageHandler_.function(),
2600  data),
2601  requestedAcks, systemAddedAcks, commandType);
2602  }
2603  }
2604  ++routesAdded;
2605  }
2606  if (!id.empty() && messageHandler_.isValid()
2607  && requestedAcks & ~Message::AckType::Persisted
2608  && id != subId && id != qid)
2609  {
2610  if (routesAdded == 0)
2611  {
2612  _routes.addRoute(id, messageHandler_, requestedAcks,
2613  systemAddedAcks, commandType);
2614  }
2615  else
2616  {
2617  void* data = NULL;
2618  {
2619  Unlock<Mutex> u(_lock);
2620  data = amps_invoke_copy_route_function(
2621  messageHandler_.userData());
2622  }
2623  if (!data)
2624  {
2625  _routes.addRoute(id, messageHandler_, requestedAcks,
2626  systemAddedAcks, commandType);
2627  }
2628  else
2629  {
2630  _routes.addRoute(id,
2631  MessageHandler(messageHandler_.function(),
2632  data),
2633  requestedAcks,
2634  systemAddedAcks, commandType);
2635  }
2636  }
2637  ++routesAdded;
2638  }
2639  try
2640  {
2641  // We aren't adding to subscription manager, so this isn't
2642  // an HA subscribe.
2643  syncAckProcessing(timeout_, message_, 0, false);
2644  message_.setAckTypeEnum(requestedAcks);
2645  }
2646  catch (...)
2647  {
2648  _routes.removeRoute(message_.getQueryID());
2649  _routes.removeRoute(message_.getSubscriptionId());
2650  _routes.removeRoute(id);
2651  message_.setAckTypeEnum(requestedAcks);
2652  throw;
2653  }
2654  }
2655  break;
2656  // These are valid commands that are used as-is
2657  case Message::Command::Unsubscribe:
2658  case Message::Command::Heartbeat:
2659  case Message::Command::Logon:
2660  case Message::Command::StartTimer:
2661  case Message::Command::StopTimer:
2662  case Message::Command::SOWDelete:
2663  {
2664  Lock<Mutex> l(_lock);
2665  // if an ack is requested, it'll need a command ID.
2666  if (message_.getAckTypeEnum() != Message::AckType::None)
2667  {
2668  if (id.empty())
2669  {
2670  message_.newCommandId();
2671  id = message_.getCommandId();
2672  }
2673  if (messageHandler_.isValid())
2674  {
2675  _routes.addRoute(id, messageHandler_, requestedAcks,
2676  Message::AckType::None, commandType);
2677  }
2678  }
2679  _send(message_);
2680  }
2681  break;
2682  case Message::Command::DeltaPublish:
2683  case Message::Command::Publish:
2684  {
2685  bool useSync = message_.getFilter().len() > 0;
2686  Lock<Mutex> l(_lock);
2687  // if an ack is requested, it'll need a command ID.
2688  unsigned ackType = message_.getAckTypeEnum();
2689  if (ackType != Message::AckType::None
2690  || useSync)
2691  {
2692  if (id.empty())
2693  {
2694  message_.newCommandId();
2695  id = message_.getCommandId();
2696  }
2697  if (messageHandler_.isValid())
2698  {
2699  _routes.addRoute(id, messageHandler_, requestedAcks,
2700  Message::AckType::None, commandType);
2701  }
2702  }
2703  if (useSync)
2704  {
2705  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2706  syncAckProcessing(timeout_, message_, 0, false);
2707  }
2708  else
2709  {
2710  _send(message_);
2711  }
2712  }
2713  break;
2714  // These are things that shouldn't be sent (not meaningful)
2715  case Message::Command::GroupBegin:
2716  case Message::Command::GroupEnd:
2717  case Message::Command::OOF:
2718  case Message::Command::Ack:
2719  case Message::Command::Unknown:
2720  default:
2721  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2722  }
2723  message_.setAckTypeEnum(requestedAcks);
2724  return id;
2725  }
2726 
2727  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2728  {
2729  Lock<Mutex> l(_lock);
2730  _disconnectHandler = disconnectHandler;
2731  }
2732 
2733  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2734  {
2735  switch (command_[0])
2736  {
2737 #if 0 // Not currently implemented to avoid an extra branch in delivery
2738  case 'p':
2739  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2740  break;
2741  case 's':
2742  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2743  break;
2744 #endif
2745  case 'h':
2746  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2747  break;
2748 #if 0 // Not currently implemented to avoid an extra branch in delivery
2749  case 'g':
2750  if (command_[6] == 'b')
2751  {
2752  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2753  }
2754  else if (command_[6] == 'e')
2755  {
2756  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2757  }
2758  else
2759  {
2760  std::ostringstream os;
2761  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2762  throw CommandException(os.str());
2763  }
2764  break;
2765  case 'o':
2766  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2767  break;
2768 #endif
2769  case 'a':
2770  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2771  break;
2772  case 'l':
2773  case 'L':
2774  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2775  break;
2776  case 'd':
2777  case 'D':
2778  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2779  break;
2780  default:
2781  std::ostringstream os;
2782  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2783  throw CommandException(os.str());
2784  break;
2785  }
2786  }
2787 
2788  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2789  {
2790  switch (command_)
2791  {
2792 #if 0 // Not currently implemented to avoid an extra branch in delivery
2793  case Message::Command::Publish:
2794  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2795  break;
2796  case Message::Command::SOW:
2797  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2798  break;
2799 #endif
2800  case Message::Command::Heartbeat:
2801  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2802  break;
2803 #if 0 // Not currently implemented to avoid an extra branch in delivery
2804  case Message::Command::GroupBegin:
2805  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2806  break;
2807  case Message::Command::GroupEnd:
2808  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2809  break;
2810  case Message::Command::OOF:
2811  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2812  break;
2813 #endif
2814  case Message::Command::Ack:
2815  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2816  break;
2817  default:
2818  unsigned bits = 0;
2819  unsigned command = command_;
2820  while (command > 0)
2821  {
2822  ++bits;
2823  command >>= 1;
2824  }
2825  char errBuf[128];
2826  AMPS_snprintf(errBuf, sizeof(errBuf),
2827  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2828  CommandConstants<0>::Lengths[bits],
2829  CommandConstants<0>::Values[bits]);
2830  throw CommandException(errBuf);
2831  break;
2832  }
2833  }
2834 
2835  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2836  {
2837  _globalCommandTypeHandlers[handlerType_] = handler_;
2838  }
2839 
2840  void setFailedWriteHandler(FailedWriteHandler* handler_)
2841  {
2842  Lock<Mutex> l(_lock);
2843  _failedWriteHandler.reset(handler_);
2844  }
2845 
2846  void setPublishStore(const Store& publishStore_)
2847  {
2848  Lock<Mutex> l(_lock);
2849  if (_connected)
2850  {
2851  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2852  }
2853  _publishStore = publishStore_;
2854  }
2855 
2856  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2857  {
2858  Lock<Mutex> l(_lock);
2859  if (_connected)
2860  {
2861  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2862  }
2863  _bookmarkStore = bookmarkStore_;
2864  }
2865 
2866  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2867  {
2868  Lock<Mutex> l(_lock);
2869  _subscriptionManager.reset(subscriptionManager_);
2870  }
2871 
2872  SubscriptionManager* getSubscriptionManager() const
2873  {
2874  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2875  }
2876 
2877  DisconnectHandler getDisconnectHandler() const
2878  {
2879  return _disconnectHandler;
2880  }
2881 
2882  MessageHandler getDuplicateMessageHandler() const
2883  {
2884  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2885  }
2886 
2887  FailedWriteHandler* getFailedWriteHandler() const
2888  {
2889  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2890  }
2891 
2892  Store getPublishStore() const
2893  {
2894  return _publishStore;
2895  }
2896 
2897  BookmarkStore getBookmarkStore() const
2898  {
2899  return _bookmarkStore;
2900  }
2901 
2902  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2903  {
2904  if (!_publishStore.isValid())
2905  {
2906  Lock<Mutex> l(_lock);
2907  _publishMessage.assignTopic(topic_, topicLen_);
2908  _publishMessage.assignData(data_, dataLen_);
2909  _send(_publishMessage);
2910  return 0;
2911  }
2912  else
2913  {
2914  publishStoreMessage.reset();
2915  publishStoreMessage.setCommandEnum(Message::Command::Publish);
2916  return _publish(topic_, topicLen_, data_, dataLen_);
2917  }
2918  }
2919 
2920  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2921  size_t dataLen_, unsigned long expiration_)
2922  {
2923  if (!_publishStore.isValid())
2924  {
2925  Lock<Mutex> l(_lock);
2926  _publishMessage.assignTopic(topic_, topicLen_);
2927  _publishMessage.assignData(data_, dataLen_);
2928  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2929  size_t pos = convertToCharArray(exprBuf, expiration_);
2930  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2931  _send(_publishMessage);
2932  _publishMessage.assignExpiration(NULL, 0);
2933  return 0;
2934  }
2935  else
2936  {
2937  publishStoreMessage.reset();
2938  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2939  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2940  publishStoreMessage.setCommandEnum(Message::Command::Publish)
2941  .assignExpiration(exprBuf + exprPos,
2942  AMPS_NUMBER_BUFFER_LEN - exprPos);
2943  return _publish(topic_, topicLen_, data_, dataLen_);
2944  }
2945  }
2946 
2947  class FlushAckHandler : ConnectionStateListener
2948  {
2949  private:
2950  ClientImpl* _pClient;
2951  Field _cmdId;
2952 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2953  std::atomic<bool> _acked;
2954  std::atomic<bool> _disconnected;
2955 #else
2956  volatile bool _acked;
2957  volatile bool _disconnected;
2958 #endif
2959  public:
2960  FlushAckHandler(ClientImpl* pClient_)
2961  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2962  {
2963  pClient_->addConnectionStateListener(this);
2964  }
2965  ~FlushAckHandler()
2966  {
2967  _pClient->removeConnectionStateListener(this);
2968  _pClient->removeMessageHandler(_cmdId);
2969  _cmdId.clear();
2970  }
2971  void setCommandId(const Field& cmdId_)
2972  {
2973  _cmdId.deepCopy(cmdId_);
2974  }
2975  void invoke(const Message&)
2976  {
2977  _acked = true;
2978  }
2979  void connectionStateChanged(State state_)
2980  {
2981  if (state_ <= Shutdown)
2982  {
2983  _disconnected = true;
2984  }
2985  }
2986  bool acked()
2987  {
2988  return _acked;
2989  }
2990  bool done()
2991  {
2992  return _acked || _disconnected;
2993  }
2994  };
2995 
2996  void publishFlush(long timeout_, unsigned ackType_)
2997  {
2998  static const char* processed = "processed";
2999  static const size_t processedLen = strlen(processed);
3000  static const char* persisted = "persisted";
3001  static const size_t persistedLen = strlen(persisted);
3002  static const char* flush = "flush";
3003  static const size_t flushLen = strlen(flush);
3004  static VersionInfo minPersisted("5.3.3.0");
3005  static VersionInfo minFlush("4");
3006  if (ackType_ != Message::AckType::Processed
3007  && ackType_ != Message::AckType::Persisted)
3008  {
3009  throw CommandException("Flush can only be used with processed or persisted acks.");
3010  }
3011  FlushAckHandler flushHandler(this);
3012  if (_serverVersion >= minFlush)
3013  {
3014  Lock<Mutex> l(_lock);
3015  if (!_connected)
3016  {
3017  throw DisconnectedException("Not connected trying to flush");
3018  }
3019  _message.reset();
3020  _message.newCommandId();
3021  _message.assignCommand(flush, flushLen);
3022  if (_serverVersion < minPersisted
3023  || ackType_ == Message::AckType::Processed)
3024  {
3025  _message.assignAckType(processed, processedLen);
3026  }
3027  else
3028  {
3029  _message.assignAckType(persisted, persistedLen);
3030  }
3031  flushHandler.setCommandId(_message.getCommandId());
3032  addMessageHandler(_message.getCommandId(),
3033  std::bind(&FlushAckHandler::invoke,
3034  std::ref(flushHandler),
3035  std::placeholders::_1),
3036  ackType_, _message.getCommandEnum());
3037  NoDelay noDelay(_client);
3038  if (_send(_message) == -1)
3039  {
3040  throw DisconnectedException("Disconnected trying to flush");
3041  }
3042  }
3043  if (_publishStore.isValid())
3044  {
3045  try
3046  {
3047  _publishStore.flush(timeout_);
3048  }
3049  catch (const AMPSException& ex)
3050  {
3051  AMPS_UNHANDLED_EXCEPTION(ex);
3052  throw;
3053  }
3054  }
3055  else if (_serverVersion < minFlush)
3056  {
3057  if (timeout_ > 0)
3058  {
3059  AMPS_USLEEP(timeout_ * 1000);
3060  }
3061  else
3062  {
3063  AMPS_USLEEP(1000 * 1000);
3064  }
3065  return;
3066  }
3067  if (timeout_)
3068  {
3069  Timer timer((double)timeout_);
3070  timer.start();
3071  while (!timer.check() && !flushHandler.done())
3072  {
3073  AMPS_USLEEP(10000);
3074  amps_invoke_waiting_function();
3075  }
3076  }
3077  else
3078  {
3079  while (!flushHandler.done())
3080  {
3081  AMPS_USLEEP(10000);
3082  amps_invoke_waiting_function();
3083  }
3084  }
3085  // No response or disconnect in timeout interval
3086  if (!flushHandler.done())
3087  {
3088  throw TimedOutException("Timed out waiting for flush");
3089  }
3090  // We got disconnected and there is no publish store
3091  if (!flushHandler.acked() && !_publishStore.isValid())
3092  {
3093  throw DisconnectedException("Disconnected waiting for flush");
3094  }
3095  }
3096 
3097  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3098  const char* data_, size_t dataLength_)
3099  {
3100  if (!_publishStore.isValid())
3101  {
3102  Lock<Mutex> l(_lock);
3103  _deltaMessage.assignTopic(topic_, topicLength_);
3104  _deltaMessage.assignData(data_, dataLength_);
3105  _send(_deltaMessage);
3106  return 0;
3107  }
3108  else
3109  {
3110  publishStoreMessage.reset();
3111  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish);
3112  return _publish(topic_, topicLength_, data_, dataLength_);
3113  }
3114  }
3115 
3116  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3117  const char* data_, size_t dataLength_,
3118  unsigned long expiration_)
3119  {
3120  if (!_publishStore.isValid())
3121  {
3122  Lock<Mutex> l(_lock);
3123  _deltaMessage.assignTopic(topic_, topicLength_);
3124  _deltaMessage.assignData(data_, dataLength_);
3125  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3126  size_t pos = convertToCharArray(exprBuf, expiration_);
3127  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3128  _send(_deltaMessage);
3129  _deltaMessage.assignExpiration(NULL, 0);
3130  return 0;
3131  }
3132  else
3133  {
3134  publishStoreMessage.reset();
3135  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3136  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3137  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish)
3138  .assignExpiration(exprBuf + exprPos,
3139  AMPS_NUMBER_BUFFER_LEN - exprPos);
3140  return _publish(topic_, topicLength_, data_, dataLength_);
3141  }
3142  }
3143 
3144  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3145  const char* data_, size_t dataLength_)
3146  {
3147  publishStoreMessage.assignTopic(topic_, topicLength_)
3148  .setAckTypeEnum(Message::AckType::Persisted)
3149  .assignData(data_, dataLength_);
3150  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3151  char buf[AMPS_NUMBER_BUFFER_LEN];
3152  size_t pos = convertToCharArray(buf, haSequenceNumber);
3153  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3154  {
3155  Lock<Mutex> l(_lock);
3156  _send(publishStoreMessage, haSequenceNumber);
3157  }
3158  return haSequenceNumber;
3159  }
3160 
3161  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3162  const char* options_ = NULL)
3163  {
3164  Lock<Mutex> l(_lock);
3165  return _logon(timeout_, authenticator_, options_);
3166  }
3167 
3168  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3169  const char* options_ = NULL)
3170  {
3171  _message.reset();
3172  _message.newCommandId();
3173  std::string newCommandId = _message.getCommandId();
3174  _message.setCommandEnum(Message::Command::Logon);
3175  _message.setClientName(_name);
3176 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3177  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3178  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3179 #endif
3180  URI uri(_lastUri);
3181  if (uri.user().size())
3182  {
3183  _message.setUserId(uri.user());
3184  }
3185  if (uri.password().size())
3186  {
3187  _message.setPassword(uri.password());
3188  }
3189  if (uri.protocol() == "amps" && uri.messageType().size())
3190  {
3191  _message.setMessageType(uri.messageType());
3192  }
3193  if (uri.isTrue("pretty"))
3194  {
3195  _message.setOptions("pretty");
3196  }
3197 
3198  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3199  if (!_logonCorrelationData.empty())
3200  {
3201  _message.assignCorrelationId(_logonCorrelationData);
3202  }
3203  if (options_)
3204  {
3205  _message.setOptions(options_);
3206  }
3207  _username = _message.getUserId();
3208  try
3209  {
3210  AtomicFlagFlip pubFlip(&_logonInProgress);
3211  NoDelay noDelay(_client);
3212  while (true)
3213  {
3214  _message.setAckTypeEnum(Message::AckType::Processed);
3215  AckResponse ack = syncAckProcessing(timeout_, _message);
3216  if (ack.status() == "retry")
3217  {
3218  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3219  _username = ack.username();
3220  _message.setUserId(_username);
3221  }
3222  else
3223  {
3224  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3225  break;
3226  }
3227  }
3228  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3229 
3230  // Now re-send the heartbeat command if configured
3231  _sendHeartbeat();
3232  // Signal any threads waiting for _logonInProgress
3233  _lock.signalAll();
3234  }
3235  catch (const AMPSException& ex)
3236  {
3237  _lock.signalAll();
3238  AMPS_UNHANDLED_EXCEPTION(ex);
3239  throw;
3240  }
3241  catch (...)
3242  {
3243  _lock.signalAll();
3244  throw;
3245  }
3246 
3247  if (_publishStore.isValid())
3248  {
3249  try
3250  {
3251  _publishStore.replay(_replayer);
3252  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3253  }
3254  catch (const PublishStoreGapException& ex)
3255  {
3256  _lock.signalAll();
3257  AMPS_UNHANDLED_EXCEPTION(ex);
3258  throw;
3259  }
3260  catch (const StoreException& ex)
3261  {
3262  _lock.signalAll();
3263  std::ostringstream os;
3264  os << "A local store exception occurred while logging on."
3265  << ex.toString();
3266  throw ConnectionException(os.str());
3267  }
3268  catch (const AMPSException& ex)
3269  {
3270  _lock.signalAll();
3271  AMPS_UNHANDLED_EXCEPTION(ex);
3272  throw;
3273  }
3274  catch (const std::exception& ex)
3275  {
3276  _lock.signalAll();
3277  AMPS_UNHANDLED_EXCEPTION(ex);
3278  throw;
3279  }
3280  catch (...)
3281  {
3282  _lock.signalAll();
3283  throw;
3284  }
3285  }
3286  _lock.signalAll();
3287  return newCommandId;
3288  }
3289 
3290  std::string subscribe(const MessageHandler& messageHandler_,
3291  const std::string& topic_,
3292  long timeout_,
3293  const std::string& filter_,
3294  const std::string& bookmark_,
3295  const std::string& options_,
3296  const std::string& subId_,
3297  bool isHASubscribe_ = true)
3298  {
3299  isHASubscribe_ &= (bool)_subscriptionManager;
3300  Lock<Mutex> l(_lock);
3301  _message.reset();
3302  _message.setCommandEnum(Message::Command::Subscribe);
3303  _message.newCommandId();
3304  std::string subId(subId_);
3305  if (subId.empty())
3306  {
3307  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3308  {
3309  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3310  }
3311 
3312  subId = _message.getCommandId();
3313  }
3314  _message.setSubscriptionId(subId);
3315  // we need to deep copy this before sending the message; while we are
3316  // waiting for a response, the fields in _message may get blown away for
3317  // other operations.
3318  AMPS::Message::Field subIdField(subId);
3319  unsigned ackTypes = Message::AckType::Processed;
3320 
3321  if (!bookmark_.empty() && _bookmarkStore.isValid())
3322  {
3323  ackTypes |= Message::AckType::Persisted;
3324  }
3325  _message.setTopic(topic_);
3326 
3327  if (filter_.length())
3328  {
3329  _message.setFilter(filter_);
3330  }
3331  if (bookmark_.length())
3332  {
3333  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3334  {
3335  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3336  _message.setBookmark(mostRecent);
3337  }
3338  else
3339  {
3340  _message.setBookmark(bookmark_);
3341  if (_bookmarkStore.isValid())
3342  {
3343  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3344  bookmark_ != AMPS_BOOKMARK_EPOCH)
3345  {
3346  _bookmarkStore.log(_message);
3347  _bookmarkStore.discard(_message);
3348  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3349  }
3350  }
3351  }
3352  }
3353  if (options_.length())
3354  {
3355  _message.setOptions(options_);
3356  }
3357 
3358  Message message = _message;
3359  if (isHASubscribe_)
3360  {
3361  message = _message.deepCopy();
3362  Unlock<Mutex> u(_lock);
3363  _subscriptionManager->subscribe(messageHandler_, message,
3364  Message::AckType::None);
3365  if (_badTimeToHASubscribe)
3366  {
3367  return subId;
3368  }
3369  }
3370  if (!_routes.hasRoute(_message.getSubscriptionId()))
3371  {
3372  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3373  Message::AckType::None, ackTypes, _message.getCommandEnum());
3374  }
3375  message.setAckTypeEnum(ackTypes);
3376  if (!options_.empty())
3377  {
3378  message.setOptions(options_);
3379  }
3380  try
3381  {
3382  syncAckProcessing(timeout_, message, isHASubscribe_);
3383  }
3384  catch (const DisconnectedException&)
3385  {
3386  if (!isHASubscribe_)
3387  {
3388  _routes.removeRoute(subIdField);
3389  throw;
3390  }
3391  else
3392  {
3393  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3394  throw;
3395  }
3396  }
3397  catch (const TimedOutException&)
3398  {
3399  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3400  throw;
3401  }
3402  catch (...)
3403  {
3404  if (isHASubscribe_)
3405  {
3406  // Have to unlock before calling into sub manager to avoid deadlock
3407  Unlock<Mutex> unlock(_lock);
3408  _subscriptionManager->unsubscribe(subIdField);
3409  }
3410  _routes.removeRoute(subIdField);
3411  throw;
3412  }
3413 
3414  return subId;
3415  }
3416  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3417  const std::string& topic_,
3418  long timeout_,
3419  const std::string& filter_,
3420  const std::string& bookmark_,
3421  const std::string& options_,
3422  const std::string& subId_ = "",
3423  bool isHASubscribe_ = true)
3424  {
3425  isHASubscribe_ &= (bool)_subscriptionManager;
3426  Lock<Mutex> l(_lock);
3427  _message.reset();
3428  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3429  _message.newCommandId();
3430  std::string subId(subId_);
3431  if (subId.empty())
3432  {
3433  subId = _message.getCommandId();
3434  }
3435  _message.setSubscriptionId(subId);
3436  // we need to deep copy this before sending the message; while we are
3437  // waiting for a response, the fields in _message may get blown away for
3438  // other operations.
3439  AMPS::Message::Field subIdField(subId);
3440  unsigned ackTypes = Message::AckType::Processed;
3441 
3442  if (!bookmark_.empty() && _bookmarkStore.isValid())
3443  {
3444  ackTypes |= Message::AckType::Persisted;
3445  }
3446  _message.setTopic(topic_);
3447  if (filter_.length())
3448  {
3449  _message.setFilter(filter_);
3450  }
3451  if (bookmark_.length())
3452  {
3453  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3454  {
3455  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3456  _message.setBookmark(mostRecent);
3457  }
3458  else
3459  {
3460  _message.setBookmark(bookmark_);
3461  if (_bookmarkStore.isValid())
3462  {
3463  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3464  bookmark_ != AMPS_BOOKMARK_EPOCH)
3465  {
3466  _bookmarkStore.log(_message);
3467  _bookmarkStore.discard(_message);
3468  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3469  }
3470  }
3471  }
3472  }
3473  if (options_.length())
3474  {
3475  _message.setOptions(options_);
3476  }
3477  Message message = _message;
3478  if (isHASubscribe_)
3479  {
3480  message = _message.deepCopy();
3481  Unlock<Mutex> u(_lock);
3482  _subscriptionManager->subscribe(messageHandler_, message,
3483  Message::AckType::None);
3484  if (_badTimeToHASubscribe)
3485  {
3486  return subId;
3487  }
3488  }
3489  if (!_routes.hasRoute(_message.getSubscriptionId()))
3490  {
3491  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3492  Message::AckType::None, ackTypes, _message.getCommandEnum());
3493  }
3494  message.setAckTypeEnum(ackTypes);
3495  if (!options_.empty())
3496  {
3497  message.setOptions(options_);
3498  }
3499  try
3500  {
3501  syncAckProcessing(timeout_, message, isHASubscribe_);
3502  }
3503  catch (const DisconnectedException&)
3504  {
3505  if (!isHASubscribe_)
3506  {
3507  _routes.removeRoute(subIdField);
3508  throw;
3509  }
3510  }
3511  catch (const TimedOutException&)
3512  {
3513  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3514  throw;
3515  }
3516  catch (...)
3517  {
3518  if (isHASubscribe_)
3519  {
3520  // Have to unlock before calling into sub manager to avoid deadlock
3521  Unlock<Mutex> unlock(_lock);
3522  _subscriptionManager->unsubscribe(subIdField);
3523  }
3524  _routes.removeRoute(subIdField);
3525  throw;
3526  }
3527  return subId;
3528  }
3529 
3530  void unsubscribe(const std::string& id)
3531  {
3532  Lock<Mutex> l(_lock);
3533  unsubscribeInternal(id);
3534  }
3535 
3536  void unsubscribe(void)
3537  {
3538  if (_subscriptionManager)
3539  {
3540  _subscriptionManager->clear();
3541  }
3542  {
3543  _routes.unsubscribeAll();
3544  Lock<Mutex> l(_lock);
3545  _message.reset();
3546  _message.setCommandEnum(Message::Command::Unsubscribe);
3547  _message.newCommandId();
3548  _message.setSubscriptionId("all");
3549  _sendWithoutRetry(_message);
3550  }
3551  deferredExecution(&amps_noOpFn, NULL);
3552  }
3553 
3554  std::string sow(const MessageHandler& messageHandler_,
3555  const std::string& topic_,
3556  const std::string& filter_ = "",
3557  const std::string& orderBy_ = "",
3558  const std::string& bookmark_ = "",
3559  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3560  int topN_ = AMPS_DEFAULT_TOP_N,
3561  const std::string& options_ = "",
3562  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3563  {
3564  Lock<Mutex> l(_lock);
3565  _message.reset();
3566  _message.setCommandEnum(Message::Command::SOW);
3567  _message.newCommandId();
3568  // need to keep our own copy of the command ID.
3569  std::string commandId = _message.getCommandId();
3570  _message.setQueryID(_message.getCommandId());
3571  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3572  _message.setAckTypeEnum(ackTypes);
3573  _message.setTopic(topic_);
3574  if (filter_.length())
3575  {
3576  _message.setFilter(filter_);
3577  }
3578  if (orderBy_.length())
3579  {
3580  _message.setOrderBy(orderBy_);
3581  }
3582  if (bookmark_.length())
3583  {
3584  _message.setBookmark(bookmark_);
3585  }
3586  _message.setBatchSize(AMPS::asString(batchSize_));
3587  if (topN_ != AMPS_DEFAULT_TOP_N)
3588  {
3589  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3590  }
3591  if (options_.length())
3592  {
3593  _message.setOptions(options_);
3594  }
3595 
3596  _routes.addRoute(_message.getQueryID(), messageHandler_,
3597  Message::AckType::None, ackTypes, _message.getCommandEnum());
3598 
3599  try
3600  {
3601  syncAckProcessing(timeout_, _message);
3602  }
3603  catch (...)
3604  {
3605  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3606  throw;
3607  }
3608 
3609  return commandId;
3610  }
3611 
3612  std::string sow(const MessageHandler& messageHandler_,
3613  const std::string& topic_,
3614  long timeout_,
3615  const std::string& filter_ = "",
3616  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3617  int topN_ = AMPS_DEFAULT_TOP_N)
3618  {
3619  std::string notSet;
3620  return sow(messageHandler_,
3621  topic_,
3622  filter_,
3623  notSet, // orderBy
3624  notSet, // bookmark
3625  batchSize_,
3626  topN_,
3627  notSet,
3628  timeout_);
3629  }
3630 
3631  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3632  const std::string& topic_,
3633  const std::string& filter_ = "",
3634  const std::string& orderBy_ = "",
3635  const std::string& bookmark_ = "",
3636  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3637  int topN_ = AMPS_DEFAULT_TOP_N,
3638  const std::string& options_ = "",
3639  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3640  bool isHASubscribe_ = true)
3641  {
3642  isHASubscribe_ &= (bool)_subscriptionManager;
3643  unsigned ackTypes = Message::AckType::Processed;
3644  Lock<Mutex> l(_lock);
3645  _message.reset();
3646  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3647  _message.newCommandId();
3648  Field cid = _message.getCommandId();
3649  std::string subId = cid;
3650  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3651  if (filter_.length())
3652  {
3653  _message.setFilter(filter_);
3654  }
3655  if (orderBy_.length())
3656  {
3657  _message.setOrderBy(orderBy_);
3658  }
3659  if (bookmark_.length())
3660  {
3661  _message.setBookmark(bookmark_);
3662  Message::Field bookmark = _message.getBookmark();
3663  if (_bookmarkStore.isValid())
3664  {
3665  ackTypes |= Message::AckType::Persisted;
3666  if (bookmark == AMPS_BOOKMARK_RECENT)
3667  {
3668  _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3669  }
3670  else if (bookmark != AMPS_BOOKMARK_NOW &&
3671  bookmark != AMPS_BOOKMARK_EPOCH)
3672  {
3673  _bookmarkStore.log(_message);
3674  if (!BookmarkRange::isRange(bookmark))
3675  {
3676  _bookmarkStore.discard(_message);
3677  _bookmarkStore.persisted(_message.getSubscriptionId(),
3678  bookmark);
3679  }
3680  }
3681  }
3682  else if (bookmark == AMPS_BOOKMARK_RECENT)
3683  {
3684  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3685  }
3686  }
3687  _message.setBatchSize(AMPS::asString(batchSize_));
3688  if (topN_ != AMPS_DEFAULT_TOP_N)
3689  {
3690  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3691  }
3692  if (options_.length())
3693  {
3694  _message.setOptions(options_);
3695  }
3696 
3697  Message message = _message;
3698  if (isHASubscribe_)
3699  {
3700  message = _message.deepCopy();
3701  Unlock<Mutex> u(_lock);
3702  _subscriptionManager->subscribe(messageHandler_, message,
3703  Message::AckType::None);
3704  if (_badTimeToHASubscribe)
3705  {
3706  return subId;
3707  }
3708  }
3709  _routes.addRoute(cid, messageHandler_,
3710  Message::AckType::None, ackTypes, _message.getCommandEnum());
3711  message.setAckTypeEnum(ackTypes);
3712  if (!options_.empty())
3713  {
3714  message.setOptions(options_);
3715  }
3716  try
3717  {
3718  syncAckProcessing(timeout_, message, isHASubscribe_);
3719  }
3720  catch (const DisconnectedException&)
3721  {
3722  if (!isHASubscribe_)
3723  {
3724  _routes.removeRoute(subId);
3725  throw;
3726  }
3727  }
3728  catch (const TimedOutException&)
3729  {
3730  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3731  throw;
3732  }
3733  catch (...)
3734  {
3735  if (isHASubscribe_)
3736  {
3737  // Have to unlock before calling into sub manager to avoid deadlock
3738  Unlock<Mutex> unlock(_lock);
3739  _subscriptionManager->unsubscribe(cid);
3740  }
3741  _routes.removeRoute(subId);
3742  throw;
3743  }
3744  return subId;
3745  }
3746 
3747  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3748  const std::string& topic_,
3749  long timeout_,
3750  const std::string& filter_ = "",
3751  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3752  bool oofEnabled_ = false,
3753  int topN_ = AMPS_DEFAULT_TOP_N,
3754  bool isHASubscribe_ = true)
3755  {
3756  std::string notSet;
3757  return sowAndSubscribe(messageHandler_,
3758  topic_,
3759  filter_,
3760  notSet, // orderBy
3761  notSet, // bookmark
3762  batchSize_,
3763  topN_,
3764  (oofEnabled_ ? "oof" : ""),
3765  timeout_,
3766  isHASubscribe_);
3767  }
3768 
3769  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3770  const std::string& topic_,
3771  const std::string& filter_ = "",
3772  const std::string& orderBy_ = "",
3773  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3774  int topN_ = AMPS_DEFAULT_TOP_N,
3775  const std::string& options_ = "",
3776  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3777  bool isHASubscribe_ = true)
3778  {
3779  isHASubscribe_ &= (bool)_subscriptionManager;
3780  Lock<Mutex> l(_lock);
3781  _message.reset();
3782  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3783  _message.newCommandId();
3784  _message.setQueryID(_message.getCommandId());
3785  _message.setSubscriptionId(_message.getCommandId());
3786  std::string subId = _message.getSubscriptionId();
3787  _message.setTopic(topic_);
3788  if (filter_.length())
3789  {
3790  _message.setFilter(filter_);
3791  }
3792  if (orderBy_.length())
3793  {
3794  _message.setOrderBy(orderBy_);
3795  }
3796  _message.setBatchSize(AMPS::asString(batchSize_));
3797  if (topN_ != AMPS_DEFAULT_TOP_N)
3798  {
3799  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3800  }
3801  if (options_.length())
3802  {
3803  _message.setOptions(options_);
3804  }
3805  Message message = _message;
3806  if (isHASubscribe_)
3807  {
3808  message = _message.deepCopy();
3809  Unlock<Mutex> u(_lock);
3810  _subscriptionManager->subscribe(messageHandler_, message,
3811  Message::AckType::None);
3812  if (_badTimeToHASubscribe)
3813  {
3814  return subId;
3815  }
3816  }
3817  _routes.addRoute(message.getQueryID(), messageHandler_,
3818  Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3819  message.setAckTypeEnum(Message::AckType::Processed);
3820  if (!options_.empty())
3821  {
3822  message.setOptions(options_);
3823  }
3824  try
3825  {
3826  syncAckProcessing(timeout_, message, isHASubscribe_);
3827  }
3828  catch (const DisconnectedException&)
3829  {
3830  if (!isHASubscribe_)
3831  {
3832  _routes.removeRoute(subId);
3833  throw;
3834  }
3835  }
3836  catch (const TimedOutException&)
3837  {
3838  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3839  throw;
3840  }
3841  catch (...)
3842  {
3843  if (isHASubscribe_)
3844  {
3845  // Have to unlock before calling into sub manager to avoid deadlock
3846  Unlock<Mutex> unlock(_lock);
3847  _subscriptionManager->unsubscribe(Field(subId));
3848  }
3849  _routes.removeRoute(subId);
3850  throw;
3851  }
3852  return subId;
3853  }
3854 
3855  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3856  const std::string& topic_,
3857  long timeout_,
3858  const std::string& filter_ = "",
3859  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3860  bool oofEnabled_ = false,
3861  bool sendEmpties_ = false,
3862  int topN_ = AMPS_DEFAULT_TOP_N,
3863  bool isHASubscribe_ = true)
3864  {
3865  std::string notSet;
3866  Message::Options options;
3867  if (oofEnabled_)
3868  {
3869  options.setOOF();
3870  }
3871  if (sendEmpties_ == false)
3872  {
3873  options.setNoEmpties();
3874  }
3875  return sowAndDeltaSubscribe(messageHandler_,
3876  topic_,
3877  filter_,
3878  notSet, // orderBy
3879  batchSize_,
3880  topN_,
3881  options,
3882  timeout_,
3883  isHASubscribe_);
3884  }
3885 
3886  std::string sowDelete(const MessageHandler& messageHandler_,
3887  const std::string& topic_,
3888  const std::string& filter_,
3889  long timeout_,
3890  Message::Field commandId_ = Message::Field())
3891  {
3892  if (_publishStore.isValid())
3893  {
3894  unsigned ackType = Message::AckType::Processed |
3895  Message::AckType::Stats |
3896  Message::AckType::Persisted;
3897  publishStoreMessage.reset();
3898  if (commandId_.empty())
3899  {
3900  publishStoreMessage.newCommandId();
3901  commandId_ = publishStoreMessage.getCommandId();
3902  }
3903  else
3904  {
3905  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3906  }
3907  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3908  .assignSubscriptionId(commandId_.data(), commandId_.len())
3909  .assignQueryID(commandId_.data(), commandId_.len())
3910  .setAckTypeEnum(ackType)
3911  .assignTopic(topic_.c_str(), topic_.length())
3912  .assignFilter(filter_.c_str(), filter_.length());
3913  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3914  char buf[AMPS_NUMBER_BUFFER_LEN];
3915  size_t pos = convertToCharArray(buf, haSequenceNumber);
3916  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3917  {
3918  try
3919  {
3920  Lock<Mutex> l(_lock);
3921  _routes.addRoute(commandId_, messageHandler_,
3922  Message::AckType::Stats,
3923  Message::AckType::Processed | Message::AckType::Persisted,
3924  publishStoreMessage.getCommandEnum());
3925  syncAckProcessing(timeout_, publishStoreMessage,
3926  haSequenceNumber);
3927  }
3928  catch (const DisconnectedException&)
3929  {
3930  // -V565
3931  // Pass - it will get replayed upon reconnect
3932  }
3933  catch (...)
3934  {
3935  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3936  throw;
3937  }
3938  }
3939  return (std::string)commandId_;
3940  }
3941  else
3942  {
3943  Lock<Mutex> l(_lock);
3944  _message.reset();
3945  if (commandId_.empty())
3946  {
3947  _message.newCommandId();
3948  commandId_ = _message.getCommandId();
3949  }
3950  else
3951  {
3952  _message.setCommandId(commandId_.data(), commandId_.len());
3953  }
3954  _message.setCommandEnum(Message::Command::SOWDelete)
3955  .assignSubscriptionId(commandId_.data(), commandId_.len())
3956  .assignQueryID(commandId_.data(), commandId_.len())
3957  .setAckTypeEnum(Message::AckType::Processed |
3958  Message::AckType::Stats)
3959  .assignTopic(topic_.c_str(), topic_.length())
3960  .assignFilter(filter_.c_str(), filter_.length());
3961  _routes.addRoute(commandId_, messageHandler_,
3962  Message::AckType::Stats,
3963  Message::AckType::Processed,
3964  _message.getCommandEnum());
3965  try
3966  {
3967  syncAckProcessing(timeout_, _message);
3968  }
3969  catch (...)
3970  {
3971  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3972  throw;
3973  }
3974  return (std::string)commandId_;
3975  }
3976  }
3977 
3978  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3979  const std::string& topic_,
3980  const std::string& data_,
3981  long timeout_,
3982  Message::Field commandId_ = Message::Field())
3983  {
3984  if (_publishStore.isValid())
3985  {
3986  unsigned ackType = Message::AckType::Processed |
3987  Message::AckType::Stats |
3988  Message::AckType::Persisted;
3989  publishStoreMessage.reset();
3990  if (commandId_.empty())
3991  {
3992  publishStoreMessage.newCommandId();
3993  commandId_ = publishStoreMessage.getCommandId();
3994  }
3995  else
3996  {
3997  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3998  }
3999  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4000  .assignSubscriptionId(commandId_.data(), commandId_.len())
4001  .assignQueryID(commandId_.data(), commandId_.len())
4002  .setAckTypeEnum(ackType)
4003  .assignTopic(topic_.c_str(), topic_.length())
4004  .assignData(data_.c_str(), data_.length());
4005  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4006  char buf[AMPS_NUMBER_BUFFER_LEN];
4007  size_t pos = convertToCharArray(buf, haSequenceNumber);
4008  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4009  {
4010  try
4011  {
4012  Lock<Mutex> l(_lock);
4013  _routes.addRoute(commandId_, messageHandler_,
4014  Message::AckType::Stats,
4015  Message::AckType::Processed | Message::AckType::Persisted,
4016  publishStoreMessage.getCommandEnum());
4017  syncAckProcessing(timeout_, publishStoreMessage,
4018  haSequenceNumber);
4019  }
4020  catch (const DisconnectedException&)
4021  {
4022  // -V565
4023  // Pass - it will get replayed upon reconnect
4024  }
4025  catch (...)
4026  {
4027  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4028  throw;
4029  }
4030  }
4031  return (std::string)commandId_;
4032  }
4033  else
4034  {
4035  Lock<Mutex> l(_lock);
4036  _message.reset();
4037  if (commandId_.empty())
4038  {
4039  _message.newCommandId();
4040  commandId_ = _message.getCommandId();
4041  }
4042  else
4043  {
4044  _message.setCommandId(commandId_.data(), commandId_.len());
4045  }
4046  _message.setCommandEnum(Message::Command::SOWDelete)
4047  .assignSubscriptionId(commandId_.data(), commandId_.len())
4048  .assignQueryID(commandId_.data(), commandId_.len())
4049  .setAckTypeEnum(Message::AckType::Processed |
4050  Message::AckType::Stats)
4051  .assignTopic(topic_.c_str(), topic_.length())
4052  .assignData(data_.c_str(), data_.length());
4053  _routes.addRoute(commandId_, messageHandler_,
4054  Message::AckType::Stats,
4055  Message::AckType::Processed,
4056  _message.getCommandEnum());
4057  try
4058  {
4059  syncAckProcessing(timeout_, _message);
4060  }
4061  catch (...)
4062  {
4063  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4064  throw;
4065  }
4066  return (std::string)commandId_;
4067  }
4068  }
4069 
4070  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4071  const std::string& topic_,
4072  const std::string& keys_,
4073  long timeout_,
4074  Message::Field commandId_ = Message::Field())
4075  {
4076  if (_publishStore.isValid())
4077  {
4078  unsigned ackType = Message::AckType::Processed |
4079  Message::AckType::Stats |
4080  Message::AckType::Persisted;
4081  publishStoreMessage.reset();
4082  if (commandId_.empty())
4083  {
4084  publishStoreMessage.newCommandId();
4085  commandId_ = publishStoreMessage.getCommandId();
4086  }
4087  else
4088  {
4089  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4090  }
4091  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4092  .assignSubscriptionId(commandId_.data(), commandId_.len())
4093  .assignQueryID(commandId_.data(), commandId_.len())
4094  .setAckTypeEnum(ackType)
4095  .assignTopic(topic_.c_str(), topic_.length())
4096  .assignSowKeys(keys_.c_str(), keys_.length());
4097  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4098  char buf[AMPS_NUMBER_BUFFER_LEN];
4099  size_t pos = convertToCharArray(buf, haSequenceNumber);
4100  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4101  {
4102  try
4103  {
4104  Lock<Mutex> l(_lock);
4105  _routes.addRoute(commandId_, messageHandler_,
4106  Message::AckType::Stats,
4107  Message::AckType::Processed | Message::AckType::Persisted,
4108  publishStoreMessage.getCommandEnum());
4109  syncAckProcessing(timeout_, publishStoreMessage,
4110  haSequenceNumber);
4111  }
4112  catch (const DisconnectedException&)
4113  {
4114  // -V565
4115  // Pass - it will get replayed upon reconnect
4116  }
4117  catch (...)
4118  {
4119  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4120  throw;
4121  }
4122  }
4123  return (std::string)commandId_;
4124  }
4125  else
4126  {
4127  Lock<Mutex> l(_lock);
4128  _message.reset();
4129  if (commandId_.empty())
4130  {
4131  _message.newCommandId();
4132  commandId_ = _message.getCommandId();
4133  }
4134  else
4135  {
4136  _message.setCommandId(commandId_.data(), commandId_.len());
4137  }
4138  _message.setCommandEnum(Message::Command::SOWDelete)
4139  .assignSubscriptionId(commandId_.data(), commandId_.len())
4140  .assignQueryID(commandId_.data(), commandId_.len())
4141  .setAckTypeEnum(Message::AckType::Processed |
4142  Message::AckType::Stats)
4143  .assignTopic(topic_.c_str(), topic_.length())
4144  .assignSowKeys(keys_.c_str(), keys_.length());
4145  _routes.addRoute(commandId_, messageHandler_,
4146  Message::AckType::Stats,
4147  Message::AckType::Processed,
4148  _message.getCommandEnum());
4149  try
4150  {
4151  syncAckProcessing(timeout_, _message);
4152  }
4153  catch (...)
4154  {
4155  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4156  throw;
4157  }
4158  return (std::string)commandId_;
4159  }
4160  }
4161 
4162  void startTimer(void)
4163  {
4164  if (_serverVersion >= "5.3.2.0")
4165  {
4166  throw CommandException("The start_timer command is deprecated.");
4167  }
4168  Lock<Mutex> l(_lock);
4169  _message.reset();
4170  _message.setCommandEnum(Message::Command::StartTimer);
4171 
4172  _send(_message);
4173  }
4174 
4175  std::string stopTimer(MessageHandler messageHandler_)
4176  {
4177  if (_serverVersion >= "5.3.2.0")
4178  {
4179  throw CommandException("The stop_timer command is deprecated.");
4180  }
4181  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4182  }
4183 
4184  amps_handle getHandle(void)
4185  {
4186  return _client;
4187  }
4188 
4196  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4197  {
4198  _pExceptionListener = pListener_;
4199  _exceptionListener = _pExceptionListener.get();
4200  }
4201 
4202  void setExceptionListener(const ExceptionListener& listener_)
4203  {
4204  _exceptionListener = &listener_;
4205  }
4206 
4207  const ExceptionListener& getExceptionListener(void) const
4208  {
4209  return *_exceptionListener;
4210  }
4211 
4212  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4213  {
4214  if (readTimeout_ < heartbeatInterval_)
4215  {
4216  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4217  }
4218  Lock<Mutex> l(_lock);
4219  if (_heartbeatInterval != heartbeatInterval_ ||
4220  _readTimeout != readTimeout_)
4221  {
4222  _heartbeatInterval = heartbeatInterval_;
4223  _readTimeout = readTimeout_;
4224  _sendHeartbeat();
4225  }
4226  }
4227 
4228  void _sendHeartbeat(void)
4229  {
4230  if (_connected && _heartbeatInterval != 0)
4231  {
4232  std::ostringstream options;
4233  options << "start," << _heartbeatInterval;
4234  _beatMessage.setOptions(options.str());
4235 
4236  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4237  _heartbeatTimer.start();
4238  try
4239  {
4240  _sendWithoutRetry(_beatMessage);
4241  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4242  }
4243  catch (ConnectionException& ex_)
4244  {
4245  // If we are disconnected when we attempt to send, that's OK;
4246  // we'll send this message after we re-connect (if we do).
4247  AMPS_UNHANDLED_EXCEPTION(ex_);
4248  }
4249  _beatMessage.setOptions("beat");
4250  }
4251  amps_result result = AMPS_E_OK;
4252  if (_readTimeout && _connected)
4253  {
4254  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4255  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4256  {
4257  AMPSException::throwFor(_client, result);
4258  }
4259  if (!_queueAckTimeout)
4260  {
4261  result = amps_client_set_idle_time(_client,
4262  (int)(_heartbeatInterval * 1000));
4263  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4264  {
4265  AMPSException::throwFor(_client, result);
4266  }
4267  }
4268  }
4269  }
4270 
4271  void addConnectionStateListener(ConnectionStateListener* listener_)
4272  {
4273  Lock<Mutex> lock(_lock);
4274  _connectionStateListeners.insert(listener_);
4275  }
4276 
4277  void removeConnectionStateListener(ConnectionStateListener* listener_)
4278  {
4279  Lock<Mutex> lock(_lock);
4280  _connectionStateListeners.erase(listener_);
4281  }
4282 
4283  void clearConnectionStateListeners()
4284  {
4285  Lock<Mutex> lock(_lock);
4286  _connectionStateListeners.clear();
4287  }
4288 
4289  void _registerHandler(Command& command_, Message::Field& cid_,
4290  MessageHandler& handler_, unsigned requestedAcks_,
4291  unsigned systemAddedAcks_, Message::Command::Type commandType_)
4292  {
4293  Message message = command_.getMessage();
4294  Message::Command::Type commandType = message.getCommandEnum();
4295  Message::Field subid = message.getSubscriptionId();
4296  Message::Field qid = message.getQueryID();
4297  // If we have an id, we're good, even if it's an existing route
4298  bool added = qid.len() || subid.len() || cid_.len();
4299  bool cidIsQid = cid_ == qid;
4300  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4301  int addedCount = 0;
4302  if (subid.len() > 0)
4303  {
4304  // This can replace a non-subscribe with a matching id
4305  // with a subscription but not another subscription.
4306  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4307  systemAddedAcks_, commandType_);
4308  if (!cidUnique
4309  && (commandType == Message::Command::Subscribe
4310  || commandType == Message::Command::DeltaSubscribe))
4311  {
4312  // We don't need to do anything else
4313  cid_ = subid;
4314  return;
4315  }
4316  }
4317  if (qid.len() > 0 && qid != subid
4318  && (commandType == Message::Command::SOW
4319  || commandType == Message::Command::SOWDelete
4320  || commandType == Message::Command::SOWAndSubscribe
4321  || commandType == Message::Command::SOWAndDeltaSubscribe))
4322  {
4323  while (_routes.hasRoute(qid))
4324  {
4325  message.newQueryId();
4326  if (cidIsQid)
4327  {
4328  cid_ = message.getQueryId();
4329  }
4330  qid = message.getQueryId();
4331  }
4332  if (addedCount == 0)
4333  {
4334  _routes.addRoute(qid, handler_, requestedAcks_,
4335  systemAddedAcks_, commandType_);
4336  }
4337  else
4338  {
4339  void* data = NULL;
4340  {
4341  Unlock<Mutex> u(_lock);
4342  data = amps_invoke_copy_route_function(handler_.userData());
4343  }
4344  if (!data)
4345  {
4346  _routes.addRoute(qid, handler_, requestedAcks_,
4347  systemAddedAcks_, commandType_);
4348  }
4349  else
4350  {
4351  _routes.addRoute(qid,
4352  MessageHandler(handler_.function(),
4353  data),
4354  requestedAcks_,
4355  systemAddedAcks_, commandType_);
4356  }
4357  }
4358  ++addedCount;
4359  }
4360  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4361  {
4362  while (_routes.hasRoute(cid_))
4363  {
4364  cid_ = message.newCommandId().getCommandId();
4365  }
4366  if (addedCount == 0)
4367  {
4368  _routes.addRoute(cid_, handler_, requestedAcks_,
4369  systemAddedAcks_, commandType_);
4370  }
4371  else
4372  {
4373  void* data = NULL;
4374  {
4375  Unlock<Mutex> u(_lock);
4376  data = amps_invoke_copy_route_function(handler_.userData());
4377  }
4378  if (!data)
4379  {
4380  _routes.addRoute(cid_, handler_, requestedAcks_,
4381  systemAddedAcks_, commandType_);
4382  }
4383  else
4384  {
4385  _routes.addRoute(cid_,
4386  MessageHandler(handler_.function(),
4387  data),
4388  requestedAcks_,
4389  systemAddedAcks_, commandType_);
4390  }
4391  }
4392  }
4393  else if ((commandType == Message::Command::Publish ||
4394  commandType == Message::Command::DeltaPublish)
4395  && requestedAcks_ & ~Message::AckType::Persisted)
4396  {
4397  cid_ = command_.getMessage().newCommandId().getCommandId();
4398  _routes.addRoute(cid_, handler_, requestedAcks_,
4399  systemAddedAcks_, commandType_);
4400  added = true;
4401  }
4402  if (!added)
4403  {
4404  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4405  }
4406  }
4407 
4408  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4409  bool isHASubscribe_ = true)
4410  {
4411  isHASubscribe_ &= (bool)_subscriptionManager;
4412  Message& message = command_.getMessage();
4413  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4414  Message::AckType::Processed : Message::AckType::None;
4415  unsigned requestedAcks = message.getAckTypeEnum();
4416  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4417  Message::Command::Type commandType = message.getCommandEnum();
4418  if (commandType == Message::Command::StopTimer)
4419  {
4420  systemAddedAcks |= Message::AckType::Completed;
4421  }
4422  Message::Field cid = message.getCommandId();
4423  if (handler_.isValid() && cid.empty())
4424  {
4425  cid = message.newCommandId().getCommandId();
4426  }
4427  if (message.getBookmark().len() > 0)
4428  {
4429  if (command_.isSubscribe())
4430  {
4431  Message::Field bookmark = message.getBookmark();
4432  if (_bookmarkStore.isValid())
4433  {
4434  systemAddedAcks |= Message::AckType::Persisted;
4435  if (bookmark == AMPS_BOOKMARK_RECENT)
4436  {
4437  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4438  }
4439  else if (bookmark != AMPS_BOOKMARK_NOW &&
4440  bookmark != AMPS_BOOKMARK_EPOCH)
4441  {
4442  _bookmarkStore.log(message);
4443  if (!BookmarkRange::isRange(bookmark))
4444  {
4445  _bookmarkStore.discard(message);
4446  _bookmarkStore.persisted(message.getSubscriptionId(),
4447  bookmark);
4448  }
4449  }
4450  }
4451  else if (bookmark == AMPS_BOOKMARK_RECENT)
4452  {
4454  }
4455  }
4456  }
4457  if (isPublishStore)
4458  {
4459  systemAddedAcks |= Message::AckType::Persisted;
4460  }
4461  bool isSubscribe = command_.isSubscribe();
4462  if (handler_.isValid() && !isSubscribe)
4463  {
4464  _registerHandler(command_, cid, handler_,
4465  requestedAcks, systemAddedAcks, commandType);
4466  }
4467  if (isPublishStore)
4468  {
4469  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4470  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4471  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4472  {
4473  Unlock<Mutex> u(_lock);
4474  haSequenceNumber = _publishStore.store(message);
4475  }
4476  message.setSequence(haSequenceNumber);
4477  try
4478  {
4479  if (useSyncSend)
4480  {
4481  syncAckProcessing((long)command_.getTimeout(), message,
4482  haSequenceNumber);
4483  }
4484  else
4485  {
4486  _send(message, haSequenceNumber);
4487  }
4488  }
4489  catch (const DisconnectedException&)
4490  {
4491  throw;
4492  }
4493  catch (...)
4494  {
4495  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4496  throw;
4497  }
4498  }
4499  else
4500  {
4501  if (isSubscribe)
4502  {
4503  const Message::Field& subId = message.getSubscriptionId();
4504  if (isHASubscribe_)
4505  {
4506  Unlock<Mutex> u(_lock);
4507  _subscriptionManager->subscribe(handler_,
4508  message.deepCopy(),
4509  requestedAcks);
4510  if (_badTimeToHASubscribe)
4511  {
4512  message.setAckTypeEnum(requestedAcks);
4513  return std::string(subId.data(), subId.len());
4514  }
4515  }
4516  if (handler_.isValid())
4517  {
4518  _registerHandler(command_, cid, handler_,
4519  requestedAcks, systemAddedAcks, commandType);
4520  }
4521  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4522  try
4523  {
4524  syncAckProcessing((long)command_.getTimeout(), message,
4525  isHASubscribe_);
4526  }
4527  catch (const DisconnectedException&)
4528  {
4529  if (!isHASubscribe_)
4530  {
4531  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4532  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4533  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4534  message.setAckTypeEnum(requestedAcks);
4535  throw;
4536  }
4537  }
4538  catch (const TimedOutException&)
4539  {
4540  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4541  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4542  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4543  message.setAckTypeEnum(requestedAcks);
4544  throw;
4545  }
4546  catch (...)
4547  {
4548  if (isHASubscribe_)
4549  {
4550  // Have to unlock before calling into sub manager to avoid deadlock
4551  Unlock<Mutex> unlock(_lock);
4552  _subscriptionManager->unsubscribe(subId);
4553  }
4554  if (message.getQueryID().len() > 0)
4555  {
4556  _routes.removeRoute(message.getQueryID());
4557  }
4558  _routes.removeRoute(cid);
4559  _routes.removeRoute(subId);
4560  message.setAckTypeEnum(requestedAcks);
4561  throw;
4562  }
4563  if (subId.len() > 0)
4564  {
4565  message.setAckTypeEnum(requestedAcks);
4566  return std::string(subId.data(), subId.len());
4567  }
4568  }
4569  else
4570  {
4571  // SOW, Flush, etc. should always be sync. Publish/delete may not be.
4572  bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4573  || (cid.len() > 0 && command_.hasProcessedAck());
4574  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4575  try
4576  {
4577  if (useSyncSend)
4578  {
4579  syncAckProcessing((long)(command_.getTimeout()), message);
4580  }
4581  else
4582  {
4583  _send(message);
4584  }
4585  }
4586  catch (const TimedOutException&)
4587  {
4588  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4589  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4590  message.setAckTypeEnum(requestedAcks);
4591  throw;
4592  }
4593  catch (const DisconnectedException&)
4594  {
4595  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4596  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4597  message.setAckTypeEnum(requestedAcks);
4598  throw;
4599  }
4600  catch (...)
4601  {
4602  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4603  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4604  message.setAckTypeEnum(requestedAcks);
4605  throw;
4606  }
4607  }
4608  }
4609  message.setAckTypeEnum(requestedAcks);
4610  return cid;
4611  }
4612 
4613  MessageStream getEmptyMessageStream(void);
4614 
4615  std::string executeAsync(Command& command_, MessageHandler& handler_,
4616  bool isHASubscribe_ = true)
4617  {
4618  Lock<Mutex> lock(_lock);
4619  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4620  }
4621 
4622  // Queue Methods //
4623  void setAutoAck(bool isAutoAckEnabled_)
4624  {
4625  _isAutoAckEnabled = isAutoAckEnabled_;
4626  }
4627  bool getAutoAck(void) const
4628  {
4629  return _isAutoAckEnabled;
4630  }
4631  void setAckBatchSize(const unsigned batchSize_)
4632  {
4633  _ackBatchSize = batchSize_;
4634  if (!_queueAckTimeout)
4635  {
4636  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4637  amps_client_set_idle_time(_client, _queueAckTimeout);
4638  }
4639  }
4640  unsigned getAckBatchSize(void) const
4641  {
4642  return _ackBatchSize;
4643  }
4644  int getAckTimeout(void) const
4645  {
4646  return _queueAckTimeout;
4647  }
4648  void setAckTimeout(const int ackTimeout_)
4649  {
4650  amps_client_set_idle_time(_client, ackTimeout_);
4651  _queueAckTimeout = ackTimeout_;
4652  }
4653  size_t _ack(QueueBookmarks& queueBookmarks_)
4654  {
4655  if (queueBookmarks_._bookmarkCount)
4656  {
4657  publishStoreMessage.reset();
4658  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4659  .setTopic(queueBookmarks_._topic)
4660  .setBookmark(queueBookmarks_._data)
4661  .setCommandId("AMPS-queue-ack");
4662  amps_uint64_t haSequenceNumber = 0;
4663  if (_publishStore.isValid())
4664  {
4665  haSequenceNumber = _publishStore.store(publishStoreMessage);
4666  publishStoreMessage.setAckType("persisted")
4667  .setSequence(haSequenceNumber);
4668  queueBookmarks_._data.erase();
4669  queueBookmarks_._bookmarkCount = 0;
4670  }
4671  _send(publishStoreMessage, haSequenceNumber);
4672  if (!_publishStore.isValid())
4673  {
4674  queueBookmarks_._data.erase();
4675  queueBookmarks_._bookmarkCount = 0;
4676  }
4677  return 1;
4678  }
4679  return 0;
4680  }
4681  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4682  {
4683  if (_isAutoAckEnabled)
4684  {
4685  return;
4686  }
4687  _ack(topic_, bookmark_, options_);
4688  }
4689  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4690  {
4691  if (bookmark_.len() == 0)
4692  {
4693  return;
4694  }
4695  Lock<Mutex> lock(_lock);
4696  if (_ackBatchSize < 2 || options_ != NULL)
4697  {
4698  publishStoreMessage.reset();
4699  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4700  .setCommandId("AMPS-queue-ack")
4701  .setTopic(topic_).setBookmark(bookmark_);
4702  if (options_)
4703  {
4704  publishStoreMessage.setOptions(options_);
4705  }
4706  amps_uint64_t haSequenceNumber = 0;
4707  if (_publishStore.isValid())
4708  {
4709  haSequenceNumber = _publishStore.store(publishStoreMessage);
4710  publishStoreMessage.setAckType("persisted")
4711  .setSequence(haSequenceNumber);
4712  }
4713  _send(publishStoreMessage, haSequenceNumber);
4714  return;
4715  }
4716  // have we acked anything for this hash
4717  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4718  TopicHashMap::iterator it = _topicHashMap.find(hash);
4719  if (it == _topicHashMap.end())
4720  {
4721  // add a new one to the map
4722 #ifdef AMPS_USE_EMPLACE
4723  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4724 #else
4725  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4726 #endif
4727  }
4728  QueueBookmarks& queueBookmarks = it->second;
4729  if (queueBookmarks._data.length())
4730  {
4731  queueBookmarks._data.append(",");
4732  }
4733  else
4734  {
4735  queueBookmarks._oldestTime = amps_now();
4736  }
4737  queueBookmarks._data.append(bookmark_);
4738  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4739  {
4740  _ack(queueBookmarks);
4741  }
4742  }
4743  void flushAcks(void)
4744  {
4745  size_t sendCount = 0;
4746  if (!_connected)
4747  {
4748  return;
4749  }
4750  else
4751  {
4752  Lock<Mutex> lock(_lock);
4753  typedef TopicHashMap::iterator iterator;
4754  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4755  {
4756  QueueBookmarks& queueBookmarks = it->second;
4757  sendCount += _ack(queueBookmarks);
4758  }
4759  }
4760  if (sendCount && _connected)
4761  {
4762  publishFlush(0, Message::AckType::Processed);
4763  }
4764  }
4765  // called when there's idle time, to see if we need to flush out any "acks"
4766  void checkQueueAcks(void)
4767  {
4768  if (!_topicHashMap.size())
4769  {
4770  return;
4771  }
4772  Lock<Mutex> lock(_lock);
4773  try
4774  {
4775  amps_uint64_t threshold = amps_now()
4776  - (amps_uint64_t)_queueAckTimeout;
4777  typedef TopicHashMap::iterator iterator;
4778  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4779  {
4780  QueueBookmarks& queueBookmarks = it->second;
4781  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4782  {
4783  _ack(queueBookmarks);
4784  }
4785  }
4786  }
4787  catch (std::exception& ex)
4788  {
4789  AMPS_UNHANDLED_EXCEPTION(ex);
4790  }
4791  }
4792 
4793  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4794  {
4795  Lock<Mutex> lock(_deferredExecutionLock);
4796 #ifdef AMPS_USE_EMPLACE
4797  _deferredExecutionList.emplace_back(
4798  DeferredExecutionRequest(func_, userData_));
4799 #else
4800  _deferredExecutionList.push_back(
4801  DeferredExecutionRequest(func_, userData_));
4802 #endif
4803  }
4804 
4805  inline void processDeferredExecutions(void)
4806  {
4807  if (_deferredExecutionList.size())
4808  {
4809  Lock<Mutex> lock(_deferredExecutionLock);
4810  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4811  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4812  for (; it != end; ++it)
4813  {
4814  try
4815  {
4816  it->_func(it->_userData);
4817  }
4818  catch (...)
4819  {
4820  // -V565
4821  // Intentionally ignore errors
4822  }
4823  }
4824  _deferredExecutionList.clear();
4825  _routes.invalidateCache();
4826  _routeCache.invalidateCache();
4827  }
4828  }
4829 
4830  bool getRetryOnDisconnect(void) const
4831  {
4832  return _isRetryOnDisconnect;
4833  }
4834 
4835  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4836  {
4837  _isRetryOnDisconnect = isRetryOnDisconnect_;
4838  }
4839 
4840  void setDefaultMaxDepth(unsigned maxDepth_)
4841  {
4842  _defaultMaxDepth = maxDepth_;
4843  }
4844 
4845  unsigned getDefaultMaxDepth(void) const
4846  {
4847  return _defaultMaxDepth;
4848  }
4849 
4850  void setTransportFilterFunction(amps_transport_filter_function filter_,
4851  void* userData_)
4852  {
4853  amps_client_set_transport_filter_function(_client, filter_, userData_);
4854  }
4855 
4856  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4857  void* userData_)
4858  {
4859  amps_client_set_thread_created_callback(_client, callback_, userData_);
4860  }
4861  }; // class ClientImpl
4936 
4938  {
4939  RefHandle<MessageStreamImpl> _body;
4940  public:
4945  class iterator
4946  {
4947  MessageStream* _pStream;
4948  Message _current;
4949  inline void advance(void);
4950 
4951  public:
4952  iterator() // end
4953  : _pStream(NULL)
4954  {;}
4955  iterator(MessageStream* pStream_)
4956  : _pStream(pStream_)
4957  {
4958  advance();
4959  }
4960 
4961  bool operator==(const iterator& rhs) const
4962  {
4963  return _pStream == rhs._pStream;
4964  }
4965  bool operator!=(const iterator& rhs) const
4966  {
4967  return _pStream != rhs._pStream;
4968  }
4969  void operator++(void)
4970  {
4971  advance();
4972  }
4973  Message operator*(void)
4974  {
4975  return _current;
4976  }
4977  Message* operator->(void)
4978  {
4979  return &_current;
4980  }
4981  };
4983  bool isValid() const
4984  {
4985  return _body.isValid();
4986  }
4987 
4991  {
4992  if (!_body.isValid())
4993  {
4994  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4995  }
4996  return iterator(this);
4997  }
5000  // For non-SOW queries, the end is never reached.
5002  {
5003  return iterator();
5004  }
5005  inline MessageStream(void);
5006 
5012  MessageStream timeout(unsigned timeout_);
5013 
5017  MessageStream conflate(void);
5023  MessageStream maxDepth(unsigned maxDepth_);
5026  unsigned getMaxDepth(void) const;
5029  unsigned getDepth(void) const;
5030 
5031  private:
5032  inline MessageStream(const Client& client_);
5033  inline void setSOWOnly(const std::string& commandId_,
5034  const std::string& queryId_ = "");
5035  inline void setSubscription(const std::string& subId_,
5036  const std::string& commandId_ = "",
5037  const std::string& queryId_ = "");
5038  inline void setStatsOnly(const std::string& commandId_,
5039  const std::string& queryId_ = "");
5040  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5041 
5042  inline operator MessageHandler(void);
5043 
5044  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5045 
5046  friend class Client;
5047 
5048  };
5049 
5069  class Client // -V553
5070  {
5071  protected:
5072  BorrowRefHandle<ClientImpl> _body;
5073  public:
5074  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5075  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5076  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5077 
5086  Client(const std::string& clientName = "")
5087  : _body(new ClientImpl(clientName), true)
5088  {;}
5089 
5090  Client(ClientImpl* existingClient)
5091  : _body(existingClient, true)
5092  {;}
5093 
5094  Client(ClientImpl* existingClient, bool isRef)
5095  : _body(existingClient, isRef)
5096  {;}
5097 
5098  Client(const Client& rhs) : _body(rhs._body) {;}
5099  virtual ~Client(void) {;}
5100 
5101  Client& operator=(const Client& rhs)
5102  {
5103  _body = rhs._body;
5104  return *this;
5105  }
5106 
5107  bool isValid()
5108  {
5109  return _body.isValid();
5110  }
5111 
5124  void setName(const std::string& name)
5125  {
5126  _body.get().setName(name);
5127  }
5128 
5131  const std::string& getName() const
5132  {
5133  return _body.get().getName();
5134  }
5135 
5139  const std::string& getNameHash() const
5140  {
5141  return _body.get().getNameHash();
5142  }
5143 
5147  const amps_uint64_t getNameHashValue() const
5148  {
5149  return _body.get().getNameHashValue();
5150  }
5151 
5158  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5159  {
5160  _body.get().setLogonCorrelationData(logonCorrelationData_);
5161  }
5162 
5165  const std::string& getLogonCorrelationData() const
5166  {
5167  return _body.get().getLogonCorrelationData();
5168  }
5169 
5178  size_t getServerVersion() const
5179  {
5180  return _body.get().getServerVersion();
5181  }
5182 
5189  VersionInfo getServerVersionInfo() const
5190  {
5191  return _body.get().getServerVersionInfo();
5192  }
5193 
5203  static size_t convertVersionToNumber(const std::string& version_)
5204  {
5205  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5206  }
5207 
5218  static size_t convertVersionToNumber(const char* data_, size_t len_)
5219  {
5220  return AMPS::convertVersionToNumber(data_, len_);
5221  }
5222 
5225  const std::string& getURI() const
5226  {
5227  return _body.get().getURI();
5228  }
5229 
5236 
5238 
5249  void connect(const std::string& uri)
5250  {
5251  _body.get().connect(uri);
5252  }
5253 
5256  void disconnect()
5257  {
5258  _body.get().disconnect();
5259  }
5260 
5274  void send(const Message& message)
5275  {
5276  _body.get().send(message);
5277  }
5278 
5287  void addMessageHandler(const Field& commandId_,
5288  const AMPS::MessageHandler& messageHandler_,
5289  unsigned requestedAcks_, bool isSubscribe_)
5290  {
5291  Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5292  _body.get().addMessageHandler(commandId_, messageHandler_,
5293  requestedAcks_, commandType);
5294  }
5295 
5304  void addMessageHandler(const Field& commandId_,
5305  const AMPS::MessageHandler& messageHandler_,
5306  unsigned requestedAcks_, Message::Command::Type commandType_)
5307  {
5308  _body.get().addMessageHandler(commandId_, messageHandler_,
5309  requestedAcks_, commandType_);
5310  }
5311 
5315  bool removeMessageHandler(const Field& commandId_)
5316  {
5317  return _body.get().removeMessageHandler(commandId_);
5318  }
5319 
5343  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5344  {
5345  return _body.get().send(messageHandler, message, timeout);
5346  }
5347 
5357  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5358  {
5359  _body.get().setDisconnectHandler(disconnectHandler);
5360  }
5361 
5365  DisconnectHandler getDisconnectHandler(void) const
5366  {
5367  return _body.get().getDisconnectHandler();
5368  }
5369 
5374  virtual ConnectionInfo getConnectionInfo() const
5375  {
5376  return _body.get().getConnectionInfo();
5377  }
5378 
5387  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5388  {
5389  _body.get().setBookmarkStore(bookmarkStore_);
5390  }
5391 
5396  {
5397  return _body.get().getBookmarkStore();
5398  }
5399 
5404  {
5405  return _body.get().getSubscriptionManager();
5406  }
5407 
5415  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5416  {
5417  _body.get().setSubscriptionManager(subscriptionManager_);
5418  }
5419 
5439  void setPublishStore(const Store& publishStore_)
5440  {
5441  _body.get().setPublishStore(publishStore_);
5442  }
5443 
5448  {
5449  return _body.get().getPublishStore();
5450  }
5451 
5455  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5456  {
5457  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5458  duplicateMessageHandler_);
5459  }
5460 
5471  {
5472  return _body.get().getDuplicateMessageHandler();
5473  }
5474 
5485  {
5486  _body.get().setFailedWriteHandler(handler_);
5487  }
5488 
5493  {
5494  return _body.get().getFailedWriteHandler();
5495  }
5496 
5497 
5515  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5516  {
5517  return _body.get().publish(topic_.c_str(), topic_.length(),
5518  data_.c_str(), data_.length());
5519  }
5520 
5540  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5541  const char* data_, size_t dataLength_)
5542  {
5543  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5544  }
5545 
5564  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5565  unsigned long expiration_)
5566  {
5567  return _body.get().publish(topic_.c_str(), topic_.length(),
5568  data_.c_str(), data_.length(), expiration_);
5569  }
5570 
5591  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5592  const char* data_, size_t dataLength_,
5593  unsigned long expiration_)
5594  {
5595  return _body.get().publish(topic_, topicLength_,
5596  data_, dataLength_, expiration_);
5597  }
5598 
5637  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5638  {
5639  _body.get().publishFlush(timeout_, ackType_);
5640  }
5641 
5642 
5658  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5659  {
5660  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5661  data_.c_str(), data_.length());
5662  }
5663 
5681  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5682  const char* data_, size_t dataLength_)
5683  {
5684  return _body.get().deltaPublish(topic_, topicLength_,
5685  data_, dataLength_);
5686  }
5687 
5704  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5705  unsigned long expiration_)
5706  {
5707  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5708  data_.c_str(), data_.length(),
5709  expiration_);
5710  }
5711 
5730  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5731  const char* data_, size_t dataLength_,
5732  unsigned long expiration_)
5733  {
5734  return _body.get().deltaPublish(topic_, topicLength_,
5735  data_, dataLength_, expiration_);
5736  }
5737 
5753  std::string logon(int timeout_ = 0,
5754  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5755  const char* options_ = NULL)
5756  {
5757  return _body.get().logon(timeout_, authenticator_, options_);
5758  }
5772  std::string logon(const char* options_, int timeout_ = 0)
5773  {
5774  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5775  options_);
5776  }
5777 
5791  std::string logon(const std::string& options_, int timeout_ = 0)
5792  {
5793  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5794  options_.c_str());
5795  }
5796 
5816  std::string subscribe(const MessageHandler& messageHandler_,
5817  const std::string& topic_,
5818  long timeout_ = 0,
5819  const std::string& filter_ = "",
5820  const std::string& options_ = "",
5821  const std::string& subId_ = "")
5822  {
5823  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5824  filter_, "", options_, subId_);
5825  }
5826 
5842  MessageStream subscribe(const std::string& topic_,
5843  long timeout_ = 0, const std::string& filter_ = "",
5844  const std::string& options_ = "",
5845  const std::string& subId_ = "")
5846  {
5847  MessageStream result(*this);
5848  if (_body.get().getDefaultMaxDepth())
5849  {
5850  result.maxDepth(_body.get().getDefaultMaxDepth());
5851  }
5852  result.setSubscription(_body.get().subscribe(
5853  result.operator MessageHandler(),
5854  topic_, timeout_, filter_, "",
5855  options_, subId_, false));
5856  return result;
5857  }
5858 
5874  MessageStream subscribe(const char* topic_,
5875  long timeout_ = 0, const std::string& filter_ = "",
5876  const std::string& options_ = "",
5877  const std::string& subId_ = "")
5878  {
5879  MessageStream result(*this);
5880  if (_body.get().getDefaultMaxDepth())
5881  {
5882  result.maxDepth(_body.get().getDefaultMaxDepth());
5883  }
5884  result.setSubscription(_body.get().subscribe(
5885  result.operator MessageHandler(),
5886  topic_, timeout_, filter_, "",
5887  options_, subId_, false));
5888  return result;
5889  }
5890 
5903  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5904  const std::string& topic_,
5905  long timeout_,
5906  const std::string& filter_ = "",
5907  const std::string& options_ = "",
5908  const std::string& subId_ = "")
5909  {
5910  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5911  filter_, "", options_, subId_);
5912  }
5921  MessageStream deltaSubscribe(const std::string& topic_,
5922  long timeout_, const std::string& filter_ = "",
5923  const std::string& options_ = "",
5924  const std::string& subId_ = "")
5925  {
5926  MessageStream result(*this);
5927  if (_body.get().getDefaultMaxDepth())
5928  {
5929  result.maxDepth(_body.get().getDefaultMaxDepth());
5930  }
5931  result.setSubscription(_body.get().deltaSubscribe(
5932  result.operator MessageHandler(),
5933  topic_, timeout_, filter_, "",
5934  options_, subId_, false));
5935  return result;
5936  }
5937 
5939  MessageStream deltaSubscribe(const char* topic_,
5940  long timeout_, const std::string& filter_ = "",
5941  const std::string& options_ = "",
5942  const std::string& subId_ = "")
5943  {
5944  MessageStream result(*this);
5945  if (_body.get().getDefaultMaxDepth())
5946  {
5947  result.maxDepth(_body.get().getDefaultMaxDepth());
5948  }
5949  result.setSubscription(_body.get().deltaSubscribe(
5950  result.operator MessageHandler(),
5951  topic_, timeout_, filter_, "",
5952  options_, subId_, false));
5953  return result;
5954  }
5955 
5981  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5982  const std::string& topic_,
5983  long timeout_,
5984  const std::string& bookmark_,
5985  const std::string& filter_ = "",
5986  const std::string& options_ = "",
5987  const std::string& subId_ = "")
5988  {
5989  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5990  filter_, bookmark_, options_, subId_);
5991  }
6009  MessageStream bookmarkSubscribe(const std::string& topic_,
6010  long timeout_,
6011  const std::string& bookmark_,
6012  const std::string& filter_ = "",
6013  const std::string& options_ = "",
6014  const std::string& subId_ = "")
6015  {
6016  MessageStream result(*this);
6017  if (_body.get().getDefaultMaxDepth())
6018  {
6019  result.maxDepth(_body.get().getDefaultMaxDepth());
6020  }
6021  result.setSubscription(_body.get().subscribe(
6022  result.operator MessageHandler(),
6023  topic_, timeout_, filter_,
6024  bookmark_, options_,
6025  subId_, false));
6026  return result;
6027  }
6028 
6030  MessageStream bookmarkSubscribe(const char* topic_,
6031  long timeout_,
6032  const std::string& bookmark_,
6033  const std::string& filter_ = "",
6034  const std::string& options_ = "",
6035  const std::string& subId_ = "")
6036  {
6037  MessageStream result(*this);
6038  if (_body.get().getDefaultMaxDepth())
6039  {
6040  result.maxDepth(_body.get().getDefaultMaxDepth());
6041  }
6042  result.setSubscription(_body.get().subscribe(
6043  result.operator MessageHandler(),
6044  topic_, timeout_, filter_,
6045  bookmark_, options_,
6046  subId_, false));
6047  return result;
6048  }
6049 
6058  void unsubscribe(const std::string& commandId)
6059  {
6060  return _body.get().unsubscribe(commandId);
6061  }
6062 
6071  {
6072  return _body.get().unsubscribe();
6073  }
6074 
6075 
6105  std::string sow(const MessageHandler& messageHandler_,
6106  const std::string& topic_,
6107  const std::string& filter_ = "",
6108  const std::string& orderBy_ = "",
6109  const std::string& bookmark_ = "",
6110  int batchSize_ = DEFAULT_BATCH_SIZE,
6111  int topN_ = DEFAULT_TOP_N,
6112  const std::string& options_ = "",
6113  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6114  {
6115  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6116  bookmark_, batchSize_, topN_, options_,
6117  timeout_);
6118  }
6143  MessageStream sow(const std::string& topic_,
6144  const std::string& filter_ = "",
6145  const std::string& orderBy_ = "",
6146  const std::string& bookmark_ = "",
6147  int batchSize_ = DEFAULT_BATCH_SIZE,
6148  int topN_ = DEFAULT_TOP_N,
6149  const std::string& options_ = "",
6150  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6151  {
6152  MessageStream result(*this);
6153  if (_body.get().getDefaultMaxDepth())
6154  {
6155  result.maxDepth(_body.get().getDefaultMaxDepth());
6156  }
6157  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6158  topic_, filter_, orderBy_, bookmark_,
6159  batchSize_, topN_, options_, timeout_));
6160  return result;
6161  }
6162 
6164  MessageStream sow(const char* topic_,
6165  const std::string& filter_ = "",
6166  const std::string& orderBy_ = "",
6167  const std::string& bookmark_ = "",
6168  int batchSize_ = DEFAULT_BATCH_SIZE,
6169  int topN_ = DEFAULT_TOP_N,
6170  const std::string& options_ = "",
6171  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6172  {
6173  MessageStream result(*this);
6174  if (_body.get().getDefaultMaxDepth())
6175  {
6176  result.maxDepth(_body.get().getDefaultMaxDepth());
6177  }
6178  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6179  topic_, filter_, orderBy_, bookmark_,
6180  batchSize_, topN_, options_, timeout_));
6181  return result;
6182  }
6205  std::string sow(const MessageHandler& messageHandler_,
6206  const std::string& topic_,
6207  long timeout_,
6208  const std::string& filter_ = "",
6209  int batchSize_ = DEFAULT_BATCH_SIZE,
6210  int topN_ = DEFAULT_TOP_N)
6211  {
6212  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6213  batchSize_, topN_);
6214  }
6237  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6238  const std::string& topic_,
6239  long timeout_,
6240  const std::string& filter_ = "",
6241  int batchSize_ = DEFAULT_BATCH_SIZE,
6242  bool oofEnabled_ = false,
6243  int topN_ = DEFAULT_TOP_N)
6244  {
6245  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6246  filter_, batchSize_, oofEnabled_,
6247  topN_);
6248  }
6249 
6269  MessageStream sowAndSubscribe(const std::string& topic_,
6270  long timeout_,
6271  const std::string& filter_ = "",
6272  int batchSize_ = DEFAULT_BATCH_SIZE,
6273  bool oofEnabled_ = false,
6274  int topN_ = DEFAULT_TOP_N)
6275  {
6276  MessageStream result(*this);
6277  if (_body.get().getDefaultMaxDepth())
6278  {
6279  result.maxDepth(_body.get().getDefaultMaxDepth());
6280  }
6281  result.setSubscription(_body.get().sowAndSubscribe(
6282  result.operator MessageHandler(),
6283  topic_, timeout_, filter_,
6284  batchSize_, oofEnabled_,
6285  topN_, false));
6286  return result;
6287  }
6307  MessageStream sowAndSubscribe(const char* topic_,
6308  long timeout_,
6309  const std::string& filter_ = "",
6310  int batchSize_ = DEFAULT_BATCH_SIZE,
6311  bool oofEnabled_ = false,
6312  int topN_ = DEFAULT_TOP_N)
6313  {
6314  MessageStream result(*this);
6315  if (_body.get().getDefaultMaxDepth())
6316  {
6317  result.maxDepth(_body.get().getDefaultMaxDepth());
6318  }
6319  result.setSubscription(_body.get().sowAndSubscribe(
6320  result.operator MessageHandler(),
6321  topic_, timeout_, filter_,
6322  batchSize_, oofEnabled_,
6323  topN_, false));
6324  return result;
6325  }
6326 
6327 
6355  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6356  const std::string& topic_,
6357  const std::string& filter_ = "",
6358  const std::string& orderBy_ = "",
6359  const std::string& bookmark_ = "",
6360  int batchSize_ = DEFAULT_BATCH_SIZE,
6361  int topN_ = DEFAULT_TOP_N,
6362  const std::string& options_ = "",
6363  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6364  {
6365  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6366  orderBy_, bookmark_, batchSize_,
6367  topN_, options_, timeout_);
6368  }
6369 
6394  MessageStream sowAndSubscribe(const std::string& topic_,
6395  const std::string& filter_ = "",
6396  const std::string& orderBy_ = "",
6397  const std::string& bookmark_ = "",
6398  int batchSize_ = DEFAULT_BATCH_SIZE,
6399  int topN_ = DEFAULT_TOP_N,
6400  const std::string& options_ = "",
6401  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6402  {
6403  MessageStream result(*this);
6404  if (_body.get().getDefaultMaxDepth())
6405  {
6406  result.maxDepth(_body.get().getDefaultMaxDepth());
6407  }
6408  result.setSubscription(_body.get().sowAndSubscribe(
6409  result.operator MessageHandler(),
6410  topic_, filter_, orderBy_,
6411  bookmark_, batchSize_, topN_,
6412  options_, timeout_, false));
6413  return result;
6414  }
6415 
6417  MessageStream sowAndSubscribe(const char* topic_,
6418  const std::string& filter_ = "",
6419  const std::string& orderBy_ = "",
6420  const std::string& bookmark_ = "",
6421  int batchSize_ = DEFAULT_BATCH_SIZE,
6422  int topN_ = DEFAULT_TOP_N,
6423  const std::string& options_ = "",
6424  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6425  {
6426  MessageStream result(*this);
6427  if (_body.get().getDefaultMaxDepth())
6428  {
6429  result.maxDepth(_body.get().getDefaultMaxDepth());
6430  }
6431  result.setSubscription(_body.get().sowAndSubscribe(
6432  result.operator MessageHandler(),
6433  topic_, filter_, orderBy_,
6434  bookmark_, batchSize_, topN_,
6435  options_, timeout_, false));
6436  return result;
6437  }
6438 
6463  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6464  const std::string& topic_,
6465  const std::string& filter_ = "",
6466  const std::string& orderBy_ = "",
6467  int batchSize_ = DEFAULT_BATCH_SIZE,
6468  int topN_ = DEFAULT_TOP_N,
6469  const std::string& options_ = "",
6470  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6471  {
6472  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6473  filter_, orderBy_, batchSize_,
6474  topN_, options_, timeout_);
6475  }
6496  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6497  const std::string& filter_ = "",
6498  const std::string& orderBy_ = "",
6499  int batchSize_ = DEFAULT_BATCH_SIZE,
6500  int topN_ = DEFAULT_TOP_N,
6501  const std::string& options_ = "",
6502  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6503  {
6504  MessageStream result(*this);
6505  if (_body.get().getDefaultMaxDepth())
6506  {
6507  result.maxDepth(_body.get().getDefaultMaxDepth());
6508  }
6509  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6510  result.operator MessageHandler(),
6511  topic_, filter_, orderBy_,
6512  batchSize_, topN_, options_,
6513  timeout_, false));
6514  return result;
6515  }
6516 
6519  const std::string& filter_ = "",
6520  const std::string& orderBy_ = "",
6521  int batchSize_ = DEFAULT_BATCH_SIZE,
6522  int topN_ = DEFAULT_TOP_N,
6523  const std::string& options_ = "",
6524  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6525  {
6526  MessageStream result(*this);
6527  if (_body.get().getDefaultMaxDepth())
6528  {
6529  result.maxDepth(_body.get().getDefaultMaxDepth());
6530  }
6531  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6532  result.operator MessageHandler(),
6533  topic_, filter_, orderBy_,
6534  batchSize_, topN_, options_,
6535  timeout_, false));
6536  return result;
6537  }
6538 
6563  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6564  const std::string& topic_,
6565  long timeout_,
6566  const std::string& filter_ = "",
6567  int batchSize_ = DEFAULT_BATCH_SIZE,
6568  bool oofEnabled_ = false,
6569  bool sendEmpties_ = false,
6570  int topN_ = DEFAULT_TOP_N)
6571  {
6572  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6573  timeout_, filter_, batchSize_,
6574  oofEnabled_, sendEmpties_,
6575  topN_);
6576  }
6577 
6599  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6600  long timeout_,
6601  const std::string& filter_ = "",
6602  int batchSize_ = DEFAULT_BATCH_SIZE,
6603  bool oofEnabled_ = false,
6604  bool sendEmpties_ = false,
6605  int topN_ = DEFAULT_TOP_N)
6606  {
6607  MessageStream result(*this);
6608  if (_body.get().getDefaultMaxDepth())
6609  {
6610  result.maxDepth(_body.get().getDefaultMaxDepth());
6611  }
6612  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6613  result.operator MessageHandler(),
6614  topic_, timeout_, filter_,
6615  batchSize_, oofEnabled_,
6616  sendEmpties_, topN_, false));
6617  return result;
6618  }
6641  long timeout_,
6642  const std::string& filter_ = "",
6643  int batchSize_ = DEFAULT_BATCH_SIZE,
6644  bool oofEnabled_ = false,
6645  bool sendEmpties_ = false,
6646  int topN_ = DEFAULT_TOP_N)
6647  {
6648  MessageStream result(*this);
6649  if (_body.get().getDefaultMaxDepth())
6650  {
6651  result.maxDepth(_body.get().getDefaultMaxDepth());
6652  }
6653  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6654  result.operator MessageHandler(),
6655  topic_, timeout_, filter_,
6656  batchSize_, oofEnabled_,
6657  sendEmpties_, topN_, false));
6658  return result;
6659  }
6679  std::string sowDelete(const MessageHandler& messageHandler,
6680  const std::string& topic,
6681  const std::string& filter,
6682  long timeout)
6683  {
6684  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6685  }
6702  Message sowDelete(const std::string& topic, const std::string& filter,
6703  long timeout = 0)
6704  {
6705  MessageStream stream(*this);
6706  char buf[Message::IdentifierLength + 1];
6707  buf[Message::IdentifierLength] = 0;
6708  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6709  Field cid(buf);
6710  try
6711  {
6712  stream.setStatsOnly(cid);
6713  _body.get().sowDelete(stream.operator MessageHandler(), topic, filter, timeout, cid);
6714  return *(stream.begin());
6715  }
6716  catch (const DisconnectedException&)
6717  {
6718  removeMessageHandler(cid);
6719  throw;
6720  }
6721  }
6722 
6727  void startTimer()
6728  {
6729  _body.get().startTimer();
6730  }
6731 
6738  std::string stopTimer(const MessageHandler& messageHandler)
6739  {
6740  return _body.get().stopTimer(messageHandler);
6741  }
6742 
6764  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6765  const std::string& topic_,
6766  const std::string& keys_,
6767  long timeout_ = 0)
6768  {
6769  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6770  }
6791  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6792  long timeout_ = 0)
6793  {
6794  MessageStream stream(*this);
6795  char buf[Message::IdentifierLength + 1];
6796  buf[Message::IdentifierLength] = 0;
6797  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6798  Field cid(buf);
6799  try
6800  {
6801  stream.setStatsOnly(cid);
6802  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6803  return *(stream.begin());
6804  }
6805  catch (const DisconnectedException&)
6806  {
6807  removeMessageHandler(cid);
6808  throw;
6809  }
6810  }
6811 
6826  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6827  const std::string& topic_, const std::string& data_,
6828  long timeout_ = 0)
6829  {
6830  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6831  }
6832 
6847  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
6848  long timeout_ = 0)
6849  {
6850  MessageStream stream(*this);
6851  char buf[Message::IdentifierLength + 1];
6852  buf[Message::IdentifierLength] = 0;
6853  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6854  Field cid(buf);
6855  try
6856  {
6857  stream.setStatsOnly(cid);
6858  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
6859  return *(stream.begin());
6860  }
6861  catch (const DisconnectedException&)
6862  {
6863  removeMessageHandler(cid);
6864  throw;
6865  }
6866  }
6867 
6872  {
6873  return _body.get().getHandle();
6874  }
6875 
6884  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6885  {
6886  _body.get().setExceptionListener(pListener_);
6887  }
6888 
6898  {
6899  _body.get().setExceptionListener(listener_);
6900  }
6901 
6905  {
6906  return _body.get().getExceptionListener();
6907  }
6908 
6916  // type of message) from the server for the specified interval (plus a grace period),
6930  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6931  {
6932  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6933  }
6934 
6942  // type of message) from the server for the specified interval (plus a grace period),
6954  void setHeartbeat(unsigned heartbeatTime_)
6955  {
6956  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6957  }
6958 
6961  {
6962  setLastChanceMessageHandler(messageHandler);
6963  }
6964 
6968  {
6969  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6970  messageHandler);
6971  }
6972 
6993  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6994  {
6995  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6996  }
6997 
7018  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7019  {
7020  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7021  }
7022 
7028  static const char* BOOKMARK_NOW()
7029  {
7030  return AMPS_BOOKMARK_NOW;
7031  }
7037  static const char* NOW()
7038  {
7039  return AMPS_BOOKMARK_NOW;
7040  }
7041 
7047  static const char* BOOKMARK_EPOCH()
7048  {
7049  return AMPS_BOOKMARK_EPOCH;
7050  }
7051 
7057  static const char* EPOCH()
7058  {
7059  return AMPS_BOOKMARK_EPOCH;
7060  }
7061 
7068  static const char* BOOKMARK_MOST_RECENT()
7069  {
7070  return AMPS_BOOKMARK_RECENT;
7071  }
7072 
7079  static const char* MOST_RECENT()
7080  {
7081  return AMPS_BOOKMARK_RECENT;
7082  }
7083 
7090  static const char* BOOKMARK_RECENT()
7091  {
7092  return AMPS_BOOKMARK_RECENT;
7093  }
7094 
7095 
7102  {
7103  _body.get().addConnectionStateListener(listener);
7104  }
7105 
7110  {
7111  _body.get().removeConnectionStateListener(listener);
7112  }
7113 
7117  {
7118  _body.get().clearConnectionStateListeners();
7119  }
7120 
7146  std::string executeAsync(Command& command_, MessageHandler handler_)
7147  {
7148  return _body.get().executeAsync(command_, handler_);
7149  }
7150 
7180  std::string executeAsyncNoResubscribe(Command& command_,
7181  MessageHandler handler_)
7182  {
7183  std::string id;
7184  try
7185  {
7186  if (command_.isSubscribe())
7187  {
7188  Message& message = command_.getMessage();
7189  Field subId = message.getSubscriptionId();
7190  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7191  if (useExistingHandler)
7192  {
7193  MessageHandler existingHandler;
7194  if (_body.get()._routes.getRoute(subId, existingHandler))
7195  {
7196  // we found an existing handler.
7197  _body.get().executeAsync(command_, existingHandler, false);
7198  return id; // empty string indicates existing
7199  }
7200  }
7201  }
7202  id = _body.get().executeAsync(command_, handler_, false);
7203  }
7204  catch (const DisconnectedException&)
7205  {
7206  removeMessageHandler(command_.getMessage().getCommandId());
7207  if (command_.isSubscribe())
7208  {
7209  removeMessageHandler(command_.getMessage().getSubscriptionId());
7210  }
7211  if (command_.isSow())
7212  {
7213  removeMessageHandler(command_.getMessage().getQueryID());
7214  }
7215  throw;
7216  }
7217  return id;
7218  }
7219 
7232  MessageStream execute(Command& command_);
7233 
7242  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7243  {
7244  _body.get().ack(topic_, bookmark_, options_);
7245  }
7246 
7254  void ack(Message& message_, const char* options_ = NULL)
7255  {
7256  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7257  }
7266  void ack(const std::string& topic_, const std::string& bookmark_,
7267  const char* options_ = NULL)
7268  {
7269  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7270  }
7271 
7277  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7278  {
7279  _body.get()._ack(topic_, bookmark_, options_);
7280  }
7290  void flushAcks(void)
7291  {
7292  _body.get().flushAcks();
7293  }
7294 
7299  bool getAutoAck(void) const
7300  {
7301  return _body.get().getAutoAck();
7302  }
7309  void setAutoAck(bool isAutoAckEnabled_)
7310  {
7311  _body.get().setAutoAck(isAutoAckEnabled_);
7312  }
7317  unsigned getAckBatchSize(void) const
7318  {
7319  return _body.get().getAckBatchSize();
7320  }
7327  void setAckBatchSize(const unsigned ackBatchSize_)
7328  {
7329  _body.get().setAckBatchSize(ackBatchSize_);
7330  }
7331 
7338  int getAckTimeout(void) const
7339  {
7340  return _body.get().getAckTimeout();
7341  }
7350  void setAckTimeout(const int ackTimeout_)
7351  {
7352  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7353  {
7354  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7355  }
7356  _body.get().setAckTimeout(ackTimeout_);
7357  }
7358 
7359 
7368  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7369  {
7370  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7371  }
7372 
7377  bool getRetryOnDisconnect(void) const
7378  {
7379  return _body.get().getRetryOnDisconnect();
7380  }
7381 
7386  void setDefaultMaxDepth(unsigned maxDepth_)
7387  {
7388  _body.get().setDefaultMaxDepth(maxDepth_);
7389  }
7390 
7395  unsigned getDefaultMaxDepth(void) const
7396  {
7397  return _body.get().getDefaultMaxDepth();
7398  }
7399 
7407  void* userData_)
7408  {
7409  return _body.get().setTransportFilterFunction(filter_, userData_);
7410  }
7411 
7421  void* userData_)
7422  {
7423  return _body.get().setThreadCreatedCallback(callback_, userData_);
7424  }
7425 
7431  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7432  {
7433  _body.get().deferredExecution(func_, userData_);
7434  }
7438  };
7439 
7440  inline void
7441  ClientImpl::lastChance(AMPS::Message& message)
7442  {
7443  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7444  }
7445 
7446  inline unsigned
7447  ClientImpl::persistedAck(AMPS::Message& message)
7448  {
7449  unsigned deliveries = 0;
7450  try
7451  {
7452  /*
7453  * Best Practice: If you don't care about the dupe acks that
7454  * occur during failover or rapid disconnect/reconnect, then just
7455  * ignore them. We could discard each duplicate from the
7456  * persisted store, but the storage costs of doing 1 record
7457  * discards is heavy. In most scenarios we'll just quickly blow
7458  * through the duplicates and get back to processing the
7459  * non-dupes.
7460  */
7461  const char* data = NULL;
7462  size_t len = 0;
7463  const char* status = NULL;
7464  size_t statusLen = 0;
7465  amps_handle messageHandle = message.getMessage();
7466  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7467  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7468  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7469  if (len == NotEntitled || len == Duplicate ||
7470  (statusLen == Failure && status[0] == 'f'))
7471  {
7472  if (_failedWriteHandler)
7473  {
7474  if (_publishStore.isValid())
7475  {
7476  amps_uint64_t sequence =
7477  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7478  FailedWriteStoreReplayer replayer(this, data, len);
7479  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7480  replayer, sequence));
7481  }
7482  else // Call the handler with what little we have
7483  {
7484  static Message emptyMessage;
7485  emptyMessage.setSequence(message.getSequence());
7486  AMPS_CALL_EXCEPTION_WRAPPER(
7487  _failedWriteHandler->failedWrite(emptyMessage,
7488  data, len));
7489  }
7490  ++deliveries;
7491  }
7492  }
7493  if (_publishStore.isValid())
7494  {
7495  // Ack for publisher will have sequence while
7496  // ack for bookmark subscribe won't
7497  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7498  AMPS_Sequence);
7499  if (seq > 0)
7500  {
7501  ++deliveries;
7502  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7503  }
7504  }
7505 
7506  if (!deliveries && _bookmarkStore.isValid())
7507  {
7508  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7509  &data, &len);
7510  if (len > 0)
7511  {
7512  Message::Field subId(data, len);
7513  const char* bookmarkData = NULL;
7514  size_t bookmarkLen = 0;
7515  amps_message_get_field_value(messageHandle,
7516  AMPS_Bookmark,
7517  &bookmarkData,
7518  &bookmarkLen);
7519  // Everything is there and not unsubscribed AC-912
7520  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7521  {
7522  ++deliveries;
7523  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7524  }
7525  }
7526  }
7527  }
7528  catch (std::exception& ex)
7529  {
7530  AMPS_UNHANDLED_EXCEPTION(ex);
7531  }
7532  return deliveries;
7533  }
7534 
7535  inline unsigned
7536  ClientImpl::processedAck(Message& message)
7537  {
7538  unsigned deliveries = 0;
7539  AckResponse ack;
7540  const char* data = NULL;
7541  size_t len = 0;
7542  amps_handle messageHandle = message.getMessage();
7543  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7544  Lock<Mutex> l(_lock);
7545  if (data && len)
7546  {
7547  Lock<Mutex> guard(_ackMapLock);
7548  AckMap::iterator i = _ackMap.find(std::string(data, len));
7549  if (i != _ackMap.end())
7550  {
7551  ++deliveries;
7552  ack = i->second;
7553  _ackMap.erase(i);
7554  }
7555  }
7556  if (deliveries)
7557  {
7558  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7559  ack.setStatus(data, len);
7560  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7561  ack.setReason(data, len);
7562  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7563  ack.setUsername(data, len);
7564  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7565  ack.setPassword(data, len);
7566  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7567  ack.setServerVersion(data, len);
7568  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7569  ack.setOptions(data, len);
7570  // This sets bookmark, nameHashValue, and sequenceNo
7571  ack.setBookmark(message.getBookmark());
7572  ack.setResponded();
7573  _lock.signalAll();
7574  }
7575  return deliveries;
7576  }
7577 
7578  inline void
7579  ClientImpl::checkAndSendHeartbeat(bool force)
7580  {
7581  if (force || _heartbeatTimer.check())
7582  {
7583  _heartbeatTimer.start();
7584  try
7585  {
7586  sendWithoutRetry(_beatMessage);
7587  }
7588  catch (const AMPSException&)
7589  {
7590  ;
7591  }
7592  }
7593  }
7594 
7595  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7596  {
7597  ConnectionInfo info;
7598  std::ostringstream writer;
7599 
7600  info["client.uri"] = _lastUri;
7601  info["client.name"] = _name;
7602  info["client.username"] = _username;
7603  if (_publishStore.isValid())
7604  {
7605  writer << _publishStore.unpersistedCount();
7606  info["publishStore.unpersistedCount"] = writer.str();
7607  writer.clear();
7608  writer.str("");
7609  }
7610 
7611  return info;
7612  }
7613 
7614  inline amps_result
7615  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7616  {
7617  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7618  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7619  ClientImpl* me = (ClientImpl*) userData_;
7620  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7621  if (!messageHandle_)
7622  {
7623  if (me->_queueAckTimeout)
7624  {
7625  me->checkQueueAcks();
7626  }
7627  me->checkAndSendHeartbeat();
7628  return AMPS_E_OK;
7629  }
7630 
7631  me->_readMessage.replace(messageHandle_);
7632  Message& message = me->_readMessage;
7633  Message::Command::Type commandType = message.getCommandEnum();
7634  if (commandType & SOWMask)
7635  {
7636 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7637  // A small cheat here to get the right handler, using knowledge of the
7638  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7639  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7640  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7641  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7642 #endif
7643  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7644  message.getQueryID()));
7645  }
7646  else if (commandType & PublishMask)
7647  {
7648 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7649  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7650  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7651  GlobalCommandTypeHandlers::Publish :
7652  GlobalCommandTypeHandlers::OOF)].invoke(message));
7653 #endif
7654  const char* subIds = NULL;
7655  size_t subIdsLen = 0;
7656  // Publish command, send to subscriptions
7657  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7658  &subIds, &subIdsLen);
7659  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7660  for (size_t i = 0; i < subIdCount; ++i)
7661  {
7662  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7663  MessageHandler& handler = lookupResult.handler;
7664  if (handler.isValid())
7665  {
7666  amps_message_set_field_value(messageHandle_,
7667  AMPS_SubscriptionId,
7668  subIds + lookupResult.idOffset,
7669  lookupResult.idLength);
7670  Message::Field bookmark = message.getBookmark();
7671  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7672  bool isAutoAck = me->_isAutoAckEnabled;
7673 
7674  if (!isMessageQueue && !bookmark.empty() &&
7675  me->_bookmarkStore.isValid())
7676  {
7677  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7678  {
7679  //Call duplicate message handler in handlers map
7680  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7681  {
7682  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7683  }
7684  }
7685  else
7686  {
7687  me->_bookmarkStore.log(me->_readMessage);
7688  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7689  handler.invoke(message));
7690  }
7691  }
7692  else
7693  {
7694  if (isMessageQueue && isAutoAck)
7695  {
7696  try
7697  {
7698  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7699  if (!message.getIgnoreAutoAck())
7700  {
7701  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7702  me->_ack(message.getTopic(), message.getBookmark()));
7703  }
7704  }
7705  catch (std::exception& ex)
7706  {
7707  if (!message.getIgnoreAutoAck())
7708  {
7709  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7710  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7711  }
7712  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7713  }
7714  }
7715  else
7716  {
7717  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7718  handler.invoke(message));
7719  }
7720  }
7721  }
7722  else
7723  {
7724  me->lastChance(message);
7725  }
7726  } // for (subidsEnd)
7727  }
7728  else if (commandType == Message::Command::Ack)
7729  {
7730  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7731  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7732  unsigned ackType = message.getAckTypeEnum();
7733  unsigned deliveries = 0U;
7734  switch (ackType)
7735  {
7736  case Message::AckType::Persisted:
7737  deliveries += me->persistedAck(message);
7738  break;
7739  case Message::AckType::Processed: // processed
7740  deliveries += me->processedAck(message);
7741  break;
7742  }
7743  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7744  if (deliveries == 0)
7745  {
7746  me->lastChance(message);
7747  }
7748  }
7749  else if (commandType == Message::Command::Heartbeat)
7750  {
7751  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7752  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7753  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7754  {
7755  me->checkAndSendHeartbeat(true);
7756  }
7757  else
7758  {
7759  me->lastChance(message);
7760  }
7761  return AMPS_E_OK;
7762  }
7763  else if (!message.getCommandId().empty())
7764  {
7765  unsigned deliveries = 0U;
7766  try
7767  {
7768  while (me->_connected) // Keep sending heartbeats when stream is full
7769  {
7770  try
7771  {
7772  deliveries = me->_routes.deliverData(message, message.getCommandId());
7773  break;
7774  }
7775 #ifdef _WIN32
7776  catch (MessageStreamFullException&)
7777 #else
7778  catch (MessageStreamFullException& ex_)
7779 #endif
7780  {
7781  try
7782  {
7783  me->checkAndSendHeartbeat(false);
7784  }
7785 #ifdef _WIN32
7786  catch (std::exception&)
7787 #else
7788  catch (std::exception& ex_)
7789 #endif
7790  {
7791  ;
7792  }
7793  }
7794  }
7795  }
7796  catch (std::exception& ex_)
7797  {
7798  try
7799  {
7800  me->_exceptionListener->exceptionThrown(ex_);
7801  }
7802  catch (...)
7803  {
7804  ;
7805  }
7806  }
7807  if (deliveries == 0)
7808  {
7809  me->lastChance(message);
7810  }
7811  }
7812  me->checkAndSendHeartbeat();
7813  return AMPS_E_OK;
7814  }
7815 
7816  inline void
7817  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7818  {
7819  ClientImpl* me = (ClientImpl*) userData;
7820  //Client wrapper(me);
7821  // Go ahead and signal any waiters if they are around...
7822  me->clearAcks(failedConnectionVersion);
7823  }
7824 
7825  inline amps_result
7826  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7827  {
7828  ClientImpl* me = (ClientImpl*) userData;
7829  Lock<Mutex> l(me->_lock);
7830  Client wrapper(me, false);
7831  if (me->_connected)
7832  {
7833  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7834  }
7835  while (true)
7836  {
7837  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7838  bool retryInProgress = false;
7839  try
7840  {
7841  me->_connected = false;
7842  me->_lock.signalAll();
7843  // Have to release the lock here or receive thread can't
7844  // invoke the message handler.
7845  Unlock<Mutex> unlock(me->_lock);
7846  me->_disconnectHandler.invoke(wrapper);
7847  }
7848 #ifdef _WIN32
7849  catch (const RetryOperationException&)
7850 #else
7851  catch (const RetryOperationException& ex)
7852 #endif
7853  {
7854  retryInProgress = true;
7855  }
7856  catch (const std::exception& ex)
7857  {
7858  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7859  }
7860  me->_lock.signalAll();
7861 
7862  if (!me->_connected)
7863  {
7864  if (retryInProgress)
7865  {
7866  AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException("Reconnect in progress."));
7867  }
7868  else
7869  {
7870  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7871  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
7872  }
7873  return AMPS_E_DISCONNECTED;
7874  }
7875  try
7876  {
7877  // Resubscribe
7878  if (me->_subscriptionManager)
7879  {
7880  {
7881  // Have to release the lock here or receive thread can't
7882  // invoke the message handler.
7883  Unlock<Mutex> unlock(me->_lock);
7884  me->_subscriptionManager->resubscribe(wrapper);
7885  }
7886  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7887  }
7888  return AMPS_E_OK;
7889  }
7890  catch (const AMPSException& subEx)
7891  {
7892  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7893  }
7894  catch (const std::exception& subEx)
7895  {
7896  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7897  return AMPS_E_RETRY;
7898  }
7899  catch (...)
7900  {
7901  return AMPS_E_RETRY;
7902  }
7903  }
7904  return AMPS_E_RETRY;
7905  }
7906 
7907  class FIX
7908  {
7909  const char* _data;
7910  size_t _len;
7911  char _fieldSep;
7912  public:
7913  class iterator
7914  {
7915  const char* _data;
7916  size_t _len;
7917  size_t _pos;
7918  char _fieldSep;
7919  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
7920  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7921  {
7922  while (_pos != _len && _data[_pos] == _fieldSep)
7923  {
7924  ++_pos;
7925  }
7926  }
7927  public:
7928  typedef void* difference_type;
7929  typedef std::forward_iterator_tag iterator_category;
7930  typedef std::pair<Message::Field, Message::Field> value_type;
7931  typedef value_type* pointer;
7932  typedef value_type& reference;
7933  bool operator==(const iterator& rhs) const
7934  {
7935  return _pos == rhs._pos;
7936  }
7937  bool operator!=(const iterator& rhs) const
7938  {
7939  return _pos != rhs._pos;
7940  }
7941  iterator& operator++()
7942  {
7943  // Skip through the data
7944  while (_pos != _len && _data[_pos] != _fieldSep)
7945  {
7946  ++_pos;
7947  }
7948  // Skip through any field separators
7949  while (_pos != _len && _data[_pos] == _fieldSep)
7950  {
7951  ++_pos;
7952  }
7953  return *this;
7954  }
7955 
7956  value_type operator*() const
7957  {
7958  value_type result;
7959  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7960  for (; i < _len && _data[i] != '='; ++i)
7961  {
7962  ++keyLength;
7963  }
7964 
7965  result.first.assign(_data + _pos, keyLength);
7966 
7967  if (i < _len && _data[i] == '=')
7968  {
7969  ++i;
7970  valueStart = i;
7971  for (; i < _len && _data[i] != _fieldSep; ++i)
7972  {
7973  valueLength++;
7974  }
7975  }
7976  result.second.assign(_data + valueStart, valueLength);
7977  return result;
7978  }
7979 
7980  friend class FIX;
7981  };
7982  class reverse_iterator
7983  {
7984  const char* _data;
7985  size_t _len;
7986  const char* _pos;
7987  char _fieldSep;
7988  public:
7989  typedef std::pair<Message::Field, Message::Field> value_type;
7990  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7991  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7992  {
7993  if (_pos)
7994  {
7995  // skip past meaningless trailing fieldseps
7996  while (_pos >= _data && *_pos == _fieldSep)
7997  {
7998  --_pos;
7999  }
8000  while (_pos > _data && *_pos != _fieldSep)
8001  {
8002  --_pos;
8003  }
8004  // if we stopped before the 0th character, it's because
8005  // it's a field sep. advance one to point to the first character
8006  // of a key.
8007  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8008  {
8009  ++_pos;
8010  }
8011  if (_pos < _data)
8012  {
8013  _pos = 0;
8014  }
8015  }
8016  }
8017  bool operator==(const reverse_iterator& rhs) const
8018  {
8019  return _pos == rhs._pos;
8020  }
8021  bool operator!=(const reverse_iterator& rhs) const
8022  {
8023  return _pos != rhs._pos;
8024  }
8025  reverse_iterator& operator++()
8026  {
8027  if (_pos == _data)
8028  {
8029  _pos = 0;
8030  }
8031  else
8032  {
8033  // back up 1 to a field separator
8034  --_pos;
8035  // keep backing up through field separators
8036  while (_pos >= _data && *_pos == _fieldSep)
8037  {
8038  --_pos;
8039  }
8040  // now back up to the beginning of this field
8041  while (_pos > _data && *_pos != _fieldSep)
8042  {
8043  --_pos;
8044  }
8045  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8046  {
8047  ++_pos;
8048  }
8049  if (_pos < _data)
8050  {
8051  _pos = 0;
8052  }
8053  }
8054  return *this;
8055  }
8056  value_type operator*() const
8057  {
8058  value_type result;
8059  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8060  size_t i = (size_t)(_pos - _data);
8061  for (; i < _len && _data[i] != '='; ++i)
8062  {
8063  ++keyLength;
8064  }
8065  result.first.assign(_pos, keyLength);
8066  if (i < _len && _data[i] == '=')
8067  {
8068  ++i;
8069  valueStart = i;
8070  for (; i < _len && _data[i] != _fieldSep; ++i)
8071  {
8072  valueLength++;
8073  }
8074  }
8075  result.second.assign(_data + valueStart, valueLength);
8076  return result;
8077  }
8078  };
8079  FIX(const Message::Field& data, char fieldSeparator = 1)
8080  : _data(data.data()), _len(data.len()),
8081  _fieldSep(fieldSeparator)
8082  {
8083  }
8084 
8085  FIX(const char* data, size_t len, char fieldSeparator = 1)
8086  : _data(data), _len(len), _fieldSep(fieldSeparator)
8087  {
8088  }
8089 
8090  iterator begin() const
8091  {
8092  return iterator(_data, _len, 0, _fieldSep);
8093  }
8094  iterator end() const
8095  {
8096  return iterator(_data, _len, _len, _fieldSep);
8097  }
8098 
8099 
8100  reverse_iterator rbegin() const
8101  {
8102  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8103  }
8104 
8105  reverse_iterator rend() const
8106  {
8107  return reverse_iterator(_data, _len, 0, _fieldSep);
8108  }
8109  };
8110 
8111 
8124 
8125  template <class T>
8127  {
8128  std::stringstream _data;
8129  char _fs;
8130  public:
8136  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8137 
8145  void append(const T& tag, const char* value, size_t offset, size_t length)
8146  {
8147  _data << tag << '=';
8148  _data.write(value + offset, (std::streamsize)length);
8149  _data << _fs;
8150  }
8156  void append(const T& tag, const std::string& value)
8157  {
8158  _data << tag << '=' << value << _fs;
8159  }
8160 
8163  std::string getString() const
8164  {
8165  return _data.str();
8166  }
8167  operator std::string() const
8168  {
8169  return _data.str();
8170  }
8171 
8173  void reset()
8174  {
8175  _data.str(std::string());
8176  }
8177  };
8178 
8182 
8184 
8188 
8190 
8191 
8199 
8201  {
8202  char _fs;
8203  public:
8208  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8209 
8212  typedef std::map<Message::Field, Message::Field> map_type;
8213 
8219  map_type toMap(const Message::Field& data)
8220  {
8221  FIX fix(data, _fs);
8222  map_type retval;
8223  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8224  {
8225  retval.insert(*a);
8226  }
8227 
8228  return retval;
8229  }
8230  };
8231 
8232 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8233  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8234  {
8235  Mutex _lock;
8236  std::deque<Message> _q;
8237  std::deque<Message> _cache;
8238  std::string _commandId;
8239  std::string _subId;
8240  std::string _queryId;
8241  Client _client;
8242  unsigned _timeout;
8243  unsigned _maxDepth;
8244  unsigned _requestedAcks;
8245  size_t _cacheMax;
8246  Message::Field _previousTopic;
8247  Message::Field _previousBookmark;
8248  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8249 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8250  std::atomic<State> _state;
8251 #else
8252  volatile State _state;
8253 #endif
8254  typedef std::map<std::string, Message*> SOWKeyMap;
8255  SOWKeyMap _sowKeyMap;
8256  public:
8257  MessageStreamImpl(const Client& client_)
8258  : _client(client_),
8259  _timeout(0),
8260  _maxDepth((unsigned)~0),
8261  _requestedAcks(0),
8262  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8263  _state(Unset)
8264  {
8265  if (_client.isValid())
8266  {
8267  _client.addConnectionStateListener(this);
8268  }
8269  }
8270 
8271  MessageStreamImpl(ClientImpl* client_)
8272  : _client(client_),
8273  _timeout(0),
8274  _maxDepth((unsigned)~0),
8275  _requestedAcks(0),
8276  _state(Unset)
8277  {
8278  if (_client.isValid())
8279  {
8280  _client.addConnectionStateListener(this);
8281  }
8282  }
8283 
8284  ~MessageStreamImpl()
8285  {
8286  }
8287 
8288  virtual void destroy()
8289  {
8290  try
8291  {
8292  close();
8293  }
8294  catch (std::exception& e)
8295  {
8296  try
8297  {
8298  if (_client.isValid())
8299  {
8300  _client.getExceptionListener().exceptionThrown(e);
8301  }
8302  }
8303  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8304  }
8305  if (_client.isValid())
8306  {
8307  _client.removeConnectionStateListener(this);
8308  Client c = _client;
8309  _client = Client((ClientImpl*)NULL);
8310  c.deferredExecution(MessageStreamImpl::destroyer, this);
8311  }
8312  else
8313  {
8314  delete this;
8315  }
8316  }
8317 
8318  static void destroyer(void* vpMessageStreamImpl_)
8319  {
8320  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8321  }
8322 
8323  void setSubscription(const std::string& subId_,
8324  const std::string& commandId_ = "",
8325  const std::string& queryId_ = "")
8326  {
8327  Lock<Mutex> lock(_lock);
8328  _subId = subId_;
8329  if (!commandId_.empty() && commandId_ != subId_)
8330  {
8331  _commandId = commandId_;
8332  }
8333  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8334  {
8335  _queryId = queryId_;
8336  }
8337  // It's possible to disconnect between creation/registration and here.
8338  if (Disconnected == _state)
8339  {
8340  return;
8341  }
8342  assert(Unset == _state);
8343  _state = Subscribe;
8344  }
8345 
8346  void setSOWOnly(const std::string& commandId_,
8347  const std::string& queryId_ = "")
8348  {
8349  Lock<Mutex> lock(_lock);
8350  _commandId = commandId_;
8351  if (!queryId_.empty() && queryId_ != commandId_)
8352  {
8353  _queryId = queryId_;
8354  }
8355  // It's possible to disconnect between creation/registration and here.
8356  if (Disconnected == _state)
8357  {
8358  return;
8359  }
8360  assert(Unset == _state);
8361  _state = SOWOnly;
8362  }
8363 
8364  void setStatsOnly(const std::string& commandId_,
8365  const std::string& queryId_ = "")
8366  {
8367  Lock<Mutex> lock(_lock);
8368  _commandId = commandId_;
8369  if (!queryId_.empty() && queryId_ != commandId_)
8370  {
8371  _queryId = queryId_;
8372  }
8373  // It's possible to disconnect between creation/registration and here.
8374  if (Disconnected == _state)
8375  {
8376  return;
8377  }
8378  assert(Unset == _state);
8379  _state = AcksOnly;
8380  _requestedAcks = Message::AckType::Stats;
8381  }
8382 
8383  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8384  {
8385  Lock<Mutex> lock(_lock);
8386  _commandId = commandId_;
8387  // It's possible to disconnect between creation/registration and here.
8388  if (Disconnected == _state)
8389  {
8390  return;
8391  }
8392  assert(Unset == _state);
8393  _state = AcksOnly;
8394  _requestedAcks = acks_;
8395  }
8396 
8397  void connectionStateChanged(ConnectionStateListener::State state_)
8398  {
8399  Lock<Mutex> lock(_lock);
8400  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8401  {
8402  _state = Disconnected;
8403  close();
8404  }
8405  else if (state_ == AMPS::ConnectionStateListener::Connected
8406  && _commandId.empty()
8407  && _subId.empty()
8408  && _queryId.empty())
8409  {
8410  // AC-1331 Reconnect before command was sent, so Unset
8411  _state = Unset;
8412  }
8413  _lock.signalAll();
8414  }
8415 
8416  void timeout(unsigned timeout_)
8417  {
8418  _timeout = timeout_;
8419  }
8420  void conflate(void)
8421  {
8422  if (_state == Subscribe)
8423  {
8424  _state = Conflate;
8425  }
8426  }
8427  void maxDepth(unsigned maxDepth_)
8428  {
8429  if (maxDepth_)
8430  {
8431  _maxDepth = maxDepth_;
8432  }
8433  else
8434  {
8435  _maxDepth = (unsigned)~0;
8436  }
8437  }
8438  unsigned getMaxDepth(void) const
8439  {
8440  return _maxDepth;
8441  }
8442  unsigned getDepth(void) const
8443  {
8444  return (unsigned)(_q.size());
8445  }
8446 
8447  bool next(Message& current_)
8448  {
8449  Lock<Mutex> lock(_lock);
8450  if (!_previousTopic.empty() && !_previousBookmark.empty())
8451  {
8452  try
8453  {
8454  if (_client.isValid())
8455  {
8456  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8457  }
8458  }
8459 #ifdef _WIN32
8460  catch (AMPSException&)
8461 #else
8462  catch (AMPSException& e)
8463 #endif
8464  {
8465  current_.invalidate();
8466  _previousTopic.clear();
8467  _previousBookmark.clear();
8468  return false;
8469  }
8470  _previousTopic.clear();
8471  _previousBookmark.clear();
8472  }
8473  // Don't wait to wait more than 1s at a time
8474  long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8475  Timer timer((double)_timeout);
8476  timer.start();
8477  while (_q.empty() && _state & Running)
8478  {
8479  // Using timeout so python can interrupt
8480  _lock.wait(minWaitTime);
8481  {
8482  Unlock<Mutex> unlck(_lock);
8483  amps_invoke_waiting_function();
8484  }
8485  if (_timeout)
8486  {
8487  // In case we woke up early, see how much longer to wait
8488  if (timer.checkAndGetRemaining(&minWaitTime))
8489  {
8490  // No time left
8491  break;
8492  }
8493  // Adjust next wait time
8494  minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8495  }
8496  }
8497  if (current_.isValid() && _cache.size() < _cacheMax)
8498  {
8499  current_.reset();
8500  _cache.push_back(current_);
8501  }
8502  if (!_q.empty())
8503  {
8504  current_ = _q.front();
8505  if (_q.size() == _maxDepth)
8506  {
8507  _lock.signalAll();
8508  }
8509  _q.pop_front();
8510  if (_state == Conflate)
8511  {
8512  std::string sowKey = current_.getSowKey();
8513  if (sowKey.length())
8514  {
8515  _sowKeyMap.erase(sowKey);
8516  }
8517  }
8518  else if (_state == AcksOnly)
8519  {
8520  _requestedAcks &= ~(current_.getAckTypeEnum());
8521  }
8522  if ((_state == AcksOnly && _requestedAcks == 0) ||
8523  (_state == SOWOnly && current_.getCommand() == "group_end"))
8524  {
8525  _state = Closed;
8526  }
8527  else if (current_.isValid()
8528  && current_.getCommandEnum() == Message::Command::Publish
8529  && _client.isValid() && _client.getAutoAck()
8530  && !current_.getLeasePeriod().empty()
8531  && !current_.getBookmark().empty())
8532  {
8533  _previousTopic = current_.getTopic().deepCopy();
8534  _previousBookmark = current_.getBookmark().deepCopy();
8535  }
8536  return true;
8537  }
8538  if (_state == Disconnected)
8539  {
8540  throw DisconnectedException("Connection closed.");
8541  }
8542  current_.invalidate();
8543  if (_state == Closed)
8544  {
8545  return false;
8546  }
8547  return _timeout != 0;
8548  }
8549  void close(void)
8550  {
8551  if (_client.isValid())
8552  {
8553  if (_state == SOWOnly || _state == Subscribe) //not delete
8554  {
8555  if (!_commandId.empty())
8556  {
8557  _client.unsubscribe(_commandId);
8558  }
8559  if (!_subId.empty())
8560  {
8561  _client.unsubscribe(_subId);
8562  }
8563  if (!_queryId.empty())
8564  {
8565  _client.unsubscribe(_queryId);
8566  }
8567  }
8568  else
8569  {
8570  if (!_commandId.empty())
8571  {
8572  _client.removeMessageHandler(_commandId);
8573  }
8574  if (!_subId.empty())
8575  {
8576  _client.removeMessageHandler(_subId);
8577  }
8578  if (!_queryId.empty())
8579  {
8580  _client.removeMessageHandler(_queryId);
8581  }
8582  }
8583  }
8584  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8585  {
8586  _state = Closed;
8587  }
8588  }
8589  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8590  {
8591  Lock<Mutex> lock(this_->_lock);
8592  if (this_->_state != Conflate)
8593  {
8594  AMPS_TESTING_SLOW_MESSAGE_STREAM
8595  if (this_->_q.size() >= this_->_maxDepth)
8596  {
8597  // We throw here so that heartbeats can be sent. The exception
8598  // will be handled internally only, and the same Message will
8599  // come back to try again. Make sure to signal.
8600  this_->_lock.signalAll();
8601  throw MessageStreamFullException("Stream is currently full.");
8602  }
8603  if (!this_->_cache.empty())
8604  {
8605  this_->_cache.front().deepCopy(message_);
8606  this_->_q.push_back(this_->_cache.front());
8607  this_->_cache.pop_front();
8608  }
8609  else
8610  {
8611 #ifdef AMPS_USE_EMPLACE
8612  this_->_q.emplace_back(message_.deepCopy());
8613 #else
8614  this_->_q.push_back(message_.deepCopy());
8615 #endif
8616  }
8617  if (message_.getCommandEnum() == Message::Command::Publish &&
8618  this_->_client.isValid() && this_->_client.getAutoAck() &&
8619  !message_.getLeasePeriod().empty() &&
8620  !message_.getBookmark().empty())
8621  {
8622  message_.setIgnoreAutoAck();
8623  }
8624  }
8625  else
8626  {
8627  std::string sowKey = message_.getSowKey();
8628  if (sowKey.length())
8629  {
8630  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8631  if (it != this_->_sowKeyMap.end())
8632  {
8633  it->second->deepCopy(message_);
8634  }
8635  else
8636  {
8637  if (this_->_q.size() >= this_->_maxDepth)
8638  {
8639  // We throw here so that heartbeats can be sent. The
8640  // exception will be handled internally only, and the
8641  // same Message will come back to try again. Make sure
8642  // to signal.
8643  this_->_lock.signalAll();
8644  throw MessageStreamFullException("Stream is currently full.");
8645  }
8646  if (!this_->_cache.empty())
8647  {
8648  this_->_cache.front().deepCopy(message_);
8649  this_->_q.push_back(this_->_cache.front());
8650  this_->_cache.pop_front();
8651  }
8652  else
8653  {
8654 #ifdef AMPS_USE_EMPLACE
8655  this_->_q.emplace_back(message_.deepCopy());
8656 #else
8657  this_->_q.push_back(message_.deepCopy());
8658 #endif
8659  }
8660  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8661  }
8662  }
8663  else
8664  {
8665  if (this_->_q.size() >= this_->_maxDepth)
8666  {
8667  // We throw here so that heartbeats can be sent. The exception
8668  // will be handled internally only, and the same Message will
8669  // come back to try again. Make sure to signal.
8670  this_->_lock.signalAll();
8671  throw MessageStreamFullException("Stream is currently full.");
8672  }
8673  if (!this_->_cache.empty())
8674  {
8675  this_->_cache.front().deepCopy(message_);
8676  this_->_q.push_back(this_->_cache.front());
8677  this_->_cache.pop_front();
8678  }
8679  else
8680  {
8681 #ifdef AMPS_USE_EMPLACE
8682  this_->_q.emplace_back(message_.deepCopy());
8683 #else
8684  this_->_q.push_back(message_.deepCopy());
8685 #endif
8686  }
8687  if (message_.getCommandEnum() == Message::Command::Publish &&
8688  this_->_client.isValid() && this_->_client.getAutoAck() &&
8689  !message_.getLeasePeriod().empty() &&
8690  !message_.getBookmark().empty())
8691  {
8692  message_.setIgnoreAutoAck();
8693  }
8694  }
8695  }
8696  this_->_lock.signalAll();
8697  }
8698  };
8699  inline MessageStream::MessageStream(void)
8700  {
8701  }
8702  inline MessageStream::MessageStream(const Client& client_)
8703  : _body(new MessageStreamImpl(client_))
8704  {
8705  }
8706  inline void MessageStream::iterator::advance(void)
8707  {
8708  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8709  }
8710  inline MessageStream::operator MessageHandler(void)
8711  {
8712  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8713  }
8714  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8715  {
8716  MessageStream result;
8717  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8718  {
8719  result._body = (MessageStreamImpl*)(handler_._userData);
8720  }
8721  return result;
8722  }
8723 
8724  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8725  const std::string& queryId_)
8726  {
8727  _body->setSOWOnly(commandId_, queryId_);
8728  }
8729  inline void MessageStream::setSubscription(const std::string& subId_,
8730  const std::string& commandId_,
8731  const std::string& queryId_)
8732  {
8733  _body->setSubscription(subId_, commandId_, queryId_);
8734  }
8735  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8736  const std::string& queryId_)
8737  {
8738  _body->setStatsOnly(commandId_, queryId_);
8739  }
8740  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8741  unsigned acks_)
8742  {
8743  _body->setAcksOnly(commandId_, acks_);
8744  }
8745  inline MessageStream MessageStream::timeout(unsigned timeout_)
8746  {
8747  _body->timeout(timeout_);
8748  return *this;
8749  }
8751  {
8752  _body->conflate();
8753  return *this;
8754  }
8755  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8756  {
8757  _body->maxDepth(maxDepth_);
8758  return *this;
8759  }
8760  inline unsigned MessageStream::getMaxDepth(void) const
8761  {
8762  return _body->getMaxDepth();
8763  }
8764  inline unsigned MessageStream::getDepth(void) const
8765  {
8766  return _body->getDepth();
8767  }
8768 
8769  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8770  {
8771  return *(_pEmptyMessageStream.get());
8772  }
8773 
8775  {
8776  // If the command is sow and has a sub_id, OR
8777  // if the command has a replace option, return the existing
8778  // messagestream, don't create a new one.
8779  ClientImpl& body = _body.get();
8780  Message& message = command_.getMessage();
8781  Field subId = message.getSubscriptionId();
8782  unsigned ackTypes = message.getAckTypeEnum();
8783  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8784  if (useExistingHandler)
8785  {
8786  // Try to find the existing message handler.
8787  if (!subId.empty())
8788  {
8789  MessageHandler existingHandler;
8790  if (body._routes.getRoute(subId, existingHandler))
8791  {
8792  // we found an existing handler. It might not be a message stream, but that's okay.
8793  body.executeAsync(command_, existingHandler, false);
8794  return MessageStream::fromExistingHandler(existingHandler);
8795  }
8796  }
8797  // fall through; we'll a new handler altogether.
8798  }
8799  // Make sure something will be returned to the stream or use the empty one
8800  // Check that: it's a command that doesn't normally return data, and there
8801  // are no acks requested for the cmd id
8802  Message::Command::Type command = message.getCommandEnum();
8803  if ((command & Message::Command::NoDataCommands)
8804  && (ackTypes == Message::AckType::Persisted
8805  || ackTypes == Message::AckType::None))
8806  {
8807  executeAsync(command_, MessageHandler());
8808  if (!body._pEmptyMessageStream)
8809  {
8810  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
8811  body._pEmptyMessageStream.get()->_body->close();
8812  }
8813  return body.getEmptyMessageStream();
8814  }
8815  MessageStream stream(*this);
8816  if (body.getDefaultMaxDepth())
8817  {
8818  stream.maxDepth(body.getDefaultMaxDepth());
8819  }
8820  MessageHandler handler = stream.operator MessageHandler();
8821  std::string commandID = body.executeAsync(command_, handler, false);
8822  if (command_.hasStatsAck())
8823  {
8824  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
8825  }
8826  else if (command_.isSow())
8827  {
8828  if (command_.getAckTypeEnum() & Message::AckType::Completed)
8829  {
8830  stream.setAcksOnly(commandID,
8831  ackTypes);
8832  }
8833  else
8834  {
8835  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
8836  }
8837  }
8838  else if (command_.isSubscribe())
8839  {
8840  stream.setSubscription(commandID,
8841  command_.getMessage().getCommandId(),
8842  command_.getMessage().getQueryId());
8843  }
8844  else
8845  {
8846  // Persisted acks for writes don't come back with command id
8847  if (command == Message::Command::Publish ||
8848  command == Message::Command::DeltaPublish ||
8849  command == Message::Command::SOWDelete)
8850  {
8851  stream.setAcksOnly(commandID,
8852  ackTypes & (unsigned)~Message::AckType::Persisted);
8853  }
8854  else
8855  {
8856  stream.setAcksOnly(commandID, ackTypes);
8857  }
8858  }
8859  return stream;
8860  }
8861 
8862 // This is here because it uses api from Client.
8863  inline void Message::ack(const char* options_) const
8864  {
8865  ClientImpl* pClient = _body.get().clientImpl();
8866  Message::Field bookmark = getBookmark();
8867  if (pClient && bookmark.len() &&
8868  !pClient->getAutoAck())
8869  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
8870  {
8871  pClient->ack(getTopic(), bookmark, options_);
8872  }
8873  }
8874 }// end namespace AMPS
8875 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:747
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:698
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5086
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6764
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6738
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:945
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5315
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8126
void startTimer()
Definition: ampsplusplus.hpp:6727
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6269
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1089
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8750
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5343
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:559
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:758
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7018
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:923
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7386
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5147
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5203
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6058
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:788
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1061
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:672
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7395
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7254
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6009
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5439
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1338
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5704
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5218
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:724
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5455
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7109
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7338
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5189
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6791
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:567
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1309
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8163
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7406
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5591
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4990
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:659
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7037
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:901
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7047
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5274
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5981
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1152
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7368
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8760
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5249
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5304
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1453
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6463
Success.
Definition: amps.h:221
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1035
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5681
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8208
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6105
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5287
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4983
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5492
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8774
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:642
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5447
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7420
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:950
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5178
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1497
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:828
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5753
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1242
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:691
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5139
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1097
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6417
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5069
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6993
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:717
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7028
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7101
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:582
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:850
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7116
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5540
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:665
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5403
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5816
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:574
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6904
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1052
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1493
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6826
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:796
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1263
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1047
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5921
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7057
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:591
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:809
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5772
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1029
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5158
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8136
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7266
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1042
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1083
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6307
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1183
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8145
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1323
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5395
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1280
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7327
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1373
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6702
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1481
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5387
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:704
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6897
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:599
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5415
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6960
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8156
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7317
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5658
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6394
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:881
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4945
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6640
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1272
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:836
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5637
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7146
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:624
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6599
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5791
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6070
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7309
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5939
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1231
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6563
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:865
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6884
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1355
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5470
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:737
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5365
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8200
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5374
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7242
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1431
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8212
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:685
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5357
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1215
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8219
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:203
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6496
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6954
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5874
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4937
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1347
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:802
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7079
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:730
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:815
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7090
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:668
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1301
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:678
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8755
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5256
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6143
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8764
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5165
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6930
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:856
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5225
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:887
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1251
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7068
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:711
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6871
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8173
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5484
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6518
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7350
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:641
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7299
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5515
Definition: ampsplusplus.hpp:102
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5124
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:998
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1293
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1403
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1159
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:781
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6164
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5564
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6967
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5730
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6847
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5903
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8745
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6205
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5131
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:471
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6355
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5842
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6030
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:611
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7180
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:769
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6679
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5001
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7377
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7290
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6237