AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <regex>
39 #include <set>
40 #include <deque>
41 #include <vector>
42 #include <assert.h>
43 #ifndef _WIN32
44  #include <inttypes.h>
45 #endif
46 #if defined(sun)
47  #include <sys/atomic.h>
48 #endif
49 #include "amps/BookmarkStore.hpp"
50 #include "amps/MessageRouter.hpp"
51 #include "amps/util.hpp"
52 #include "amps/ampscrc.hpp"
53 #if __cplusplus >= 201100L || _MSC_VER >= 1900
54  #include <atomic>
55 #endif
56 
57 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
58  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
59 #endif
60 
65 
66 
73 
84 
85 // For StoreBuffer implementations
86 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
87 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
88 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
89 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
90 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
91 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
92 #define AMPS_DEFAULT_TOP_N -1
93 #define AMPS_DEFAULT_BATCH_SIZE 10
94 #define AMPS_NUMBER_BUFFER_LEN 20
95 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
96 
97 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
98  #define AMPS_X64 1
99 #endif
100 
101 static thread_local AMPS::Message publishStoreMessage;
102 
103 namespace AMPS
104 {
105 
106  typedef std::map<std::string, std::string> ConnectionInfo;
107 
108  template<class Type>
109  inline std::string asString(Type x_)
110  {
111  std::ostringstream os;
112  os << x_;
113  return os.str();
114  }
115 
116  inline
117  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
118  {
119  size_t pos = AMPS_NUMBER_BUFFER_LEN;
120  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
121  {
122  if (seqNo_ > 0)
123  {
124  buf_[--pos] = (char)(seqNo_ % 10 + '0');
125  seqNo_ /= 10;
126  }
127  }
128  return pos;
129  }
130 
131 #ifdef _WIN32
132  inline
133  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
134  {
135  size_t pos = AMPS_NUMBER_BUFFER_LEN;
136  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
137  {
138  if (seqNo_ > 0)
139  {
140  buf_[--pos] = (char)(seqNo_ % 10 + '0');
141  seqNo_ /= 10;
142  }
143  }
144  return pos;
145  }
146 #endif
147 
151  class Reason
152  {
153  public:
154  static const char* duplicate()
155  {
156  return "duplicate";
157  }
158  static const char* badFilter()
159  {
160  return "bad filter";
161  }
162  static const char* badRegexTopic()
163  {
164  return "bad regex topic";
165  }
166  static const char* subscriptionAlreadyExists()
167  {
168  return "subscription already exists";
169  }
170  static const char* nameInUse()
171  {
172  return "name in use";
173  }
174  static const char* authFailure()
175  {
176  return "auth failure";
177  }
178  static const char* notEntitled()
179  {
180  return "not entitled";
181  }
182  static const char* authDisabled()
183  {
184  return "authentication disabled";
185  }
186  static const char* subidInUse()
187  {
188  return "subid in use";
189  }
190  static const char* noTopic()
191  {
192  return "no topic";
193  }
194  };
195 
205  {
206  public:
207  virtual ~ExceptionListener() {;}
208  virtual void exceptionThrown(const std::exception&) const {;}
209  };
210 
212 
213 
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
215  try\
216  {\
217  x;\
218  }\
219  catch (std::exception& stdEx_)\
220  {\
221  try\
222  {\
223  _exceptionListener->exceptionThrown(stdEx_);\
224  }\
225  catch(...)\
226  {\
227  ;\
228  }\
229  }
230  /*
231  * Note : we don't attempt to trap non std::exception exceptions
232  * here because doing so interferes with pthread_exit on some OSes.
233  catch (...)\
234  {\
235  try\
236  {\
237  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
238  "An unhandled exception of unknown type was thrown by "\
239  "the registered handler.", AMPS_E_USAGE));\
240  }\
241  catch(...)\
242  {\
243  ;\
244  }\
245  }*/
246 #ifdef _WIN32
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
248  try\
249  {\
250  while(me->_connected)\
251  {\
252  try\
253  {\
254  x;\
255  break;\
256  }\
257  catch(MessageStreamFullException&)\
258  {\
259  try\
260  {\
261  me->checkAndSendHeartbeat(false);\
262  }\
263  catch (std::exception& stdEx_)\
264  {\
265  try\
266  {\
267  me->_exceptionListener->exceptionThrown(stdEx_);\
268  }\
269  catch(...)\
270  {\
271  ;\
272  }\
273  break;\
274  }\
275  }\
276  }\
277  }\
278  catch (std::exception& stdEx_)\
279  {\
280  try\
281  {\
282  me->_exceptionListener->exceptionThrown(stdEx_);\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }
289  /*
290  * Note : we don't attempt to trap non std::exception exceptions
291  * here because doing so interferes with pthread_exit on some OSes.
292  catch (...)\
293  {\
294  try\
295  {\
296  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
297  "An unhandled exception of unknown type was thrown by "\
298  "the registered handler.", AMPS_E_USAGE));\
299  }\
300  catch(...)\
301  {\
302  ;\
303  }\
304  }*/
305 
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException&)\
315  {\
316  try\
317  {\
318  me->checkAndSendHeartbeat(false);\
319  }\
320  catch (std::exception& stdEx_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(stdEx_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  break;\
331  }\
332  }\
333  }
334 #else
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
336  try\
337  {\
338  while(me->_connected)\
339  {\
340  try\
341  {\
342  x;\
343  break;\
344  }\
345  catch(MessageStreamFullException& msfEx_)\
346  {\
347  try\
348  {\
349  me->checkAndSendHeartbeat(false);\
350  }\
351  catch (std::exception& stdEx_)\
352  {\
353  try\
354  {\
355  me->_exceptionListener->exceptionThrown(stdEx_);\
356  }\
357  catch(...)\
358  {\
359  ;\
360  }\
361  break;\
362  }\
363  }\
364  }\
365  }\
366  catch (std::exception& stdEx_)\
367  {\
368  try\
369  {\
370  me->_exceptionListener->exceptionThrown(stdEx_);\
371  }\
372  catch(...)\
373  {\
374  ;\
375  }\
376  }
377  /*
378  * Note : we don't attempt to trap non std::exception exceptions
379  * here because doing so interferes with pthread_exit on some OSes.
380  catch (...)\
381  {\
382  try\
383  {\
384  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
385  "An unhandled exception of unknown type was thrown by "\
386  "the registered handler.", AMPS_E_USAGE));\
387  }\
388  catch(...)\
389  {\
390  ;\
391  }\
392  }*/
393 
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395  while(me->_connected)\
396  {\
397  try\
398  {\
399  x;\
400  break;\
401  }\
402  catch(MessageStreamFullException& msfEx_)\
403  {\
404  try\
405  {\
406  me->checkAndSendHeartbeat(false);\
407  }\
408  catch (std::exception& stdEx_)\
409  {\
410  try\
411  {\
412  me->_exceptionListener->exceptionThrown(stdEx_);\
413  }\
414  catch(...)\
415  {\
416  ;\
417  }\
418  break;\
419  }\
420  }\
421  }
422 #endif
423 
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
425  try\
426  {\
427  _exceptionListener->exceptionThrown(ex);\
428  }\
429  catch(...)\
430  {\
431  ;\
432  }
433 
434 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
435  try\
436  {\
437  me->_exceptionListener->exceptionThrown(ex);\
438  }\
439  catch(...)\
440  {\
441  ;\
442  }
443 
444 
445  class Client;
446 
471 
472  class Command
473  {
474  Message _message;
475  unsigned _timeout;
476  unsigned _batchSize;
477  unsigned _flags;
478  static const unsigned Subscribe = 1;
479  static const unsigned SOW = 2;
480  static const unsigned NeedsSequenceNumber = 4;
481  static const unsigned ProcessedAck = 8;
482  static const unsigned StatsAck = 16;
483  void init(Message::Command::Type command_)
484  {
485  _timeout = 0;
486  _batchSize = 0;
487  _flags = 0;
488  _message.reset();
489  _message.setCommandEnum(command_);
490  _setIds();
491  }
492  void init(const std::string& command_)
493  {
494  _timeout = 0;
495  _batchSize = 0;
496  _flags = 0;
497  _message.reset();
498  _message.setCommand(command_);
499  _setIds();
500  }
501  void init(const char* command_, size_t commandLen_)
502  {
503  _timeout = 0;
504  _batchSize = 0;
505  _flags = 0;
506  _message.reset();
507  _message.setCommand(command_, commandLen_);
508  _setIds();
509  }
510  void _setIds(void)
511  {
512  Message::Command::Type command = _message.getCommandEnum();
513  if (!(command & Message::Command::NoDataCommands))
514  {
515  _message.newCommandId();
516  if (command == Message::Command::Subscribe ||
517  command == Message::Command::SOWAndSubscribe ||
518  command == Message::Command::DeltaSubscribe ||
519  command == Message::Command::SOWAndDeltaSubscribe)
520  {
521  _message.setSubscriptionId(_message.getCommandId());
522  _flags |= Subscribe;
523  }
524  if (command == Message::Command::SOW
525  || command == Message::Command::SOWAndSubscribe
526  || command == Message::Command::SOWAndDeltaSubscribe)
527  {
528  _message.setQueryID(_message.getCommandId());
529  if (_batchSize == 0)
530  {
531  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
532  }
533  if (command == Message::Command::SOW)
534  {
535  _flags |= SOW;
536  }
537  }
538  _flags |= ProcessedAck;
539  }
540  else if (command == Message::Command::SOWDelete)
541  {
542  _message.newCommandId();
543  _flags |= ProcessedAck;
544  _flags |= NeedsSequenceNumber;
545  }
546  else if (command == Message::Command::Publish
547  || command == Message::Command::DeltaPublish)
548  {
549  _flags |= NeedsSequenceNumber;
550  }
551  else if (command == Message::Command::StopTimer)
552  {
553  _message.newCommandId();
554  }
555  }
556  public:
560  Command(const std::string& command_)
561  {
562  init(command_);
563  }
568  Command(const char* command_, size_t commandLen_)
569  {
570  init(command_, commandLen_);
571  }
575  Command(Message::Command::Type command_)
576  {
577  init(command_);
578  }
579 
583  Command& reset(const std::string& command_)
584  {
585  init(command_);
586  return *this;
587  }
592  Command& reset(const char* command_, size_t commandLen_)
593  {
594  init(command_, commandLen_);
595  return *this;
596  }
600  Command& reset(Message::Command::Type command_)
601  {
602  init(command_);
603  return *this;
604  }
612  Command& setSowKey(const std::string& sowKey_)
613  {
614  _message.setSowKey(sowKey_);
615  return *this;
616  }
625  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
626  {
627  _message.setSowKey(sowKey_, sowKeyLen_);
628  return *this;
629  }
642  Command& setSowKeys(const std::string& sowKeys_)
643  {
644  _message.setSowKeys(sowKeys_);
645  return *this;
646  }
660  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
661  {
662  _message.setSowKeys(sowKeys_, sowKeysLen_);
663  return *this;
664  }
666  Command& setCommandId(const std::string& cmdId_)
667  {
668  _message.setCommandId(cmdId_);
669  return *this;
670  }
673  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
674  {
675  _message.setCommandId(cmdId_, cmdIdLen_);
676  return *this;
677  }
679  Command& setTopic(const std::string& topic_)
680  {
681  _message.setTopic(topic_);
682  return *this;
683  }
686  Command& setTopic(const char* topic_, size_t topicLen_)
687  {
688  _message.setTopic(topic_, topicLen_);
689  return *this;
690  }
692  Command& setFilter(const std::string& filter_)
693  {
694  _message.setFilter(filter_);
695  return *this;
696  }
699  Command& setFilter(const char* filter_, size_t filterLen_)
700  {
701  _message.setFilter(filter_, filterLen_);
702  return *this;
703  }
705  Command& setOrderBy(const std::string& orderBy_)
706  {
707  _message.setOrderBy(orderBy_);
708  return *this;
709  }
712  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
713  {
714  _message.setOrderBy(orderBy_, orderByLen_);
715  return *this;
716  }
718  Command& setSubId(const std::string& subId_)
719  {
720  _message.setSubscriptionId(subId_);
721  return *this;
722  }
725  Command& setSubId(const char* subId_, size_t subIdLen_)
726  {
727  _message.setSubscriptionId(subId_, subIdLen_);
728  return *this;
729  }
731  Command& setQueryId(const std::string& queryId_)
732  {
733  _message.setQueryId(queryId_);
734  return *this;
735  }
738  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
739  {
740  _message.setQueryId(queryId_, queryIdLen_);
741  return *this;
742  }
748  Command& setBookmark(const std::string& bookmark_)
749  {
750  _message.setBookmark(bookmark_);
751  return *this;
752  }
759  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
760  {
761  _message.setBookmark(bookmark_, bookmarkLen_);
762  return *this;
763  }
770  Command& setCorrelationId(const std::string& correlationId_)
771  {
772  _message.setCorrelationId(correlationId_);
773  return *this;
774  }
782  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
783  {
784  _message.setCorrelationId(correlationId_, correlationIdLen_);
785  return *this;
786  }
789  Command& setOptions(const std::string& options_)
790  {
791  _message.setOptions(options_);
792  return *this;
793  }
797  Command& setOptions(const char* options_, size_t optionsLen_)
798  {
799  _message.setOptions(options_, optionsLen_);
800  return *this;
801  }
803  Command& setSequence(const std::string& seq_)
804  {
805  _message.setSequence(seq_);
806  return *this;
807  }
810  Command& setSequence(const char* seq_, size_t seqLen_)
811  {
812  _message.setSequence(seq_, seqLen_);
813  return *this;
814  }
816  Command& setSequence(const amps_uint64_t seq_)
817  {
818  std::ostringstream os;
819  os << seq_;
820  _message.setSequence(os.str());
821  return *this;
822  }
823  amps_uint64_t getSequence() const
824  {
825  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
826  }
829  Command& setData(const std::string& data_)
830  {
831  _message.setData(data_);
832  return *this;
833  }
837  Command& setData(const char* data_, size_t dataLen_)
838  {
839  _message.setData(data_, dataLen_);
840  return *this;
841  }
851  Command& setTimeout(unsigned timeout_)
852  {
853  _timeout = timeout_;
854  return *this;
855  }
857  Command& setTopN(unsigned topN_)
858  {
859  if (topN_ != (unsigned)AMPS_DEFAULT_TOP_N)
860  {
861  _message.setTopNRecordsReturned(topN_);
862  }
863  else
864  {
865  _message.setTopNRecordsReturned(nullptr, 0);
866  }
867  return *this;
868  }
873  Command& setBatchSize(unsigned batchSize_)
874  {
875  _message.setBatchSize(batchSize_);
876  _batchSize = batchSize_;
877  return *this;
878  }
889  Command& setExpiration(unsigned expiration_)
890  {
891  _message.setExpiration(expiration_);
892  return *this;
893  }
895  Command& addAckType(const std::string& ackType_)
896  {
897  _message.setAckType(_message.getAckType() + "," + ackType_);
898  if (ackType_ == "processed")
899  {
900  _flags |= ProcessedAck;
901  }
902  else if (ackType_ == "stats")
903  {
904  _flags |= StatsAck;
905  }
906  return *this;
907  }
909  Command& setAckType(const std::string& ackType_)
910  {
911  _message.setAckType(ackType_);
912  if (ackType_.find("processed") != std::string::npos)
913  {
914  _flags |= ProcessedAck;
915  }
916  else
917  {
918  _flags &= ~ProcessedAck;
919  }
920  if (ackType_.find("stats") != std::string::npos)
921  {
922  _flags |= StatsAck;
923  }
924  else
925  {
926  _flags &= ~StatsAck;
927  }
928  return *this;
929  }
931  Command& setAckType(unsigned ackType_)
932  {
933  _message.setAckTypeEnum(ackType_);
934  if (ackType_ & Message::AckType::Processed)
935  {
936  _flags |= ProcessedAck;
937  }
938  else
939  {
940  _flags &= ~ProcessedAck;
941  }
942  if (ackType_ & Message::AckType::Stats)
943  {
944  _flags |= StatsAck;
945  }
946  else
947  {
948  _flags &= ~StatsAck;
949  }
950  return *this;
951  }
953  std::string getAckType() const
954  {
955  return (std::string)(_message.getAckType());
956  }
958  unsigned getAckTypeEnum() const
959  {
960  return _message.getAckTypeEnum();
961  }
962 
963  Message& getMessage(void)
964  {
965  return _message;
966  }
967  unsigned getTimeout(void) const
968  {
969  return _timeout;
970  }
971  unsigned getBatchSize(void) const
972  {
973  return _batchSize;
974  }
975  bool isSubscribe(void) const
976  {
977  return _flags & Subscribe;
978  }
979  bool isSow(void) const
980  {
981  return (_flags & SOW) != 0;
982  }
983  bool hasProcessedAck(void) const
984  {
985  return (_flags & ProcessedAck) != 0;
986  }
987  bool hasStatsAck(void) const
988  {
989  return (_flags & StatsAck) != 0;
990  }
991  bool needsSequenceNumber(void) const
992  {
993  return (_flags & NeedsSequenceNumber) != 0;
994  }
995  };
996 
999  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
1000 
1001  class Message;
1003 
1007  {
1008  public:
1009  virtual ~Authenticator() {;}
1010 
1016  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
1024  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
1031  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
1032  };
1033 
1038  {
1039  public:
1040  virtual ~DefaultAuthenticator() {;}
1043  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1044  {
1045  return password_;
1046  }
1047 
1050  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1051  {
1052  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1053  }
1054 
1055  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1056 
1061  {
1062  static DefaultAuthenticator d;
1063  return d;
1064  }
1065  };
1066 
1070  {
1071  public:
1072 
1076  virtual void execute(Message& message_) = 0;
1077 
1078  virtual ~StoreReplayer() {;}
1079  };
1080 
1081  class Store;
1082 
1091  typedef bool (*PublishStoreResizeHandler)(Store store_,
1092  size_t size_,
1093  void* userData_);
1094 
1097  class StoreImpl : public RefBody
1098  {
1099  public:
1105  StoreImpl(bool errorOnPublishGap_ = false)
1106  : _resizeHandler(NULL)
1107  , _resizeHandlerData(NULL)
1108  , _errorOnPublishGap(errorOnPublishGap_)
1109  {;}
1110 
1115  virtual amps_uint64_t store(const Message& message_) = 0;
1116 
1123  virtual void discardUpTo(amps_uint64_t index_) = 0;
1124 
1129  virtual void replay(StoreReplayer& replayer_) = 0;
1130 
1138  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1139 
1144  virtual size_t unpersistedCount() const = 0;
1145 
1146  virtual ~StoreImpl() {;}
1147 
1156  virtual void flush(long timeout_) = 0;
1157 
1160  static inline size_t getUnsetPosition()
1161  {
1162  return AMPS_UNSET_INDEX;
1163  }
1164 
1167  static inline amps_uint64_t getUnsetSequence()
1168  {
1169  return AMPS_UNSET_SEQUENCE;
1170  }
1171 
1175  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1176 
1180  virtual amps_uint64_t getLastPersisted() = 0;
1181 
1191  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1192  void* userData_)
1193  {
1194  _resizeHandler = handler_;
1195  _resizeHandlerData = userData_;
1196  }
1197 
1198  inline virtual PublishStoreResizeHandler getResizeHandler() const
1199  {
1200  return _resizeHandler;
1201  }
1202 
1203  bool callResizeHandler(size_t newSize_);
1204 
1205  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1206  {
1207  _errorOnPublishGap = errorOnPublishGap_;
1208  }
1209 
1210  inline virtual bool getErrorOnPublishGap() const
1211  {
1212  return _errorOnPublishGap;
1213  }
1214 
1215  private:
1216  PublishStoreResizeHandler _resizeHandler;
1217  void* _resizeHandlerData;
1218  bool _errorOnPublishGap;
1219  };
1220 
1223  class Store
1224  {
1225  RefHandle<StoreImpl> _body;
1226  public:
1227  Store() {;}
1228  Store(StoreImpl* body_) : _body(body_) {;}
1229  Store(const Store& rhs) : _body(rhs._body) {;}
1230  Store& operator=(const Store& rhs)
1231  {
1232  _body = rhs._body;
1233  return *this;
1234  }
1235 
1239  amps_uint64_t store(const Message& message_)
1240  {
1241  return _body.get().store(message_);
1242  }
1243 
1250  void discardUpTo(amps_uint64_t index_)
1251  {
1252  _body.get().discardUpTo(index_);
1253  }
1254 
1259  void replay(StoreReplayer& replayer_)
1260  {
1261  _body.get().replay(replayer_);
1262  }
1263 
1271  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1272  {
1273  return _body.get().replaySingle(replayer_, index_);
1274  }
1275 
1280  size_t unpersistedCount() const
1281  {
1282  return _body.get().unpersistedCount();
1283  }
1284 
1288  bool isValid() const
1289  {
1290  return _body.isValid();
1291  }
1292 
1301  void flush(long timeout_ = 0)
1302  {
1303  return _body.get().flush(timeout_);
1304  }
1305 
1309  amps_uint64_t getLowestUnpersisted()
1310  {
1311  return _body.get().getLowestUnpersisted();
1312  }
1313 
1317  amps_uint64_t getLastPersisted()
1318  {
1319  return _body.get().getLastPersisted();
1320  }
1321 
1331  void setResizeHandler(PublishStoreResizeHandler handler_,
1332  void* userData_)
1333  {
1334  _body.get().setResizeHandler(handler_, userData_);
1335  }
1336 
1337  PublishStoreResizeHandler getResizeHandler() const
1338  {
1339  return _body.get().getResizeHandler();
1340  }
1341 
1346  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1347  {
1348  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1349  }
1350 
1355  inline bool getErrorOnPublishGap() const
1356  {
1357  return _body.get().getErrorOnPublishGap();
1358  }
1359 
1363  StoreImpl* get()
1364  {
1365  if (_body.isValid())
1366  {
1367  return &_body.get();
1368  }
1369  else
1370  {
1371  return NULL;
1372  }
1373  }
1374 
1375  };
1376 
1382  {
1383  public:
1384  virtual ~FailedWriteHandler() {;}
1391  virtual void failedWrite(const Message& message_,
1392  const char* reason_, size_t reasonLength_) = 0;
1393  };
1394 
1395 
1396  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1397  {
1398  if (_resizeHandler)
1399  {
1400  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1401  }
1402  return true;
1403  }
1404 
1411  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1412  void* data_)
1413  {
1414  long* timeoutp = (long*)data_;
1415  size_t count = store_.unpersistedCount();
1416  if (count == 0)
1417  {
1418  return false;
1419  }
1420  try
1421  {
1422  store_.flush(*timeoutp);
1423  }
1424 #ifdef _WIN32
1425  catch (const TimedOutException&)
1426 #else
1427  catch (const TimedOutException& e)
1428 #endif
1429  {
1430  return true;
1431  }
1432  return (count == store_.unpersistedCount());
1433  }
1434 
1440  {
1441  public:
1453  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1454  unsigned requestedAckTypes_,
1455  const AMPSException& exception_) = 0;
1456  };
1457 
1462  {
1463  public:
1464  virtual ~SubscriptionManager() {;}
1472  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1473  unsigned requestedAckTypes_) = 0;
1477  virtual void unsubscribe(const Message::Field& subId_) = 0;
1480  virtual void clear() = 0;
1484  virtual void resubscribe(Client& client_) = 0;
1489  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1490  {
1491  _failedResubscribeHandler = handler_;
1492  }
1493  protected:
1494  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1495  };
1496 
1500 
1502  {
1503  public:
1505  typedef enum { Disconnected = 0,
1506  Shutdown = 1,
1507  Connected = 2,
1508  LoggedOn = 4,
1509  PublishReplayed = 8,
1510  HeartbeatInitiated = 16,
1511  Resubscribed = 32,
1512  UNKNOWN = 16384
1513  } State;
1514 
1524  virtual void connectionStateChanged(State newState_) = 0;
1525  virtual ~ConnectionStateListener() {;}
1526  };
1527 
1528 
1529  class MessageStreamImpl;
1530  class MessageStream;
1531 
1532  typedef void(*DeferredExecutionFunc)(void*);
1533 
1534  class ClientImpl : public RefBody // -V553
1535  {
1536  // Class to wrap turning of Nagle for things like flush and logon
1537  class NoDelay
1538  {
1539  private:
1540  AMPS_SOCKET _socket;
1541  int _noDelay;
1542  char* _valuePtr;
1543 #ifdef _WIN32
1544  int _valueLen;
1545 #else
1546  socklen_t _valueLen;
1547 #endif
1548  public:
1549  NoDelay(amps_handle client_)
1550  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1551  {
1552  _valuePtr = (char*)&_noDelay;
1553  _socket = amps_client_get_socket(client_);
1554  if (_socket != AMPS_INVALID_SOCKET)
1555  {
1556  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1557  if (!_noDelay)
1558  {
1559  _noDelay = 1;
1560  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1561  }
1562  else
1563  {
1564  _socket = AMPS_INVALID_SOCKET;
1565  }
1566  }
1567  }
1568 
1569  ~NoDelay()
1570  {
1571  if (_socket != AMPS_INVALID_SOCKET)
1572  {
1573  _noDelay = 0;
1574  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1575  }
1576  }
1577  };
1578 
1579  friend class Client;
1580  protected:
1581  amps_handle _client;
1582  DisconnectHandler _disconnectHandler;
1583  enum GlobalCommandTypeHandlers : size_t
1584  {
1585  Publish = 0,
1586  SOW = 1,
1587  GroupBegin = 2,
1588  GroupEnd = 3,
1589  Heartbeat = 4,
1590  OOF = 5,
1591  Ack = 6,
1592  LastChance = 7,
1593  DuplicateMessage = 8,
1594  COUNT = 9
1595  };
1596  std::vector<MessageHandler> _globalCommandTypeHandlers;
1597  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1598  MessageRouter _routes;
1599  MessageRouter::RouteCache _routeCache;
1600  mutable Mutex _lock;
1601  std::string _name, _nameHash, _lastUri, _logonCorrelationData, _preflightMessage;
1602  std::vector<std::string> _httpPreflightHeaders;
1603  amps_uint64_t _nameHashValue;
1604  BookmarkStore _bookmarkStore;
1605  Store _publishStore;
1606  bool _isRetryOnDisconnect;
1607  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1608 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1609  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1610 #else
1611  volatile amps_uint64_t _lastSentHaSequenceNumber;
1612 #endif
1613  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1614  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1615  VersionInfo _serverVersion;
1616  Timer _heartbeatTimer;
1617  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1618 
1619  // queue data
1620  int _queueAckTimeout;
1621  bool _isAutoAckEnabled;
1622  unsigned _ackBatchSize;
1623  unsigned _queuedAckCount;
1624  unsigned _defaultMaxDepth;
1625  struct QueueBookmarks
1626  {
1627  QueueBookmarks(const std::string& topic_)
1628  : _topic(topic_)
1629  , _oldestTime(0)
1630  , _bookmarkCount(0)
1631  {;}
1632  std::string _topic;
1633  std::string _data;
1634  amps_uint64_t _oldestTime;
1635  unsigned _bookmarkCount;
1636  };
1637  typedef amps_uint64_t topic_hash;
1638  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1639  TopicHashMap _topicHashMap;
1640 
1641  class ClientStoreReplayer : public StoreReplayer
1642  {
1643  ClientImpl* _client;
1644  public:
1645  unsigned _version;
1646  amps_result _res;
1647 
1648  ClientStoreReplayer()
1649  : _client(NULL), _version(0), _res(AMPS_E_OK)
1650  {}
1651 
1652  ClientStoreReplayer(ClientImpl* client_)
1653  : _client(client_), _version(0), _res(AMPS_E_OK)
1654  {}
1655 
1656  void setClient(ClientImpl* client_)
1657  {
1658  _client = client_;
1659  }
1660 
1661  void execute(Message& message_)
1662  {
1663  if (!_client)
1664  {
1665  throw CommandException("Can't replay without a client.");
1666  }
1667  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1668  AMPS_Sequence);
1669  if (index > _client->_lastSentHaSequenceNumber)
1670  {
1671  _client->_lastSentHaSequenceNumber = index;
1672  }
1673 
1674  _res = AMPS_E_OK;
1675  // Don't replay a queue cancel message after a reconnect.
1676  // Currently, the only messages that will have anything in options
1677  // are cancel messages.
1678  if (!message_.getCommand().empty() &&
1679  (!_client->_logonInProgress ||
1680  message_.getOptions().len() < 6))
1681  {
1682  _res = amps_client_send_batch(_client->_client,
1683  message_.getMessage(),
1684  &_version,
1685  1);
1686  if (_res != AMPS_E_OK)
1687  {
1688  throw DisconnectedException("AMPS Server disconnected during replay");
1689  }
1690  }
1691  }
1692 
1693  };
1694  ClientStoreReplayer _replayer;
1695 
1696  class FailedWriteStoreReplayer : public StoreReplayer
1697  {
1698  ClientImpl* _parent;
1699  const char* _reason;
1700  size_t _reasonLength;
1701  size_t _replayCount;
1702  public:
1703  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1704  : _parent(parent),
1705  _reason(reason_),
1706  _reasonLength(reasonLength_),
1707  _replayCount(0)
1708  {;}
1709  void execute(Message& message_)
1710  {
1711  if (_parent->_failedWriteHandler)
1712  {
1713  ++_replayCount;
1714  _parent->_failedWriteHandler->failedWrite(message_,
1715  _reason, _reasonLength);
1716  }
1717  }
1718  size_t replayCount(void) const
1719  {
1720  return _replayCount;
1721  }
1722  };
1723 
1724  struct AckResponseImpl : public RefBody
1725  {
1726  std::string username, password, reason, status, bookmark, options;
1727  amps_uint64_t sequenceNo;
1728  amps_uint64_t nameHashValue;
1729  VersionInfo serverVersion;
1730 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1731  std::atomic<bool> responded;
1732  std::atomic<bool> abandoned;
1733 #else
1734  volatile bool responded;
1735  volatile bool abandoned;
1736 #endif
1737  unsigned connectionVersion;
1738  AckResponseImpl() :
1739  RefBody(),
1740  username(), password(), reason(), status(), bookmark(), options(),
1741  sequenceNo((amps_uint64_t)0),
1742  serverVersion(),
1743  responded(false),
1744  abandoned(false),
1745  connectionVersion(UINT_MAX) // Don't abandon if unsent AC-1329
1746  {
1747  }
1748  };
1749 
1750  class AckResponse
1751  {
1752  RefHandle<AckResponseImpl> _body;
1753  public:
1754  AckResponse() : _body(NULL) {;}
1755  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1756  static AckResponse create()
1757  {
1758  AckResponse r;
1759  r._body = new AckResponseImpl();
1760  return r;
1761  }
1762 
1763  const std::string& username()
1764  {
1765  return _body.get().username;
1766  }
1767  void setUsername(const char* data_, size_t len_)
1768  {
1769  if (data_)
1770  {
1771  _body.get().username.assign(data_, len_);
1772  }
1773  else
1774  {
1775  _body.get().username.clear();
1776  }
1777  }
1778  const std::string& password()
1779  {
1780  return _body.get().password;
1781  }
1782  void setPassword(const char* data_, size_t len_)
1783  {
1784  if (data_)
1785  {
1786  _body.get().password.assign(data_, len_);
1787  }
1788  else
1789  {
1790  _body.get().password.clear();
1791  }
1792  }
1793  const std::string& reason()
1794  {
1795  return _body.get().reason;
1796  }
1797  void setReason(const char* data_, size_t len_)
1798  {
1799  if (data_)
1800  {
1801  _body.get().reason.assign(data_, len_);
1802  }
1803  else
1804  {
1805  _body.get().reason.clear();
1806  }
1807  }
1808  const std::string& status()
1809  {
1810  return _body.get().status;
1811  }
1812  void setStatus(const char* data_, size_t len_)
1813  {
1814  if (data_)
1815  {
1816  _body.get().status.assign(data_, len_);
1817  }
1818  else
1819  {
1820  _body.get().status.clear();
1821  }
1822  }
1823  const std::string& bookmark()
1824  {
1825  return _body.get().bookmark;
1826  }
1827  void setBookmark(const Field& bookmark_)
1828  {
1829  if (!bookmark_.empty())
1830  {
1831  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1832  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1833  _body.get().sequenceNo);
1834  }
1835  else
1836  {
1837  _body.get().bookmark.clear();
1838  _body.get().sequenceNo = (amps_uint64_t)0;
1839  _body.get().nameHashValue = (amps_uint64_t)0;
1840  }
1841  }
1842  amps_uint64_t sequenceNo() const
1843  {
1844  return _body.get().sequenceNo;
1845  }
1846  amps_uint64_t nameHashValue() const
1847  {
1848  return _body.get().nameHashValue;
1849  }
1850  void setSequenceNo(const char* data_, size_t len_)
1851  {
1852  amps_uint64_t result = (amps_uint64_t)0;
1853  if (data_)
1854  {
1855  for (size_t i = 0; i < len_; ++i)
1856  {
1857  result *= (amps_uint64_t)10;
1858  result += (amps_uint64_t)(data_[i] - '0');
1859  }
1860  }
1861  _body.get().sequenceNo = result;
1862  }
1863  VersionInfo serverVersion() const
1864  {
1865  return _body.get().serverVersion;
1866  }
1867  void setServerVersion(const char* data_, size_t len_)
1868  {
1869  if (data_)
1870  {
1871  _body.get().serverVersion.setVersion(std::string(data_, len_));
1872  }
1873  }
1874  bool responded()
1875  {
1876  return _body.get().responded;
1877  }
1878  void setResponded()
1879  {
1880  _body.get().responded = true;
1881  }
1882  bool abandoned()
1883  {
1884  return _body.get().abandoned;
1885  }
1886  void setAbandoned()
1887  {
1888  if (_body.isValid())
1889  {
1890  _body.get().abandoned = true;
1891  }
1892  }
1893 
1894  void setConnectionVersion(unsigned connectionVersion)
1895  {
1896  _body.get().connectionVersion = connectionVersion;
1897  }
1898 
1899  unsigned getConnectionVersion()
1900  {
1901  return _body.get().connectionVersion;
1902  }
1903  void setOptions(const char* data_, size_t len_)
1904  {
1905  if (data_)
1906  {
1907  _body.get().options.assign(data_, len_);
1908  }
1909  else
1910  {
1911  _body.get().options.clear();
1912  }
1913  }
1914 
1915  const std::string& options()
1916  {
1917  return _body.get().options;
1918  }
1919 
1920  AckResponse& operator=(const AckResponse& rhs)
1921  {
1922  _body = rhs._body;
1923  return *this;
1924  }
1925  };
1926 
1927 
1928  typedef std::map<std::string, AckResponse> AckMap;
1929  AckMap _ackMap;
1930  Mutex _ackMapLock;
1931  DefaultExceptionListener _defaultExceptionListener;
1932  protected:
1933 
1934  struct DeferredExecutionRequest
1935  {
1936  DeferredExecutionRequest(DeferredExecutionFunc func_,
1937  void* userData_)
1938  : _func(func_),
1939  _userData(userData_)
1940  {;}
1941 
1942  DeferredExecutionFunc _func;
1943  void* _userData;
1944  };
1945  const ExceptionListener* _exceptionListener;
1946  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1947  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1948  volatile bool _connected;
1949  std::string _username;
1950  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1951  ConnectionStateListeners _connectionStateListeners;
1952  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1953  Mutex _deferredExecutionLock;
1954  DeferredExecutionList _deferredExecutionList;
1955  unsigned _heartbeatInterval;
1956  unsigned _readTimeout;
1957 
1958  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1959  {
1960  // If we disconnected before we got to notification, don't notify.
1961  // This should only be able to happen for Resubscribed, since the lock
1962  // is released to let the subscription manager run resubscribe so a
1963  // disconnect could be called before the change is broadcast.
1964  if (!_connected && newState_ > ConnectionStateListener::Connected)
1965  {
1966  return;
1967  }
1968  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1969  {
1970  AMPS_CALL_EXCEPTION_WRAPPER(
1971  (*it)->connectionStateChanged(newState_));
1972  }
1973  }
1974  unsigned processedAck(Message& message);
1975  unsigned persistedAck(Message& meesage);
1976  void lastChance(Message& message);
1977  void checkAndSendHeartbeat(bool force = false);
1978  virtual ConnectionInfo getConnectionInfo() const;
1979  static amps_result
1980  ClientImplMessageHandler(amps_handle message, void* userData);
1981  static void
1982  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1983  static amps_result
1984  ClientImplDisconnectHandler(amps_handle client, void* userData);
1985  static const char*
1986  ClientImplGetHttpPreflightMessage(void* userData);
1987 
1988  void unsubscribeInternal(const std::string& id)
1989  {
1990  if (id.empty())
1991  {
1992  return;
1993  }
1994  // remove the handler first to avoid any more message delivery
1995  Message::Field subId;
1996  subId.assign(id.data(), id.length());
1997  _routes.removeRoute(subId);
1998  // Lock is already acquired
1999  if (_subscriptionManager)
2000  {
2001  // Have to unlock before calling into sub manager to avoid deadlock
2002  Unlock<Mutex> unlock(_lock);
2003  _subscriptionManager->unsubscribe(subId);
2004  }
2005  _message.reset();
2006  _message.setCommandEnum(Message::Command::Unsubscribe);
2007  _message.newCommandId();
2008  _message.setSubscriptionId(id);
2009  _sendWithoutRetry(_message);
2010  deferredExecution(&amps_noOpFn, NULL);
2011  }
2012 
2013  AckResponse syncAckProcessing(long timeout_, Message& message_,
2014  bool isHASubscribe_)
2015  {
2016  return syncAckProcessing(timeout_, message_,
2017  (amps_uint64_t)0, isHASubscribe_);
2018  }
2019 
2020  AckResponse syncAckProcessing(long timeout_, Message& message_,
2021  amps_uint64_t haSeq = (amps_uint64_t)0,
2022  bool isHASubscribe_ = false)
2023  {
2024  // inv: we already have _lock locked up.
2025  AckResponse ack = AckResponse::create();
2026  if (1)
2027  {
2028  Lock<Mutex> guard(_ackMapLock);
2029  _ackMap[message_.getCommandId()] = ack;
2030  }
2031  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
2032  if (ack.getConnectionVersion() == 0)
2033  {
2034  // Send failed
2035  throw DisconnectedException("Connection closed while waiting for response.");
2036  }
2037  bool timedOut = false;
2038  AMPS_START_TIMER(timeout_)
2039  while (!timedOut && !ack.responded() && !ack.abandoned())
2040  {
2041  if (timeout_)
2042  {
2043  timedOut = !_lock.wait(timeout_);
2044  // May have woken up early, check real time
2045  if (timedOut)
2046  {
2047  AMPS_RESET_TIMER(timedOut, timeout_);
2048  }
2049  }
2050  else
2051  {
2052  // Using a timeout version to ensure python can interrupt
2053  _lock.wait(1000);
2054  Unlock<Mutex> unlck(_lock);
2055  amps_invoke_waiting_function();
2056  }
2057  }
2058  if (ack.responded())
2059  {
2060  if (ack.status() != "failure")
2061  {
2062  if (message_.getCommand() == "logon")
2063  {
2064  amps_uint64_t ackSequence = ack.sequenceNo();
2065  if (_lastSentHaSequenceNumber < ackSequence)
2066  {
2067  _lastSentHaSequenceNumber = ackSequence;
2068  }
2069  if (_publishStore.isValid())
2070  {
2071  // If this throws, logon will fail and eitehr be
2072  // handled in HAClient/ServerChooser or by the caller
2073  // of logon.
2074  _publishStore.discardUpTo(ackSequence);
2075  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2076  {
2077  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2078  }
2079  }
2080  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2081  _nameHashValue = ack.nameHashValue();
2082  _serverVersion = ack.serverVersion();
2083  if (_bookmarkStore.isValid())
2084  {
2085  _bookmarkStore.setServerVersion(_serverVersion);
2086  }
2087  }
2088  if (_ackBatchSize)
2089  {
2090  const std::string& options = ack.options();
2091  size_t index = options.find_first_of("max_backlog=");
2092  if (index != std::string::npos)
2093  {
2094  unsigned data = 0;
2095  const char* c = options.c_str() + index + 12;
2096  while (*c && *c != ',')
2097  {
2098  data = (data * 10) + (unsigned)(*c++ -48);
2099  }
2100  if (_ackBatchSize > data)
2101  {
2102  _ackBatchSize = data;
2103  }
2104  }
2105  }
2106  return ack;
2107  }
2108  const size_t NotEntitled = 12;
2109  std::string ackReason = ack.reason();
2110  if (ackReason.length() == 0)
2111  {
2112  return ack; // none
2113  }
2114  if (ackReason.length() == NotEntitled &&
2115  ackReason[0] == 'n' &&
2116  message_.getUserId().len() == 0)
2117  {
2118  message_.assignUserId(_username);
2119  }
2120  message_.throwFor(_client, ackReason);
2121  }
2122  else // !ack.responded()
2123  {
2124  if (!ack.abandoned())
2125  {
2126  throw TimedOutException("timed out waiting for operation.");
2127  }
2128  else
2129  {
2130  throw DisconnectedException("Connection closed while waiting for response.");
2131  }
2132  }
2133  return ack;
2134  }
2135 
2136  void _cleanup(void)
2137  {
2138  if (!_client)
2139  {
2140  return;
2141  }
2142  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2143  amps_client_set_disconnect_handler(_client, NULL, 0L);
2144  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2145  _pEmptyMessageStream.reset(NULL);
2146  amps_client_destroy(_client);
2147  _client = NULL;
2148  }
2149 
2150  public:
2151 
2152  ClientImpl(const std::string& clientName)
2153  : _client(NULL), _name(clientName)
2154  , _isRetryOnDisconnect(true)
2155  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2156  , _badTimeToHASubscribe(0), _serverVersion()
2157  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2158  , _isAutoAckEnabled(false)
2159  , _ackBatchSize(0)
2160  , _queuedAckCount(0)
2161  , _defaultMaxDepth(0)
2162  , _connected(false)
2163  , _heartbeatInterval(0)
2164  , _readTimeout(0)
2165  {
2166  _replayer.setClient(this);
2167  _client = amps_client_create(clientName.c_str());
2169  (amps_handler)ClientImpl::ClientImplMessageHandler,
2170  this);
2172  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2173  this);
2175  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2176  this);
2178  ClientImpl::ClientImplGetHttpPreflightMessage,
2179  this);
2180  _exceptionListener = &_defaultExceptionListener;
2181  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2182  {
2183 #ifdef AMPS_USE_EMPLACE
2184  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2185 #else
2186  _globalCommandTypeHandlers.push_back(MessageHandler());
2187 #endif
2188  }
2189  }
2190 
2191  virtual ~ClientImpl()
2192  {
2193  _cleanup();
2194  }
2195 
2196  const std::string& getName() const
2197  {
2198  return _name;
2199  }
2200 
2201  const std::string& getNameHash() const
2202  {
2203  return _nameHash;
2204  }
2205 
2206  const amps_uint64_t getNameHashValue() const
2207  {
2208  return _nameHashValue;
2209  }
2210 
2211  void setName(const std::string& name)
2212  {
2213  // This operation will fail if the client's
2214  // name is already set.
2215  amps_result result = amps_client_set_name(_client, name.c_str());
2216  if (result != AMPS_E_OK)
2217  {
2218  AMPSException::throwFor(_client, result);
2219  }
2220  _name = name;
2221  }
2222 
2223  const std::string& getLogonCorrelationData() const
2224  {
2225  return _logonCorrelationData;
2226  }
2227 
2228  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2229  {
2230  _logonCorrelationData = logonCorrelationData_;
2231  }
2232 
2233  size_t getServerVersion() const
2234  {
2235  return _serverVersion.getOldStyleVersion();
2236  }
2237 
2238  VersionInfo getServerVersionInfo() const
2239  {
2240  return _serverVersion;
2241  }
2242 
2243  const std::string& getURI() const
2244  {
2245  return _lastUri;
2246  }
2247 
2248  virtual void connect(const std::string& uri)
2249  {
2250  Lock<Mutex> l(_lock);
2251  _connect(uri);
2252  }
2253 
2254  virtual void _connect(const std::string& uri)
2255  {
2256  _lastUri = uri;
2257  amps_result result = amps_client_connect(_client, uri.c_str());
2258  if (result != AMPS_E_OK)
2259  {
2260  AMPSException::throwFor(_client, result);
2261  }
2262  _message.reset();
2263  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2264  _publishMessage.setCommandEnum(Message::Command::Publish);
2265  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2266  _beatMessage.setOptions("beat");
2267  _readMessage.setClientImpl(this);
2268  if (_queueAckTimeout)
2269  {
2270  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2271  if (result != AMPS_E_OK)
2272  {
2273  AMPSException::throwFor(_client, result);
2274  }
2275  }
2276  _connected = true;
2277  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2278  }
2279 
2280  void addHttpPreflightHeader(const std::string& header_)
2281  {
2282  _httpPreflightHeaders.push_back(header_);
2283  }
2284 
2285  void addHttpPreflightHeader(const std::string& key_, const std::string& value_)
2286  {
2287  _httpPreflightHeaders.push_back(key_ + std::string(": ") + value_);
2288  }
2289 
2290  void clearHttpPreflightHeaders()
2291  {
2292  _httpPreflightHeaders.clear();
2293  }
2294 
2295  template<class T>
2296  void setHttpPreflightHeaders(const T& headers_)
2297  {
2298  _httpPreflightHeaders.clear();
2299  for (typename T::const_iterator i = headers_.begin(); i != headers_.end(); ++i)
2300  {
2301  _httpPreflightHeaders.push_back(*i);
2302  }
2303  }
2304 
2305  void setDisconnected()
2306  {
2307  {
2308  Lock<Mutex> l(_lock);
2309  if (_connected)
2310  {
2311  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2312  }
2313  _connected = false;
2314  _heartbeatTimer.setTimeout(0.0);
2315  // AC-1189 AC-1329 AC-1337 We need acks cleared while lock is held,
2316  // but not for unsent commands.
2317  clearAcks(UINT_MAX-1);
2318  }
2319  amps_client_disconnect(_client);
2320  _routes.clear();
2321  }
2322 
2323  virtual void disconnect()
2324  {
2325  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2326  setDisconnected();
2327  // Abandon all acks, sent and unsent
2328  clearAcks(UINT_MAX);
2329  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2330  Lock<Mutex> l(_lock);
2331  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2332  }
2333 
2334  void clearAcks(unsigned failedVersion)
2335  {
2336  // Have to lock to prevent race conditions
2337  Lock<Mutex> guard(_ackMapLock);
2338  {
2339  // Go ahead and signal any waiters if they are around...
2340  std::vector<std::string> worklist;
2341  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2342  {
2343  if (i->second.getConnectionVersion() <= failedVersion)
2344  {
2345  i->second.setAbandoned();
2346  worklist.push_back(i->first);
2347  }
2348  }
2349 
2350  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2351  {
2352  _ackMap.erase(*j);
2353  }
2354  }
2355 
2356  _lock.signalAll();
2357  }
2358 
2359  int send(const Message& message)
2360  {
2361  Lock<Mutex> l(_lock);
2362  return _send(message);
2363  }
2364 
2365  void sendWithoutRetry(const Message& message_)
2366  {
2367  Lock<Mutex> l(_lock);
2368  // If we got here while logon was in progress, then we tried to send
2369  // while we were disconnected so throw DisconnectedException
2370  if (_logonInProgress)
2371  {
2372  throw DisconnectedException("The client has been disconnected.");
2373  }
2374  _sendWithoutRetry(message_);
2375  }
2376 
2377  void _sendWithoutRetry(const Message& message_)
2378  {
2379  amps_result result = amps_client_send(_client, message_.getMessage());
2380  if (result != AMPS_E_OK)
2381  {
2382  AMPSException::throwFor(_client, result);
2383  }
2384  }
2385 
2386  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2387  bool isHASubscribe_ = false, int isBatch_ = 0)
2388  {
2389  // Lock is already acquired
2390  amps_result result = AMPS_E_RETRY;
2391 
2392  // Create a local reference to this message, as we'll need to hold on
2393  // to a reference to it in case reconnect occurs.
2394  Message localMessage = message;
2395  unsigned version = 0;
2396 
2397  while (result == AMPS_E_RETRY)
2398  {
2399  if (haSeq && _logonInProgress)
2400  {
2401  // If retrySend is disabled, do not wait for the reconnect
2402  // to finish, just throw.
2403  if (!_isRetryOnDisconnect)
2404  {
2405  AMPSException::throwFor(_client, AMPS_E_RETRY);
2406  }
2407  if (!_lock.wait(1000))
2408  {
2409  amps_invoke_waiting_function();
2410  }
2411  }
2412  else
2413  {
2414  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2415  (isHASubscribe_ && _badTimeToHASubscribe))
2416  {
2417  return (int)version;
2418  }
2419  // It's possible to get here out of order, but this way we'll
2420  // always send in order.
2421  if (haSeq > _lastSentHaSequenceNumber)
2422  {
2423  while (haSeq > _lastSentHaSequenceNumber + 1)
2424  {
2425  try
2426  {
2427  // Replayer updates _lastSentHaSsequenceNumber
2428  if (!_publishStore.replaySingle(_replayer,
2429  _lastSentHaSequenceNumber + 1))
2430  {
2431  //++_lastSentHaSequenceNumber;
2432  continue;
2433  }
2434  result = AMPS_E_OK;
2435  version = _replayer._version;
2436  }
2437 #ifdef _WIN32
2438  catch (const DisconnectedException&)
2439 #else
2440  catch (const DisconnectedException& e)
2441 #endif
2442  {
2443  if (!_isRetryOnDisconnect)
2444  {
2445  AMPSException::throwFor(_client, AMPS_E_RETRY);
2446  }
2447  result = _replayer._res;
2448  break;
2449  }
2450  }
2451  result = amps_client_send_batch(_client,
2452  localMessage.getMessage(),
2453  &version,
2454  isBatch_);
2455  ++_lastSentHaSequenceNumber;
2456  }
2457  else
2458  {
2459  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2460  {
2461  while (_logonInProgress)
2462  {
2463  if (!_lock.wait(1000))
2464  {
2465  amps_invoke_waiting_function();
2466  if (!_isRetryOnDisconnect)
2467  {
2468  // retrySend is disabled so throw the error
2469  AMPSException::throwFor(_client, result);
2470  }
2471  }
2472  }
2473  }
2474  result = amps_client_send_batch(_client,
2475  localMessage.getMessage(),
2476  &version,
2477  isBatch_);
2478  }
2479  if (result != AMPS_E_OK)
2480  {
2481  if (!isHASubscribe_ && !haSeq &&
2482  localMessage.getMessage() == message.getMessage())
2483  {
2484  localMessage = message.deepCopy();
2485  }
2486  if (_isRetryOnDisconnect)
2487  {
2488  Unlock<Mutex> u(_lock);
2489  result = amps_client_attempt_reconnect(_client, version);
2490  // If this is an HA publish or subscribe command, it was
2491  // stored first and will have already been replayed by the
2492  // store or sub manager after reconnect, so just return.
2493  if ((isHASubscribe_ || haSeq) &&
2494  result == AMPS_E_RETRY)
2495  {
2496  return (int)version;
2497  }
2498  }
2499  else
2500  {
2501  // retrySend is disabled so throw the error
2502  // from the send as an exception, do not retry.
2503  AMPSException::throwFor(_client, result);
2504  }
2505  }
2506  }
2507  if (result == AMPS_E_RETRY)
2508  {
2509  amps_invoke_waiting_function();
2510  }
2511  }
2512 
2513  if (result != AMPS_E_OK)
2514  {
2515  AMPSException::throwFor(_client, result);
2516  }
2517  return (int)version;
2518  }
2519 
2520  void addMessageHandler(const Field& commandId_,
2521  const AMPS::MessageHandler& messageHandler_,
2522  unsigned requestedAcks_, Message::Command::Type commandType_)
2523  {
2524  Lock<Mutex> lock(_lock);
2525  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2526  0, commandType_);
2527  }
2528 
2529  bool removeMessageHandler(const Field& commandId_)
2530  {
2531  Lock<Mutex> lock(_lock);
2532  return _routes.removeRoute(commandId_);
2533  }
2534 
2535  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2536  {
2537  Field id = message_.getCommandId();
2538  Field subId = message_.getSubscriptionId();
2539  Field qid = message_.getQueryId();
2540  bool isSubscribeOnly = false;
2541  bool replace = false;
2542  unsigned requestedAcks = message_.getAckTypeEnum();
2543  unsigned systemAddedAcks = Message::AckType::None;
2544  Message::Command::Type commandType = message_.getCommandEnum();
2545 
2546  switch (commandType)
2547  {
2548  case Message::Command::Subscribe:
2549  case Message::Command::DeltaSubscribe:
2550  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2551  isSubscribeOnly = true;
2552  // fall through
2553  case Message::Command::SOWAndSubscribe:
2554  case Message::Command::SOWAndDeltaSubscribe:
2555  if (id.empty())
2556  {
2557  id = message_.newCommandId().getCommandId();
2558  }
2559  else
2560  {
2561  while (!replace && id != subId && _routes.hasRoute(id))
2562  {
2563  id = message_.newCommandId().getCommandId();
2564  }
2565  }
2566  if (subId.empty())
2567  {
2568  message_.setSubscriptionId(id);
2569  subId = id;
2570  }
2571  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2572  {
2573  systemAddedAcks |= Message::AckType::Persisted;
2574  }
2575  // fall through
2576  case Message::Command::SOW:
2577  if (id.empty())
2578  {
2579  id = message_.newCommandId().getCommandId();
2580  }
2581  else
2582  {
2583  while (!replace && id != subId && _routes.hasRoute(id))
2584  {
2585  message_.newCommandId();
2586  if (qid == id)
2587  {
2588  qid = message_.getCommandId();
2589  message_.setQueryId(qid);
2590  }
2591  id = message_.getCommandId();
2592  }
2593  }
2594  if (!isSubscribeOnly)
2595  {
2596  if (qid.empty())
2597  {
2598  message_.setQueryID(id);
2599  qid = id;
2600  }
2601  else
2602  {
2603  while (!replace && qid != subId && qid != id
2604  && _routes.hasRoute(qid))
2605  {
2606  qid = message_.newQueryId().getQueryId();
2607  }
2608  }
2609  }
2610  systemAddedAcks |= Message::AckType::Processed;
2611  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2612  {
2613  int routesAdded = 0;
2614  Lock<Mutex> l(_lock);
2615  if (!subId.empty() && messageHandler_.isValid())
2616  {
2617  if (!_routes.hasRoute(subId))
2618  {
2619  ++routesAdded;
2620  }
2621  // This can replace a non-subscribe with a matching id
2622  // with a subscription but not another subscription.
2623  _routes.addRoute(subId, messageHandler_, requestedAcks,
2624  systemAddedAcks, commandType);
2625  }
2626  if (!isSubscribeOnly && !qid.empty()
2627  && messageHandler_.isValid() && qid != subId)
2628  {
2629  if (routesAdded == 0)
2630  {
2631  _routes.addRoute(qid, messageHandler_,
2632  requestedAcks, systemAddedAcks, commandType);
2633  }
2634  else
2635  {
2636  void* data = NULL;
2637  {
2638  Unlock<Mutex> u(_lock);
2639  data = amps_invoke_copy_route_function(
2640  messageHandler_.userData());
2641  }
2642  if (!data)
2643  {
2644  _routes.addRoute(qid, messageHandler_, requestedAcks,
2645  systemAddedAcks, commandType);
2646  }
2647  else
2648  {
2649  _routes.addRoute(qid,
2650  MessageHandler(messageHandler_.function(),
2651  data),
2652  requestedAcks, systemAddedAcks, commandType);
2653  }
2654  }
2655  ++routesAdded;
2656  }
2657  if (!id.empty() && messageHandler_.isValid()
2658  && requestedAcks & ~Message::AckType::Persisted
2659  && id != subId && id != qid)
2660  {
2661  if (routesAdded == 0)
2662  {
2663  _routes.addRoute(id, messageHandler_, requestedAcks,
2664  systemAddedAcks, commandType);
2665  }
2666  else
2667  {
2668  void* data = NULL;
2669  {
2670  Unlock<Mutex> u(_lock);
2671  data = amps_invoke_copy_route_function(
2672  messageHandler_.userData());
2673  }
2674  if (!data)
2675  {
2676  _routes.addRoute(id, messageHandler_, requestedAcks,
2677  systemAddedAcks, commandType);
2678  }
2679  else
2680  {
2681  _routes.addRoute(id,
2682  MessageHandler(messageHandler_.function(),
2683  data),
2684  requestedAcks,
2685  systemAddedAcks, commandType);
2686  }
2687  }
2688  ++routesAdded;
2689  }
2690  try
2691  {
2692  // We aren't adding to subscription manager, so this isn't
2693  // an HA subscribe.
2694  syncAckProcessing(timeout_, message_, 0, false);
2695  message_.setAckTypeEnum(requestedAcks);
2696  }
2697  catch (...)
2698  {
2699  _routes.removeRoute(message_.getQueryID());
2700  _routes.removeRoute(message_.getSubscriptionId());
2701  _routes.removeRoute(id);
2702  message_.setAckTypeEnum(requestedAcks);
2703  throw;
2704  }
2705  }
2706  break;
2707  // These are valid commands that are used as-is
2708  case Message::Command::Unsubscribe:
2709  case Message::Command::Heartbeat:
2710  case Message::Command::Logon:
2711  case Message::Command::StartTimer:
2712  case Message::Command::StopTimer:
2713  case Message::Command::SOWDelete:
2714  {
2715  Lock<Mutex> l(_lock);
2716  // if an ack is requested, it'll need a command ID.
2717  if (message_.getAckTypeEnum() != Message::AckType::None)
2718  {
2719  if (id.empty())
2720  {
2721  message_.newCommandId();
2722  id = message_.getCommandId();
2723  }
2724  if (messageHandler_.isValid())
2725  {
2726  _routes.addRoute(id, messageHandler_, requestedAcks,
2727  Message::AckType::None, commandType);
2728  }
2729  }
2730  _send(message_);
2731  }
2732  break;
2733  case Message::Command::DeltaPublish:
2734  case Message::Command::Publish:
2735  {
2736  bool useSync = message_.getFilter().len() > 0;
2737  Lock<Mutex> l(_lock);
2738  // if an ack is requested, it'll need a command ID.
2739  unsigned ackType = message_.getAckTypeEnum();
2740  if (ackType != Message::AckType::None
2741  || useSync)
2742  {
2743  if (id.empty())
2744  {
2745  message_.newCommandId();
2746  id = message_.getCommandId();
2747  }
2748  if (messageHandler_.isValid())
2749  {
2750  _routes.addRoute(id, messageHandler_, requestedAcks,
2751  Message::AckType::None, commandType);
2752  }
2753  }
2754  if (useSync)
2755  {
2756  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2757  syncAckProcessing(timeout_, message_, 0, false);
2758  }
2759  else
2760  {
2761  _send(message_, 0, false, 1);
2762  }
2763  }
2764  break;
2765  // These are things that shouldn't be sent (not meaningful)
2766  case Message::Command::GroupBegin:
2767  case Message::Command::GroupEnd:
2768  case Message::Command::OOF:
2769  case Message::Command::Ack:
2770  case Message::Command::Unknown:
2771  default:
2772  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2773  }
2774  message_.setAckTypeEnum(requestedAcks);
2775  return id;
2776  }
2777 
2778  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2779  {
2780  Lock<Mutex> l(_lock);
2781  _disconnectHandler = disconnectHandler;
2782  }
2783 
2784  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2785  {
2786  switch (command_[0])
2787  {
2788 #if 0 // Not currently implemented to avoid an extra branch in delivery
2789  case 'p':
2790  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2791  break;
2792  case 's':
2793  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2794  break;
2795 #endif
2796  case 'h':
2797  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2798  break;
2799 #if 0 // Not currently implemented to avoid an extra branch in delivery
2800  case 'g':
2801  if (command_[6] == 'b')
2802  {
2803  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2804  }
2805  else if (command_[6] == 'e')
2806  {
2807  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2808  }
2809  else
2810  {
2811  std::ostringstream os;
2812  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2813  throw CommandException(os.str());
2814  }
2815  break;
2816  case 'o':
2817  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2818  break;
2819 #endif
2820  case 'a':
2821  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2822  break;
2823  case 'l':
2824  case 'L':
2825  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2826  break;
2827  case 'd':
2828  case 'D':
2829  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2830  break;
2831  default:
2832  std::ostringstream os;
2833  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2834  throw CommandException(os.str());
2835  break;
2836  }
2837  }
2838 
2839  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2840  {
2841  switch (command_)
2842  {
2843 #if 0 // Not currently implemented to avoid an extra branch in delivery
2844  case Message::Command::Publish:
2845  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2846  break;
2847  case Message::Command::SOW:
2848  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2849  break;
2850 #endif
2851  case Message::Command::Heartbeat:
2852  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2853  break;
2854 #if 0 // Not currently implemented to avoid an extra branch in delivery
2855  case Message::Command::GroupBegin:
2856  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2857  break;
2858  case Message::Command::GroupEnd:
2859  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2860  break;
2861  case Message::Command::OOF:
2862  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2863  break;
2864 #endif
2865  case Message::Command::Ack:
2866  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2867  break;
2868  default:
2869  unsigned bits = 0;
2870  unsigned command = command_;
2871  while (command > 0)
2872  {
2873  ++bits;
2874  command >>= 1;
2875  }
2876  char errBuf[128];
2877  AMPS_snprintf(errBuf, sizeof(errBuf),
2878  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2879  CommandConstants<0>::Lengths[bits],
2880  CommandConstants<0>::Values[bits]);
2881  throw CommandException(errBuf);
2882  break;
2883  }
2884  }
2885 
2886  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2887  {
2888  _globalCommandTypeHandlers[handlerType_] = handler_;
2889  }
2890 
2891  void setFailedWriteHandler(FailedWriteHandler* handler_)
2892  {
2893  Lock<Mutex> l(_lock);
2894  _failedWriteHandler.reset(handler_);
2895  }
2896 
2897  void setPublishStore(const Store& publishStore_)
2898  {
2899  Lock<Mutex> l(_lock);
2900  if (_connected)
2901  {
2902  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2903  }
2904  _publishStore = publishStore_;
2905  }
2906 
2907  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2908  {
2909  Lock<Mutex> l(_lock);
2910  if (_connected)
2911  {
2912  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2913  }
2914  _bookmarkStore = bookmarkStore_;
2915  }
2916 
2917  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2918  {
2919  Lock<Mutex> l(_lock);
2920  _subscriptionManager.reset(subscriptionManager_);
2921  }
2922 
2923  SubscriptionManager* getSubscriptionManager() const
2924  {
2925  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2926  }
2927 
2928  DisconnectHandler getDisconnectHandler() const
2929  {
2930  return _disconnectHandler;
2931  }
2932 
2933  MessageHandler getDuplicateMessageHandler() const
2934  {
2935  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2936  }
2937 
2938  FailedWriteHandler* getFailedWriteHandler() const
2939  {
2940  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2941  }
2942 
2943  Store getPublishStore() const
2944  {
2945  return _publishStore;
2946  }
2947 
2948  BookmarkStore getBookmarkStore() const
2949  {
2950  return _bookmarkStore;
2951  }
2952 
2953  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2954  {
2955  if (!_publishStore.isValid())
2956  {
2957  Lock<Mutex> l(_lock);
2958  _publishMessage.assignTopic(topic_, topicLen_);
2959  _publishMessage.assignData(data_, dataLen_);
2960  _send(_publishMessage, 0, false, 1);
2961  return 0;
2962  }
2963  else
2964  {
2965  publishStoreMessage.reset();
2966  publishStoreMessage.setCommandEnum(Message::Command::Publish);
2967  return _publish(topic_, topicLen_, data_, dataLen_);
2968  }
2969  }
2970 
2971  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2972  size_t dataLen_, unsigned long expiration_)
2973  {
2974  if (!_publishStore.isValid())
2975  {
2976  Lock<Mutex> l(_lock);
2977  _publishMessage.assignTopic(topic_, topicLen_);
2978  _publishMessage.assignData(data_, dataLen_);
2979  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2980  size_t pos = convertToCharArray(exprBuf, expiration_);
2981  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2982  _send(_publishMessage, 0, false, 1);
2983  _publishMessage.assignExpiration(NULL, 0);
2984  return 0;
2985  }
2986  else
2987  {
2988  publishStoreMessage.reset();
2989  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2990  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2991  publishStoreMessage.setCommandEnum(Message::Command::Publish)
2992  .assignExpiration(exprBuf + exprPos,
2993  AMPS_NUMBER_BUFFER_LEN - exprPos);
2994  return _publish(topic_, topicLen_, data_, dataLen_);
2995  }
2996  }
2997 
2998  class FlushAckHandler : ConnectionStateListener
2999  {
3000  private:
3001  ClientImpl* _pClient;
3002  Field _cmdId;
3003 #if __cplusplus >= 201100L || _MSC_VER >= 1900
3004  std::atomic<bool> _acked;
3005  std::atomic<bool> _disconnected;
3006 #else
3007  volatile bool _acked;
3008  volatile bool _disconnected;
3009 #endif
3010  public:
3011  FlushAckHandler(ClientImpl* pClient_)
3012  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
3013  {
3014  pClient_->addConnectionStateListener(this);
3015  }
3016  ~FlushAckHandler()
3017  {
3018  _pClient->removeConnectionStateListener(this);
3019  _pClient->removeMessageHandler(_cmdId);
3020  _cmdId.clear();
3021  }
3022  void setCommandId(const Field& cmdId_)
3023  {
3024  _cmdId.deepCopy(cmdId_);
3025  }
3026  void invoke(const Message&)
3027  {
3028  _acked = true;
3029  }
3030  void connectionStateChanged(State state_)
3031  {
3032  if (state_ <= Shutdown)
3033  {
3034  _disconnected = true;
3035  }
3036  }
3037  bool acked()
3038  {
3039  return _acked;
3040  }
3041  bool done()
3042  {
3043  return _acked || _disconnected;
3044  }
3045  };
3046 
3047  void publishFlush(long timeout_, unsigned ackType_)
3048  {
3049  static const char* processed = "processed";
3050  static const size_t processedLen = strlen(processed);
3051  static const char* persisted = "persisted";
3052  static const size_t persistedLen = strlen(persisted);
3053  static const char* flush = "flush";
3054  static const size_t flushLen = strlen(flush);
3055  static VersionInfo minPersisted("5.3.3.0");
3056  static VersionInfo minFlush("4");
3057  if (ackType_ != Message::AckType::Processed
3058  && ackType_ != Message::AckType::Persisted)
3059  {
3060  throw CommandException("Flush can only be used with processed or persisted acks.");
3061  }
3062  FlushAckHandler flushHandler(this);
3063  if (_serverVersion >= minFlush)
3064  {
3065  Lock<Mutex> l(_lock);
3066  if (!_connected)
3067  {
3068  throw DisconnectedException("Not connected trying to flush");
3069  }
3070  _message.reset();
3071  _message.newCommandId();
3072  _message.assignCommand(flush, flushLen);
3073  if (_serverVersion < minPersisted
3074  || ackType_ == Message::AckType::Processed)
3075  {
3076  _message.assignAckType(processed, processedLen);
3077  }
3078  else
3079  {
3080  _message.assignAckType(persisted, persistedLen);
3081  }
3082  flushHandler.setCommandId(_message.getCommandId());
3083  addMessageHandler(_message.getCommandId(),
3084  std::bind(&FlushAckHandler::invoke,
3085  std::ref(flushHandler),
3086  std::placeholders::_1),
3087  ackType_, _message.getCommandEnum());
3088  NoDelay noDelay(_client);
3089  if (_send(_message) == -1)
3090  {
3091  throw DisconnectedException("Disconnected trying to flush");
3092  }
3093  }
3094  if (_publishStore.isValid())
3095  {
3096  try
3097  {
3098  _publishStore.flush(timeout_);
3099  }
3100  catch (const AMPSException& ex)
3101  {
3102  AMPS_UNHANDLED_EXCEPTION(ex);
3103  throw;
3104  }
3105  }
3106  else if (_serverVersion < minFlush)
3107  {
3108  if (timeout_ > 0)
3109  {
3110  AMPS_USLEEP(timeout_ * 1000);
3111  }
3112  else
3113  {
3114  AMPS_USLEEP(1000 * 1000);
3115  }
3116  return;
3117  }
3118  if (timeout_)
3119  {
3120  Timer timer((double)timeout_);
3121  timer.start();
3122  while (!timer.check() && !flushHandler.done())
3123  {
3124  AMPS_USLEEP(10000);
3125  amps_invoke_waiting_function();
3126  }
3127  }
3128  else
3129  {
3130  while (!flushHandler.done())
3131  {
3132  AMPS_USLEEP(10000);
3133  amps_invoke_waiting_function();
3134  }
3135  }
3136  // No response or disconnect in timeout interval
3137  if (!flushHandler.done())
3138  {
3139  throw TimedOutException("Timed out waiting for flush");
3140  }
3141  // We got disconnected and there is no publish store
3142  if (!flushHandler.acked() && !_publishStore.isValid())
3143  {
3144  throw DisconnectedException("Disconnected waiting for flush");
3145  }
3146  }
3147 
3148  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3149  const char* data_, size_t dataLength_)
3150  {
3151  if (!_publishStore.isValid())
3152  {
3153  Lock<Mutex> l(_lock);
3154  _deltaMessage.assignTopic(topic_, topicLength_);
3155  _deltaMessage.assignData(data_, dataLength_);
3156  _send(_deltaMessage, 0, false, 1);
3157  return 0;
3158  }
3159  else
3160  {
3161  publishStoreMessage.reset();
3162  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish);
3163  return _publish(topic_, topicLength_, data_, dataLength_);
3164  }
3165  }
3166 
3167  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3168  const char* data_, size_t dataLength_,
3169  unsigned long expiration_)
3170  {
3171  if (!_publishStore.isValid())
3172  {
3173  Lock<Mutex> l(_lock);
3174  _deltaMessage.assignTopic(topic_, topicLength_);
3175  _deltaMessage.assignData(data_, dataLength_);
3176  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3177  size_t pos = convertToCharArray(exprBuf, expiration_);
3178  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3179  _send(_deltaMessage, 0, false, 1);
3180  _deltaMessage.assignExpiration(NULL, 0);
3181  return 0;
3182  }
3183  else
3184  {
3185  publishStoreMessage.reset();
3186  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3187  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3188  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish)
3189  .assignExpiration(exprBuf + exprPos,
3190  AMPS_NUMBER_BUFFER_LEN - exprPos);
3191  return _publish(topic_, topicLength_, data_, dataLength_);
3192  }
3193  }
3194 
3195  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3196  const char* data_, size_t dataLength_)
3197  {
3198  publishStoreMessage.assignTopic(topic_, topicLength_)
3199  .setAckTypeEnum(Message::AckType::Persisted)
3200  .assignData(data_, dataLength_);
3201  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3202  char buf[AMPS_NUMBER_BUFFER_LEN];
3203  size_t pos = convertToCharArray(buf, haSequenceNumber);
3204  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3205  {
3206  Lock<Mutex> l(_lock);
3207  _send(publishStoreMessage, haSequenceNumber, false, 1);
3208  }
3209  return haSequenceNumber;
3210  }
3211 
3212  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3213  const char* options_ = NULL)
3214  {
3215  Lock<Mutex> l(_lock);
3216  return _logon(timeout_, authenticator_, options_);
3217  }
3218 
3219  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3220  const char* options_ = NULL)
3221  {
3222  _message.reset();
3223  _message.newCommandId();
3224  std::string newCommandId = _message.getCommandId();
3225  _message.setCommandEnum(Message::Command::Logon);
3226  _message.setClientName(_name);
3227 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3228  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3229  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3230 #endif
3231  URI uri(_lastUri);
3232  if (uri.user().size())
3233  {
3234  _message.setUserId(uri.user());
3235  }
3236  if (uri.password().size())
3237  {
3238  _message.setPassword(uri.password());
3239  }
3240  if (uri.protocol() == "amps" && uri.messageType().size())
3241  {
3242  _message.setMessageType(uri.messageType());
3243  }
3244  if (uri.isTrue("pretty"))
3245  {
3246  _message.setOptions("pretty");
3247  }
3248 
3249  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3250  if (!_logonCorrelationData.empty())
3251  {
3252  _message.assignCorrelationId(_logonCorrelationData);
3253  }
3254  if (options_)
3255  {
3256  _message.setOptions(options_);
3257  }
3258  _username = _message.getUserId();
3259  try
3260  {
3261  AtomicFlagFlip pubFlip(&_logonInProgress);
3262  NoDelay noDelay(_client);
3263  while (true)
3264  {
3265  _message.setAckTypeEnum(Message::AckType::Processed);
3266  AckResponse ack = syncAckProcessing(timeout_, _message);
3267  if (ack.status() == "retry")
3268  {
3269  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3270  _username = ack.username();
3271  _message.setUserId(_username);
3272  }
3273  else
3274  {
3275  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3276  break;
3277  }
3278  }
3279  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3280 
3281  // Now re-send the heartbeat command if configured
3282  _sendHeartbeat();
3283  // Signal any threads waiting for _logonInProgress
3284  _lock.signalAll();
3285  }
3286  catch (const AMPSException& ex)
3287  {
3288  {
3289  Unlock<Mutex> u(_lock);
3290  setDisconnected();
3291  }
3292  _lock.signalAll();
3293  AMPS_UNHANDLED_EXCEPTION(ex);
3294  throw;
3295  }
3296  catch (...)
3297  {
3298  {
3299  Unlock<Mutex> u(_lock);
3300  setDisconnected();
3301  }
3302  _lock.signalAll();
3303  throw;
3304  }
3305 
3306  if (_publishStore.isValid())
3307  {
3308  try
3309  {
3310  _publishStore.replay(_replayer);
3311  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3312  }
3313  catch (const PublishStoreGapException& ex)
3314  {
3315  {
3316  Unlock<Mutex> u(_lock);
3317  setDisconnected();
3318  }
3319  _lock.signalAll();
3320  AMPS_UNHANDLED_EXCEPTION(ex);
3321  throw;
3322  }
3323  catch (const StoreException& ex)
3324  {
3325  {
3326  Unlock<Mutex> u(_lock);
3327  setDisconnected();
3328  }
3329  _lock.signalAll();
3330  std::ostringstream os;
3331  os << "A local store exception occurred while logging on."
3332  << ex.toString();
3333  throw ConnectionException(os.str());
3334  }
3335  catch (const AMPSException& ex)
3336  {
3337  {
3338  Unlock<Mutex> u(_lock);
3339  setDisconnected();
3340  }
3341  _lock.signalAll();
3342  AMPS_UNHANDLED_EXCEPTION(ex);
3343  throw;
3344  }
3345  catch (const std::exception& ex)
3346  {
3347  {
3348  Unlock<Mutex> u(_lock);
3349  setDisconnected();
3350  }
3351  _lock.signalAll();
3352  AMPS_UNHANDLED_EXCEPTION(ex);
3353  throw;
3354  }
3355  catch (...)
3356  {
3357  {
3358  Unlock<Mutex> u(_lock);
3359  setDisconnected();
3360  }
3361  _lock.signalAll();
3362  throw;
3363  }
3364  }
3365  _lock.signalAll();
3366  return newCommandId;
3367  }
3368 
3369  std::string subscribe(const MessageHandler& messageHandler_,
3370  const std::string& topic_,
3371  long timeout_,
3372  const std::string& filter_,
3373  const std::string& bookmark_,
3374  const std::string& options_,
3375  const std::string& subId_,
3376  bool isHASubscribe_ = true)
3377  {
3378  isHASubscribe_ &= (bool)_subscriptionManager;
3379  Lock<Mutex> l(_lock);
3380  _message.reset();
3381  _message.setCommandEnum(Message::Command::Subscribe);
3382  _message.newCommandId();
3383  std::string subId(subId_);
3384  if (subId.empty())
3385  {
3386  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3387  {
3388  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3389  }
3390 
3391  subId = _message.getCommandId();
3392  }
3393  _message.setSubscriptionId(subId);
3394  // we need to deep copy this before sending the message; while we are
3395  // waiting for a response, the fields in _message may get blown away for
3396  // other operations.
3397  AMPS::Message::Field subIdField(subId);
3398  unsigned ackTypes = Message::AckType::Processed;
3399 
3400  if (!bookmark_.empty() && _bookmarkStore.isValid())
3401  {
3402  ackTypes |= Message::AckType::Persisted;
3403  }
3404  _message.setTopic(topic_);
3405 
3406  if (filter_.length())
3407  {
3408  _message.setFilter(filter_);
3409  }
3410  if (bookmark_.length())
3411  {
3412  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3413  {
3414  // The returned Field is a deep copy, so use assign to get it cleared
3415  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3416  _message.assignOwnershipBookmark(mostRecent);
3417  }
3418  else
3419  {
3420  _message.setBookmark(bookmark_);
3421  if (_bookmarkStore.isValid())
3422  {
3423  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3424  bookmark_ != AMPS_BOOKMARK_EPOCH)
3425  {
3426  _bookmarkStore.log(_message);
3427  _bookmarkStore.discard(_message);
3428  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3429  }
3430  }
3431  }
3432  }
3433  if (options_.length())
3434  {
3435  _message.setOptions(options_);
3436  }
3437 
3438  Message message = _message;
3439  if (isHASubscribe_)
3440  {
3441  message = _message.deepCopy();
3442  Unlock<Mutex> u(_lock);
3443  _subscriptionManager->subscribe(messageHandler_, message,
3444  Message::AckType::None);
3445  if (_badTimeToHASubscribe)
3446  {
3447  return subId;
3448  }
3449  }
3450  if (!_routes.hasRoute(message.getSubscriptionId()))
3451  {
3452  _routes.addRoute(message.getSubscriptionId(), messageHandler_,
3453  Message::AckType::None, ackTypes, message.getCommandEnum());
3454  }
3455  message.setAckTypeEnum(ackTypes);
3456  if (!options_.empty())
3457  {
3458  message.setOptions(options_);
3459  }
3460  try
3461  {
3462  syncAckProcessing(timeout_, message, isHASubscribe_);
3463  }
3464  catch (const DisconnectedException&)
3465  {
3466  if (!isHASubscribe_)
3467  {
3468  _routes.removeRoute(subIdField);
3469  throw;
3470  }
3471  else
3472  {
3473  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3474  throw;
3475  }
3476  }
3477  catch (const TimedOutException&)
3478  {
3479  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3480  throw;
3481  }
3482  catch (...)
3483  {
3484  if (isHASubscribe_)
3485  {
3486  // Have to unlock before calling into sub manager to avoid deadlock
3487  Unlock<Mutex> unlock(_lock);
3488  _subscriptionManager->unsubscribe(subIdField);
3489  }
3490  _routes.removeRoute(subIdField);
3491  throw;
3492  }
3493 
3494  return subId;
3495  }
3496  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3497  const std::string& topic_,
3498  long timeout_,
3499  const std::string& filter_,
3500  const std::string& bookmark_,
3501  const std::string& options_,
3502  const std::string& subId_ = "",
3503  bool isHASubscribe_ = true)
3504  {
3505  isHASubscribe_ &= (bool)_subscriptionManager;
3506  Lock<Mutex> l(_lock);
3507  _message.reset();
3508  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3509  _message.newCommandId();
3510  std::string subId(subId_);
3511  if (subId.empty())
3512  {
3513  subId = _message.getCommandId();
3514  }
3515  _message.setSubscriptionId(subId);
3516  // we need to deep copy this before sending the message; while we are
3517  // waiting for a response, the fields in _message may get blown away for
3518  // other operations.
3519  AMPS::Message::Field subIdField(subId);
3520  unsigned ackTypes = Message::AckType::Processed;
3521 
3522  if (!bookmark_.empty() && _bookmarkStore.isValid())
3523  {
3524  ackTypes |= Message::AckType::Persisted;
3525  }
3526  _message.setTopic(topic_);
3527  if (filter_.length())
3528  {
3529  _message.setFilter(filter_);
3530  }
3531  if (bookmark_.length())
3532  {
3533  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3534  {
3535  // The returned Field is a deep copy, so use assign to get it cleared
3536  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3537  _message.assignOwnershipBookmark(mostRecent);
3538  }
3539  else
3540  {
3541  _message.setBookmark(bookmark_);
3542  if (_bookmarkStore.isValid())
3543  {
3544  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3545  bookmark_ != AMPS_BOOKMARK_EPOCH)
3546  {
3547  _bookmarkStore.log(_message);
3548  _bookmarkStore.discard(_message);
3549  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3550  }
3551  }
3552  }
3553  }
3554  if (options_.length())
3555  {
3556  _message.setOptions(options_);
3557  }
3558  Message message = _message;
3559  if (isHASubscribe_)
3560  {
3561  message = _message.deepCopy();
3562  Unlock<Mutex> u(_lock);
3563  _subscriptionManager->subscribe(messageHandler_, message,
3564  Message::AckType::None);
3565  if (_badTimeToHASubscribe)
3566  {
3567  return subId;
3568  }
3569  }
3570  if (!_routes.hasRoute(message.getSubscriptionId()))
3571  {
3572  _routes.addRoute(message.getSubscriptionId(), messageHandler_,
3573  Message::AckType::None, ackTypes, message.getCommandEnum());
3574  }
3575  message.setAckTypeEnum(ackTypes);
3576  if (!options_.empty())
3577  {
3578  message.setOptions(options_);
3579  }
3580  try
3581  {
3582  syncAckProcessing(timeout_, message, isHASubscribe_);
3583  }
3584  catch (const DisconnectedException&)
3585  {
3586  if (!isHASubscribe_)
3587  {
3588  _routes.removeRoute(subIdField);
3589  throw;
3590  }
3591  }
3592  catch (const TimedOutException&)
3593  {
3594  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3595  throw;
3596  }
3597  catch (...)
3598  {
3599  if (isHASubscribe_)
3600  {
3601  // Have to unlock before calling into sub manager to avoid deadlock
3602  Unlock<Mutex> unlock(_lock);
3603  _subscriptionManager->unsubscribe(subIdField);
3604  }
3605  _routes.removeRoute(subIdField);
3606  throw;
3607  }
3608  return subId;
3609  }
3610 
3611  void unsubscribe(const std::string& id)
3612  {
3613  Lock<Mutex> l(_lock);
3614  unsubscribeInternal(id);
3615  }
3616 
3617  void unsubscribe(void)
3618  {
3619  if (_subscriptionManager)
3620  {
3621  _subscriptionManager->clear();
3622  }
3623  {
3624  _routes.unsubscribeAll();
3625  Lock<Mutex> l(_lock);
3626  _message.reset();
3627  _message.setCommandEnum(Message::Command::Unsubscribe);
3628  _message.newCommandId();
3629  _message.setSubscriptionId("all");
3630  _sendWithoutRetry(_message);
3631  }
3632  deferredExecution(&amps_noOpFn, NULL);
3633  }
3634 
3635  std::string sow(const MessageHandler& messageHandler_,
3636  const std::string& topic_,
3637  const std::string& filter_ = "",
3638  const std::string& orderBy_ = "",
3639  const std::string& bookmark_ = "",
3640  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3641  int topN_ = AMPS_DEFAULT_TOP_N,
3642  const std::string& options_ = "",
3643  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3644  {
3645  Lock<Mutex> l(_lock);
3646  _message.reset();
3647  _message.setCommandEnum(Message::Command::SOW);
3648  _message.newCommandId();
3649  // need to keep our own copy of the command ID.
3650  std::string commandId = _message.getCommandId();
3651  _message.setQueryID(_message.getCommandId());
3652  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3653  _message.setAckTypeEnum(ackTypes);
3654  _message.setTopic(topic_);
3655  if (filter_.length())
3656  {
3657  _message.setFilter(filter_);
3658  }
3659  if (orderBy_.length())
3660  {
3661  _message.setOrderBy(orderBy_);
3662  }
3663  if (bookmark_.length())
3664  {
3665  _message.setBookmark(bookmark_);
3666  }
3667  _message.setBatchSize(AMPS::asString(batchSize_));
3668  if (topN_ != AMPS_DEFAULT_TOP_N)
3669  {
3670  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3671  }
3672  if (options_.length())
3673  {
3674  _message.setOptions(options_);
3675  }
3676 
3677  _routes.addRoute(_message.getQueryID(), messageHandler_,
3678  Message::AckType::None, ackTypes, _message.getCommandEnum());
3679 
3680  try
3681  {
3682  syncAckProcessing(timeout_, _message);
3683  }
3684  catch (...)
3685  {
3686  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3687  throw;
3688  }
3689 
3690  return commandId;
3691  }
3692 
3693  std::string sow(const MessageHandler& messageHandler_,
3694  const std::string& topic_,
3695  long timeout_,
3696  const std::string& filter_ = "",
3697  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3698  int topN_ = AMPS_DEFAULT_TOP_N)
3699  {
3700  std::string notSet;
3701  return sow(messageHandler_,
3702  topic_,
3703  filter_,
3704  notSet, // orderBy
3705  notSet, // bookmark
3706  batchSize_,
3707  topN_,
3708  notSet,
3709  timeout_);
3710  }
3711 
3712  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3713  const std::string& topic_,
3714  const std::string& filter_ = "",
3715  const std::string& orderBy_ = "",
3716  const std::string& bookmark_ = "",
3717  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3718  int topN_ = AMPS_DEFAULT_TOP_N,
3719  const std::string& options_ = "",
3720  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3721  bool isHASubscribe_ = true)
3722  {
3723  isHASubscribe_ &= (bool)_subscriptionManager;
3724  unsigned ackTypes = Message::AckType::Processed;
3725  Lock<Mutex> l(_lock);
3726  _message.reset();
3727  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3728  _message.newCommandId();
3729  Field cid = _message.getCommandId();
3730  std::string subId = cid;
3731  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3732  if (filter_.length())
3733  {
3734  _message.setFilter(filter_);
3735  }
3736  if (orderBy_.length())
3737  {
3738  _message.setOrderBy(orderBy_);
3739  }
3740  if (bookmark_.length())
3741  {
3742  _message.setBookmark(bookmark_);
3743  Message::Field bookmark = _message.getBookmark();
3744  if (_bookmarkStore.isValid())
3745  {
3746  ackTypes |= Message::AckType::Persisted;
3747  if (bookmark == AMPS_BOOKMARK_RECENT)
3748  {
3749  _message.assignOwnershipBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3750  }
3751  else if (bookmark != AMPS_BOOKMARK_NOW &&
3752  bookmark != AMPS_BOOKMARK_EPOCH)
3753  {
3754  _bookmarkStore.log(_message);
3755  if (!BookmarkRange::isRange(bookmark))
3756  {
3757  _bookmarkStore.discard(_message);
3758  _bookmarkStore.persisted(_message.getSubscriptionId(),
3759  bookmark);
3760  }
3761  }
3762  }
3763  else if (bookmark == AMPS_BOOKMARK_RECENT)
3764  {
3765  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3766  }
3767  }
3768  _message.setBatchSize(AMPS::asString(batchSize_));
3769  if (topN_ != AMPS_DEFAULT_TOP_N)
3770  {
3771  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3772  }
3773  if (options_.length())
3774  {
3775  _message.setOptions(options_);
3776  }
3777 
3778  Message message = _message;
3779  if (isHASubscribe_)
3780  {
3781  message = _message.deepCopy();
3782  Unlock<Mutex> u(_lock);
3783  _subscriptionManager->subscribe(messageHandler_, message,
3784  Message::AckType::None);
3785  if (_badTimeToHASubscribe)
3786  {
3787  return subId;
3788  }
3789  }
3790  _routes.addRoute(cid, messageHandler_,
3791  Message::AckType::None, ackTypes, message.getCommandEnum());
3792  message.setAckTypeEnum(ackTypes);
3793  if (!options_.empty())
3794  {
3795  message.setOptions(options_);
3796  }
3797  try
3798  {
3799  syncAckProcessing(timeout_, message, isHASubscribe_);
3800  }
3801  catch (const DisconnectedException&)
3802  {
3803  if (!isHASubscribe_)
3804  {
3805  _routes.removeRoute(subId);
3806  throw;
3807  }
3808  }
3809  catch (const TimedOutException&)
3810  {
3811  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3812  throw;
3813  }
3814  catch (...)
3815  {
3816  if (isHASubscribe_)
3817  {
3818  // Have to unlock before calling into sub manager to avoid deadlock
3819  Unlock<Mutex> unlock(_lock);
3820  _subscriptionManager->unsubscribe(cid);
3821  }
3822  _routes.removeRoute(subId);
3823  throw;
3824  }
3825  return subId;
3826  }
3827 
3828  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3829  const std::string& topic_,
3830  long timeout_,
3831  const std::string& filter_ = "",
3832  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3833  bool oofEnabled_ = false,
3834  int topN_ = AMPS_DEFAULT_TOP_N,
3835  bool isHASubscribe_ = true)
3836  {
3837  std::string notSet;
3838  return sowAndSubscribe(messageHandler_,
3839  topic_,
3840  filter_,
3841  notSet, // orderBy
3842  notSet, // bookmark
3843  batchSize_,
3844  topN_,
3845  (oofEnabled_ ? "oof" : ""),
3846  timeout_,
3847  isHASubscribe_);
3848  }
3849 
3850  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3851  const std::string& topic_,
3852  const std::string& filter_ = "",
3853  const std::string& orderBy_ = "",
3854  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3855  int topN_ = AMPS_DEFAULT_TOP_N,
3856  const std::string& options_ = "",
3857  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3858  bool isHASubscribe_ = true)
3859  {
3860  isHASubscribe_ &= (bool)_subscriptionManager;
3861  Lock<Mutex> l(_lock);
3862  _message.reset();
3863  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3864  _message.newCommandId();
3865  _message.setQueryID(_message.getCommandId());
3866  _message.setSubscriptionId(_message.getCommandId());
3867  std::string subId = _message.getSubscriptionId();
3868  _message.setTopic(topic_);
3869  if (filter_.length())
3870  {
3871  _message.setFilter(filter_);
3872  }
3873  if (orderBy_.length())
3874  {
3875  _message.setOrderBy(orderBy_);
3876  }
3877  _message.setBatchSize(AMPS::asString(batchSize_));
3878  if (topN_ != AMPS_DEFAULT_TOP_N)
3879  {
3880  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3881  }
3882  if (options_.length())
3883  {
3884  _message.setOptions(options_);
3885  }
3886  Message message = _message;
3887  if (isHASubscribe_)
3888  {
3889  message = _message.deepCopy();
3890  Unlock<Mutex> u(_lock);
3891  _subscriptionManager->subscribe(messageHandler_, message,
3892  Message::AckType::None);
3893  if (_badTimeToHASubscribe)
3894  {
3895  return subId;
3896  }
3897  }
3898  _routes.addRoute(message.getQueryID(), messageHandler_,
3899  Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3900  message.setAckTypeEnum(Message::AckType::Processed);
3901  if (!options_.empty())
3902  {
3903  message.setOptions(options_);
3904  }
3905  try
3906  {
3907  syncAckProcessing(timeout_, message, isHASubscribe_);
3908  }
3909  catch (const DisconnectedException&)
3910  {
3911  if (!isHASubscribe_)
3912  {
3913  _routes.removeRoute(subId);
3914  throw;
3915  }
3916  }
3917  catch (const TimedOutException&)
3918  {
3919  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3920  throw;
3921  }
3922  catch (...)
3923  {
3924  if (isHASubscribe_)
3925  {
3926  // Have to unlock before calling into sub manager to avoid deadlock
3927  Unlock<Mutex> unlock(_lock);
3928  _subscriptionManager->unsubscribe(Field(subId));
3929  }
3930  _routes.removeRoute(subId);
3931  throw;
3932  }
3933  return subId;
3934  }
3935 
3936  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3937  const std::string& topic_,
3938  long timeout_,
3939  const std::string& filter_ = "",
3940  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3941  bool oofEnabled_ = false,
3942  bool sendEmpties_ = false,
3943  int topN_ = AMPS_DEFAULT_TOP_N,
3944  bool isHASubscribe_ = true)
3945  {
3946  std::string notSet;
3947  Message::Options options;
3948  if (oofEnabled_)
3949  {
3950  options.setOOF();
3951  }
3952  if (sendEmpties_ == false)
3953  {
3954  options.setNoEmpties();
3955  }
3956  return sowAndDeltaSubscribe(messageHandler_,
3957  topic_,
3958  filter_,
3959  notSet, // orderBy
3960  batchSize_,
3961  topN_,
3962  options,
3963  timeout_,
3964  isHASubscribe_);
3965  }
3966 
3967  std::string sowDelete(const MessageHandler& messageHandler_,
3968  const std::string& topic_,
3969  const std::string& filter_,
3970  long timeout_,
3971  Message::Field commandId_ = Message::Field())
3972  {
3973  if (_publishStore.isValid())
3974  {
3975  unsigned ackType = Message::AckType::Processed |
3976  Message::AckType::Stats |
3977  Message::AckType::Persisted;
3978  publishStoreMessage.reset();
3979  if (commandId_.empty())
3980  {
3981  publishStoreMessage.newCommandId();
3982  commandId_ = publishStoreMessage.getCommandId();
3983  }
3984  else
3985  {
3986  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3987  }
3988  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3989  .assignSubscriptionId(commandId_.data(), commandId_.len())
3990  .assignQueryID(commandId_.data(), commandId_.len())
3991  .setAckTypeEnum(ackType)
3992  .assignTopic(topic_.c_str(), topic_.length())
3993  .assignFilter(filter_.c_str(), filter_.length());
3994  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3995  char buf[AMPS_NUMBER_BUFFER_LEN];
3996  size_t pos = convertToCharArray(buf, haSequenceNumber);
3997  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3998  {
3999  try
4000  {
4001  Lock<Mutex> l(_lock);
4002  _routes.addRoute(commandId_, messageHandler_,
4003  Message::AckType::Stats,
4004  Message::AckType::Processed | Message::AckType::Persisted,
4005  publishStoreMessage.getCommandEnum());
4006  syncAckProcessing(timeout_, publishStoreMessage,
4007  haSequenceNumber);
4008  }
4009  catch (const DisconnectedException&)
4010  {
4011  // -V565
4012  // Pass - it will get replayed upon reconnect
4013  }
4014  catch (...)
4015  {
4016  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4017  throw;
4018  }
4019  }
4020  return (std::string)commandId_;
4021  }
4022  else
4023  {
4024  Lock<Mutex> l(_lock);
4025  _message.reset();
4026  if (commandId_.empty())
4027  {
4028  _message.newCommandId();
4029  commandId_ = _message.getCommandId();
4030  }
4031  else
4032  {
4033  _message.setCommandId(commandId_.data(), commandId_.len());
4034  }
4035  _message.setCommandEnum(Message::Command::SOWDelete)
4036  .assignSubscriptionId(commandId_.data(), commandId_.len())
4037  .assignQueryID(commandId_.data(), commandId_.len())
4038  .setAckTypeEnum(Message::AckType::Processed |
4039  Message::AckType::Stats)
4040  .assignTopic(topic_.c_str(), topic_.length())
4041  .assignFilter(filter_.c_str(), filter_.length());
4042  _routes.addRoute(commandId_, messageHandler_,
4043  Message::AckType::Stats,
4044  Message::AckType::Processed,
4045  _message.getCommandEnum());
4046  try
4047  {
4048  syncAckProcessing(timeout_, _message);
4049  }
4050  catch (...)
4051  {
4052  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4053  throw;
4054  }
4055  return (std::string)commandId_;
4056  }
4057  }
4058 
4059  std::string sowDeleteByData(const MessageHandler& messageHandler_,
4060  const std::string& topic_,
4061  const std::string& data_,
4062  long timeout_,
4063  Message::Field commandId_ = Message::Field())
4064  {
4065  if (_publishStore.isValid())
4066  {
4067  unsigned ackType = Message::AckType::Processed |
4068  Message::AckType::Stats |
4069  Message::AckType::Persisted;
4070  publishStoreMessage.reset();
4071  if (commandId_.empty())
4072  {
4073  publishStoreMessage.newCommandId();
4074  commandId_ = publishStoreMessage.getCommandId();
4075  }
4076  else
4077  {
4078  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4079  }
4080  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4081  .assignSubscriptionId(commandId_.data(), commandId_.len())
4082  .assignQueryID(commandId_.data(), commandId_.len())
4083  .setAckTypeEnum(ackType)
4084  .assignTopic(topic_.c_str(), topic_.length())
4085  .assignData(data_.c_str(), data_.length());
4086  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4087  char buf[AMPS_NUMBER_BUFFER_LEN];
4088  size_t pos = convertToCharArray(buf, haSequenceNumber);
4089  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4090  {
4091  try
4092  {
4093  Lock<Mutex> l(_lock);
4094  _routes.addRoute(commandId_, messageHandler_,
4095  Message::AckType::Stats,
4096  Message::AckType::Processed | Message::AckType::Persisted,
4097  publishStoreMessage.getCommandEnum());
4098  syncAckProcessing(timeout_, publishStoreMessage,
4099  haSequenceNumber);
4100  }
4101  catch (const DisconnectedException&)
4102  {
4103  // -V565
4104  // Pass - it will get replayed upon reconnect
4105  }
4106  catch (...)
4107  {
4108  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4109  throw;
4110  }
4111  }
4112  return (std::string)commandId_;
4113  }
4114  else
4115  {
4116  Lock<Mutex> l(_lock);
4117  _message.reset();
4118  if (commandId_.empty())
4119  {
4120  _message.newCommandId();
4121  commandId_ = _message.getCommandId();
4122  }
4123  else
4124  {
4125  _message.setCommandId(commandId_.data(), commandId_.len());
4126  }
4127  _message.setCommandEnum(Message::Command::SOWDelete)
4128  .assignSubscriptionId(commandId_.data(), commandId_.len())
4129  .assignQueryID(commandId_.data(), commandId_.len())
4130  .setAckTypeEnum(Message::AckType::Processed |
4131  Message::AckType::Stats)
4132  .assignTopic(topic_.c_str(), topic_.length())
4133  .assignData(data_.c_str(), data_.length());
4134  _routes.addRoute(commandId_, messageHandler_,
4135  Message::AckType::Stats,
4136  Message::AckType::Processed,
4137  _message.getCommandEnum());
4138  try
4139  {
4140  syncAckProcessing(timeout_, _message);
4141  }
4142  catch (...)
4143  {
4144  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4145  throw;
4146  }
4147  return (std::string)commandId_;
4148  }
4149  }
4150 
4151  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4152  const std::string& topic_,
4153  const std::string& keys_,
4154  long timeout_,
4155  Message::Field commandId_ = Message::Field())
4156  {
4157  if (_publishStore.isValid())
4158  {
4159  unsigned ackType = Message::AckType::Processed |
4160  Message::AckType::Stats |
4161  Message::AckType::Persisted;
4162  publishStoreMessage.reset();
4163  if (commandId_.empty())
4164  {
4165  publishStoreMessage.newCommandId();
4166  commandId_ = publishStoreMessage.getCommandId();
4167  }
4168  else
4169  {
4170  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4171  }
4172  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4173  .assignSubscriptionId(commandId_.data(), commandId_.len())
4174  .assignQueryID(commandId_.data(), commandId_.len())
4175  .setAckTypeEnum(ackType)
4176  .assignTopic(topic_.c_str(), topic_.length())
4177  .assignSowKeys(keys_.c_str(), keys_.length());
4178  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4179  char buf[AMPS_NUMBER_BUFFER_LEN];
4180  size_t pos = convertToCharArray(buf, haSequenceNumber);
4181  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4182  {
4183  try
4184  {
4185  Lock<Mutex> l(_lock);
4186  _routes.addRoute(commandId_, messageHandler_,
4187  Message::AckType::Stats,
4188  Message::AckType::Processed | Message::AckType::Persisted,
4189  publishStoreMessage.getCommandEnum());
4190  syncAckProcessing(timeout_, publishStoreMessage,
4191  haSequenceNumber);
4192  }
4193  catch (const DisconnectedException&)
4194  {
4195  // -V565
4196  // Pass - it will get replayed upon reconnect
4197  }
4198  catch (...)
4199  {
4200  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4201  throw;
4202  }
4203  }
4204  return (std::string)commandId_;
4205  }
4206  else
4207  {
4208  Lock<Mutex> l(_lock);
4209  _message.reset();
4210  if (commandId_.empty())
4211  {
4212  _message.newCommandId();
4213  commandId_ = _message.getCommandId();
4214  }
4215  else
4216  {
4217  _message.setCommandId(commandId_.data(), commandId_.len());
4218  }
4219  _message.setCommandEnum(Message::Command::SOWDelete)
4220  .assignSubscriptionId(commandId_.data(), commandId_.len())
4221  .assignQueryID(commandId_.data(), commandId_.len())
4222  .setAckTypeEnum(Message::AckType::Processed |
4223  Message::AckType::Stats)
4224  .assignTopic(topic_.c_str(), topic_.length())
4225  .assignSowKeys(keys_.c_str(), keys_.length());
4226  _routes.addRoute(commandId_, messageHandler_,
4227  Message::AckType::Stats,
4228  Message::AckType::Processed,
4229  _message.getCommandEnum());
4230  try
4231  {
4232  syncAckProcessing(timeout_, _message);
4233  }
4234  catch (...)
4235  {
4236  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4237  throw;
4238  }
4239  return (std::string)commandId_;
4240  }
4241  }
4242 
4243  void startTimer(void)
4244  {
4245  if (_serverVersion >= "5.3.2.0")
4246  {
4247  throw CommandException("The start_timer command is deprecated.");
4248  }
4249  Lock<Mutex> l(_lock);
4250  _message.reset();
4251  _message.setCommandEnum(Message::Command::StartTimer);
4252 
4253  _send(_message);
4254  }
4255 
4256  std::string stopTimer(MessageHandler messageHandler_)
4257  {
4258  if (_serverVersion >= "5.3.2.0")
4259  {
4260  throw CommandException("The stop_timer command is deprecated.");
4261  }
4262  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4263  }
4264 
4265  amps_handle getHandle(void)
4266  {
4267  return _client;
4268  }
4269 
4277  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4278  {
4279  _pExceptionListener = pListener_;
4280  _exceptionListener = _pExceptionListener.get();
4281  }
4282 
4283  void setExceptionListener(const ExceptionListener& listener_)
4284  {
4285  _exceptionListener = &listener_;
4286  }
4287 
4288  const ExceptionListener& getExceptionListener(void) const
4289  {
4290  return *_exceptionListener;
4291  }
4292 
4293  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4294  {
4295  if (readTimeout_ < heartbeatInterval_)
4296  {
4297  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4298  }
4299  Lock<Mutex> l(_lock);
4300  if (_heartbeatInterval != heartbeatInterval_ ||
4301  _readTimeout != readTimeout_)
4302  {
4303  _heartbeatInterval = heartbeatInterval_;
4304  _readTimeout = readTimeout_;
4305  _sendHeartbeat();
4306  }
4307  }
4308 
4309  void _sendHeartbeat(void)
4310  {
4311  if (_connected && _heartbeatInterval != 0)
4312  {
4313  std::ostringstream options;
4314  options << "start," << _heartbeatInterval;
4315  _beatMessage.setOptions(options.str());
4316 
4317  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4318  _heartbeatTimer.start();
4319  try
4320  {
4321  _sendWithoutRetry(_beatMessage);
4322  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4323  }
4324  catch (ConnectionException& ex_)
4325  {
4326  // If we are disconnected when we attempt to send, that's OK;
4327  // we'll send this message after we re-connect (if we do).
4328  AMPS_UNHANDLED_EXCEPTION(ex_);
4329  }
4330  _beatMessage.setOptions("beat");
4331  }
4332  amps_result result = AMPS_E_OK;
4333  if (_readTimeout && _connected)
4334  {
4335  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4336  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4337  {
4338  AMPSException::throwFor(_client, result);
4339  }
4340  if (!_queueAckTimeout)
4341  {
4342  result = amps_client_set_idle_time(_client,
4343  (int)(_heartbeatInterval * 1000));
4344  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4345  {
4346  AMPSException::throwFor(_client, result);
4347  }
4348  }
4349  }
4350  }
4351 
4352  void addConnectionStateListener(ConnectionStateListener* listener_)
4353  {
4354  Lock<Mutex> lock(_lock);
4355  _connectionStateListeners.insert(listener_);
4356  }
4357 
4358  void removeConnectionStateListener(ConnectionStateListener* listener_)
4359  {
4360  Lock<Mutex> lock(_lock);
4361  _connectionStateListeners.erase(listener_);
4362  }
4363 
4364  void clearConnectionStateListeners()
4365  {
4366  Lock<Mutex> lock(_lock);
4367  _connectionStateListeners.clear();
4368  }
4369 
4370  void _registerHandler(Command& command_, Message::Field& cid_,
4371  MessageHandler& handler_, unsigned requestedAcks_,
4372  unsigned systemAddedAcks_, Message::Command::Type commandType_)
4373  {
4374  Message message = command_.getMessage();
4375  Message::Command::Type commandType = message.getCommandEnum();
4376  Message::Field subid = message.getSubscriptionId();
4377  Message::Field qid = message.getQueryID();
4378  // If we have an id, we're good, even if it's an existing route
4379  bool added = qid.len() || subid.len() || cid_.len();
4380  bool cidIsQid = cid_ == qid;
4381  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4382  int addedCount = 0;
4383  if (subid.len() > 0)
4384  {
4385  // This can replace a non-subscribe with a matching id
4386  // with a subscription but not another subscription.
4387  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4388  systemAddedAcks_, commandType_);
4389  if (!cidUnique
4390  && (commandType == Message::Command::Subscribe
4391  || commandType == Message::Command::DeltaSubscribe))
4392  {
4393  // We don't need to do anything else
4394  cid_ = subid;
4395  return;
4396  }
4397  }
4398  if (qid.len() > 0 && qid != subid
4399  && (commandType == Message::Command::SOW
4400  || commandType == Message::Command::SOWDelete
4401  || commandType == Message::Command::SOWAndSubscribe
4402  || commandType == Message::Command::SOWAndDeltaSubscribe))
4403  {
4404  while (_routes.hasRoute(qid))
4405  {
4406  message.newQueryId();
4407  if (cidIsQid)
4408  {
4409  cid_ = message.getQueryId();
4410  }
4411  qid = message.getQueryId();
4412  }
4413  if (addedCount == 0)
4414  {
4415  _routes.addRoute(qid, handler_, requestedAcks_,
4416  systemAddedAcks_, commandType_);
4417  }
4418  else
4419  {
4420  void* data = NULL;
4421  {
4422  Unlock<Mutex> u(_lock);
4423  data = amps_invoke_copy_route_function(handler_.userData());
4424  }
4425  if (!data)
4426  {
4427  _routes.addRoute(qid, handler_, requestedAcks_,
4428  systemAddedAcks_, commandType_);
4429  }
4430  else
4431  {
4432  _routes.addRoute(qid,
4433  MessageHandler(handler_.function(),
4434  data),
4435  requestedAcks_,
4436  systemAddedAcks_, commandType_);
4437  }
4438  }
4439  ++addedCount;
4440  }
4441  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4442  {
4443  while (_routes.hasRoute(cid_))
4444  {
4445  cid_ = message.newCommandId().getCommandId();
4446  }
4447  if (addedCount == 0)
4448  {
4449  _routes.addRoute(cid_, handler_, requestedAcks_,
4450  systemAddedAcks_, commandType_);
4451  }
4452  else
4453  {
4454  void* data = NULL;
4455  {
4456  Unlock<Mutex> u(_lock);
4457  data = amps_invoke_copy_route_function(handler_.userData());
4458  }
4459  if (!data)
4460  {
4461  _routes.addRoute(cid_, handler_, requestedAcks_,
4462  systemAddedAcks_, commandType_);
4463  }
4464  else
4465  {
4466  _routes.addRoute(cid_,
4467  MessageHandler(handler_.function(),
4468  data),
4469  requestedAcks_,
4470  systemAddedAcks_, commandType_);
4471  }
4472  }
4473  }
4474  else if ((commandType == Message::Command::Publish ||
4475  commandType == Message::Command::DeltaPublish)
4476  && requestedAcks_ & ~Message::AckType::Persisted)
4477  {
4478  cid_ = command_.getMessage().newCommandId().getCommandId();
4479  _routes.addRoute(cid_, handler_, requestedAcks_,
4480  systemAddedAcks_, commandType_);
4481  added = true;
4482  }
4483  if (!added)
4484  {
4485  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4486  }
4487  }
4488 
4489  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4490  bool isHASubscribe_ = true)
4491  {
4492  isHASubscribe_ &= (bool)_subscriptionManager;
4493  Message& message = command_.getMessage();
4494  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4495  Message::AckType::Processed : Message::AckType::None;
4496  unsigned requestedAcks = message.getAckTypeEnum();
4497  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4498  Message::Command::Type commandType = message.getCommandEnum();
4499  if (commandType == Message::Command::StopTimer)
4500  {
4501  systemAddedAcks |= Message::AckType::Completed;
4502  }
4503  else if (commandType == Message::Command::Unsubscribe)
4504  {
4505  // Clear routes and sub manager
4506  const std::string subId = message.getSubscriptionId();
4507  if (subId == "all")
4508  {
4509  _routes.unsubscribeAll();
4510  if (_subscriptionManager)
4511  {
4512  Unlock<Mutex> unlock(_lock);
4513  _subscriptionManager->clear();
4514  }
4515  }
4516  else
4517  {
4518  _routes.removeRoute(subId);
4519  // Lock is already acquired
4520  if (_subscriptionManager)
4521  {
4522  // Have to unlock before calling into sub manager to avoid deadlock
4523  Unlock<Mutex> unlock(_lock);
4524  _subscriptionManager->unsubscribe(subId);
4525  }
4526  }
4527  // Make sure the clear gets processed by receive thread
4528  deferredExecution(&amps_noOpFn, NULL);
4529  }
4530  Message::Field cid = message.getCommandId();
4531  if (handler_.isValid() && cid.empty())
4532  {
4533  cid = message.newCommandId().getCommandId();
4534  }
4535  if (message.getBookmark().len() > 0)
4536  {
4537  if (command_.isSubscribe())
4538  {
4539  Message::Field bookmark = message.getBookmark();
4540  if (_bookmarkStore.isValid())
4541  {
4542  systemAddedAcks |= Message::AckType::Persisted;
4543  if (bookmark == AMPS_BOOKMARK_RECENT)
4544  {
4545  message.assignOwnershipBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4546  }
4547  else if (bookmark != AMPS_BOOKMARK_NOW &&
4548  bookmark != AMPS_BOOKMARK_EPOCH)
4549  {
4550  _bookmarkStore.log(message);
4551  if (!BookmarkRange::isRange(bookmark))
4552  {
4553  _bookmarkStore.discard(message);
4554  _bookmarkStore.persisted(message.getSubscriptionId(),
4555  bookmark);
4556  }
4557  }
4558  }
4559  else if (bookmark == AMPS_BOOKMARK_RECENT)
4560  {
4562  }
4563  }
4564  }
4565  if (isPublishStore)
4566  {
4567  systemAddedAcks |= Message::AckType::Persisted;
4568  }
4569  bool isSubscribe = command_.isSubscribe();
4570  if (handler_.isValid() && !isSubscribe)
4571  {
4572  _registerHandler(command_, cid, handler_,
4573  requestedAcks, systemAddedAcks, commandType);
4574  }
4575  if (isPublishStore)
4576  {
4577  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4578  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4579  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4580  {
4581  Unlock<Mutex> u(_lock);
4582  haSequenceNumber = _publishStore.store(message);
4583  }
4584  message.setSequence(haSequenceNumber);
4585  try
4586  {
4587  if (useSyncSend)
4588  {
4589  syncAckProcessing((long)command_.getTimeout(), message,
4590  haSequenceNumber);
4591  }
4592  else
4593  {
4594  _send(message, haSequenceNumber, false,
4595  commandType & (Message::Command::Publish
4596  | Message::Command::DeltaPublish));
4597  }
4598  }
4599  catch (const DisconnectedException&)
4600  {
4601  throw;
4602  }
4603  catch (...)
4604  {
4605  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4606  throw;
4607  }
4608  }
4609  else
4610  {
4611  if (isSubscribe)
4612  {
4613  const Message::Field& subId = message.getSubscriptionId();
4614  if (isHASubscribe_)
4615  {
4616  Unlock<Mutex> u(_lock);
4617  _subscriptionManager->subscribe(handler_,
4618  message.deepCopy(),
4619  requestedAcks);
4620  if (_badTimeToHASubscribe)
4621  {
4622  message.setAckTypeEnum(requestedAcks);
4623  return std::string(subId.data(), subId.len());
4624  }
4625  }
4626  if (handler_.isValid())
4627  {
4628  _registerHandler(command_, cid, handler_,
4629  requestedAcks, systemAddedAcks, commandType);
4630  }
4631  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4632  try
4633  {
4634  syncAckProcessing((long)command_.getTimeout(), message,
4635  isHASubscribe_);
4636  }
4637  catch (const DisconnectedException&)
4638  {
4639  if (!isHASubscribe_)
4640  {
4641  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4642  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4643  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4644  message.setAckTypeEnum(requestedAcks);
4645  throw;
4646  }
4647  }
4648  catch (const TimedOutException&)
4649  {
4650  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4651  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4652  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4653  message.setAckTypeEnum(requestedAcks);
4654  throw;
4655  }
4656  catch (...)
4657  {
4658  if (isHASubscribe_)
4659  {
4660  // Have to unlock before calling into sub manager to avoid deadlock
4661  Unlock<Mutex> unlock(_lock);
4662  _subscriptionManager->unsubscribe(subId);
4663  }
4664  if (message.getQueryID().len() > 0)
4665  {
4666  _routes.removeRoute(message.getQueryID());
4667  }
4668  _routes.removeRoute(cid);
4669  _routes.removeRoute(subId);
4670  message.setAckTypeEnum(requestedAcks);
4671  throw;
4672  }
4673  if (subId.len() > 0)
4674  {
4675  message.setAckTypeEnum(requestedAcks);
4676  return std::string(subId.data(), subId.len());
4677  }
4678  }
4679  else
4680  {
4681  // SOW, Flush, etc. should always be sync. Publish/delete may not be.
4682  bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4683  || (cid.len() > 0 && command_.hasProcessedAck());
4684  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4685  try
4686  {
4687  if (useSyncSend)
4688  {
4689  syncAckProcessing((long)(command_.getTimeout()), message);
4690  }
4691  else
4692  {
4693  _send(message, 0, false,
4694  commandType & (Message::Command::Publish
4695  | Message::Command::DeltaPublish));
4696  }
4697  }
4698  catch (const TimedOutException&)
4699  {
4700  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4701  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4702  message.setAckTypeEnum(requestedAcks);
4703  throw;
4704  }
4705  catch (const DisconnectedException&)
4706  {
4707  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4708  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4709  message.setAckTypeEnum(requestedAcks);
4710  throw;
4711  }
4712  catch (...)
4713  {
4714  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4715  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4716  message.setAckTypeEnum(requestedAcks);
4717  throw;
4718  }
4719  }
4720  }
4721  message.setAckTypeEnum(requestedAcks);
4722  return cid;
4723  }
4724 
4725  MessageStream getEmptyMessageStream(void);
4726 
4727  std::string executeAsync(Command& command_, MessageHandler& handler_,
4728  bool isHASubscribe_ = true)
4729  {
4730  Lock<Mutex> lock(_lock);
4731  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4732  }
4733 
4734  // Queue Methods //
4735  void setAutoAck(bool isAutoAckEnabled_)
4736  {
4737  _isAutoAckEnabled = isAutoAckEnabled_;
4738  }
4739  bool getAutoAck(void) const
4740  {
4741  return _isAutoAckEnabled;
4742  }
4743  void setAckBatchSize(const unsigned batchSize_)
4744  {
4745  _ackBatchSize = batchSize_;
4746  if (!_queueAckTimeout)
4747  {
4748  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4749  amps_client_set_idle_time(_client, _queueAckTimeout);
4750  }
4751  }
4752  unsigned getAckBatchSize(void) const
4753  {
4754  return _ackBatchSize;
4755  }
4756  int getAckTimeout(void) const
4757  {
4758  return _queueAckTimeout;
4759  }
4760  void setAckTimeout(const int ackTimeout_)
4761  {
4762  amps_client_set_idle_time(_client, ackTimeout_);
4763  _queueAckTimeout = ackTimeout_;
4764  }
4765  size_t _ack(QueueBookmarks& queueBookmarks_)
4766  {
4767  if (queueBookmarks_._bookmarkCount)
4768  {
4769  publishStoreMessage.reset();
4770  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4771  .setTopic(queueBookmarks_._topic)
4772  .setBookmark(queueBookmarks_._data)
4773  .setCommandId("AMPS-queue-ack");
4774  amps_uint64_t haSequenceNumber = 0;
4775  if (_publishStore.isValid())
4776  {
4777  haSequenceNumber = _publishStore.store(publishStoreMessage);
4778  publishStoreMessage.setAckType("persisted")
4779  .setSequence(haSequenceNumber);
4780  queueBookmarks_._data.erase();
4781  queueBookmarks_._bookmarkCount = 0;
4782  }
4783  _send(publishStoreMessage, haSequenceNumber);
4784  if (!_publishStore.isValid())
4785  {
4786  queueBookmarks_._data.erase();
4787  queueBookmarks_._bookmarkCount = 0;
4788  }
4789  return 1;
4790  }
4791  return 0;
4792  }
4793  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4794  {
4795  if (_isAutoAckEnabled)
4796  {
4797  return;
4798  }
4799  _ack(topic_, bookmark_, options_);
4800  }
4801  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4802  {
4803  if (bookmark_.len() == 0)
4804  {
4805  return;
4806  }
4807  Lock<Mutex> lock(_lock);
4808  if (_ackBatchSize < 2 || options_ != NULL)
4809  {
4810  publishStoreMessage.reset();
4811  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4812  .setCommandId("AMPS-queue-ack")
4813  .setTopic(topic_).setBookmark(bookmark_);
4814  if (options_)
4815  {
4816  publishStoreMessage.setOptions(options_);
4817  }
4818  amps_uint64_t haSequenceNumber = 0;
4819  if (_publishStore.isValid())
4820  {
4821  haSequenceNumber = _publishStore.store(publishStoreMessage);
4822  publishStoreMessage.setAckType("persisted")
4823  .setSequence(haSequenceNumber);
4824  }
4825  _send(publishStoreMessage, haSequenceNumber);
4826  return;
4827  }
4828  // have we acked anything for this hash
4829  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4830  TopicHashMap::iterator it = _topicHashMap.find(hash);
4831  if (it == _topicHashMap.end())
4832  {
4833  // add a new one to the map
4834 #ifdef AMPS_USE_EMPLACE
4835  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4836 #else
4837  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4838 #endif
4839  }
4840  QueueBookmarks& queueBookmarks = it->second;
4841  if (queueBookmarks._data.length())
4842  {
4843  queueBookmarks._data.append(",");
4844  }
4845  else
4846  {
4847  queueBookmarks._oldestTime = amps_now();
4848  }
4849  queueBookmarks._data.append(bookmark_);
4850  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4851  {
4852  _ack(queueBookmarks);
4853  }
4854  }
4855  void flushAcks(void)
4856  {
4857  size_t sendCount = 0;
4858  if (!_connected)
4859  {
4860  return;
4861  }
4862  else
4863  {
4864  Lock<Mutex> lock(_lock);
4865  typedef TopicHashMap::iterator iterator;
4866  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4867  {
4868  QueueBookmarks& queueBookmarks = it->second;
4869  sendCount += _ack(queueBookmarks);
4870  }
4871  }
4872  if (sendCount && _connected)
4873  {
4874  publishFlush(0, Message::AckType::Processed);
4875  }
4876  }
4877  // called when there's idle time, to see if we need to flush out any "acks"
4878  void checkQueueAcks(void)
4879  {
4880  if (!_topicHashMap.size())
4881  {
4882  return;
4883  }
4884  Lock<Mutex> lock(_lock);
4885  try
4886  {
4887  amps_uint64_t threshold = amps_now()
4888  - (amps_uint64_t)_queueAckTimeout;
4889  typedef TopicHashMap::iterator iterator;
4890  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4891  {
4892  QueueBookmarks& queueBookmarks = it->second;
4893  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4894  {
4895  _ack(queueBookmarks);
4896  }
4897  }
4898  }
4899  catch (std::exception& ex)
4900  {
4901  AMPS_UNHANDLED_EXCEPTION(ex);
4902  }
4903  }
4904 
4905  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4906  {
4907  Lock<Mutex> lock(_deferredExecutionLock);
4908 #ifdef AMPS_USE_EMPLACE
4909  _deferredExecutionList.emplace_back(
4910  DeferredExecutionRequest(func_, userData_));
4911 #else
4912  _deferredExecutionList.push_back(
4913  DeferredExecutionRequest(func_, userData_));
4914 #endif
4915  }
4916 
4917  inline void processDeferredExecutions(void)
4918  {
4919  if (_deferredExecutionList.size())
4920  {
4921  Lock<Mutex> lock(_deferredExecutionLock);
4922  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4923  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4924  for (; it != end; ++it)
4925  {
4926  try
4927  {
4928  it->_func(it->_userData);
4929  }
4930  catch (...)
4931  {
4932  // -V565
4933  // Intentionally ignore errors
4934  }
4935  }
4936  _deferredExecutionList.clear();
4937  _routes.invalidateCache();
4938  _routeCache.invalidateCache();
4939  }
4940  }
4941 
4942  bool getRetryOnDisconnect(void) const
4943  {
4944  return _isRetryOnDisconnect;
4945  }
4946 
4947  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4948  {
4949  _isRetryOnDisconnect = isRetryOnDisconnect_;
4950  }
4951 
4952  void setDefaultMaxDepth(unsigned maxDepth_)
4953  {
4954  _defaultMaxDepth = maxDepth_;
4955  }
4956 
4957  unsigned getDefaultMaxDepth(void) const
4958  {
4959  return _defaultMaxDepth;
4960  }
4961 
4962  void setTransportFilterFunction(amps_transport_filter_function filter_,
4963  void* userData_)
4964  {
4965  amps_client_set_transport_filter_function(_client, filter_, userData_);
4966  }
4967 
4968  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4969  void* userData_)
4970  {
4971  amps_client_set_thread_created_callback(_client, callback_, userData_);
4972  }
4973 
4974  void setPublishBatching(amps_uint64_t batchSizeBytes_, amps_uint64_t batchTimeoutMillis_)
4975  {
4976  amps_client_set_batch_send(_client, batchSizeBytes_, batchTimeoutMillis_);
4977  }
4978  }; // class ClientImpl
4979 
5054 
5056  {
5057  RefHandle<MessageStreamImpl> _body;
5058  public:
5063  class iterator
5064  {
5065  MessageStream* _pStream;
5066  Message _current;
5067  inline void advance(void);
5068 
5069  public:
5070  iterator() // end
5071  : _pStream(NULL)
5072  {;}
5073  iterator(MessageStream* pStream_)
5074  : _pStream(pStream_)
5075  {
5076  advance();
5077  }
5078 
5079  bool operator==(const iterator& rhs) const
5080  {
5081  return _pStream == rhs._pStream;
5082  }
5083  bool operator!=(const iterator& rhs) const
5084  {
5085  return _pStream != rhs._pStream;
5086  }
5087  void operator++(void)
5088  {
5089  advance();
5090  }
5091  Message operator*(void)
5092  {
5093  return _current;
5094  }
5095  Message* operator->(void)
5096  {
5097  return &_current;
5098  }
5099  };
5101  bool isValid() const
5102  {
5103  return _body.isValid();
5104  }
5105 
5109  {
5110  if (!_body.isValid())
5111  {
5112  throw UsageException("This MessageStream is not valid and cannot be iterated.");
5113  }
5114  return iterator(this);
5115  }
5118  // For non-SOW queries, the end is never reached.
5120  {
5121  return iterator();
5122  }
5123  inline MessageStream(void);
5124 
5130  MessageStream timeout(unsigned timeout_);
5131 
5135  MessageStream conflate(void);
5141  MessageStream maxDepth(unsigned maxDepth_);
5144  unsigned getMaxDepth(void) const;
5147  unsigned getDepth(void) const;
5148 
5149  private:
5150  inline MessageStream(const Client& client_);
5151  inline MessageStream(RefHandle<MessageStreamImpl> body_);
5152  inline void setSOWOnly(const std::string& commandId_,
5153  const std::string& queryId_ = "");
5154  inline void setSubscription(const std::string& subId_,
5155  const std::string& commandId_ = "",
5156  const std::string& queryId_ = "");
5157  inline void setStatsOnly(const std::string& commandId_,
5158  const std::string& queryId_ = "");
5159  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5160 
5161  inline operator MessageHandler(void);
5162 
5163  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5164 
5165  friend class Client;
5166  friend class ClientImpl;
5167 
5168  };
5169 
5189  class Client // -V553
5190  {
5191  protected:
5192  BorrowRefHandle<ClientImpl> _body;
5193  public:
5194  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5195  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5196  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5197 
5206  Client(const std::string& clientName = "")
5207  : _body(new ClientImpl(clientName), true)
5208  {;}
5209 
5210  Client(ClientImpl* existingClient)
5211  : _body(existingClient, true)
5212  {;}
5213 
5214  Client(ClientImpl* existingClient, bool isRef)
5215  : _body(existingClient, isRef)
5216  {;}
5217 
5218  Client(const Client& rhs) : _body(rhs._body) {;}
5219  virtual ~Client(void) {;}
5220 
5221  Client& operator=(const Client& rhs)
5222  {
5223  _body = rhs._body;
5224  return *this;
5225  }
5226 
5227  bool isValid()
5228  {
5229  return _body.isValid();
5230  }
5231 
5244  void setName(const std::string& name)
5245  {
5246  _body.get().setName(name);
5247  }
5248 
5251  const std::string& getName() const
5252  {
5253  return _body.get().getName();
5254  }
5255 
5259  const std::string& getNameHash() const
5260  {
5261  return _body.get().getNameHash();
5262  }
5263 
5267  const amps_uint64_t getNameHashValue() const
5268  {
5269  return _body.get().getNameHashValue();
5270  }
5271 
5278  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5279  {
5280  _body.get().setLogonCorrelationData(logonCorrelationData_);
5281  }
5282 
5285  const std::string& getLogonCorrelationData() const
5286  {
5287  return _body.get().getLogonCorrelationData();
5288  }
5289 
5293  void addHttpPreflightHeader(const std::string& header_)
5294  {
5295  _body.get().addHttpPreflightHeader(header_);
5296  }
5297 
5302  void addHttpPreflightHeader(const std::string& key_, const std::string& value_)
5303  {
5304  _body.get().addHttpPreflightHeader(key_, value_);
5305  }
5306 
5309  {
5310  _body.get().clearHttpPreflightHeaders();
5311  }
5312 
5316  template<class T>
5317  void setHttpPreflightHeaders(const T& headers_)
5318  {
5319  _body.get().setHttpPreflightHeaders(headers_);
5320  }
5321 
5330  size_t getServerVersion() const
5331  {
5332  return _body.get().getServerVersion();
5333  }
5334 
5341  VersionInfo getServerVersionInfo() const
5342  {
5343  return _body.get().getServerVersionInfo();
5344  }
5345 
5355  static size_t convertVersionToNumber(const std::string& version_)
5356  {
5357  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5358  }
5359 
5370  static size_t convertVersionToNumber(const char* data_, size_t len_)
5371  {
5372  return AMPS::convertVersionToNumber(data_, len_);
5373  }
5374 
5377  const std::string& getURI() const
5378  {
5379  return _body.get().getURI();
5380  }
5381 
5388 
5390 
5401  void connect(const std::string& uri)
5402  {
5403  _body.get().connect(uri);
5404  }
5405 
5408  void disconnect()
5409  {
5410  _body.get().disconnect();
5411  }
5412 
5426  void send(const Message& message)
5427  {
5428  _body.get().send(message);
5429  }
5430 
5439  void addMessageHandler(const Field& commandId_,
5440  const AMPS::MessageHandler& messageHandler_,
5441  unsigned requestedAcks_, bool isSubscribe_)
5442  {
5443  Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5444  _body.get().addMessageHandler(commandId_, messageHandler_,
5445  requestedAcks_, commandType);
5446  }
5447 
5456  void addMessageHandler(const Field& commandId_,
5457  const AMPS::MessageHandler& messageHandler_,
5458  unsigned requestedAcks_, Message::Command::Type commandType_)
5459  {
5460  _body.get().addMessageHandler(commandId_, messageHandler_,
5461  requestedAcks_, commandType_);
5462  }
5463 
5467  bool removeMessageHandler(const Field& commandId_)
5468  {
5469  return _body.get().removeMessageHandler(commandId_);
5470  }
5471 
5495  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5496  {
5497  return _body.get().send(messageHandler, message, timeout);
5498  }
5499 
5513 #if defined(_WIN32) || __cplusplus >= 201402L
5514  [[deprecated("Use HAClient for automatic reconnection and a ConnectionStateListener to monitor connection state.")]]
5515 #endif
5516  virtual void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5517  {
5518  _body.get().setDisconnectHandler(disconnectHandler);
5519  }
5520 
5527 #if defined(_WIN32) || __cplusplus >= 201402L
5528  [[deprecated("Use HAClient for automatic reconnection and a ConnectionStateListener to monitor connection state.")]]
5529 #endif
5530  DisconnectHandler getDisconnectHandler(void) const
5531  {
5532  return _body.get().getDisconnectHandler();
5533  }
5534 
5539  virtual ConnectionInfo getConnectionInfo() const
5540  {
5541  return _body.get().getConnectionInfo();
5542  }
5543 
5552  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5553  {
5554  _body.get().setBookmarkStore(bookmarkStore_);
5555  }
5556 
5561  {
5562  return _body.get().getBookmarkStore();
5563  }
5564 
5569  {
5570  return _body.get().getSubscriptionManager();
5571  }
5572 
5580  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5581  {
5582  _body.get().setSubscriptionManager(subscriptionManager_);
5583  }
5584 
5604  void setPublishStore(const Store& publishStore_)
5605  {
5606  _body.get().setPublishStore(publishStore_);
5607  }
5608 
5613  {
5614  return _body.get().getPublishStore();
5615  }
5616 
5620  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5621  {
5622  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5623  duplicateMessageHandler_);
5624  }
5625 
5636  {
5637  return _body.get().getDuplicateMessageHandler();
5638  }
5639 
5650  {
5651  _body.get().setFailedWriteHandler(handler_);
5652  }
5653 
5658  {
5659  return _body.get().getFailedWriteHandler();
5660  }
5661 
5662 
5680  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5681  {
5682  return _body.get().publish(topic_.c_str(), topic_.length(),
5683  data_.c_str(), data_.length());
5684  }
5685 
5705  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5706  const char* data_, size_t dataLength_)
5707  {
5708  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5709  }
5710 
5729  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5730  unsigned long expiration_)
5731  {
5732  return _body.get().publish(topic_.c_str(), topic_.length(),
5733  data_.c_str(), data_.length(), expiration_);
5734  }
5735 
5756  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5757  const char* data_, size_t dataLength_,
5758  unsigned long expiration_)
5759  {
5760  return _body.get().publish(topic_, topicLength_,
5761  data_, dataLength_, expiration_);
5762  }
5763 
5802  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5803  {
5804  _body.get().publishFlush(timeout_, ackType_);
5805  }
5806 
5807 
5823  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5824  {
5825  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5826  data_.c_str(), data_.length());
5827  }
5828 
5846  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5847  const char* data_, size_t dataLength_)
5848  {
5849  return _body.get().deltaPublish(topic_, topicLength_,
5850  data_, dataLength_);
5851  }
5852 
5869  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5870  unsigned long expiration_)
5871  {
5872  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5873  data_.c_str(), data_.length(),
5874  expiration_);
5875  }
5876 
5895  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5896  const char* data_, size_t dataLength_,
5897  unsigned long expiration_)
5898  {
5899  return _body.get().deltaPublish(topic_, topicLength_,
5900  data_, dataLength_, expiration_);
5901  }
5902 
5918  std::string logon(int timeout_ = 0,
5919  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5920  const char* options_ = NULL)
5921  {
5922  return _body.get().logon(timeout_, authenticator_, options_);
5923  }
5937  std::string logon(const char* options_, int timeout_ = 0)
5938  {
5939  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5940  options_);
5941  }
5942 
5956  std::string logon(const std::string& options_, int timeout_ = 0)
5957  {
5958  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5959  options_.c_str());
5960  }
5961 
5981  std::string subscribe(const MessageHandler& messageHandler_,
5982  const std::string& topic_,
5983  long timeout_ = 0,
5984  const std::string& filter_ = "",
5985  const std::string& options_ = "",
5986  const std::string& subId_ = "")
5987  {
5988  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5989  filter_, "", options_, subId_);
5990  }
5991 
6007  MessageStream subscribe(const std::string& topic_,
6008  long timeout_ = 0, const std::string& filter_ = "",
6009  const std::string& options_ = "",
6010  const std::string& subId_ = "")
6011  {
6012  MessageStream result(*this);
6013  if (_body.get().getDefaultMaxDepth())
6014  {
6015  result.maxDepth(_body.get().getDefaultMaxDepth());
6016  }
6017  result.setSubscription(_body.get().subscribe(
6018  result.operator MessageHandler(),
6019  topic_, timeout_, filter_, "",
6020  options_, subId_, false));
6021  return result;
6022  }
6023 
6039  MessageStream subscribe(const char* topic_,
6040  long timeout_ = 0, const std::string& filter_ = "",
6041  const std::string& options_ = "",
6042  const std::string& subId_ = "")
6043  {
6044  MessageStream result(*this);
6045  if (_body.get().getDefaultMaxDepth())
6046  {
6047  result.maxDepth(_body.get().getDefaultMaxDepth());
6048  }
6049  result.setSubscription(_body.get().subscribe(
6050  result.operator MessageHandler(),
6051  topic_, timeout_, filter_, "",
6052  options_, subId_, false));
6053  return result;
6054  }
6055 
6068  std::string deltaSubscribe(const MessageHandler& messageHandler_,
6069  const std::string& topic_,
6070  long timeout_,
6071  const std::string& filter_ = "",
6072  const std::string& options_ = "",
6073  const std::string& subId_ = "")
6074  {
6075  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
6076  filter_, "", options_, subId_);
6077  }
6086  MessageStream deltaSubscribe(const std::string& topic_,
6087  long timeout_, const std::string& filter_ = "",
6088  const std::string& options_ = "",
6089  const std::string& subId_ = "")
6090  {
6091  MessageStream result(*this);
6092  if (_body.get().getDefaultMaxDepth())
6093  {
6094  result.maxDepth(_body.get().getDefaultMaxDepth());
6095  }
6096  result.setSubscription(_body.get().deltaSubscribe(
6097  result.operator MessageHandler(),
6098  topic_, timeout_, filter_, "",
6099  options_, subId_, false));
6100  return result;
6101  }
6102 
6104  MessageStream deltaSubscribe(const char* topic_,
6105  long timeout_, const std::string& filter_ = "",
6106  const std::string& options_ = "",
6107  const std::string& subId_ = "")
6108  {
6109  MessageStream result(*this);
6110  if (_body.get().getDefaultMaxDepth())
6111  {
6112  result.maxDepth(_body.get().getDefaultMaxDepth());
6113  }
6114  result.setSubscription(_body.get().deltaSubscribe(
6115  result.operator MessageHandler(),
6116  topic_, timeout_, filter_, "",
6117  options_, subId_, false));
6118  return result;
6119  }
6120 
6146  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
6147  const std::string& topic_,
6148  long timeout_,
6149  const std::string& bookmark_,
6150  const std::string& filter_ = "",
6151  const std::string& options_ = "",
6152  const std::string& subId_ = "")
6153  {
6154  return _body.get().subscribe(messageHandler_, topic_, timeout_,
6155  filter_, bookmark_, options_, subId_);
6156  }
6174  MessageStream bookmarkSubscribe(const std::string& topic_,
6175  long timeout_,
6176  const std::string& bookmark_,
6177  const std::string& filter_ = "",
6178  const std::string& options_ = "",
6179  const std::string& subId_ = "")
6180  {
6181  MessageStream result(*this);
6182  if (_body.get().getDefaultMaxDepth())
6183  {
6184  result.maxDepth(_body.get().getDefaultMaxDepth());
6185  }
6186  result.setSubscription(_body.get().subscribe(
6187  result.operator MessageHandler(),
6188  topic_, timeout_, filter_,
6189  bookmark_, options_,
6190  subId_, false));
6191  return result;
6192  }
6193 
6195  MessageStream bookmarkSubscribe(const char* topic_,
6196  long timeout_,
6197  const std::string& bookmark_,
6198  const std::string& filter_ = "",
6199  const std::string& options_ = "",
6200  const std::string& subId_ = "")
6201  {
6202  MessageStream result(*this);
6203  if (_body.get().getDefaultMaxDepth())
6204  {
6205  result.maxDepth(_body.get().getDefaultMaxDepth());
6206  }
6207  result.setSubscription(_body.get().subscribe(
6208  result.operator MessageHandler(),
6209  topic_, timeout_, filter_,
6210  bookmark_, options_,
6211  subId_, false));
6212  return result;
6213  }
6214 
6223  void unsubscribe(const std::string& commandId)
6224  {
6225  return _body.get().unsubscribe(commandId);
6226  }
6227 
6236  {
6237  return _body.get().unsubscribe();
6238  }
6239 
6240 
6270  std::string sow(const MessageHandler& messageHandler_,
6271  const std::string& topic_,
6272  const std::string& filter_ = "",
6273  const std::string& orderBy_ = "",
6274  const std::string& bookmark_ = "",
6275  int batchSize_ = DEFAULT_BATCH_SIZE,
6276  int topN_ = DEFAULT_TOP_N,
6277  const std::string& options_ = "",
6278  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6279  {
6280  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6281  bookmark_, batchSize_, topN_, options_,
6282  timeout_);
6283  }
6308  MessageStream sow(const std::string& topic_,
6309  const std::string& filter_ = "",
6310  const std::string& orderBy_ = "",
6311  const std::string& bookmark_ = "",
6312  int batchSize_ = DEFAULT_BATCH_SIZE,
6313  int topN_ = DEFAULT_TOP_N,
6314  const std::string& options_ = "",
6315  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6316  {
6317  MessageStream result(*this);
6318  if (_body.get().getDefaultMaxDepth())
6319  {
6320  result.maxDepth(_body.get().getDefaultMaxDepth());
6321  }
6322  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6323  topic_, filter_, orderBy_, bookmark_,
6324  batchSize_, topN_, options_, timeout_));
6325  return result;
6326  }
6327 
6329  MessageStream sow(const char* topic_,
6330  const std::string& filter_ = "",
6331  const std::string& orderBy_ = "",
6332  const std::string& bookmark_ = "",
6333  int batchSize_ = DEFAULT_BATCH_SIZE,
6334  int topN_ = DEFAULT_TOP_N,
6335  const std::string& options_ = "",
6336  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6337  {
6338  MessageStream result(*this);
6339  if (_body.get().getDefaultMaxDepth())
6340  {
6341  result.maxDepth(_body.get().getDefaultMaxDepth());
6342  }
6343  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6344  topic_, filter_, orderBy_, bookmark_,
6345  batchSize_, topN_, options_, timeout_));
6346  return result;
6347  }
6370  std::string sow(const MessageHandler& messageHandler_,
6371  const std::string& topic_,
6372  long timeout_,
6373  const std::string& filter_ = "",
6374  int batchSize_ = DEFAULT_BATCH_SIZE,
6375  int topN_ = DEFAULT_TOP_N)
6376  {
6377  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6378  batchSize_, topN_);
6379  }
6402  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6403  const std::string& topic_,
6404  long timeout_,
6405  const std::string& filter_ = "",
6406  int batchSize_ = DEFAULT_BATCH_SIZE,
6407  bool oofEnabled_ = false,
6408  int topN_ = DEFAULT_TOP_N)
6409  {
6410  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6411  filter_, batchSize_, oofEnabled_,
6412  topN_);
6413  }
6414 
6434  MessageStream sowAndSubscribe(const std::string& topic_,
6435  long timeout_,
6436  const std::string& filter_ = "",
6437  int batchSize_ = DEFAULT_BATCH_SIZE,
6438  bool oofEnabled_ = false,
6439  int topN_ = DEFAULT_TOP_N)
6440  {
6441  MessageStream result(*this);
6442  if (_body.get().getDefaultMaxDepth())
6443  {
6444  result.maxDepth(_body.get().getDefaultMaxDepth());
6445  }
6446  result.setSubscription(_body.get().sowAndSubscribe(
6447  result.operator MessageHandler(),
6448  topic_, timeout_, filter_,
6449  batchSize_, oofEnabled_,
6450  topN_, false));
6451  return result;
6452  }
6472  MessageStream sowAndSubscribe(const char* topic_,
6473  long timeout_,
6474  const std::string& filter_ = "",
6475  int batchSize_ = DEFAULT_BATCH_SIZE,
6476  bool oofEnabled_ = false,
6477  int topN_ = DEFAULT_TOP_N)
6478  {
6479  MessageStream result(*this);
6480  if (_body.get().getDefaultMaxDepth())
6481  {
6482  result.maxDepth(_body.get().getDefaultMaxDepth());
6483  }
6484  result.setSubscription(_body.get().sowAndSubscribe(
6485  result.operator MessageHandler(),
6486  topic_, timeout_, filter_,
6487  batchSize_, oofEnabled_,
6488  topN_, false));
6489  return result;
6490  }
6491 
6492 
6520  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6521  const std::string& topic_,
6522  const std::string& filter_ = "",
6523  const std::string& orderBy_ = "",
6524  const std::string& bookmark_ = "",
6525  int batchSize_ = DEFAULT_BATCH_SIZE,
6526  int topN_ = DEFAULT_TOP_N,
6527  const std::string& options_ = "",
6528  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6529  {
6530  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6531  orderBy_, bookmark_, batchSize_,
6532  topN_, options_, timeout_);
6533  }
6534 
6559  MessageStream sowAndSubscribe(const std::string& topic_,
6560  const std::string& filter_ = "",
6561  const std::string& orderBy_ = "",
6562  const std::string& bookmark_ = "",
6563  int batchSize_ = DEFAULT_BATCH_SIZE,
6564  int topN_ = DEFAULT_TOP_N,
6565  const std::string& options_ = "",
6566  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6567  {
6568  MessageStream result(*this);
6569  if (_body.get().getDefaultMaxDepth())
6570  {
6571  result.maxDepth(_body.get().getDefaultMaxDepth());
6572  }
6573  result.setSubscription(_body.get().sowAndSubscribe(
6574  result.operator MessageHandler(),
6575  topic_, filter_, orderBy_,
6576  bookmark_, batchSize_, topN_,
6577  options_, timeout_, false));
6578  return result;
6579  }
6580 
6582  MessageStream sowAndSubscribe(const char* topic_,
6583  const std::string& filter_ = "",
6584  const std::string& orderBy_ = "",
6585  const std::string& bookmark_ = "",
6586  int batchSize_ = DEFAULT_BATCH_SIZE,
6587  int topN_ = DEFAULT_TOP_N,
6588  const std::string& options_ = "",
6589  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6590  {
6591  MessageStream result(*this);
6592  if (_body.get().getDefaultMaxDepth())
6593  {
6594  result.maxDepth(_body.get().getDefaultMaxDepth());
6595  }
6596  result.setSubscription(_body.get().sowAndSubscribe(
6597  result.operator MessageHandler(),
6598  topic_, filter_, orderBy_,
6599  bookmark_, batchSize_, topN_,
6600  options_, timeout_, false));
6601  return result;
6602  }
6603 
6628  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6629  const std::string& topic_,
6630  const std::string& filter_ = "",
6631  const std::string& orderBy_ = "",
6632  int batchSize_ = DEFAULT_BATCH_SIZE,
6633  int topN_ = DEFAULT_TOP_N,
6634  const std::string& options_ = "",
6635  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6636  {
6637  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6638  filter_, orderBy_, batchSize_,
6639  topN_, options_, timeout_);
6640  }
6661  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6662  const std::string& filter_ = "",
6663  const std::string& orderBy_ = "",
6664  int batchSize_ = DEFAULT_BATCH_SIZE,
6665  int topN_ = DEFAULT_TOP_N,
6666  const std::string& options_ = "",
6667  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6668  {
6669  MessageStream result(*this);
6670  if (_body.get().getDefaultMaxDepth())
6671  {
6672  result.maxDepth(_body.get().getDefaultMaxDepth());
6673  }
6674  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6675  result.operator MessageHandler(),
6676  topic_, filter_, orderBy_,
6677  batchSize_, topN_, options_,
6678  timeout_, false));
6679  return result;
6680  }
6681 
6684  const std::string& filter_ = "",
6685  const std::string& orderBy_ = "",
6686  int batchSize_ = DEFAULT_BATCH_SIZE,
6687  int topN_ = DEFAULT_TOP_N,
6688  const std::string& options_ = "",
6689  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6690  {
6691  MessageStream result(*this);
6692  if (_body.get().getDefaultMaxDepth())
6693  {
6694  result.maxDepth(_body.get().getDefaultMaxDepth());
6695  }
6696  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6697  result.operator MessageHandler(),
6698  topic_, filter_, orderBy_,
6699  batchSize_, topN_, options_,
6700  timeout_, false));
6701  return result;
6702  }
6703 
6728  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6729  const std::string& topic_,
6730  long timeout_,
6731  const std::string& filter_ = "",
6732  int batchSize_ = DEFAULT_BATCH_SIZE,
6733  bool oofEnabled_ = false,
6734  bool sendEmpties_ = false,
6735  int topN_ = DEFAULT_TOP_N)
6736  {
6737  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6738  timeout_, filter_, batchSize_,
6739  oofEnabled_, sendEmpties_,
6740  topN_);
6741  }
6742 
6764  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6765  long timeout_,
6766  const std::string& filter_ = "",
6767  int batchSize_ = DEFAULT_BATCH_SIZE,
6768  bool oofEnabled_ = false,
6769  bool sendEmpties_ = false,
6770  int topN_ = DEFAULT_TOP_N)
6771  {
6772  MessageStream result(*this);
6773  if (_body.get().getDefaultMaxDepth())
6774  {
6775  result.maxDepth(_body.get().getDefaultMaxDepth());
6776  }
6777  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6778  result.operator MessageHandler(),
6779  topic_, timeout_, filter_,
6780  batchSize_, oofEnabled_,
6781  sendEmpties_, topN_, false));
6782  return result;
6783  }
6806  long timeout_,
6807  const std::string& filter_ = "",
6808  int batchSize_ = DEFAULT_BATCH_SIZE,
6809  bool oofEnabled_ = false,
6810  bool sendEmpties_ = false,
6811  int topN_ = DEFAULT_TOP_N)
6812  {
6813  MessageStream result(*this);
6814  if (_body.get().getDefaultMaxDepth())
6815  {
6816  result.maxDepth(_body.get().getDefaultMaxDepth());
6817  }
6818  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6819  result.operator MessageHandler(),
6820  topic_, timeout_, filter_,
6821  batchSize_, oofEnabled_,
6822  sendEmpties_, topN_, false));
6823  return result;
6824  }
6844  std::string sowDelete(const MessageHandler& messageHandler,
6845  const std::string& topic,
6846  const std::string& filter,
6847  long timeout)
6848  {
6849  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6850  }
6867  Message sowDelete(const std::string& topic_, const std::string& filter_,
6868  long timeout_ = 0)
6869  {
6870  MessageStream stream(*this);
6871  stream.timeout((unsigned int)timeout_);
6872  char buf[Message::IdentifierLength + 1];
6873  buf[Message::IdentifierLength] = 0;
6874  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6875  Field cid(buf);
6876  try
6877  {
6878  stream.setStatsOnly(cid);
6879  _body.get().sowDelete(stream.operator MessageHandler(), topic_, filter_, timeout_, cid);
6880  return *(stream.begin());
6881  }
6882  catch (const DisconnectedException&)
6883  {
6884  removeMessageHandler(cid);
6885  throw;
6886  }
6887  catch (const TimedOutException&)
6888  {
6889  removeMessageHandler(cid);
6890  throw;
6891  }
6892  }
6893 
6898  void startTimer()
6899  {
6900  _body.get().startTimer();
6901  }
6902 
6909  std::string stopTimer(const MessageHandler& messageHandler)
6910  {
6911  return _body.get().stopTimer(messageHandler);
6912  }
6913 
6935  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6936  const std::string& topic_,
6937  const std::string& keys_,
6938  long timeout_ = 0)
6939  {
6940  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6941  }
6962  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6963  long timeout_ = 0)
6964  {
6965  MessageStream stream(*this);
6966  stream.timeout((unsigned int)timeout_);
6967  char buf[Message::IdentifierLength + 1];
6968  buf[Message::IdentifierLength] = 0;
6969  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6970  Field cid(buf);
6971  try
6972  {
6973  stream.setStatsOnly(cid);
6974  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6975  return *(stream.begin());
6976  }
6977  catch (const DisconnectedException&)
6978  {
6979  removeMessageHandler(cid);
6980  throw;
6981  }
6982  catch (const TimedOutException&)
6983  {
6984  removeMessageHandler(cid);
6985  throw;
6986  }
6987  }
6988 
7003  std::string sowDeleteByData(const MessageHandler& messageHandler_,
7004  const std::string& topic_, const std::string& data_,
7005  long timeout_ = 0)
7006  {
7007  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
7008  }
7009 
7024  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
7025  long timeout_ = 0)
7026  {
7027  MessageStream stream(*this);
7028  stream.timeout((unsigned int)timeout_);
7029  char buf[Message::IdentifierLength + 1];
7030  buf[Message::IdentifierLength] = 0;
7031  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
7032  Field cid(buf);
7033  try
7034  {
7035  stream.setStatsOnly(cid);
7036  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
7037  return *(stream.begin());
7038  }
7039  catch (const DisconnectedException&)
7040  {
7041  removeMessageHandler(cid);
7042  throw;
7043  }
7044  catch (const TimedOutException&)
7045  {
7046  removeMessageHandler(cid);
7047  throw;
7048  }
7049  }
7050 
7055  {
7056  return _body.get().getHandle();
7057  }
7058 
7067  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
7068  {
7069  _body.get().setExceptionListener(pListener_);
7070  }
7071 
7080 #if defined(_WIN32) || __cplusplus >= 201402L
7081  [[deprecated("Use setExceptionListener(std::shared_ptr<const ExceptionListener>&)")]]
7082 #endif
7084  {
7085  _body.get().setExceptionListener(listener_);
7086  }
7087 
7091  {
7092  return _body.get().getExceptionListener();
7093  }
7094 
7102  // type of message) from the server for the specified interval (plus a grace period),
7116  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
7117  {
7118  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
7119  }
7120 
7128  // type of message) from the server for the specified interval (plus a grace period),
7140  void setHeartbeat(unsigned heartbeatTime_)
7141  {
7142  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
7143  }
7144 
7146 #if defined(_WIN32) || __cplusplus >= 201402L
7147  [[deprecated("Use setLastChanceMessageHandler.")]]
7148 #endif
7150  {
7151  setLastChanceMessageHandler(messageHandler);
7152  }
7153 
7157  {
7158  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
7159  messageHandler);
7160  }
7161 
7182  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
7183  {
7184  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7185  }
7186 
7207  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7208  {
7209  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7210  }
7211 
7217  static const char* BOOKMARK_NOW()
7218  {
7219  return AMPS_BOOKMARK_NOW;
7220  }
7226  static const char* NOW()
7227  {
7228  return AMPS_BOOKMARK_NOW;
7229  }
7230 
7236  static const char* BOOKMARK_EPOCH()
7237  {
7238  return AMPS_BOOKMARK_EPOCH;
7239  }
7240 
7246  static const char* EPOCH()
7247  {
7248  return AMPS_BOOKMARK_EPOCH;
7249  }
7250 
7257  static const char* BOOKMARK_MOST_RECENT()
7258  {
7259  return AMPS_BOOKMARK_RECENT;
7260  }
7261 
7268  static const char* MOST_RECENT()
7269  {
7270  return AMPS_BOOKMARK_RECENT;
7271  }
7272 
7279  static const char* BOOKMARK_RECENT()
7280  {
7281  return AMPS_BOOKMARK_RECENT;
7282  }
7283 
7284 
7291  {
7292  _body.get().addConnectionStateListener(listener);
7293  }
7294 
7299  {
7300  _body.get().removeConnectionStateListener(listener);
7301  }
7302 
7306  {
7307  _body.get().clearConnectionStateListeners();
7308  }
7309 
7335  std::string executeAsync(Command& command_, MessageHandler handler_)
7336  {
7337  return _body.get().executeAsync(command_, handler_);
7338  }
7339 
7369  std::string executeAsyncNoResubscribe(Command& command_,
7370  MessageHandler handler_)
7371  {
7372  std::string id;
7373  try
7374  {
7375  if (command_.isSubscribe())
7376  {
7377  Message& message = command_.getMessage();
7378  Field subId = message.getSubscriptionId();
7379  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7380  if (useExistingHandler)
7381  {
7382  MessageHandler existingHandler;
7383  if (_body.get()._routes.getRoute(subId, existingHandler))
7384  {
7385  // we found an existing handler.
7386  _body.get().executeAsync(command_, existingHandler, false);
7387  return id; // empty string indicates existing
7388  }
7389  }
7390  }
7391  id = _body.get().executeAsync(command_, handler_, false);
7392  }
7393  catch (const DisconnectedException&)
7394  {
7395  removeMessageHandler(command_.getMessage().getCommandId());
7396  if (command_.isSubscribe())
7397  {
7398  removeMessageHandler(command_.getMessage().getSubscriptionId());
7399  }
7400  if (command_.isSow())
7401  {
7402  removeMessageHandler(command_.getMessage().getQueryID());
7403  }
7404  throw;
7405  }
7406  return id;
7407  }
7408 
7421  MessageStream execute(Command& command_);
7422 
7431  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7432  {
7433  _body.get().ack(topic_, bookmark_, options_);
7434  }
7435 
7443  void ack(Message& message_, const char* options_ = NULL)
7444  {
7445  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7446  }
7455  void ack(const std::string& topic_, const std::string& bookmark_,
7456  const char* options_ = NULL)
7457  {
7458  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7459  }
7460 
7466  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7467  {
7468  _body.get()._ack(topic_, bookmark_, options_);
7469  }
7479  void flushAcks(void)
7480  {
7481  _body.get().flushAcks();
7482  }
7483 
7488  bool getAutoAck(void) const
7489  {
7490  return _body.get().getAutoAck();
7491  }
7498  void setAutoAck(bool isAutoAckEnabled_)
7499  {
7500  _body.get().setAutoAck(isAutoAckEnabled_);
7501  }
7506  unsigned getAckBatchSize(void) const
7507  {
7508  return _body.get().getAckBatchSize();
7509  }
7516  void setAckBatchSize(const unsigned ackBatchSize_)
7517  {
7518  _body.get().setAckBatchSize(ackBatchSize_);
7519  }
7520 
7527  int getAckTimeout(void) const
7528  {
7529  return _body.get().getAckTimeout();
7530  }
7539  void setAckTimeout(const int ackTimeout_)
7540  {
7541  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7542  {
7543  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7544  }
7545  _body.get().setAckTimeout(ackTimeout_);
7546  }
7547 
7548 
7557  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7558  {
7559  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7560  }
7561 
7566  bool getRetryOnDisconnect(void) const
7567  {
7568  return _body.get().getRetryOnDisconnect();
7569  }
7570 
7575  void setDefaultMaxDepth(unsigned maxDepth_)
7576  {
7577  _body.get().setDefaultMaxDepth(maxDepth_);
7578  }
7579 
7584  unsigned getDefaultMaxDepth(void) const
7585  {
7586  return _body.get().getDefaultMaxDepth();
7587  }
7588 
7596  void* userData_)
7597  {
7598  return _body.get().setTransportFilterFunction(filter_, userData_);
7599  }
7600 
7610  void* userData_)
7611  {
7612  return _body.get().setThreadCreatedCallback(callback_, userData_);
7613  }
7614 
7621  void setPublishBatching(size_t batchSize_, amps_uint64_t batchTimeoutMillis_)
7622  {
7623  return _body.get().setPublishBatching(batchSize_, batchTimeoutMillis_);
7624  }
7625 
7631  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7632  {
7633  _body.get().deferredExecution(func_, userData_);
7634  }
7638  };
7639 
7640  inline void
7641  ClientImpl::lastChance(AMPS::Message& message)
7642  {
7643  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7644  }
7645 
7646  inline unsigned
7647  ClientImpl::persistedAck(AMPS::Message& message)
7648  {
7649  unsigned deliveries = 0;
7650  try
7651  {
7652  /*
7653  * Best Practice: If you don't care about the dupe acks that
7654  * occur during failover or rapid disconnect/reconnect, then just
7655  * ignore them. We could discard each duplicate from the
7656  * persisted store, but the storage costs of doing 1 record
7657  * discards is heavy. In most scenarios we'll just quickly blow
7658  * through the duplicates and get back to processing the
7659  * non-dupes.
7660  */
7661  const char* data = NULL;
7662  size_t len = 0;
7663  const char* status = NULL;
7664  size_t statusLen = 0;
7665  amps_handle messageHandle = message.getMessage();
7666  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7667  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7668  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7669  if (len == NotEntitled || len == Duplicate ||
7670  (statusLen == Failure && status[0] == 'f'))
7671  {
7672  if (_failedWriteHandler)
7673  {
7674  if (_publishStore.isValid())
7675  {
7676  amps_uint64_t sequence =
7677  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7678  FailedWriteStoreReplayer replayer(this, data, len);
7679  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7680  replayer, sequence));
7681  }
7682  else // Call the handler with what little we have
7683  {
7684  static Message emptyMessage;
7685  emptyMessage.setSequence(message.getSequence());
7686  AMPS_CALL_EXCEPTION_WRAPPER(
7687  _failedWriteHandler->failedWrite(emptyMessage,
7688  data, len));
7689  }
7690  ++deliveries;
7691  }
7692  }
7693  if (_publishStore.isValid())
7694  {
7695  // Ack for publisher will have sequence while
7696  // ack for bookmark subscribe won't
7697  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7698  AMPS_Sequence);
7699  if (seq > 0)
7700  {
7701  ++deliveries;
7702  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7703  }
7704  }
7705 
7706  if (!deliveries && _bookmarkStore.isValid())
7707  {
7708  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7709  &data, &len);
7710  if (len > 0)
7711  {
7712  Message::Field subId(data, len);
7713  const char* bookmarkData = NULL;
7714  size_t bookmarkLen = 0;
7715  amps_message_get_field_value(messageHandle,
7716  AMPS_Bookmark,
7717  &bookmarkData,
7718  &bookmarkLen);
7719  // Everything is there and not unsubscribed AC-912
7720  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7721  {
7722  ++deliveries;
7723  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7724  }
7725  }
7726  }
7727  }
7728  catch (std::exception& ex)
7729  {
7730  AMPS_UNHANDLED_EXCEPTION(ex);
7731  }
7732  return deliveries;
7733  }
7734 
7735  inline unsigned
7736  ClientImpl::processedAck(Message& message)
7737  {
7738  unsigned deliveries = 0;
7739  AckResponse ack;
7740  const char* data = NULL;
7741  size_t len = 0;
7742  amps_handle messageHandle = message.getMessage();
7743  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7744  Lock<Mutex> l(_lock);
7745  if (data && len)
7746  {
7747  Lock<Mutex> guard(_ackMapLock);
7748  AckMap::iterator i = _ackMap.find(std::string(data, len));
7749  if (i != _ackMap.end())
7750  {
7751  ++deliveries;
7752  ack = i->second;
7753  _ackMap.erase(i);
7754  }
7755  }
7756  if (deliveries)
7757  {
7758  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7759  ack.setStatus(data, len);
7760  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7761  ack.setReason(data, len);
7762  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7763  ack.setUsername(data, len);
7764  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7765  ack.setPassword(data, len);
7766  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7767  ack.setServerVersion(data, len);
7768  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7769  ack.setOptions(data, len);
7770  // This sets bookmark, nameHashValue, and sequenceNo
7771  ack.setBookmark(message.getBookmark());
7772  ack.setResponded();
7773  _lock.signalAll();
7774  }
7775  return deliveries;
7776  }
7777 
7778  inline void
7779  ClientImpl::checkAndSendHeartbeat(bool force)
7780  {
7781  if (force || _heartbeatTimer.check())
7782  {
7783  _heartbeatTimer.start();
7784  try
7785  {
7786  sendWithoutRetry(_beatMessage);
7787  }
7788  catch (const AMPSException&)
7789  {
7790  ;
7791  }
7792  }
7793  }
7794 
7795  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7796  {
7797  ConnectionInfo info;
7798  std::ostringstream writer;
7799 
7800  info["client.uri"] = _lastUri;
7801  info["client.name"] = _name;
7802  info["client.username"] = _username;
7803  if (_publishStore.isValid())
7804  {
7805  writer << _publishStore.unpersistedCount();
7806  info["publishStore.unpersistedCount"] = writer.str();
7807  writer.clear();
7808  writer.str("");
7809  }
7810 
7811  return info;
7812  }
7813 
7814  inline amps_result
7815  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7816  {
7817  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7818  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7819  ClientImpl* me = (ClientImpl*) userData_;
7820  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7821  if (!messageHandle_)
7822  {
7823  if (me->_queueAckTimeout)
7824  {
7825  me->checkQueueAcks();
7826  }
7827  me->checkAndSendHeartbeat();
7828  return AMPS_E_OK;
7829  }
7830 
7831  me->_readMessage.replace(messageHandle_);
7832  Message& message = me->_readMessage;
7833  Message::Command::Type commandType = message.getCommandEnum();
7834  if (commandType & SOWMask)
7835  {
7836 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7837  // A small cheat here to get the right handler, using knowledge of the
7838  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7839  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7840  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7841  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7842 #endif
7843  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7844  message.getQueryID()));
7845  }
7846  else if (commandType & PublishMask)
7847  {
7848 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7849  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7850  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7851  GlobalCommandTypeHandlers::Publish :
7852  GlobalCommandTypeHandlers::OOF)].invoke(message));
7853 #endif
7854  const char* subIds = NULL;
7855  size_t subIdsLen = 0;
7856  // Publish command, send to subscriptions
7857  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7858  &subIds, &subIdsLen);
7859  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7860  for (size_t i = 0; i < subIdCount; ++i)
7861  {
7862  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7863  MessageHandler& handler = lookupResult.handler;
7864  if (handler.isValid())
7865  {
7866  amps_message_set_field_value(messageHandle_,
7867  AMPS_SubscriptionId,
7868  subIds + lookupResult.idOffset,
7869  lookupResult.idLength);
7870  Message::Field bookmark = message.getBookmark();
7871  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7872  bool isAutoAck = me->_isAutoAckEnabled;
7873 
7874  if (!isMessageQueue && !bookmark.empty() &&
7875  me->_bookmarkStore.isValid())
7876  {
7877  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7878  {
7879  //Call duplicate message handler in handlers map
7880  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7881  {
7882  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7883  }
7884  }
7885  else
7886  {
7887  me->_bookmarkStore.log(me->_readMessage);
7888  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7889  handler.invoke(message));
7890  }
7891  }
7892  else
7893  {
7894  if (isMessageQueue && isAutoAck)
7895  {
7896  try
7897  {
7898  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7899  if (!message.getIgnoreAutoAck())
7900  {
7901  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7902  me->_ack(message.getTopic(), message.getBookmark()));
7903  }
7904  }
7905  catch (std::exception& ex)
7906  {
7907  if (!message.getIgnoreAutoAck())
7908  {
7909  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7910  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7911  }
7912  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7913  }
7914  }
7915  else
7916  {
7917  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7918  handler.invoke(message));
7919  }
7920  }
7921  }
7922  else
7923  {
7924  me->lastChance(message);
7925  }
7926  } // for (subidsEnd)
7927  }
7928  else if (commandType == Message::Command::Ack)
7929  {
7930  unsigned ackType = message.getAckTypeEnum();
7931  unsigned deliveries = 0U;
7932  switch (ackType)
7933  {
7934  case Message::AckType::Persisted:
7935  deliveries += me->persistedAck(message);
7936  break;
7937  case Message::AckType::Processed: // processed
7938  deliveries += me->processedAck(message);
7939  break;
7940  }
7941  MessageHandler ackHandler = me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack];
7942  if (ackHandler.isValid())
7943  {
7944  AMPS_CALL_EXCEPTION_WRAPPER_2(me, ackHandler.invoke(message));
7945  ++deliveries;
7946  }
7947  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7948  if (deliveries == 0)
7949  {
7950  me->lastChance(message);
7951  }
7952  }
7953  else if (commandType == Message::Command::Heartbeat)
7954  {
7955  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7956  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7957  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7958  {
7959  me->checkAndSendHeartbeat(true);
7960  }
7961  else
7962  {
7963  me->lastChance(message);
7964  }
7965  return AMPS_E_OK;
7966  }
7967  else if (!message.getCommandId().empty())
7968  {
7969  unsigned deliveries = 0U;
7970  try
7971  {
7972  while (me->_connected) // Keep sending heartbeats when stream is full
7973  {
7974  try
7975  {
7976  deliveries = me->_routes.deliverData(message, message.getCommandId());
7977  break;
7978  }
7979 #ifdef _WIN32
7980  catch (MessageStreamFullException&)
7981 #else
7982  catch (MessageStreamFullException& msfEx_)
7983 #endif
7984  {
7985  try
7986  {
7987  me->checkAndSendHeartbeat(false);
7988  }
7989 #ifdef _WIN32
7990  catch (std::exception&)
7991 #else
7992  catch (std::exception& stdEx_)
7993 #endif
7994  {
7995  ;
7996  }
7997  }
7998  }
7999  }
8000  catch (std::exception& stdEx_)
8001  {
8002  try
8003  {
8004  me->_exceptionListener->exceptionThrown(stdEx_);
8005  }
8006  catch (...)
8007  {
8008  ;
8009  }
8010  }
8011  if (deliveries == 0)
8012  {
8013  me->lastChance(message);
8014  }
8015  }
8016  me->checkAndSendHeartbeat();
8017  return AMPS_E_OK;
8018  }
8019 
8020  inline void
8021  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
8022  {
8023  ClientImpl* me = (ClientImpl*) userData;
8024  //Client wrapper(me);
8025  // Go ahead and signal any waiters if they are around...
8026  me->clearAcks(failedConnectionVersion);
8027  }
8028 
8029  inline amps_result
8030  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
8031  {
8032  ClientImpl* me = (ClientImpl*) userData;
8033  Lock<Mutex> l(me->_lock);
8034  Client wrapper(me, false);
8035  if (me->_connected)
8036  {
8037  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
8038  }
8039  while (true)
8040  {
8041  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
8042  bool retryInProgress = false;
8043  try
8044  {
8045  me->_connected = false;
8046  me->_lock.signalAll();
8047  // Have to release the lock here or receive thread can't
8048  // invoke the message handler.
8049  Unlock<Mutex> unlock(me->_lock);
8050  me->_disconnectHandler.invoke(wrapper);
8051  }
8052 #ifdef _WIN32
8053  catch (const RetryOperationException&)
8054 #else
8055  catch (const RetryOperationException& ex)
8056 #endif
8057  {
8058  retryInProgress = true;
8059  }
8060  catch (const std::exception& ex)
8061  {
8062  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
8063  }
8064  me->_lock.signalAll();
8065 
8066  if (!me->_connected)
8067  {
8068  if (retryInProgress)
8069  {
8070  AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException("Reconnect in progress."));
8071  }
8072  else
8073  {
8074  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
8075  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
8076  }
8077  return AMPS_E_DISCONNECTED;
8078  }
8079  try
8080  {
8081  // Resubscribe
8082  if (me->_subscriptionManager)
8083  {
8084  {
8085  // Have to release the lock here or receive thread can't
8086  // invoke the message handler.
8087  Unlock<Mutex> unlock(me->_lock);
8088  me->_subscriptionManager->resubscribe(wrapper);
8089  }
8090  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
8091  }
8092  return AMPS_E_OK;
8093  }
8094  catch (const AMPSException& subEx)
8095  {
8096  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8097  }
8098  catch (const std::exception& subEx)
8099  {
8100  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8101  return AMPS_E_RETRY;
8102  }
8103  catch (...)
8104  {
8105  return AMPS_E_RETRY;
8106  }
8107  }
8108  return AMPS_E_RETRY;
8109  }
8110 
8111  inline const char*
8112  ClientImpl::ClientImplGetHttpPreflightMessage(void* userData_)
8113  {
8114  ClientImpl* me = (ClientImpl*)userData_;
8115  std::ostringstream os;
8116  // [transport]://[user[:password]@][host]:port[/path][?uri_params]
8117  // firstColon is after transport
8118  size_t firstColon = me->_lastUri.find(':');
8119  // pathEnd is start of uri_params or npos
8120  size_t pathEnd = me->_lastUri.find('?');
8121  // lastColon separates host and port, last before pathEnd
8122  size_t lastColon = me->_lastUri.rfind(':', pathEnd);
8123  // at ends user/password and precedes host
8124  size_t at = me->_lastUri.rfind('@', lastColon);
8125  // hostStart is either after at or following firstColon ://
8126  size_t hostStart = at == std::string::npos ? firstColon + 3 : at + 1;
8127  size_t hostLen = lastColon - hostStart;
8128  // pathStart follows port
8129  size_t pathStart = me->_lastUri.find('/', lastColon);
8130  size_t pathLen = pathEnd;
8131  if (pathEnd != std::string::npos)
8132  {
8133  pathLen = pathEnd - pathStart;
8134  }
8135  os << "GET " << me->_lastUri.substr(pathStart, pathLen)
8136  << " HTTP/1.1\r\nHost: " << me->_lastUri.substr(hostStart, hostLen)
8137  << "\r\nConnection: upgrade\r\nUpgrade: "
8138  << me->_lastUri.substr(0, firstColon) << "\r\n";
8139  for (auto header : me->_httpPreflightHeaders)
8140  {
8141  os << header << "\r\n";
8142  }
8143  os << "\r\n";
8144  me->_preflightMessage = os.str();
8145  return me->_preflightMessage.c_str();
8146  }
8147 
8148  class FIX
8149  {
8150  const char* _data;
8151  size_t _len;
8152  char _fieldSep;
8153  public:
8154  class iterator
8155  {
8156  const char* _data;
8157  size_t _len;
8158  size_t _pos;
8159  char _fieldSep;
8160  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
8161  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
8162  {
8163  while (_pos != _len && _data[_pos] == _fieldSep)
8164  {
8165  ++_pos;
8166  }
8167  }
8168  public:
8169  typedef void* difference_type;
8170  typedef std::forward_iterator_tag iterator_category;
8171  typedef std::pair<Message::Field, Message::Field> value_type;
8172  typedef value_type* pointer;
8173  typedef value_type& reference;
8174  bool operator==(const iterator& rhs) const
8175  {
8176  return _pos == rhs._pos;
8177  }
8178  bool operator!=(const iterator& rhs) const
8179  {
8180  return _pos != rhs._pos;
8181  }
8182  iterator& operator++()
8183  {
8184  // Skip through the data
8185  while (_pos != _len && _data[_pos] != _fieldSep)
8186  {
8187  ++_pos;
8188  }
8189  // Skip through any field separators
8190  while (_pos != _len && _data[_pos] == _fieldSep)
8191  {
8192  ++_pos;
8193  }
8194  return *this;
8195  }
8196 
8197  value_type operator*() const
8198  {
8199  value_type result;
8200  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
8201  for (; i < _len && _data[i] != '='; ++i)
8202  {
8203  ++keyLength;
8204  }
8205 
8206  result.first.assign(_data + _pos, keyLength);
8207 
8208  if (i < _len && _data[i] == '=')
8209  {
8210  ++i;
8211  valueStart = i;
8212  for (; i < _len && _data[i] != _fieldSep; ++i)
8213  {
8214  valueLength++;
8215  }
8216  }
8217  result.second.assign(_data + valueStart, valueLength);
8218  return result;
8219  }
8220 
8221  friend class FIX;
8222  };
8223  class reverse_iterator
8224  {
8225  const char* _data;
8226  size_t _len;
8227  const char* _pos;
8228  char _fieldSep;
8229  public:
8230  typedef std::pair<Message::Field, Message::Field> value_type;
8231  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
8232  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
8233  {
8234  if (_pos)
8235  {
8236  // skip past meaningless trailing fieldseps
8237  while (_pos >= _data && *_pos == _fieldSep)
8238  {
8239  --_pos;
8240  }
8241  while (_pos > _data && *_pos != _fieldSep)
8242  {
8243  --_pos;
8244  }
8245  // if we stopped before the 0th character, it's because
8246  // it's a field sep. advance one to point to the first character
8247  // of a key.
8248  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8249  {
8250  ++_pos;
8251  }
8252  if (_pos < _data)
8253  {
8254  _pos = 0;
8255  }
8256  }
8257  }
8258  bool operator==(const reverse_iterator& rhs) const
8259  {
8260  return _pos == rhs._pos;
8261  }
8262  bool operator!=(const reverse_iterator& rhs) const
8263  {
8264  return _pos != rhs._pos;
8265  }
8266  reverse_iterator& operator++()
8267  {
8268  if (_pos == _data)
8269  {
8270  _pos = 0;
8271  }
8272  else
8273  {
8274  // back up 1 to a field separator
8275  --_pos;
8276  // keep backing up through field separators
8277  while (_pos >= _data && *_pos == _fieldSep)
8278  {
8279  --_pos;
8280  }
8281  // now back up to the beginning of this field
8282  while (_pos > _data && *_pos != _fieldSep)
8283  {
8284  --_pos;
8285  }
8286  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8287  {
8288  ++_pos;
8289  }
8290  if (_pos < _data)
8291  {
8292  _pos = 0;
8293  }
8294  }
8295  return *this;
8296  }
8297  value_type operator*() const
8298  {
8299  value_type result;
8300  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8301  size_t i = (size_t)(_pos - _data);
8302  for (; i < _len && _data[i] != '='; ++i)
8303  {
8304  ++keyLength;
8305  }
8306  result.first.assign(_pos, keyLength);
8307  if (i < _len && _data[i] == '=')
8308  {
8309  ++i;
8310  valueStart = i;
8311  for (; i < _len && _data[i] != _fieldSep; ++i)
8312  {
8313  valueLength++;
8314  }
8315  }
8316  result.second.assign(_data + valueStart, valueLength);
8317  return result;
8318  }
8319  };
8320  FIX(const Message::Field& data, char fieldSeparator = 1)
8321  : _data(data.data()), _len(data.len()),
8322  _fieldSep(fieldSeparator)
8323  {
8324  }
8325 
8326  FIX(const char* data, size_t len, char fieldSeparator = 1)
8327  : _data(data), _len(len), _fieldSep(fieldSeparator)
8328  {
8329  }
8330 
8331  iterator begin() const
8332  {
8333  return iterator(_data, _len, 0, _fieldSep);
8334  }
8335  iterator end() const
8336  {
8337  return iterator(_data, _len, _len, _fieldSep);
8338  }
8339 
8340 
8341  reverse_iterator rbegin() const
8342  {
8343  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8344  }
8345 
8346  reverse_iterator rend() const
8347  {
8348  return reverse_iterator(_data, _len, 0, _fieldSep);
8349  }
8350  };
8351 
8352 
8365 
8366  template <class T>
8368  {
8369  std::stringstream _data;
8370  char _fs;
8371  public:
8377  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8378 
8386  void append(const T& tag, const char* value, size_t offset, size_t length)
8387  {
8388  _data << tag << '=';
8389  _data.write(value + offset, (std::streamsize)length);
8390  _data << _fs;
8391  }
8397  void append(const T& tag, const std::string& value)
8398  {
8399  _data << tag << '=' << value << _fs;
8400  }
8401 
8404  std::string getString() const
8405  {
8406  return _data.str();
8407  }
8408  operator std::string() const
8409  {
8410  return _data.str();
8411  }
8412 
8414  void reset()
8415  {
8416  _data.str(std::string());
8417  }
8418  };
8419 
8423 
8425 
8429 
8431 
8432 
8440 
8442  {
8443  char _fs;
8444  public:
8449  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8450 
8453  typedef std::map<Message::Field, Message::Field> map_type;
8454 
8460  map_type toMap(const Message::Field& data)
8461  {
8462  FIX fix(data, _fs);
8463  map_type retval;
8464  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8465  {
8466  retval.insert(*a);
8467  }
8468 
8469  return retval;
8470  }
8471  };
8472 
8473 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8474  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8475  {
8476  Mutex _lock;
8477  std::deque<Message> _q;
8478  std::deque<Message> _cache;
8479  std::string _commandId;
8480  std::string _subId;
8481  std::string _queryId;
8482  Client _client;
8483  unsigned _timeout;
8484  unsigned _maxDepth;
8485  unsigned _requestedAcks;
8486  size_t _cacheMax;
8487  Message::Field _previousTopic;
8488  Message::Field _previousBookmark;
8489  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8490 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8491  std::atomic<State> _state;
8492 #else
8493  volatile State _state;
8494 #endif
8495  typedef std::map<std::string, Message*> SOWKeyMap;
8496  SOWKeyMap _sowKeyMap;
8497  public:
8498  MessageStreamImpl(const Client& client_)
8499  : _client(client_),
8500  _timeout(0),
8501  _maxDepth((unsigned)~0),
8502  _requestedAcks(0),
8503  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8504  _state(Unset)
8505  {
8506  if (_client.isValid())
8507  {
8508  _client.addConnectionStateListener(this);
8509  }
8510  }
8511 
8512  MessageStreamImpl(ClientImpl* client_)
8513  : _client(client_),
8514  _timeout(0),
8515  _maxDepth((unsigned)~0),
8516  _requestedAcks(0),
8517  _state(Unset)
8518  {
8519  if (_client.isValid())
8520  {
8521  _client.addConnectionStateListener(this);
8522  }
8523  }
8524 
8525  ~MessageStreamImpl()
8526  {
8527  }
8528 
8529  virtual void destroy()
8530  {
8531  try
8532  {
8533  close();
8534  }
8535  catch (std::exception& e)
8536  {
8537  try
8538  {
8539  if (_client.isValid())
8540  {
8541  _client.getExceptionListener().exceptionThrown(e);
8542  }
8543  }
8544  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8545  }
8546  if (_client.isValid())
8547  {
8548  _client.removeConnectionStateListener(this);
8549  Client c = _client;
8550  _client = Client((ClientImpl*)NULL);
8551  c.deferredExecution(MessageStreamImpl::destroyer, this);
8552  }
8553  else
8554  {
8555  delete this;
8556  }
8557  }
8558 
8559  static void destroyer(void* vpMessageStreamImpl_)
8560  {
8561  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8562  }
8563 
8564  void setSubscription(const std::string& subId_,
8565  const std::string& commandId_ = "",
8566  const std::string& queryId_ = "")
8567  {
8568  Lock<Mutex> lock(_lock);
8569  _subId = subId_;
8570  if (!commandId_.empty() && commandId_ != subId_)
8571  {
8572  _commandId = commandId_;
8573  }
8574  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8575  {
8576  _queryId = queryId_;
8577  }
8578  // It's possible to disconnect between creation/registration and here.
8579  if (Disconnected == _state)
8580  {
8581  return;
8582  }
8583  assert(Unset == _state);
8584  _state = Subscribe;
8585  }
8586 
8587  void setSOWOnly(const std::string& commandId_,
8588  const std::string& queryId_ = "")
8589  {
8590  Lock<Mutex> lock(_lock);
8591  _commandId = commandId_;
8592  if (!queryId_.empty() && queryId_ != commandId_)
8593  {
8594  _queryId = queryId_;
8595  }
8596  // It's possible to disconnect between creation/registration and here.
8597  if (Disconnected == _state)
8598  {
8599  return;
8600  }
8601  assert(Unset == _state);
8602  _state = SOWOnly;
8603  }
8604 
8605  void setStatsOnly(const std::string& commandId_,
8606  const std::string& queryId_ = "")
8607  {
8608  Lock<Mutex> lock(_lock);
8609  _commandId = commandId_;
8610  if (!queryId_.empty() && queryId_ != commandId_)
8611  {
8612  _queryId = queryId_;
8613  }
8614  // It's possible to disconnect between creation/registration and here.
8615  if (Disconnected == _state)
8616  {
8617  return;
8618  }
8619  assert(Unset == _state);
8620  _state = AcksOnly;
8621  _requestedAcks = Message::AckType::Stats;
8622  }
8623 
8624  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8625  {
8626  Lock<Mutex> lock(_lock);
8627  _commandId = commandId_;
8628  // It's possible to disconnect between creation/registration and here.
8629  if (Disconnected == _state)
8630  {
8631  return;
8632  }
8633  assert(Unset == _state);
8634  _state = AcksOnly;
8635  _requestedAcks = acks_;
8636  }
8637 
8638  void connectionStateChanged(ConnectionStateListener::State state_)
8639  {
8640  Lock<Mutex> lock(_lock);
8641  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8642  {
8643  _state = Disconnected;
8644  close();
8645  }
8646  else if (state_ == AMPS::ConnectionStateListener::Connected
8647  && _commandId.empty()
8648  && _subId.empty()
8649  && _queryId.empty())
8650  {
8651  // AC-1331 Reconnect before command was sent, so Unset
8652  _state = Unset;
8653  }
8654  _lock.signalAll();
8655  }
8656 
8657  void timeout(unsigned timeout_)
8658  {
8659  _timeout = timeout_;
8660  }
8661  void conflate(void)
8662  {
8663  if (_state == Subscribe)
8664  {
8665  _state = Conflate;
8666  }
8667  }
8668  void maxDepth(unsigned maxDepth_)
8669  {
8670  if (maxDepth_)
8671  {
8672  _maxDepth = maxDepth_;
8673  }
8674  else
8675  {
8676  _maxDepth = (unsigned)~0;
8677  }
8678  }
8679  unsigned getMaxDepth(void) const
8680  {
8681  return _maxDepth;
8682  }
8683  unsigned getDepth(void) const
8684  {
8685  return (unsigned)(_q.size());
8686  }
8687 
8688  bool next(Message& current_)
8689  {
8690  Lock<Mutex> lock(_lock);
8691  if (!_previousTopic.empty() && !_previousBookmark.empty())
8692  {
8693  try
8694  {
8695  if (_client.isValid())
8696  {
8697  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8698  }
8699  }
8700 #ifdef _WIN32
8701  catch (AMPSException&)
8702 #else
8703  catch (AMPSException& e)
8704 #endif
8705  {
8706  current_.invalidate();
8707  _previousTopic.clear();
8708  _previousBookmark.clear();
8709  return false;
8710  }
8711  _previousTopic.clear();
8712  _previousBookmark.clear();
8713  }
8714  // Don't wait to wait more than 1s at a time
8715  long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8716  Timer timer((double)_timeout);
8717  timer.start();
8718  while (_q.empty() && _state & Running)
8719  {
8720  // Using timeout so python can interrupt
8721  _lock.wait(minWaitTime);
8722  {
8723  Unlock<Mutex> unlck(_lock);
8724  amps_invoke_waiting_function();
8725  }
8726  if (_timeout)
8727  {
8728  // In case we woke up early, see how much longer to wait
8729  if (timer.checkAndGetRemaining(&minWaitTime))
8730  {
8731  // No time left
8732  break;
8733  }
8734  // Adjust next wait time
8735  minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8736  }
8737  }
8738  if (current_.isValid() && _cache.size() < _cacheMax)
8739  {
8740  current_.reset();
8741  _cache.push_back(current_);
8742  }
8743  if (!_q.empty())
8744  {
8745  current_ = _q.front();
8746  if (_q.size() == _maxDepth)
8747  {
8748  _lock.signalAll();
8749  }
8750  _q.pop_front();
8751  if (_state == Conflate)
8752  {
8753  std::string sowKey = current_.getSowKey();
8754  if (sowKey.length())
8755  {
8756  _sowKeyMap.erase(sowKey);
8757  }
8758  }
8759  else if (_state == AcksOnly)
8760  {
8761  _requestedAcks &= ~(current_.getAckTypeEnum());
8762  }
8763  if ((_state == AcksOnly && _requestedAcks == 0) ||
8764  (_state == SOWOnly && current_.getCommand() == "group_end"))
8765  {
8766  _state = Closed;
8767  }
8768  else if (current_.isValid()
8769  && current_.getCommandEnum() == Message::Command::Publish
8770  && _client.isValid() && _client.getAutoAck()
8771  && !current_.getLeasePeriod().empty()
8772  && !current_.getBookmark().empty())
8773  {
8774  _previousTopic = current_.getTopic().deepCopy();
8775  _previousBookmark = current_.getBookmark().deepCopy();
8776  }
8777  return true;
8778  }
8779  if (_state == Disconnected)
8780  {
8781  throw DisconnectedException("Connection closed.");
8782  }
8783  current_.invalidate();
8784  if (_state == Closed)
8785  {
8786  return false;
8787  }
8788  return _timeout != 0;
8789  }
8790  void close(void)
8791  {
8792  if (_client.isValid())
8793  {
8794  if (_state == SOWOnly || _state == Subscribe) //not delete
8795  {
8796  if (!_commandId.empty())
8797  {
8798  _client.unsubscribe(_commandId);
8799  }
8800  if (!_subId.empty())
8801  {
8802  _client.unsubscribe(_subId);
8803  }
8804  if (!_queryId.empty())
8805  {
8806  _client.unsubscribe(_queryId);
8807  }
8808  }
8809  else
8810  {
8811  if (!_commandId.empty())
8812  {
8813  _client.removeMessageHandler(_commandId);
8814  }
8815  if (!_subId.empty())
8816  {
8817  _client.removeMessageHandler(_subId);
8818  }
8819  if (!_queryId.empty())
8820  {
8821  _client.removeMessageHandler(_queryId);
8822  }
8823  }
8824  }
8825  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8826  {
8827  _state = Closed;
8828  }
8829  }
8830  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8831  {
8832  Lock<Mutex> lock(this_->_lock);
8833  if (this_->_state != Conflate)
8834  {
8835  AMPS_TESTING_SLOW_MESSAGE_STREAM
8836  if (this_->_q.size() >= this_->_maxDepth)
8837  {
8838  // We throw here so that heartbeats can be sent. The exception
8839  // will be handled internally only, and the same Message will
8840  // come back to try again. Make sure to signal.
8841  this_->_lock.signalAll();
8842  throw MessageStreamFullException("Stream is currently full.");
8843  }
8844  if (!this_->_cache.empty())
8845  {
8846  this_->_cache.front().deepCopy(message_);
8847  this_->_q.push_back(this_->_cache.front());
8848  this_->_cache.pop_front();
8849  }
8850  else
8851  {
8852 #ifdef AMPS_USE_EMPLACE
8853  this_->_q.emplace_back(message_.deepCopy());
8854 #else
8855  this_->_q.push_back(message_.deepCopy());
8856 #endif
8857  }
8858  if (message_.getCommandEnum() == Message::Command::Publish &&
8859  this_->_client.isValid() && this_->_client.getAutoAck() &&
8860  !message_.getLeasePeriod().empty() &&
8861  !message_.getBookmark().empty())
8862  {
8863  message_.setIgnoreAutoAck();
8864  }
8865  }
8866  else
8867  {
8868  std::string sowKey = message_.getSowKey();
8869  if (sowKey.length())
8870  {
8871  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8872  if (it != this_->_sowKeyMap.end())
8873  {
8874  it->second->deepCopy(message_);
8875  }
8876  else
8877  {
8878  if (this_->_q.size() >= this_->_maxDepth)
8879  {
8880  // We throw here so that heartbeats can be sent. The
8881  // exception will be handled internally only, and the
8882  // same Message will come back to try again. Make sure
8883  // to signal.
8884  this_->_lock.signalAll();
8885  throw MessageStreamFullException("Stream is currently full.");
8886  }
8887  if (!this_->_cache.empty())
8888  {
8889  this_->_cache.front().deepCopy(message_);
8890  this_->_q.push_back(this_->_cache.front());
8891  this_->_cache.pop_front();
8892  }
8893  else
8894  {
8895 #ifdef AMPS_USE_EMPLACE
8896  this_->_q.emplace_back(message_.deepCopy());
8897 #else
8898  this_->_q.push_back(message_.deepCopy());
8899 #endif
8900  }
8901  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8902  }
8903  }
8904  else
8905  {
8906  if (this_->_q.size() >= this_->_maxDepth)
8907  {
8908  // We throw here so that heartbeats can be sent. The exception
8909  // will be handled internally only, and the same Message will
8910  // come back to try again. Make sure to signal.
8911  this_->_lock.signalAll();
8912  throw MessageStreamFullException("Stream is currently full.");
8913  }
8914  if (!this_->_cache.empty())
8915  {
8916  this_->_cache.front().deepCopy(message_);
8917  this_->_q.push_back(this_->_cache.front());
8918  this_->_cache.pop_front();
8919  }
8920  else
8921  {
8922 #ifdef AMPS_USE_EMPLACE
8923  this_->_q.emplace_back(message_.deepCopy());
8924 #else
8925  this_->_q.push_back(message_.deepCopy());
8926 #endif
8927  }
8928  if (message_.getCommandEnum() == Message::Command::Publish &&
8929  this_->_client.isValid() && this_->_client.getAutoAck() &&
8930  !message_.getLeasePeriod().empty() &&
8931  !message_.getBookmark().empty())
8932  {
8933  message_.setIgnoreAutoAck();
8934  }
8935  }
8936  }
8937  this_->_lock.signalAll();
8938  }
8939  };
8940  inline MessageStream::MessageStream(void)
8941  {
8942  }
8943  inline MessageStream::MessageStream(const Client& client_)
8944  : _body(new MessageStreamImpl(client_))
8945  {
8946  }
8947  inline MessageStream::MessageStream(RefHandle<MessageStreamImpl> body_)
8948  : _body(body_)
8949  {
8950  }
8951  inline void MessageStream::iterator::advance(void)
8952  {
8953  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8954  }
8955  inline MessageStream::operator MessageHandler(void)
8956  {
8957  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8958  }
8959  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8960  {
8961  MessageStream result;
8962  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8963  {
8964  result._body = (MessageStreamImpl*)(handler_._userData);
8965  }
8966  return result;
8967  }
8968 
8969  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8970  const std::string& queryId_)
8971  {
8972  _body->setSOWOnly(commandId_, queryId_);
8973  }
8974  inline void MessageStream::setSubscription(const std::string& subId_,
8975  const std::string& commandId_,
8976  const std::string& queryId_)
8977  {
8978  _body->setSubscription(subId_, commandId_, queryId_);
8979  }
8980  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8981  const std::string& queryId_)
8982  {
8983  _body->setStatsOnly(commandId_, queryId_);
8984  }
8985  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8986  unsigned acks_)
8987  {
8988  _body->setAcksOnly(commandId_, acks_);
8989  }
8990  inline MessageStream MessageStream::timeout(unsigned timeout_)
8991  {
8992  _body->timeout(timeout_);
8993  return *this;
8994  }
8996  {
8997  _body->conflate();
8998  return *this;
8999  }
9000  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
9001  {
9002  _body->maxDepth(maxDepth_);
9003  return *this;
9004  }
9005  inline unsigned MessageStream::getMaxDepth(void) const
9006  {
9007  return _body->getMaxDepth();
9008  }
9009  inline unsigned MessageStream::getDepth(void) const
9010  {
9011  return _body->getDepth();
9012  }
9013 
9014  inline MessageStream ClientImpl::getEmptyMessageStream(void)
9015  {
9016  return MessageStream(_pEmptyMessageStream.get()->_body);
9017  }
9018 
9020  {
9021  // If the command is sow and has a sub_id, OR
9022  // if the command has a replace option, return the existing
9023  // messagestream, don't create a new one.
9024  ClientImpl& body = _body.get();
9025  Message& message = command_.getMessage();
9026  Field subId = message.getSubscriptionId();
9027  unsigned ackTypes = message.getAckTypeEnum();
9028  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
9029  if (useExistingHandler)
9030  {
9031  // Try to find the existing message handler.
9032  if (!subId.empty())
9033  {
9034  MessageHandler existingHandler;
9035  if (body._routes.getRoute(subId, existingHandler))
9036  {
9037  // we found an existing handler. It might not be a message stream, but that's okay.
9038  body.executeAsync(command_, existingHandler, false);
9039  return MessageStream::fromExistingHandler(existingHandler);
9040  }
9041  }
9042  // fall through; we'll a new handler altogether.
9043  }
9044  // Make sure something will be returned to the stream or use the empty one
9045  // Check that: it's a command that doesn't normally return data, and there
9046  // are no acks requested for the cmd id
9047  Message::Command::Type command = message.getCommandEnum();
9048  if ((command & Message::Command::NoDataCommands)
9049  && (ackTypes == Message::AckType::Persisted
9050  || ackTypes == Message::AckType::None))
9051  {
9052  executeAsync(command_, MessageHandler());
9053  if (!body._pEmptyMessageStream)
9054  {
9055  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
9056  body._pEmptyMessageStream.get()->_body->close();
9057  }
9058  return body.getEmptyMessageStream();
9059  }
9060  MessageStream stream(*this);
9061  if (body.getDefaultMaxDepth())
9062  {
9063  stream.maxDepth(body.getDefaultMaxDepth());
9064  }
9065  MessageHandler handler = stream.operator MessageHandler();
9066  std::string commandID = body.executeAsync(command_, handler, false);
9067  if (command_.hasStatsAck())
9068  {
9069  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
9070  }
9071  else if (command_.isSow())
9072  {
9073  if (command_.getAckTypeEnum() & Message::AckType::Completed)
9074  {
9075  stream.setAcksOnly(commandID,
9076  ackTypes);
9077  }
9078  else
9079  {
9080  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
9081  }
9082  }
9083  else if (command_.isSubscribe())
9084  {
9085  stream.setSubscription(commandID,
9086  command_.getMessage().getCommandId(),
9087  command_.getMessage().getQueryId());
9088  }
9089  else
9090  {
9091  // Persisted acks for writes don't come back with command id
9092  if (command == Message::Command::Publish ||
9093  command == Message::Command::DeltaPublish ||
9094  command == Message::Command::SOWDelete)
9095  {
9096  stream.setAcksOnly(commandID,
9097  ackTypes & (unsigned)~Message::AckType::Persisted);
9098  }
9099  else
9100  {
9101  stream.setAcksOnly(commandID, ackTypes);
9102  }
9103  }
9104  return stream;
9105  }
9106 
9107 // This is here because it uses api from Client.
9108  inline void Message::ack(const char* options_) const
9109  {
9110  ClientImpl* pClient = _body.get().clientImpl();
9111  Message::Field bookmark = getBookmark();
9112  if (pClient && bookmark.len() &&
9113  !pClient->getAutoAck())
9114  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
9115  {
9116  pClient->ack(getTopic(), bookmark, options_);
9117  }
9118  }
9119 }// end namespace AMPS
9120 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:748
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:699
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1538
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5206
AMPSDLL amps_result amps_client_set_http_preflight_callback(amps_handle client, amps_http_preflight_callback callback, void *userData)
Sets a user-supplied callback function for when a connection is established and the provided uri incl...
Field getUserId() const
Retrieves the value of the UserId header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1515
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6935
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6909
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:550
std::string getAckType() const
Definition: ampsplusplus.hpp:953
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Message & assignOwnershipBookmark(const Field &f)
Assigns the value of the Bookmark header for this Message without copying and makes this Message resp...
Definition: Message.hpp:1256
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5467
Message & assignTopic(const std::string &v)
Assigns the value of the Topic header for this Message without copying.
Definition: Message.hpp:1511
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8367
void startTimer()
Definition: ampsplusplus.hpp:6898
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6434
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8995
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1484
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5495
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1366
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1479
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:560
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1290
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:759
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7207
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:931
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7575
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5267
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5355
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6223
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1477
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:673
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7584
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7443
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6174
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5604
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1346
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5869
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5370
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1368
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:725
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:870
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5620
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:587
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7298
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:853
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7527
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5341
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6962
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:568
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1245
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1317
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8404
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7595
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5756
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5108
void addHttpPreflightHeader(const std::string &key_, const std::string &value_)
Adds a given key/value pair as an HTTP header line as "key: value" to the end of the headers that wil...
Definition: ampsplusplus.hpp:5302
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a Field which references the underlyi...
Definition: Message.hpp:1371
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:660
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7226
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:909
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7236
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5426
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1486
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6146
Field getFilter() const
Retrieves the value of the Filter header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1368
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1160
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7557
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:9005
Message & assignUserId(const std::string &v)
Assigns the value of the UserId header for this Message without copying.
Definition: Message.hpp:1515
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5401
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5456
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1461
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6628
Success.
Definition: amps.h:221
AMPSDLL void amps_client_set_batch_send(amps_handle client_, amps_uint64_t batchSizeBytes_, amps_uint64_t batchTimeout_)
Sets a byte size batchSizeBytes and timeout for using batch sends of publish and delta_publish messag...
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1350
Field getOptions() const
Retrieves the value of the Options header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1378
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1043
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5846
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8449
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6270
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:609
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5439
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:5101
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5657
Field getAckType() const
Retrieves the value of the AckType header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1192
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1257
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:9019
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:670
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1511
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5612
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7609
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:958
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5330
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1505
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:829
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5918
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1250
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5259
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1105
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6582
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5189
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:7182
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:718
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7217
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1364
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7290
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:583
void clearHttpPreflightHeaders()
Clears all previously set HTTP header lines.
Definition: ampsplusplus.hpp:5308
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7305
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1484
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5705
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:666
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1515
Field getTopic() const
Retrieves the value of the Topic header of the Message as a Field which references the underlying buf...
Definition: Message.hpp:1511
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5568
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5981
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:575
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1374
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a Field which references the underlying...
Definition: Message.hpp:1364
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:7090
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1479
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1060
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1501
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1489
virtual void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Definition: ampsplusplus.hpp:5516
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:7003
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:797
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1479
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Assigns the value of the Expiration header for this Message without copying.
Definition: Message.hpp:1367
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1271
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1055
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6086
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7246
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:592
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:810
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5937
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1365
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1486
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1037
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5278
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1257
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8377
void addHttpPreflightHeader(const std::string &header_)
Adds a given HTTP header line to the end of the headers that will be sent for the HTTP GET Upgrade re...
Definition: ampsplusplus.hpp:5293
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7455
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1050
Message & assignSubscriptionId(const std::string &v)
Assigns the value of the SubscriptionId header for this Message without copying.
Definition: Message.hpp:1489
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1091
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6472
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1191
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8386
Message & assignCommand(const std::string &v)
Assigns the value of the Command header for this Message without copying.
Definition: Message.hpp:1257
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1331
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5560
void setHttpPreflightHeaders(const T &headers_)
Sets the given HTTP header lines to be sent for the HTTP GET Upgrade request.
Definition: ampsplusplus.hpp:5317
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1288
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7516
Field getPassword() const
Retrieves the value of the Password header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1478
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1513
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1381
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1489
void setPublishBatching(size_t batchSize_, amps_uint64_t batchTimeoutMillis_)
Sets the max bytes to cache and max timeout in millis for caching delta_publish and publish commands...
Definition: ampsplusplus.hpp:7621
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1487
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5552
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1367
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:705
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:7083
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:600
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5580
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:7149
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8397
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7506
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5823
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6559
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:889
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5063
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6805
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1280
AMPSDLL amps_result amps_client_send_batch(amps_handle client, amps_handle message, unsigned *version_out, int addToBatch)
Adds a message to the send cache, possibly sending the cache.
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:837
Message & assignAckType(const std::string &v)
Assigns the value of the AckType header for this Message without copying.
Definition: Message.hpp:1192
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5802
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7335
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:625
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6764
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5956
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6235
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7498
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6104
Message & assignCorrelationId(const std::string &v)
Assigns the value of the CorrelationId header for this Message without copying.
Definition: Message.hpp:1366
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1239
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6728
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:873
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:7067
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1363
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5635
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:738
DisconnectHandler getDisconnectHandler(void) const
Definition: ampsplusplus.hpp:5530
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8441
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1192
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5539
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1406
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7431
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1439
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8453
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:686
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1223
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8460
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6661
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7140
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:6039
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:5055
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1355
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:803
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7268
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:731
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message sowDelete(const std::string &topic_, const std::string &filter_, long timeout_=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6867
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1364
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:816
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7279
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1479
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:696
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1309
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:9000
Message & assignVersion(const std::string &v)
Assigns the value of the Version header for this Message without copying.
Definition: Message.hpp:1514
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5408
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6308
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:9009
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1479
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5285
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7116
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:857
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5377
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:895
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1259
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7257
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1256
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:712
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1222
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:7054
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8414
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5649
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6683
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7539
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:642
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1478
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7488
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5680
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5244
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:1006
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1301
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1411
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:782
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6329
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5729
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:7156
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5895
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:7024
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6068
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8990
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6370
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5251
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:472
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6520
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1255
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:6007
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6195
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:612
Message & assignSequence(const std::string &v)
Assigns the value of the Sequence header for this Message without copying.
Definition: Message.hpp:1484
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7369
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:770
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6844
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5119
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7566
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7479
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6402