AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.0
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <regex>
39 #include <set>
40 #include <deque>
41 #include <vector>
42 #include <assert.h>
43 #ifndef _WIN32
44  #include <inttypes.h>
45 #endif
46 #if defined(sun)
47  #include <sys/atomic.h>
48 #endif
49 #include "amps/BookmarkStore.hpp"
50 #include "amps/MessageRouter.hpp"
51 #include "amps/util.hpp"
52 #include "amps/ampscrc.hpp"
53 #if __cplusplus >= 201100L || _MSC_VER >= 1900
54  #include <atomic>
55 #endif
56 
57 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
58  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
59 #endif
60 
65 
66 
73 
84 
85 // For StoreBuffer implementations
86 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
87 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
88 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
89 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
90 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
91 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
92 #define AMPS_DEFAULT_TOP_N -1
93 #define AMPS_DEFAULT_BATCH_SIZE 10
94 #define AMPS_NUMBER_BUFFER_LEN 20
95 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
96 
97 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
98  #define AMPS_X64 1
99 #endif
100 
101 static thread_local AMPS::Message publishStoreMessage;
102 
103 namespace AMPS
104 {
105 
106  typedef std::map<std::string, std::string> ConnectionInfo;
107 
108  template<class Type>
109  inline std::string asString(Type x_)
110  {
111  std::ostringstream os;
112  os << x_;
113  return os.str();
114  }
115 
116  inline
117  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
118  {
119  size_t pos = AMPS_NUMBER_BUFFER_LEN;
120  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
121  {
122  if (seqNo_ > 0)
123  {
124  buf_[--pos] = (char)(seqNo_ % 10 + '0');
125  seqNo_ /= 10;
126  }
127  }
128  return pos;
129  }
130 
131 #ifdef _WIN32
132  inline
133  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
134  {
135  size_t pos = AMPS_NUMBER_BUFFER_LEN;
136  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
137  {
138  if (seqNo_ > 0)
139  {
140  buf_[--pos] = (char)(seqNo_ % 10 + '0');
141  seqNo_ /= 10;
142  }
143  }
144  return pos;
145  }
146 #endif
147 
151  class Reason
152  {
153  public:
154  static const char* duplicate()
155  {
156  return "duplicate";
157  }
158  static const char* badFilter()
159  {
160  return "bad filter";
161  }
162  static const char* badRegexTopic()
163  {
164  return "bad regex topic";
165  }
166  static const char* subscriptionAlreadyExists()
167  {
168  return "subscription already exists";
169  }
170  static const char* nameInUse()
171  {
172  return "name in use";
173  }
174  static const char* authFailure()
175  {
176  return "auth failure";
177  }
178  static const char* notEntitled()
179  {
180  return "not entitled";
181  }
182  static const char* authDisabled()
183  {
184  return "authentication disabled";
185  }
186  static const char* subidInUse()
187  {
188  return "subid in use";
189  }
190  static const char* noTopic()
191  {
192  return "no topic";
193  }
194  };
195 
205  {
206  public:
207  virtual ~ExceptionListener() {;}
208  virtual void exceptionThrown(const std::exception&) const {;}
209  };
210 
212 
213 
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
215  try\
216  {\
217  x;\
218  }\
219  catch (std::exception& ex_)\
220  {\
221  try\
222  {\
223  _exceptionListener->exceptionThrown(ex_);\
224  }\
225  catch(...)\
226  {\
227  ;\
228  }\
229  }
230  /*
231  * Note : we don't attempt to trap non std::exception exceptions
232  * here because doing so interferes with pthread_exit on some OSes.
233  catch (...)\
234  {\
235  try\
236  {\
237  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
238  "An unhandled exception of unknown type was thrown by "\
239  "the registered handler.", AMPS_E_USAGE));\
240  }\
241  catch(...)\
242  {\
243  ;\
244  }\
245  }*/
246 #ifdef _WIN32
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
248  try\
249  {\
250  while(me->_connected)\
251  {\
252  try\
253  {\
254  x;\
255  break;\
256  }\
257  catch(MessageStreamFullException&)\
258  {\
259  try\
260  {\
261  me->checkAndSendHeartbeat(false);\
262  }\
263  catch (std::exception& ex_)\
264  {\
265  try\
266  {\
267  me->_exceptionListener->exceptionThrown(ex_);\
268  }\
269  catch(...)\
270  {\
271  ;\
272  }\
273  break;\
274  }\
275  }\
276  }\
277  }\
278  catch (std::exception& ex_)\
279  {\
280  try\
281  {\
282  me->_exceptionListener->exceptionThrown(ex_);\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }
289  /*
290  * Note : we don't attempt to trap non std::exception exceptions
291  * here because doing so interferes with pthread_exit on some OSes.
292  catch (...)\
293  {\
294  try\
295  {\
296  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
297  "An unhandled exception of unknown type was thrown by "\
298  "the registered handler.", AMPS_E_USAGE));\
299  }\
300  catch(...)\
301  {\
302  ;\
303  }\
304  }*/
305 
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException&)\
315  {\
316  try\
317  {\
318  me->checkAndSendHeartbeat(false);\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  break;\
331  }\
332  }\
333  }
334 #else
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
336  try\
337  {\
338  while(me->_connected)\
339  {\
340  try\
341  {\
342  x;\
343  break;\
344  }\
345  catch(MessageStreamFullException& ex_)\
346  {\
347  try\
348  {\
349  me->checkAndSendHeartbeat(false);\
350  }\
351  catch (std::exception& ex_)\
352  {\
353  try\
354  {\
355  me->_exceptionListener->exceptionThrown(ex_);\
356  }\
357  catch(...)\
358  {\
359  ;\
360  }\
361  break;\
362  }\
363  }\
364  }\
365  }\
366  catch (std::exception& ex_)\
367  {\
368  try\
369  {\
370  me->_exceptionListener->exceptionThrown(ex_);\
371  }\
372  catch(...)\
373  {\
374  ;\
375  }\
376  }
377  /*
378  * Note : we don't attempt to trap non std::exception exceptions
379  * here because doing so interferes with pthread_exit on some OSes.
380  catch (...)\
381  {\
382  try\
383  {\
384  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
385  "An unhandled exception of unknown type was thrown by "\
386  "the registered handler.", AMPS_E_USAGE));\
387  }\
388  catch(...)\
389  {\
390  ;\
391  }\
392  }*/
393 
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395  while(me->_connected)\
396  {\
397  try\
398  {\
399  x;\
400  break;\
401  }\
402  catch(MessageStreamFullException& ex_)\
403  {\
404  try\
405  {\
406  me->checkAndSendHeartbeat(false);\
407  }\
408  catch (std::exception& ex_)\
409  {\
410  try\
411  {\
412  me->_exceptionListener->exceptionThrown(ex_);\
413  }\
414  catch(...)\
415  {\
416  ;\
417  }\
418  break;\
419  }\
420  }\
421  }
422 #endif
423 
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
425  try\
426  {\
427  _exceptionListener->exceptionThrown(ex);\
428  }\
429  catch(...)\
430  {\
431  ;\
432  }
433 
434 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
435  try\
436  {\
437  me->_exceptionListener->exceptionThrown(ex);\
438  }\
439  catch(...)\
440  {\
441  ;\
442  }
443 
444 
445  class Client;
446 
471 
472  class Command
473  {
474  Message _message;
475  unsigned _timeout;
476  unsigned _batchSize;
477  unsigned _flags;
478  static const unsigned Subscribe = 1;
479  static const unsigned SOW = 2;
480  static const unsigned NeedsSequenceNumber = 4;
481  static const unsigned ProcessedAck = 8;
482  static const unsigned StatsAck = 16;
483  void init(Message::Command::Type command_)
484  {
485  _timeout = 0;
486  _batchSize = 0;
487  _flags = 0;
488  _message.reset();
489  _message.setCommandEnum(command_);
490  _setIds();
491  }
492  void init(const std::string& command_)
493  {
494  _timeout = 0;
495  _batchSize = 0;
496  _flags = 0;
497  _message.reset();
498  _message.setCommand(command_);
499  _setIds();
500  }
501  void init(const char* command_, size_t commandLen_)
502  {
503  _timeout = 0;
504  _batchSize = 0;
505  _flags = 0;
506  _message.reset();
507  _message.setCommand(command_, commandLen_);
508  _setIds();
509  }
510  void _setIds(void)
511  {
512  Message::Command::Type command = _message.getCommandEnum();
513  if (!(command & Message::Command::NoDataCommands))
514  {
515  _message.newCommandId();
516  if (command == Message::Command::Subscribe ||
517  command == Message::Command::SOWAndSubscribe ||
518  command == Message::Command::DeltaSubscribe ||
519  command == Message::Command::SOWAndDeltaSubscribe)
520  {
521  _message.setSubscriptionId(_message.getCommandId());
522  _flags |= Subscribe;
523  }
524  if (command == Message::Command::SOW
525  || command == Message::Command::SOWAndSubscribe
526  || command == Message::Command::SOWAndDeltaSubscribe)
527  {
528  _message.setQueryID(_message.getCommandId());
529  if (_batchSize == 0)
530  {
531  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
532  }
533  if (command == Message::Command::SOW)
534  {
535  _flags |= SOW;
536  }
537  }
538  _flags |= ProcessedAck;
539  }
540  else if (command == Message::Command::SOWDelete)
541  {
542  _message.newCommandId();
543  _flags |= ProcessedAck;
544  _flags |= NeedsSequenceNumber;
545  }
546  else if (command == Message::Command::Publish
547  || command == Message::Command::DeltaPublish)
548  {
549  _flags |= NeedsSequenceNumber;
550  }
551  else if (command == Message::Command::StopTimer)
552  {
553  _message.newCommandId();
554  }
555  }
556  public:
560  Command(const std::string& command_)
561  {
562  init(command_);
563  }
568  Command(const char* command_, size_t commandLen_)
569  {
570  init(command_, commandLen_);
571  }
575  Command(Message::Command::Type command_)
576  {
577  init(command_);
578  }
579 
583  Command& reset(const std::string& command_)
584  {
585  init(command_);
586  return *this;
587  }
592  Command& reset(const char* command_, size_t commandLen_)
593  {
594  init(command_, commandLen_);
595  return *this;
596  }
600  Command& reset(Message::Command::Type command_)
601  {
602  init(command_);
603  return *this;
604  }
612  Command& setSowKey(const std::string& sowKey_)
613  {
614  _message.setSowKey(sowKey_);
615  return *this;
616  }
625  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
626  {
627  _message.setSowKey(sowKey_, sowKeyLen_);
628  return *this;
629  }
642  Command& setSowKeys(const std::string& sowKeys_)
643  {
644  _message.setSowKeys(sowKeys_);
645  return *this;
646  }
660  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
661  {
662  _message.setSowKeys(sowKeys_, sowKeysLen_);
663  return *this;
664  }
666  Command& setCommandId(const std::string& cmdId_)
667  {
668  _message.setCommandId(cmdId_);
669  return *this;
670  }
673  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
674  {
675  _message.setCommandId(cmdId_, cmdIdLen_);
676  return *this;
677  }
679  Command& setTopic(const std::string& topic_)
680  {
681  _message.setTopic(topic_);
682  return *this;
683  }
686  Command& setTopic(const char* topic_, size_t topicLen_)
687  {
688  _message.setTopic(topic_, topicLen_);
689  return *this;
690  }
692  Command& setFilter(const std::string& filter_)
693  {
694  _message.setFilter(filter_);
695  return *this;
696  }
699  Command& setFilter(const char* filter_, size_t filterLen_)
700  {
701  _message.setFilter(filter_, filterLen_);
702  return *this;
703  }
705  Command& setOrderBy(const std::string& orderBy_)
706  {
707  _message.setOrderBy(orderBy_);
708  return *this;
709  }
712  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
713  {
714  _message.setOrderBy(orderBy_, orderByLen_);
715  return *this;
716  }
718  Command& setSubId(const std::string& subId_)
719  {
720  _message.setSubscriptionId(subId_);
721  return *this;
722  }
725  Command& setSubId(const char* subId_, size_t subIdLen_)
726  {
727  _message.setSubscriptionId(subId_, subIdLen_);
728  return *this;
729  }
731  Command& setQueryId(const std::string& queryId_)
732  {
733  _message.setQueryId(queryId_);
734  return *this;
735  }
738  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
739  {
740  _message.setQueryId(queryId_, queryIdLen_);
741  return *this;
742  }
748  Command& setBookmark(const std::string& bookmark_)
749  {
750  _message.setBookmark(bookmark_);
751  return *this;
752  }
759  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
760  {
761  _message.setBookmark(bookmark_, bookmarkLen_);
762  return *this;
763  }
770  Command& setCorrelationId(const std::string& correlationId_)
771  {
772  _message.setCorrelationId(correlationId_);
773  return *this;
774  }
782  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
783  {
784  _message.setCorrelationId(correlationId_, correlationIdLen_);
785  return *this;
786  }
789  Command& setOptions(const std::string& options_)
790  {
791  _message.setOptions(options_);
792  return *this;
793  }
797  Command& setOptions(const char* options_, size_t optionsLen_)
798  {
799  _message.setOptions(options_, optionsLen_);
800  return *this;
801  }
803  Command& setSequence(const std::string& seq_)
804  {
805  _message.setSequence(seq_);
806  return *this;
807  }
810  Command& setSequence(const char* seq_, size_t seqLen_)
811  {
812  _message.setSequence(seq_, seqLen_);
813  return *this;
814  }
816  Command& setSequence(const amps_uint64_t seq_)
817  {
818  std::ostringstream os;
819  os << seq_;
820  _message.setSequence(os.str());
821  return *this;
822  }
823  amps_uint64_t getSequence() const
824  {
825  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
826  }
829  Command& setData(const std::string& data_)
830  {
831  _message.setData(data_);
832  return *this;
833  }
837  Command& setData(const char* data_, size_t dataLen_)
838  {
839  _message.setData(data_, dataLen_);
840  return *this;
841  }
851  Command& setTimeout(unsigned timeout_)
852  {
853  _timeout = timeout_;
854  return *this;
855  }
857  Command& setTopN(unsigned topN_)
858  {
859  if (topN_ != (unsigned)AMPS_DEFAULT_TOP_N)
860  {
861  _message.setTopNRecordsReturned(topN_);
862  }
863  else
864  {
865  _message.setTopNRecordsReturned(nullptr, 0);
866  }
867  return *this;
868  }
873  Command& setBatchSize(unsigned batchSize_)
874  {
875  _message.setBatchSize(batchSize_);
876  _batchSize = batchSize_;
877  return *this;
878  }
889  Command& setExpiration(unsigned expiration_)
890  {
891  _message.setExpiration(expiration_);
892  return *this;
893  }
895  Command& addAckType(const std::string& ackType_)
896  {
897  _message.setAckType(_message.getAckType() + "," + ackType_);
898  if (ackType_ == "processed")
899  {
900  _flags |= ProcessedAck;
901  }
902  else if (ackType_ == "stats")
903  {
904  _flags |= StatsAck;
905  }
906  return *this;
907  }
909  Command& setAckType(const std::string& ackType_)
910  {
911  _message.setAckType(ackType_);
912  if (ackType_.find("processed") != std::string::npos)
913  {
914  _flags |= ProcessedAck;
915  }
916  else
917  {
918  _flags &= ~ProcessedAck;
919  }
920  if (ackType_.find("stats") != std::string::npos)
921  {
922  _flags |= StatsAck;
923  }
924  else
925  {
926  _flags &= ~StatsAck;
927  }
928  return *this;
929  }
931  Command& setAckType(unsigned ackType_)
932  {
933  _message.setAckTypeEnum(ackType_);
934  if (ackType_ & Message::AckType::Processed)
935  {
936  _flags |= ProcessedAck;
937  }
938  else
939  {
940  _flags &= ~ProcessedAck;
941  }
942  if (ackType_ & Message::AckType::Stats)
943  {
944  _flags |= StatsAck;
945  }
946  else
947  {
948  _flags &= ~StatsAck;
949  }
950  return *this;
951  }
953  std::string getAckType() const
954  {
955  return (std::string)(_message.getAckType());
956  }
958  unsigned getAckTypeEnum() const
959  {
960  return _message.getAckTypeEnum();
961  }
962 
963  Message& getMessage(void)
964  {
965  return _message;
966  }
967  unsigned getTimeout(void) const
968  {
969  return _timeout;
970  }
971  unsigned getBatchSize(void) const
972  {
973  return _batchSize;
974  }
975  bool isSubscribe(void) const
976  {
977  return _flags & Subscribe;
978  }
979  bool isSow(void) const
980  {
981  return (_flags & SOW) != 0;
982  }
983  bool hasProcessedAck(void) const
984  {
985  return (_flags & ProcessedAck) != 0;
986  }
987  bool hasStatsAck(void) const
988  {
989  return (_flags & StatsAck) != 0;
990  }
991  bool needsSequenceNumber(void) const
992  {
993  return (_flags & NeedsSequenceNumber) != 0;
994  }
995  };
996 
999  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
1000 
1001  class Message;
1003 
1007  {
1008  public:
1009  virtual ~Authenticator() {;}
1010 
1016  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
1024  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
1031  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
1032  };
1033 
1038  {
1039  public:
1040  virtual ~DefaultAuthenticator() {;}
1043  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1044  {
1045  return password_;
1046  }
1047 
1050  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1051  {
1052  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1053  }
1054 
1055  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1056 
1061  {
1062  static DefaultAuthenticator d;
1063  return d;
1064  }
1065  };
1066 
1070  {
1071  public:
1072 
1076  virtual void execute(Message& message_) = 0;
1077 
1078  virtual ~StoreReplayer() {;}
1079  };
1080 
1081  class Store;
1082 
1091  typedef bool (*PublishStoreResizeHandler)(Store store_,
1092  size_t size_,
1093  void* userData_);
1094 
1097  class StoreImpl : public RefBody
1098  {
1099  public:
1105  StoreImpl(bool errorOnPublishGap_ = false)
1106  : _resizeHandler(NULL)
1107  , _resizeHandlerData(NULL)
1108  , _errorOnPublishGap(errorOnPublishGap_)
1109  {;}
1110 
1115  virtual amps_uint64_t store(const Message& message_) = 0;
1116 
1123  virtual void discardUpTo(amps_uint64_t index_) = 0;
1124 
1129  virtual void replay(StoreReplayer& replayer_) = 0;
1130 
1138  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1139 
1144  virtual size_t unpersistedCount() const = 0;
1145 
1146  virtual ~StoreImpl() {;}
1147 
1156  virtual void flush(long timeout_) = 0;
1157 
1160  static inline size_t getUnsetPosition()
1161  {
1162  return AMPS_UNSET_INDEX;
1163  }
1164 
1167  static inline amps_uint64_t getUnsetSequence()
1168  {
1169  return AMPS_UNSET_SEQUENCE;
1170  }
1171 
1175  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1176 
1180  virtual amps_uint64_t getLastPersisted() = 0;
1181 
1191  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1192  void* userData_)
1193  {
1194  _resizeHandler = handler_;
1195  _resizeHandlerData = userData_;
1196  }
1197 
1198  inline virtual PublishStoreResizeHandler getResizeHandler() const
1199  {
1200  return _resizeHandler;
1201  }
1202 
1203  bool callResizeHandler(size_t newSize_);
1204 
1205  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1206  {
1207  _errorOnPublishGap = errorOnPublishGap_;
1208  }
1209 
1210  inline virtual bool getErrorOnPublishGap() const
1211  {
1212  return _errorOnPublishGap;
1213  }
1214 
1215  private:
1216  PublishStoreResizeHandler _resizeHandler;
1217  void* _resizeHandlerData;
1218  bool _errorOnPublishGap;
1219  };
1220 
1223  class Store
1224  {
1225  RefHandle<StoreImpl> _body;
1226  public:
1227  Store() {;}
1228  Store(StoreImpl* body_) : _body(body_) {;}
1229  Store(const Store& rhs) : _body(rhs._body) {;}
1230  Store& operator=(const Store& rhs)
1231  {
1232  _body = rhs._body;
1233  return *this;
1234  }
1235 
1239  amps_uint64_t store(const Message& message_)
1240  {
1241  return _body.get().store(message_);
1242  }
1243 
1250  void discardUpTo(amps_uint64_t index_)
1251  {
1252  _body.get().discardUpTo(index_);
1253  }
1254 
1259  void replay(StoreReplayer& replayer_)
1260  {
1261  _body.get().replay(replayer_);
1262  }
1263 
1271  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1272  {
1273  return _body.get().replaySingle(replayer_, index_);
1274  }
1275 
1280  size_t unpersistedCount() const
1281  {
1282  return _body.get().unpersistedCount();
1283  }
1284 
1288  bool isValid() const
1289  {
1290  return _body.isValid();
1291  }
1292 
1301  void flush(long timeout_ = 0)
1302  {
1303  return _body.get().flush(timeout_);
1304  }
1305 
1309  amps_uint64_t getLowestUnpersisted()
1310  {
1311  return _body.get().getLowestUnpersisted();
1312  }
1313 
1317  amps_uint64_t getLastPersisted()
1318  {
1319  return _body.get().getLastPersisted();
1320  }
1321 
1331  void setResizeHandler(PublishStoreResizeHandler handler_,
1332  void* userData_)
1333  {
1334  _body.get().setResizeHandler(handler_, userData_);
1335  }
1336 
1337  PublishStoreResizeHandler getResizeHandler() const
1338  {
1339  return _body.get().getResizeHandler();
1340  }
1341 
1346  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1347  {
1348  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1349  }
1350 
1355  inline bool getErrorOnPublishGap() const
1356  {
1357  return _body.get().getErrorOnPublishGap();
1358  }
1359 
1363  StoreImpl* get()
1364  {
1365  if (_body.isValid())
1366  {
1367  return &_body.get();
1368  }
1369  else
1370  {
1371  return NULL;
1372  }
1373  }
1374 
1375  };
1376 
1382  {
1383  public:
1384  virtual ~FailedWriteHandler() {;}
1391  virtual void failedWrite(const Message& message_,
1392  const char* reason_, size_t reasonLength_) = 0;
1393  };
1394 
1395 
1396  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1397  {
1398  if (_resizeHandler)
1399  {
1400  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1401  }
1402  return true;
1403  }
1404 
1411  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1412  void* data_)
1413  {
1414  long* timeoutp = (long*)data_;
1415  size_t count = store_.unpersistedCount();
1416  if (count == 0)
1417  {
1418  return false;
1419  }
1420  try
1421  {
1422  store_.flush(*timeoutp);
1423  }
1424 #ifdef _WIN32
1425  catch (const TimedOutException&)
1426 #else
1427  catch (const TimedOutException& e)
1428 #endif
1429  {
1430  return true;
1431  }
1432  return (count == store_.unpersistedCount());
1433  }
1434 
1440  {
1441  public:
1453  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1454  unsigned requestedAckTypes_,
1455  const AMPSException& exception_) = 0;
1456  };
1457 
1462  {
1463  public:
1464  virtual ~SubscriptionManager() {;}
1472  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1473  unsigned requestedAckTypes_) = 0;
1477  virtual void unsubscribe(const Message::Field& subId_) = 0;
1480  virtual void clear() = 0;
1484  virtual void resubscribe(Client& client_) = 0;
1489  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1490  {
1491  _failedResubscribeHandler = handler_;
1492  }
1493  protected:
1494  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1495  };
1496 
1500 
1502  {
1503  public:
1505  typedef enum { Disconnected = 0,
1506  Shutdown = 1,
1507  Connected = 2,
1508  LoggedOn = 4,
1509  PublishReplayed = 8,
1510  HeartbeatInitiated = 16,
1511  Resubscribed = 32,
1512  UNKNOWN = 16384
1513  } State;
1514 
1524  virtual void connectionStateChanged(State newState_) = 0;
1525  virtual ~ConnectionStateListener() {;}
1526  };
1527 
1528 
1529  class MessageStreamImpl;
1530  class MessageStream;
1531 
1532  typedef void(*DeferredExecutionFunc)(void*);
1533 
1534  class ClientImpl : public RefBody // -V553
1535  {
1536  // Class to wrap turning of Nagle for things like flush and logon
1537  class NoDelay
1538  {
1539  private:
1540  AMPS_SOCKET _socket;
1541  int _noDelay;
1542  char* _valuePtr;
1543 #ifdef _WIN32
1544  int _valueLen;
1545 #else
1546  socklen_t _valueLen;
1547 #endif
1548  public:
1549  NoDelay(amps_handle client_)
1550  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1551  {
1552  _valuePtr = (char*)&_noDelay;
1553  _socket = amps_client_get_socket(client_);
1554  if (_socket != AMPS_INVALID_SOCKET)
1555  {
1556  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1557  if (!_noDelay)
1558  {
1559  _noDelay = 1;
1560  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1561  }
1562  else
1563  {
1564  _socket = AMPS_INVALID_SOCKET;
1565  }
1566  }
1567  }
1568 
1569  ~NoDelay()
1570  {
1571  if (_socket != AMPS_INVALID_SOCKET)
1572  {
1573  _noDelay = 0;
1574  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1575  }
1576  }
1577  };
1578 
1579  friend class Client;
1580  protected:
1581  amps_handle _client;
1582  DisconnectHandler _disconnectHandler;
1583  enum GlobalCommandTypeHandlers : size_t
1584  {
1585  Publish = 0,
1586  SOW = 1,
1587  GroupBegin = 2,
1588  GroupEnd = 3,
1589  Heartbeat = 4,
1590  OOF = 5,
1591  Ack = 6,
1592  LastChance = 7,
1593  DuplicateMessage = 8,
1594  COUNT = 9
1595  };
1596  std::vector<MessageHandler> _globalCommandTypeHandlers;
1597  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1598  MessageRouter _routes;
1599  MessageRouter::RouteCache _routeCache;
1600  mutable Mutex _lock;
1601  std::string _name, _nameHash, _lastUri, _logonCorrelationData, _preflightMessage;
1602  std::vector<std::string> _httpPreflightHeaders;
1603  amps_uint64_t _nameHashValue;
1604  BookmarkStore _bookmarkStore;
1605  Store _publishStore;
1606  bool _isRetryOnDisconnect;
1607  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1608 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1609  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1610 #else
1611  volatile amps_uint64_t _lastSentHaSequenceNumber;
1612 #endif
1613  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1614  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1615  VersionInfo _serverVersion;
1616  Timer _heartbeatTimer;
1617  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1618 
1619  // queue data
1620  int _queueAckTimeout;
1621  bool _isAutoAckEnabled;
1622  unsigned _ackBatchSize;
1623  unsigned _queuedAckCount;
1624  unsigned _defaultMaxDepth;
1625  struct QueueBookmarks
1626  {
1627  QueueBookmarks(const std::string& topic_)
1628  : _topic(topic_)
1629  , _oldestTime(0)
1630  , _bookmarkCount(0)
1631  {;}
1632  std::string _topic;
1633  std::string _data;
1634  amps_uint64_t _oldestTime;
1635  unsigned _bookmarkCount;
1636  };
1637  typedef amps_uint64_t topic_hash;
1638  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1639  TopicHashMap _topicHashMap;
1640 
1641  class ClientStoreReplayer : public StoreReplayer
1642  {
1643  ClientImpl* _client;
1644  public:
1645  unsigned _version;
1646  amps_result _res;
1647 
1648  ClientStoreReplayer()
1649  : _client(NULL), _version(0), _res(AMPS_E_OK)
1650  {}
1651 
1652  ClientStoreReplayer(ClientImpl* client_)
1653  : _client(client_), _version(0), _res(AMPS_E_OK)
1654  {}
1655 
1656  void setClient(ClientImpl* client_)
1657  {
1658  _client = client_;
1659  }
1660 
1661  void execute(Message& message_)
1662  {
1663  if (!_client)
1664  {
1665  throw CommandException("Can't replay without a client.");
1666  }
1667  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1668  AMPS_Sequence);
1669  if (index > _client->_lastSentHaSequenceNumber)
1670  {
1671  _client->_lastSentHaSequenceNumber = index;
1672  }
1673 
1674  _res = AMPS_E_OK;
1675  // Don't replay a queue cancel message after a reconnect.
1676  // Currently, the only messages that will have anything in options
1677  // are cancel messages.
1678  if (!message_.getCommand().empty() &&
1679  (!_client->_logonInProgress ||
1680  message_.getOptions().len() < 6))
1681  {
1682  _res = amps_client_send_with_version(_client->_client,
1683  message_.getMessage(),
1684  &_version);
1685  if (_res != AMPS_E_OK)
1686  {
1687  throw DisconnectedException("AMPS Server disconnected during replay");
1688  }
1689  }
1690  }
1691 
1692  };
1693  ClientStoreReplayer _replayer;
1694 
1695  class FailedWriteStoreReplayer : public StoreReplayer
1696  {
1697  ClientImpl* _parent;
1698  const char* _reason;
1699  size_t _reasonLength;
1700  size_t _replayCount;
1701  public:
1702  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1703  : _parent(parent),
1704  _reason(reason_),
1705  _reasonLength(reasonLength_),
1706  _replayCount(0)
1707  {;}
1708  void execute(Message& message_)
1709  {
1710  if (_parent->_failedWriteHandler)
1711  {
1712  ++_replayCount;
1713  _parent->_failedWriteHandler->failedWrite(message_,
1714  _reason, _reasonLength);
1715  }
1716  }
1717  size_t replayCount(void) const
1718  {
1719  return _replayCount;
1720  }
1721  };
1722 
1723  struct AckResponseImpl : public RefBody
1724  {
1725  std::string username, password, reason, status, bookmark, options;
1726  amps_uint64_t sequenceNo;
1727  amps_uint64_t nameHashValue;
1728  VersionInfo serverVersion;
1729 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1730  std::atomic<bool> responded;
1731  std::atomic<bool> abandoned;
1732 #else
1733  volatile bool responded;
1734  volatile bool abandoned;
1735 #endif
1736  unsigned connectionVersion;
1737  AckResponseImpl() :
1738  RefBody(),
1739  username(), password(), reason(), status(), bookmark(), options(),
1740  sequenceNo((amps_uint64_t)0),
1741  serverVersion(),
1742  responded(false),
1743  abandoned(false),
1744  connectionVersion(UINT_MAX) // Don't abandon if unsent AC-1329
1745  {
1746  }
1747  };
1748 
1749  class AckResponse
1750  {
1751  RefHandle<AckResponseImpl> _body;
1752  public:
1753  AckResponse() : _body(NULL) {;}
1754  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1755  static AckResponse create()
1756  {
1757  AckResponse r;
1758  r._body = new AckResponseImpl();
1759  return r;
1760  }
1761 
1762  const std::string& username()
1763  {
1764  return _body.get().username;
1765  }
1766  void setUsername(const char* data_, size_t len_)
1767  {
1768  if (data_)
1769  {
1770  _body.get().username.assign(data_, len_);
1771  }
1772  else
1773  {
1774  _body.get().username.clear();
1775  }
1776  }
1777  const std::string& password()
1778  {
1779  return _body.get().password;
1780  }
1781  void setPassword(const char* data_, size_t len_)
1782  {
1783  if (data_)
1784  {
1785  _body.get().password.assign(data_, len_);
1786  }
1787  else
1788  {
1789  _body.get().password.clear();
1790  }
1791  }
1792  const std::string& reason()
1793  {
1794  return _body.get().reason;
1795  }
1796  void setReason(const char* data_, size_t len_)
1797  {
1798  if (data_)
1799  {
1800  _body.get().reason.assign(data_, len_);
1801  }
1802  else
1803  {
1804  _body.get().reason.clear();
1805  }
1806  }
1807  const std::string& status()
1808  {
1809  return _body.get().status;
1810  }
1811  void setStatus(const char* data_, size_t len_)
1812  {
1813  if (data_)
1814  {
1815  _body.get().status.assign(data_, len_);
1816  }
1817  else
1818  {
1819  _body.get().status.clear();
1820  }
1821  }
1822  const std::string& bookmark()
1823  {
1824  return _body.get().bookmark;
1825  }
1826  void setBookmark(const Field& bookmark_)
1827  {
1828  if (!bookmark_.empty())
1829  {
1830  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1831  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1832  _body.get().sequenceNo);
1833  }
1834  else
1835  {
1836  _body.get().bookmark.clear();
1837  _body.get().sequenceNo = (amps_uint64_t)0;
1838  _body.get().nameHashValue = (amps_uint64_t)0;
1839  }
1840  }
1841  amps_uint64_t sequenceNo() const
1842  {
1843  return _body.get().sequenceNo;
1844  }
1845  amps_uint64_t nameHashValue() const
1846  {
1847  return _body.get().nameHashValue;
1848  }
1849  void setSequenceNo(const char* data_, size_t len_)
1850  {
1851  amps_uint64_t result = (amps_uint64_t)0;
1852  if (data_)
1853  {
1854  for (size_t i = 0; i < len_; ++i)
1855  {
1856  result *= (amps_uint64_t)10;
1857  result += (amps_uint64_t)(data_[i] - '0');
1858  }
1859  }
1860  _body.get().sequenceNo = result;
1861  }
1862  VersionInfo serverVersion() const
1863  {
1864  return _body.get().serverVersion;
1865  }
1866  void setServerVersion(const char* data_, size_t len_)
1867  {
1868  if (data_)
1869  {
1870  _body.get().serverVersion.setVersion(std::string(data_, len_));
1871  }
1872  }
1873  bool responded()
1874  {
1875  return _body.get().responded;
1876  }
1877  void setResponded()
1878  {
1879  _body.get().responded = true;
1880  }
1881  bool abandoned()
1882  {
1883  return _body.get().abandoned;
1884  }
1885  void setAbandoned()
1886  {
1887  if (_body.isValid())
1888  {
1889  _body.get().abandoned = true;
1890  }
1891  }
1892 
1893  void setConnectionVersion(unsigned connectionVersion)
1894  {
1895  _body.get().connectionVersion = connectionVersion;
1896  }
1897 
1898  unsigned getConnectionVersion()
1899  {
1900  return _body.get().connectionVersion;
1901  }
1902  void setOptions(const char* data_, size_t len_)
1903  {
1904  if (data_)
1905  {
1906  _body.get().options.assign(data_, len_);
1907  }
1908  else
1909  {
1910  _body.get().options.clear();
1911  }
1912  }
1913 
1914  const std::string& options()
1915  {
1916  return _body.get().options;
1917  }
1918 
1919  AckResponse& operator=(const AckResponse& rhs)
1920  {
1921  _body = rhs._body;
1922  return *this;
1923  }
1924  };
1925 
1926 
1927  typedef std::map<std::string, AckResponse> AckMap;
1928  AckMap _ackMap;
1929  Mutex _ackMapLock;
1930  DefaultExceptionListener _defaultExceptionListener;
1931  protected:
1932 
1933  struct DeferredExecutionRequest
1934  {
1935  DeferredExecutionRequest(DeferredExecutionFunc func_,
1936  void* userData_)
1937  : _func(func_),
1938  _userData(userData_)
1939  {;}
1940 
1941  DeferredExecutionFunc _func;
1942  void* _userData;
1943  };
1944  const ExceptionListener* _exceptionListener;
1945  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1946  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1947  volatile bool _connected;
1948  std::string _username;
1949  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1950  ConnectionStateListeners _connectionStateListeners;
1951  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1952  Mutex _deferredExecutionLock;
1953  DeferredExecutionList _deferredExecutionList;
1954  unsigned _heartbeatInterval;
1955  unsigned _readTimeout;
1956 
1957  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1958  {
1959  // If we disconnected before we got to notification, don't notify.
1960  // This should only be able to happen for Resubscribed, since the lock
1961  // is released to let the subscription manager run resubscribe so a
1962  // disconnect could be called before the change is broadcast.
1963  if (!_connected && newState_ > ConnectionStateListener::Connected)
1964  {
1965  return;
1966  }
1967  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1968  {
1969  AMPS_CALL_EXCEPTION_WRAPPER(
1970  (*it)->connectionStateChanged(newState_));
1971  }
1972  }
1973  unsigned processedAck(Message& message);
1974  unsigned persistedAck(Message& meesage);
1975  void lastChance(Message& message);
1976  void checkAndSendHeartbeat(bool force = false);
1977  virtual ConnectionInfo getConnectionInfo() const;
1978  static amps_result
1979  ClientImplMessageHandler(amps_handle message, void* userData);
1980  static void
1981  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1982  static amps_result
1983  ClientImplDisconnectHandler(amps_handle client, void* userData);
1984  static const char*
1985  ClientImplGetHttpPreflightMessage(void* userData);
1986 
1987  void unsubscribeInternal(const std::string& id)
1988  {
1989  if (id.empty())
1990  {
1991  return;
1992  }
1993  // remove the handler first to avoid any more message delivery
1994  Message::Field subId;
1995  subId.assign(id.data(), id.length());
1996  _routes.removeRoute(subId);
1997  // Lock is already acquired
1998  if (_subscriptionManager)
1999  {
2000  // Have to unlock before calling into sub manager to avoid deadlock
2001  Unlock<Mutex> unlock(_lock);
2002  _subscriptionManager->unsubscribe(subId);
2003  }
2004  _message.reset();
2005  _message.setCommandEnum(Message::Command::Unsubscribe);
2006  _message.newCommandId();
2007  _message.setSubscriptionId(id);
2008  _sendWithoutRetry(_message);
2009  deferredExecution(&amps_noOpFn, NULL);
2010  }
2011 
2012  AckResponse syncAckProcessing(long timeout_, Message& message_,
2013  bool isHASubscribe_)
2014  {
2015  return syncAckProcessing(timeout_, message_,
2016  (amps_uint64_t)0, isHASubscribe_);
2017  }
2018 
2019  AckResponse syncAckProcessing(long timeout_, Message& message_,
2020  amps_uint64_t haSeq = (amps_uint64_t)0,
2021  bool isHASubscribe_ = false)
2022  {
2023  // inv: we already have _lock locked up.
2024  AckResponse ack = AckResponse::create();
2025  if (1)
2026  {
2027  Lock<Mutex> guard(_ackMapLock);
2028  _ackMap[message_.getCommandId()] = ack;
2029  }
2030  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
2031  if (ack.getConnectionVersion() == 0)
2032  {
2033  // Send failed
2034  throw DisconnectedException("Connection closed while waiting for response.");
2035  }
2036  bool timedOut = false;
2037  AMPS_START_TIMER(timeout_)
2038  while (!timedOut && !ack.responded() && !ack.abandoned())
2039  {
2040  if (timeout_)
2041  {
2042  timedOut = !_lock.wait(timeout_);
2043  // May have woken up early, check real time
2044  if (timedOut)
2045  {
2046  AMPS_RESET_TIMER(timedOut, timeout_);
2047  }
2048  }
2049  else
2050  {
2051  // Using a timeout version to ensure python can interrupt
2052  _lock.wait(1000);
2053  Unlock<Mutex> unlck(_lock);
2054  amps_invoke_waiting_function();
2055  }
2056  }
2057  if (ack.responded())
2058  {
2059  if (ack.status() != "failure")
2060  {
2061  if (message_.getCommand() == "logon")
2062  {
2063  amps_uint64_t ackSequence = ack.sequenceNo();
2064  if (_lastSentHaSequenceNumber < ackSequence)
2065  {
2066  _lastSentHaSequenceNumber = ackSequence;
2067  }
2068  if (_publishStore.isValid())
2069  {
2070  // If this throws, logon will fail and eitehr be
2071  // handled in HAClient/ServerChooser or by the caller
2072  // of logon.
2073  _publishStore.discardUpTo(ackSequence);
2074  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2075  {
2076  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2077  }
2078  }
2079  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2080  _nameHashValue = ack.nameHashValue();
2081  _serverVersion = ack.serverVersion();
2082  if (_bookmarkStore.isValid())
2083  {
2084  _bookmarkStore.setServerVersion(_serverVersion);
2085  }
2086  }
2087  if (_ackBatchSize)
2088  {
2089  const std::string& options = ack.options();
2090  size_t index = options.find_first_of("max_backlog=");
2091  if (index != std::string::npos)
2092  {
2093  unsigned data = 0;
2094  const char* c = options.c_str() + index + 12;
2095  while (*c && *c != ',')
2096  {
2097  data = (data * 10) + (unsigned)(*c++ -48);
2098  }
2099  if (_ackBatchSize > data)
2100  {
2101  _ackBatchSize = data;
2102  }
2103  }
2104  }
2105  return ack;
2106  }
2107  const size_t NotEntitled = 12;
2108  std::string ackReason = ack.reason();
2109  if (ackReason.length() == 0)
2110  {
2111  return ack; // none
2112  }
2113  if (ackReason.length() == NotEntitled &&
2114  ackReason[0] == 'n' &&
2115  message_.getUserId().len() == 0)
2116  {
2117  message_.assignUserId(_username);
2118  }
2119  message_.throwFor(_client, ackReason);
2120  }
2121  else // !ack.responded()
2122  {
2123  if (!ack.abandoned())
2124  {
2125  throw TimedOutException("timed out waiting for operation.");
2126  }
2127  else
2128  {
2129  throw DisconnectedException("Connection closed while waiting for response.");
2130  }
2131  }
2132  return ack;
2133  }
2134 
2135  void _cleanup(void)
2136  {
2137  if (!_client)
2138  {
2139  return;
2140  }
2141  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2142  amps_client_set_disconnect_handler(_client, NULL, 0L);
2143  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2144  _pEmptyMessageStream.reset(NULL);
2145  amps_client_destroy(_client);
2146  _client = NULL;
2147  }
2148 
2149  public:
2150 
2151  ClientImpl(const std::string& clientName)
2152  : _client(NULL), _name(clientName)
2153  , _isRetryOnDisconnect(true)
2154  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2155  , _badTimeToHASubscribe(0), _serverVersion()
2156  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2157  , _isAutoAckEnabled(false)
2158  , _ackBatchSize(0)
2159  , _queuedAckCount(0)
2160  , _defaultMaxDepth(0)
2161  , _connected(false)
2162  , _heartbeatInterval(0)
2163  , _readTimeout(0)
2164  {
2165  _replayer.setClient(this);
2166  _client = amps_client_create(clientName.c_str());
2168  (amps_handler)ClientImpl::ClientImplMessageHandler,
2169  this);
2171  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2172  this);
2174  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2175  this);
2177  ClientImpl::ClientImplGetHttpPreflightMessage,
2178  this);
2179  _exceptionListener = &_defaultExceptionListener;
2180  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2181  {
2182 #ifdef AMPS_USE_EMPLACE
2183  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2184 #else
2185  _globalCommandTypeHandlers.push_back(MessageHandler());
2186 #endif
2187  }
2188  }
2189 
2190  virtual ~ClientImpl()
2191  {
2192  _cleanup();
2193  }
2194 
2195  const std::string& getName() const
2196  {
2197  return _name;
2198  }
2199 
2200  const std::string& getNameHash() const
2201  {
2202  return _nameHash;
2203  }
2204 
2205  const amps_uint64_t getNameHashValue() const
2206  {
2207  return _nameHashValue;
2208  }
2209 
2210  void setName(const std::string& name)
2211  {
2212  // This operation will fail if the client's
2213  // name is already set.
2214  amps_result result = amps_client_set_name(_client, name.c_str());
2215  if (result != AMPS_E_OK)
2216  {
2217  AMPSException::throwFor(_client, result);
2218  }
2219  _name = name;
2220  }
2221 
2222  const std::string& getLogonCorrelationData() const
2223  {
2224  return _logonCorrelationData;
2225  }
2226 
2227  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2228  {
2229  _logonCorrelationData = logonCorrelationData_;
2230  }
2231 
2232  size_t getServerVersion() const
2233  {
2234  return _serverVersion.getOldStyleVersion();
2235  }
2236 
2237  VersionInfo getServerVersionInfo() const
2238  {
2239  return _serverVersion;
2240  }
2241 
2242  const std::string& getURI() const
2243  {
2244  return _lastUri;
2245  }
2246 
2247  virtual void connect(const std::string& uri)
2248  {
2249  Lock<Mutex> l(_lock);
2250  _connect(uri);
2251  }
2252 
2253  virtual void _connect(const std::string& uri)
2254  {
2255  _lastUri = uri;
2256  amps_result result = amps_client_connect(_client, uri.c_str());
2257  if (result != AMPS_E_OK)
2258  {
2259  AMPSException::throwFor(_client, result);
2260  }
2261  _message.reset();
2262  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2263  _publishMessage.setCommandEnum(Message::Command::Publish);
2264  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2265  _beatMessage.setOptions("beat");
2266  _readMessage.setClientImpl(this);
2267  if (_queueAckTimeout)
2268  {
2269  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2270  if (result != AMPS_E_OK)
2271  {
2272  AMPSException::throwFor(_client, result);
2273  }
2274  }
2275  _connected = true;
2276  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2277  }
2278 
2279  void addHttpPreflightHeader(const std::string& header_)
2280  {
2281  _httpPreflightHeaders.push_back(header_);
2282  }
2283 
2284  void addHttpPreflightHeader(const std::string& key_, const std::string& value_)
2285  {
2286  _httpPreflightHeaders.push_back(key_ + std::string(": ") + value_);
2287  }
2288 
2289  void clearHttpPreflightHeaders()
2290  {
2291  _httpPreflightHeaders.clear();
2292  }
2293 
2294  template<class T>
2295  void setHttpPreflightHeaders(const T& headers_)
2296  {
2297  _httpPreflightHeaders.clear();
2298  for (typename T::const_iterator i = headers_.begin(); i != headers_.end(); ++i)
2299  {
2300  _httpPreflightHeaders.push_back(*i);
2301  }
2302  }
2303 
2304  void setDisconnected()
2305  {
2306  {
2307  Lock<Mutex> l(_lock);
2308  if (_connected)
2309  {
2310  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2311  }
2312  _connected = false;
2313  _heartbeatTimer.setTimeout(0.0);
2314  // AC-1189 AC-1329 AC-1337 We need acks cleared while lock is held,
2315  // but not for unsent commands.
2316  clearAcks(UINT_MAX-1);
2317  }
2318  amps_client_disconnect(_client);
2319  _routes.clear();
2320  }
2321 
2322  virtual void disconnect()
2323  {
2324  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2325  setDisconnected();
2326  // Abandon all acks, sent and unsent
2327  clearAcks(UINT_MAX);
2328  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2329  Lock<Mutex> l(_lock);
2330  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2331  }
2332 
2333  void clearAcks(unsigned failedVersion)
2334  {
2335  // Have to lock to prevent race conditions
2336  Lock<Mutex> guard(_ackMapLock);
2337  {
2338  // Go ahead and signal any waiters if they are around...
2339  std::vector<std::string> worklist;
2340  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2341  {
2342  if (i->second.getConnectionVersion() <= failedVersion)
2343  {
2344  i->second.setAbandoned();
2345  worklist.push_back(i->first);
2346  }
2347  }
2348 
2349  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2350  {
2351  _ackMap.erase(*j);
2352  }
2353  }
2354 
2355  _lock.signalAll();
2356  }
2357 
2358  int send(const Message& message)
2359  {
2360  Lock<Mutex> l(_lock);
2361  return _send(message);
2362  }
2363 
2364  void sendWithoutRetry(const Message& message_)
2365  {
2366  Lock<Mutex> l(_lock);
2367  // If we got here while logon was in progress, then we tried to send
2368  // while we were disconnected so throw DisconnectedException
2369  if (_logonInProgress)
2370  {
2371  throw DisconnectedException("The client has been disconnected.");
2372  }
2373  _sendWithoutRetry(message_);
2374  }
2375 
2376  void _sendWithoutRetry(const Message& message_)
2377  {
2378  amps_result result = amps_client_send(_client, message_.getMessage());
2379  if (result != AMPS_E_OK)
2380  {
2381  AMPSException::throwFor(_client, result);
2382  }
2383  }
2384 
2385  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2386  bool isHASubscribe_ = false)
2387  {
2388  // Lock is already acquired
2389  amps_result result = AMPS_E_RETRY;
2390 
2391  // Create a local reference to this message, as we'll need to hold on
2392  // to a reference to it in case reconnect occurs.
2393  Message localMessage = message;
2394  unsigned version = 0;
2395 
2396  while (result == AMPS_E_RETRY)
2397  {
2398  if (haSeq && _logonInProgress)
2399  {
2400  // If retrySend is disabled, do not wait for the reconnect
2401  // to finish, just throw.
2402  if (!_isRetryOnDisconnect)
2403  {
2404  AMPSException::throwFor(_client, AMPS_E_RETRY);
2405  }
2406  if (!_lock.wait(1000))
2407  {
2408  amps_invoke_waiting_function();
2409  }
2410  }
2411  else
2412  {
2413  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2414  (isHASubscribe_ && _badTimeToHASubscribe))
2415  {
2416  return (int)version;
2417  }
2418  // It's possible to get here out of order, but this way we'll
2419  // always send in order.
2420  if (haSeq > _lastSentHaSequenceNumber)
2421  {
2422  while (haSeq > _lastSentHaSequenceNumber + 1)
2423  {
2424  try
2425  {
2426  // Replayer updates _lastSentHaSsequenceNumber
2427  if (!_publishStore.replaySingle(_replayer,
2428  _lastSentHaSequenceNumber + 1))
2429  {
2430  //++_lastSentHaSequenceNumber;
2431  continue;
2432  }
2433  result = AMPS_E_OK;
2434  version = _replayer._version;
2435  }
2436 #ifdef _WIN32
2437  catch (const DisconnectedException&)
2438 #else
2439  catch (const DisconnectedException& e)
2440 #endif
2441  {
2442  result = _replayer._res;
2443  break;
2444  }
2445  }
2446  result = amps_client_send_with_version(_client,
2447  localMessage.getMessage(),
2448  &version);
2449  ++_lastSentHaSequenceNumber;
2450  }
2451  else
2452  {
2453  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2454  {
2455  while (_logonInProgress)
2456  {
2457  if (!_lock.wait(1000))
2458  {
2459  amps_invoke_waiting_function();
2460  }
2461  }
2462  }
2463  result = amps_client_send_with_version(_client,
2464  localMessage.getMessage(),
2465  &version);
2466  }
2467  if (result != AMPS_E_OK)
2468  {
2469  if (!isHASubscribe_ && !haSeq &&
2470  localMessage.getMessage() == message.getMessage())
2471  {
2472  localMessage = message.deepCopy();
2473  }
2474  if (_isRetryOnDisconnect)
2475  {
2476  Unlock<Mutex> u(_lock);
2477  result = amps_client_attempt_reconnect(_client, version);
2478  // If this is an HA publish or subscribe command, it was
2479  // stored first and will have already been replayed by the
2480  // store or sub manager after reconnect, so just return.
2481  if ((isHASubscribe_ || haSeq) &&
2482  result == AMPS_E_RETRY)
2483  {
2484  return (int)version;
2485  }
2486  }
2487  else
2488  {
2489  // retrySend is disabled so throw the error
2490  // from the send as an exception, do not retry.
2491  AMPSException::throwFor(_client, result);
2492  }
2493  }
2494  }
2495  if (result == AMPS_E_RETRY)
2496  {
2497  amps_invoke_waiting_function();
2498  }
2499  }
2500 
2501  if (result != AMPS_E_OK)
2502  {
2503  AMPSException::throwFor(_client, result);
2504  }
2505  return (int)version;
2506  }
2507 
2508  void addMessageHandler(const Field& commandId_,
2509  const AMPS::MessageHandler& messageHandler_,
2510  unsigned requestedAcks_, Message::Command::Type commandType_)
2511  {
2512  Lock<Mutex> lock(_lock);
2513  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2514  0, commandType_);
2515  }
2516 
2517  bool removeMessageHandler(const Field& commandId_)
2518  {
2519  Lock<Mutex> lock(_lock);
2520  return _routes.removeRoute(commandId_);
2521  }
2522 
2523  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2524  {
2525  Field id = message_.getCommandId();
2526  Field subId = message_.getSubscriptionId();
2527  Field qid = message_.getQueryId();
2528  bool isSubscribeOnly = false;
2529  bool replace = false;
2530  unsigned requestedAcks = message_.getAckTypeEnum();
2531  unsigned systemAddedAcks = Message::AckType::None;
2532  Message::Command::Type commandType = message_.getCommandEnum();
2533 
2534  switch (commandType)
2535  {
2536  case Message::Command::Subscribe:
2537  case Message::Command::DeltaSubscribe:
2538  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2539  isSubscribeOnly = true;
2540  // fall through
2541  case Message::Command::SOWAndSubscribe:
2542  case Message::Command::SOWAndDeltaSubscribe:
2543  if (id.empty())
2544  {
2545  id = message_.newCommandId().getCommandId();
2546  }
2547  else
2548  {
2549  while (!replace && id != subId && _routes.hasRoute(id))
2550  {
2551  id = message_.newCommandId().getCommandId();
2552  }
2553  }
2554  if (subId.empty())
2555  {
2556  message_.setSubscriptionId(id);
2557  subId = id;
2558  }
2559  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2560  {
2561  systemAddedAcks |= Message::AckType::Persisted;
2562  }
2563  // fall through
2564  case Message::Command::SOW:
2565  if (id.empty())
2566  {
2567  id = message_.newCommandId().getCommandId();
2568  }
2569  else
2570  {
2571  while (!replace && id != subId && _routes.hasRoute(id))
2572  {
2573  message_.newCommandId();
2574  if (qid == id)
2575  {
2576  qid = message_.getCommandId();
2577  message_.setQueryId(qid);
2578  }
2579  id = message_.getCommandId();
2580  }
2581  }
2582  if (!isSubscribeOnly)
2583  {
2584  if (qid.empty())
2585  {
2586  message_.setQueryID(id);
2587  qid = id;
2588  }
2589  else
2590  {
2591  while (!replace && qid != subId && qid != id
2592  && _routes.hasRoute(qid))
2593  {
2594  qid = message_.newQueryId().getQueryId();
2595  }
2596  }
2597  }
2598  systemAddedAcks |= Message::AckType::Processed;
2599  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2600  {
2601  int routesAdded = 0;
2602  Lock<Mutex> l(_lock);
2603  if (!subId.empty() && messageHandler_.isValid())
2604  {
2605  if (!_routes.hasRoute(subId))
2606  {
2607  ++routesAdded;
2608  }
2609  // This can replace a non-subscribe with a matching id
2610  // with a subscription but not another subscription.
2611  _routes.addRoute(subId, messageHandler_, requestedAcks,
2612  systemAddedAcks, commandType);
2613  }
2614  if (!isSubscribeOnly && !qid.empty()
2615  && messageHandler_.isValid() && qid != subId)
2616  {
2617  if (routesAdded == 0)
2618  {
2619  _routes.addRoute(qid, messageHandler_,
2620  requestedAcks, systemAddedAcks, commandType);
2621  }
2622  else
2623  {
2624  void* data = NULL;
2625  {
2626  Unlock<Mutex> u(_lock);
2627  data = amps_invoke_copy_route_function(
2628  messageHandler_.userData());
2629  }
2630  if (!data)
2631  {
2632  _routes.addRoute(qid, messageHandler_, requestedAcks,
2633  systemAddedAcks, commandType);
2634  }
2635  else
2636  {
2637  _routes.addRoute(qid,
2638  MessageHandler(messageHandler_.function(),
2639  data),
2640  requestedAcks, systemAddedAcks, commandType);
2641  }
2642  }
2643  ++routesAdded;
2644  }
2645  if (!id.empty() && messageHandler_.isValid()
2646  && requestedAcks & ~Message::AckType::Persisted
2647  && id != subId && id != qid)
2648  {
2649  if (routesAdded == 0)
2650  {
2651  _routes.addRoute(id, messageHandler_, requestedAcks,
2652  systemAddedAcks, commandType);
2653  }
2654  else
2655  {
2656  void* data = NULL;
2657  {
2658  Unlock<Mutex> u(_lock);
2659  data = amps_invoke_copy_route_function(
2660  messageHandler_.userData());
2661  }
2662  if (!data)
2663  {
2664  _routes.addRoute(id, messageHandler_, requestedAcks,
2665  systemAddedAcks, commandType);
2666  }
2667  else
2668  {
2669  _routes.addRoute(id,
2670  MessageHandler(messageHandler_.function(),
2671  data),
2672  requestedAcks,
2673  systemAddedAcks, commandType);
2674  }
2675  }
2676  ++routesAdded;
2677  }
2678  try
2679  {
2680  // We aren't adding to subscription manager, so this isn't
2681  // an HA subscribe.
2682  syncAckProcessing(timeout_, message_, 0, false);
2683  message_.setAckTypeEnum(requestedAcks);
2684  }
2685  catch (...)
2686  {
2687  _routes.removeRoute(message_.getQueryID());
2688  _routes.removeRoute(message_.getSubscriptionId());
2689  _routes.removeRoute(id);
2690  message_.setAckTypeEnum(requestedAcks);
2691  throw;
2692  }
2693  }
2694  break;
2695  // These are valid commands that are used as-is
2696  case Message::Command::Unsubscribe:
2697  case Message::Command::Heartbeat:
2698  case Message::Command::Logon:
2699  case Message::Command::StartTimer:
2700  case Message::Command::StopTimer:
2701  case Message::Command::SOWDelete:
2702  {
2703  Lock<Mutex> l(_lock);
2704  // if an ack is requested, it'll need a command ID.
2705  if (message_.getAckTypeEnum() != Message::AckType::None)
2706  {
2707  if (id.empty())
2708  {
2709  message_.newCommandId();
2710  id = message_.getCommandId();
2711  }
2712  if (messageHandler_.isValid())
2713  {
2714  _routes.addRoute(id, messageHandler_, requestedAcks,
2715  Message::AckType::None, commandType);
2716  }
2717  }
2718  _send(message_);
2719  }
2720  break;
2721  case Message::Command::DeltaPublish:
2722  case Message::Command::Publish:
2723  {
2724  bool useSync = message_.getFilter().len() > 0;
2725  Lock<Mutex> l(_lock);
2726  // if an ack is requested, it'll need a command ID.
2727  unsigned ackType = message_.getAckTypeEnum();
2728  if (ackType != Message::AckType::None
2729  || useSync)
2730  {
2731  if (id.empty())
2732  {
2733  message_.newCommandId();
2734  id = message_.getCommandId();
2735  }
2736  if (messageHandler_.isValid())
2737  {
2738  _routes.addRoute(id, messageHandler_, requestedAcks,
2739  Message::AckType::None, commandType);
2740  }
2741  }
2742  if (useSync)
2743  {
2744  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2745  syncAckProcessing(timeout_, message_, 0, false);
2746  }
2747  else
2748  {
2749  _send(message_);
2750  }
2751  }
2752  break;
2753  // These are things that shouldn't be sent (not meaningful)
2754  case Message::Command::GroupBegin:
2755  case Message::Command::GroupEnd:
2756  case Message::Command::OOF:
2757  case Message::Command::Ack:
2758  case Message::Command::Unknown:
2759  default:
2760  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2761  }
2762  message_.setAckTypeEnum(requestedAcks);
2763  return id;
2764  }
2765 
2766  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2767  {
2768  Lock<Mutex> l(_lock);
2769  _disconnectHandler = disconnectHandler;
2770  }
2771 
2772  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2773  {
2774  switch (command_[0])
2775  {
2776 #if 0 // Not currently implemented to avoid an extra branch in delivery
2777  case 'p':
2778  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2779  break;
2780  case 's':
2781  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2782  break;
2783 #endif
2784  case 'h':
2785  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2786  break;
2787 #if 0 // Not currently implemented to avoid an extra branch in delivery
2788  case 'g':
2789  if (command_[6] == 'b')
2790  {
2791  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2792  }
2793  else if (command_[6] == 'e')
2794  {
2795  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2796  }
2797  else
2798  {
2799  std::ostringstream os;
2800  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2801  throw CommandException(os.str());
2802  }
2803  break;
2804  case 'o':
2805  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2806  break;
2807 #endif
2808  case 'a':
2809  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2810  break;
2811  case 'l':
2812  case 'L':
2813  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2814  break;
2815  case 'd':
2816  case 'D':
2817  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2818  break;
2819  default:
2820  std::ostringstream os;
2821  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2822  throw CommandException(os.str());
2823  break;
2824  }
2825  }
2826 
2827  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2828  {
2829  switch (command_)
2830  {
2831 #if 0 // Not currently implemented to avoid an extra branch in delivery
2832  case Message::Command::Publish:
2833  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2834  break;
2835  case Message::Command::SOW:
2836  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2837  break;
2838 #endif
2839  case Message::Command::Heartbeat:
2840  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2841  break;
2842 #if 0 // Not currently implemented to avoid an extra branch in delivery
2843  case Message::Command::GroupBegin:
2844  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2845  break;
2846  case Message::Command::GroupEnd:
2847  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2848  break;
2849  case Message::Command::OOF:
2850  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2851  break;
2852 #endif
2853  case Message::Command::Ack:
2854  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2855  break;
2856  default:
2857  unsigned bits = 0;
2858  unsigned command = command_;
2859  while (command > 0)
2860  {
2861  ++bits;
2862  command >>= 1;
2863  }
2864  char errBuf[128];
2865  AMPS_snprintf(errBuf, sizeof(errBuf),
2866  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2867  CommandConstants<0>::Lengths[bits],
2868  CommandConstants<0>::Values[bits]);
2869  throw CommandException(errBuf);
2870  break;
2871  }
2872  }
2873 
2874  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2875  {
2876  _globalCommandTypeHandlers[handlerType_] = handler_;
2877  }
2878 
2879  void setFailedWriteHandler(FailedWriteHandler* handler_)
2880  {
2881  Lock<Mutex> l(_lock);
2882  _failedWriteHandler.reset(handler_);
2883  }
2884 
2885  void setPublishStore(const Store& publishStore_)
2886  {
2887  Lock<Mutex> l(_lock);
2888  if (_connected)
2889  {
2890  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2891  }
2892  _publishStore = publishStore_;
2893  }
2894 
2895  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2896  {
2897  Lock<Mutex> l(_lock);
2898  if (_connected)
2899  {
2900  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2901  }
2902  _bookmarkStore = bookmarkStore_;
2903  }
2904 
2905  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2906  {
2907  Lock<Mutex> l(_lock);
2908  _subscriptionManager.reset(subscriptionManager_);
2909  }
2910 
2911  SubscriptionManager* getSubscriptionManager() const
2912  {
2913  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2914  }
2915 
2916  DisconnectHandler getDisconnectHandler() const
2917  {
2918  return _disconnectHandler;
2919  }
2920 
2921  MessageHandler getDuplicateMessageHandler() const
2922  {
2923  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2924  }
2925 
2926  FailedWriteHandler* getFailedWriteHandler() const
2927  {
2928  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2929  }
2930 
2931  Store getPublishStore() const
2932  {
2933  return _publishStore;
2934  }
2935 
2936  BookmarkStore getBookmarkStore() const
2937  {
2938  return _bookmarkStore;
2939  }
2940 
2941  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2942  {
2943  if (!_publishStore.isValid())
2944  {
2945  Lock<Mutex> l(_lock);
2946  _publishMessage.assignTopic(topic_, topicLen_);
2947  _publishMessage.assignData(data_, dataLen_);
2948  _send(_publishMessage);
2949  return 0;
2950  }
2951  else
2952  {
2953  publishStoreMessage.reset();
2954  publishStoreMessage.setCommandEnum(Message::Command::Publish);
2955  return _publish(topic_, topicLen_, data_, dataLen_);
2956  }
2957  }
2958 
2959  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2960  size_t dataLen_, unsigned long expiration_)
2961  {
2962  if (!_publishStore.isValid())
2963  {
2964  Lock<Mutex> l(_lock);
2965  _publishMessage.assignTopic(topic_, topicLen_);
2966  _publishMessage.assignData(data_, dataLen_);
2967  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2968  size_t pos = convertToCharArray(exprBuf, expiration_);
2969  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2970  _send(_publishMessage);
2971  _publishMessage.assignExpiration(NULL, 0);
2972  return 0;
2973  }
2974  else
2975  {
2976  publishStoreMessage.reset();
2977  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2978  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2979  publishStoreMessage.setCommandEnum(Message::Command::Publish)
2980  .assignExpiration(exprBuf + exprPos,
2981  AMPS_NUMBER_BUFFER_LEN - exprPos);
2982  return _publish(topic_, topicLen_, data_, dataLen_);
2983  }
2984  }
2985 
2986  class FlushAckHandler : ConnectionStateListener
2987  {
2988  private:
2989  ClientImpl* _pClient;
2990  Field _cmdId;
2991 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2992  std::atomic<bool> _acked;
2993  std::atomic<bool> _disconnected;
2994 #else
2995  volatile bool _acked;
2996  volatile bool _disconnected;
2997 #endif
2998  public:
2999  FlushAckHandler(ClientImpl* pClient_)
3000  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
3001  {
3002  pClient_->addConnectionStateListener(this);
3003  }
3004  ~FlushAckHandler()
3005  {
3006  _pClient->removeConnectionStateListener(this);
3007  _pClient->removeMessageHandler(_cmdId);
3008  _cmdId.clear();
3009  }
3010  void setCommandId(const Field& cmdId_)
3011  {
3012  _cmdId.deepCopy(cmdId_);
3013  }
3014  void invoke(const Message&)
3015  {
3016  _acked = true;
3017  }
3018  void connectionStateChanged(State state_)
3019  {
3020  if (state_ <= Shutdown)
3021  {
3022  _disconnected = true;
3023  }
3024  }
3025  bool acked()
3026  {
3027  return _acked;
3028  }
3029  bool done()
3030  {
3031  return _acked || _disconnected;
3032  }
3033  };
3034 
3035  void publishFlush(long timeout_, unsigned ackType_)
3036  {
3037  static const char* processed = "processed";
3038  static const size_t processedLen = strlen(processed);
3039  static const char* persisted = "persisted";
3040  static const size_t persistedLen = strlen(persisted);
3041  static const char* flush = "flush";
3042  static const size_t flushLen = strlen(flush);
3043  static VersionInfo minPersisted("5.3.3.0");
3044  static VersionInfo minFlush("4");
3045  if (ackType_ != Message::AckType::Processed
3046  && ackType_ != Message::AckType::Persisted)
3047  {
3048  throw CommandException("Flush can only be used with processed or persisted acks.");
3049  }
3050  FlushAckHandler flushHandler(this);
3051  if (_serverVersion >= minFlush)
3052  {
3053  Lock<Mutex> l(_lock);
3054  if (!_connected)
3055  {
3056  throw DisconnectedException("Not connected trying to flush");
3057  }
3058  _message.reset();
3059  _message.newCommandId();
3060  _message.assignCommand(flush, flushLen);
3061  if (_serverVersion < minPersisted
3062  || ackType_ == Message::AckType::Processed)
3063  {
3064  _message.assignAckType(processed, processedLen);
3065  }
3066  else
3067  {
3068  _message.assignAckType(persisted, persistedLen);
3069  }
3070  flushHandler.setCommandId(_message.getCommandId());
3071  addMessageHandler(_message.getCommandId(),
3072  std::bind(&FlushAckHandler::invoke,
3073  std::ref(flushHandler),
3074  std::placeholders::_1),
3075  ackType_, _message.getCommandEnum());
3076  NoDelay noDelay(_client);
3077  if (_send(_message) == -1)
3078  {
3079  throw DisconnectedException("Disconnected trying to flush");
3080  }
3081  }
3082  if (_publishStore.isValid())
3083  {
3084  try
3085  {
3086  _publishStore.flush(timeout_);
3087  }
3088  catch (const AMPSException& ex)
3089  {
3090  AMPS_UNHANDLED_EXCEPTION(ex);
3091  throw;
3092  }
3093  }
3094  else if (_serverVersion < minFlush)
3095  {
3096  if (timeout_ > 0)
3097  {
3098  AMPS_USLEEP(timeout_ * 1000);
3099  }
3100  else
3101  {
3102  AMPS_USLEEP(1000 * 1000);
3103  }
3104  return;
3105  }
3106  if (timeout_)
3107  {
3108  Timer timer((double)timeout_);
3109  timer.start();
3110  while (!timer.check() && !flushHandler.done())
3111  {
3112  AMPS_USLEEP(10000);
3113  amps_invoke_waiting_function();
3114  }
3115  }
3116  else
3117  {
3118  while (!flushHandler.done())
3119  {
3120  AMPS_USLEEP(10000);
3121  amps_invoke_waiting_function();
3122  }
3123  }
3124  // No response or disconnect in timeout interval
3125  if (!flushHandler.done())
3126  {
3127  throw TimedOutException("Timed out waiting for flush");
3128  }
3129  // We got disconnected and there is no publish store
3130  if (!flushHandler.acked() && !_publishStore.isValid())
3131  {
3132  throw DisconnectedException("Disconnected waiting for flush");
3133  }
3134  }
3135 
3136  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3137  const char* data_, size_t dataLength_)
3138  {
3139  if (!_publishStore.isValid())
3140  {
3141  Lock<Mutex> l(_lock);
3142  _deltaMessage.assignTopic(topic_, topicLength_);
3143  _deltaMessage.assignData(data_, dataLength_);
3144  _send(_deltaMessage);
3145  return 0;
3146  }
3147  else
3148  {
3149  publishStoreMessage.reset();
3150  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish);
3151  return _publish(topic_, topicLength_, data_, dataLength_);
3152  }
3153  }
3154 
3155  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3156  const char* data_, size_t dataLength_,
3157  unsigned long expiration_)
3158  {
3159  if (!_publishStore.isValid())
3160  {
3161  Lock<Mutex> l(_lock);
3162  _deltaMessage.assignTopic(topic_, topicLength_);
3163  _deltaMessage.assignData(data_, dataLength_);
3164  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3165  size_t pos = convertToCharArray(exprBuf, expiration_);
3166  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3167  _send(_deltaMessage);
3168  _deltaMessage.assignExpiration(NULL, 0);
3169  return 0;
3170  }
3171  else
3172  {
3173  publishStoreMessage.reset();
3174  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3175  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3176  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish)
3177  .assignExpiration(exprBuf + exprPos,
3178  AMPS_NUMBER_BUFFER_LEN - exprPos);
3179  return _publish(topic_, topicLength_, data_, dataLength_);
3180  }
3181  }
3182 
3183  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3184  const char* data_, size_t dataLength_)
3185  {
3186  publishStoreMessage.assignTopic(topic_, topicLength_)
3187  .setAckTypeEnum(Message::AckType::Persisted)
3188  .assignData(data_, dataLength_);
3189  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3190  char buf[AMPS_NUMBER_BUFFER_LEN];
3191  size_t pos = convertToCharArray(buf, haSequenceNumber);
3192  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3193  {
3194  Lock<Mutex> l(_lock);
3195  _send(publishStoreMessage, haSequenceNumber);
3196  }
3197  return haSequenceNumber;
3198  }
3199 
3200  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3201  const char* options_ = NULL)
3202  {
3203  Lock<Mutex> l(_lock);
3204  return _logon(timeout_, authenticator_, options_);
3205  }
3206 
3207  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3208  const char* options_ = NULL)
3209  {
3210  _message.reset();
3211  _message.newCommandId();
3212  std::string newCommandId = _message.getCommandId();
3213  _message.setCommandEnum(Message::Command::Logon);
3214  _message.setClientName(_name);
3215 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3216  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3217  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3218 #endif
3219  URI uri(_lastUri);
3220  if (uri.user().size())
3221  {
3222  _message.setUserId(uri.user());
3223  }
3224  if (uri.password().size())
3225  {
3226  _message.setPassword(uri.password());
3227  }
3228  if (uri.protocol() == "amps" && uri.messageType().size())
3229  {
3230  _message.setMessageType(uri.messageType());
3231  }
3232  if (uri.isTrue("pretty"))
3233  {
3234  _message.setOptions("pretty");
3235  }
3236 
3237  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3238  if (!_logonCorrelationData.empty())
3239  {
3240  _message.assignCorrelationId(_logonCorrelationData);
3241  }
3242  if (options_)
3243  {
3244  _message.setOptions(options_);
3245  }
3246  _username = _message.getUserId();
3247  try
3248  {
3249  AtomicFlagFlip pubFlip(&_logonInProgress);
3250  NoDelay noDelay(_client);
3251  while (true)
3252  {
3253  _message.setAckTypeEnum(Message::AckType::Processed);
3254  AckResponse ack = syncAckProcessing(timeout_, _message);
3255  if (ack.status() == "retry")
3256  {
3257  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3258  _username = ack.username();
3259  _message.setUserId(_username);
3260  }
3261  else
3262  {
3263  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3264  break;
3265  }
3266  }
3267  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3268 
3269  // Now re-send the heartbeat command if configured
3270  _sendHeartbeat();
3271  // Signal any threads waiting for _logonInProgress
3272  _lock.signalAll();
3273  }
3274  catch (const AMPSException& ex)
3275  {
3276  {
3277  Unlock<Mutex> u(_lock);
3278  setDisconnected();
3279  }
3280  _lock.signalAll();
3281  AMPS_UNHANDLED_EXCEPTION(ex);
3282  throw;
3283  }
3284  catch (...)
3285  {
3286  {
3287  Unlock<Mutex> u(_lock);
3288  setDisconnected();
3289  }
3290  _lock.signalAll();
3291  throw;
3292  }
3293 
3294  if (_publishStore.isValid())
3295  {
3296  try
3297  {
3298  _publishStore.replay(_replayer);
3299  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3300  }
3301  catch (const PublishStoreGapException& ex)
3302  {
3303  {
3304  Unlock<Mutex> u(_lock);
3305  setDisconnected();
3306  }
3307  _lock.signalAll();
3308  AMPS_UNHANDLED_EXCEPTION(ex);
3309  throw;
3310  }
3311  catch (const StoreException& ex)
3312  {
3313  {
3314  Unlock<Mutex> u(_lock);
3315  setDisconnected();
3316  }
3317  _lock.signalAll();
3318  std::ostringstream os;
3319  os << "A local store exception occurred while logging on."
3320  << ex.toString();
3321  throw ConnectionException(os.str());
3322  }
3323  catch (const AMPSException& ex)
3324  {
3325  {
3326  Unlock<Mutex> u(_lock);
3327  setDisconnected();
3328  }
3329  _lock.signalAll();
3330  AMPS_UNHANDLED_EXCEPTION(ex);
3331  throw;
3332  }
3333  catch (const std::exception& ex)
3334  {
3335  {
3336  Unlock<Mutex> u(_lock);
3337  setDisconnected();
3338  }
3339  _lock.signalAll();
3340  AMPS_UNHANDLED_EXCEPTION(ex);
3341  throw;
3342  }
3343  catch (...)
3344  {
3345  {
3346  Unlock<Mutex> u(_lock);
3347  setDisconnected();
3348  }
3349  _lock.signalAll();
3350  throw;
3351  }
3352  }
3353  _lock.signalAll();
3354  return newCommandId;
3355  }
3356 
3357  std::string subscribe(const MessageHandler& messageHandler_,
3358  const std::string& topic_,
3359  long timeout_,
3360  const std::string& filter_,
3361  const std::string& bookmark_,
3362  const std::string& options_,
3363  const std::string& subId_,
3364  bool isHASubscribe_ = true)
3365  {
3366  isHASubscribe_ &= (bool)_subscriptionManager;
3367  Lock<Mutex> l(_lock);
3368  _message.reset();
3369  _message.setCommandEnum(Message::Command::Subscribe);
3370  _message.newCommandId();
3371  std::string subId(subId_);
3372  if (subId.empty())
3373  {
3374  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3375  {
3376  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3377  }
3378 
3379  subId = _message.getCommandId();
3380  }
3381  _message.setSubscriptionId(subId);
3382  // we need to deep copy this before sending the message; while we are
3383  // waiting for a response, the fields in _message may get blown away for
3384  // other operations.
3385  AMPS::Message::Field subIdField(subId);
3386  unsigned ackTypes = Message::AckType::Processed;
3387 
3388  if (!bookmark_.empty() && _bookmarkStore.isValid())
3389  {
3390  ackTypes |= Message::AckType::Persisted;
3391  }
3392  _message.setTopic(topic_);
3393 
3394  if (filter_.length())
3395  {
3396  _message.setFilter(filter_);
3397  }
3398  if (bookmark_.length())
3399  {
3400  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3401  {
3402  // The returned Field is a deep copy, so use assign to get it cleared
3403  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3404  _message.assignOwnershipBookmark(mostRecent);
3405  }
3406  else
3407  {
3408  _message.setBookmark(bookmark_);
3409  if (_bookmarkStore.isValid())
3410  {
3411  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3412  bookmark_ != AMPS_BOOKMARK_EPOCH)
3413  {
3414  _bookmarkStore.log(_message);
3415  _bookmarkStore.discard(_message);
3416  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3417  }
3418  }
3419  }
3420  }
3421  if (options_.length())
3422  {
3423  _message.setOptions(options_);
3424  }
3425 
3426  Message message = _message;
3427  if (isHASubscribe_)
3428  {
3429  message = _message.deepCopy();
3430  Unlock<Mutex> u(_lock);
3431  _subscriptionManager->subscribe(messageHandler_, message,
3432  Message::AckType::None);
3433  if (_badTimeToHASubscribe)
3434  {
3435  return subId;
3436  }
3437  }
3438  if (!_routes.hasRoute(_message.getSubscriptionId()))
3439  {
3440  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3441  Message::AckType::None, ackTypes, _message.getCommandEnum());
3442  }
3443  message.setAckTypeEnum(ackTypes);
3444  if (!options_.empty())
3445  {
3446  message.setOptions(options_);
3447  }
3448  try
3449  {
3450  syncAckProcessing(timeout_, message, isHASubscribe_);
3451  }
3452  catch (const DisconnectedException&)
3453  {
3454  if (!isHASubscribe_)
3455  {
3456  _routes.removeRoute(subIdField);
3457  throw;
3458  }
3459  else
3460  {
3461  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3462  throw;
3463  }
3464  }
3465  catch (const TimedOutException&)
3466  {
3467  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3468  throw;
3469  }
3470  catch (...)
3471  {
3472  if (isHASubscribe_)
3473  {
3474  // Have to unlock before calling into sub manager to avoid deadlock
3475  Unlock<Mutex> unlock(_lock);
3476  _subscriptionManager->unsubscribe(subIdField);
3477  }
3478  _routes.removeRoute(subIdField);
3479  throw;
3480  }
3481 
3482  return subId;
3483  }
3484  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3485  const std::string& topic_,
3486  long timeout_,
3487  const std::string& filter_,
3488  const std::string& bookmark_,
3489  const std::string& options_,
3490  const std::string& subId_ = "",
3491  bool isHASubscribe_ = true)
3492  {
3493  isHASubscribe_ &= (bool)_subscriptionManager;
3494  Lock<Mutex> l(_lock);
3495  _message.reset();
3496  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3497  _message.newCommandId();
3498  std::string subId(subId_);
3499  if (subId.empty())
3500  {
3501  subId = _message.getCommandId();
3502  }
3503  _message.setSubscriptionId(subId);
3504  // we need to deep copy this before sending the message; while we are
3505  // waiting for a response, the fields in _message may get blown away for
3506  // other operations.
3507  AMPS::Message::Field subIdField(subId);
3508  unsigned ackTypes = Message::AckType::Processed;
3509 
3510  if (!bookmark_.empty() && _bookmarkStore.isValid())
3511  {
3512  ackTypes |= Message::AckType::Persisted;
3513  }
3514  _message.setTopic(topic_);
3515  if (filter_.length())
3516  {
3517  _message.setFilter(filter_);
3518  }
3519  if (bookmark_.length())
3520  {
3521  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3522  {
3523  // The returned Field is a deep copy, so use assign to get it cleared
3524  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3525  _message.assignOwnershipBookmark(mostRecent);
3526  }
3527  else
3528  {
3529  _message.setBookmark(bookmark_);
3530  if (_bookmarkStore.isValid())
3531  {
3532  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3533  bookmark_ != AMPS_BOOKMARK_EPOCH)
3534  {
3535  _bookmarkStore.log(_message);
3536  _bookmarkStore.discard(_message);
3537  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3538  }
3539  }
3540  }
3541  }
3542  if (options_.length())
3543  {
3544  _message.setOptions(options_);
3545  }
3546  Message message = _message;
3547  if (isHASubscribe_)
3548  {
3549  message = _message.deepCopy();
3550  Unlock<Mutex> u(_lock);
3551  _subscriptionManager->subscribe(messageHandler_, message,
3552  Message::AckType::None);
3553  if (_badTimeToHASubscribe)
3554  {
3555  return subId;
3556  }
3557  }
3558  if (!_routes.hasRoute(_message.getSubscriptionId()))
3559  {
3560  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3561  Message::AckType::None, ackTypes, _message.getCommandEnum());
3562  }
3563  message.setAckTypeEnum(ackTypes);
3564  if (!options_.empty())
3565  {
3566  message.setOptions(options_);
3567  }
3568  try
3569  {
3570  syncAckProcessing(timeout_, message, isHASubscribe_);
3571  }
3572  catch (const DisconnectedException&)
3573  {
3574  if (!isHASubscribe_)
3575  {
3576  _routes.removeRoute(subIdField);
3577  throw;
3578  }
3579  }
3580  catch (const TimedOutException&)
3581  {
3582  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3583  throw;
3584  }
3585  catch (...)
3586  {
3587  if (isHASubscribe_)
3588  {
3589  // Have to unlock before calling into sub manager to avoid deadlock
3590  Unlock<Mutex> unlock(_lock);
3591  _subscriptionManager->unsubscribe(subIdField);
3592  }
3593  _routes.removeRoute(subIdField);
3594  throw;
3595  }
3596  return subId;
3597  }
3598 
3599  void unsubscribe(const std::string& id)
3600  {
3601  Lock<Mutex> l(_lock);
3602  unsubscribeInternal(id);
3603  }
3604 
3605  void unsubscribe(void)
3606  {
3607  if (_subscriptionManager)
3608  {
3609  _subscriptionManager->clear();
3610  }
3611  {
3612  _routes.unsubscribeAll();
3613  Lock<Mutex> l(_lock);
3614  _message.reset();
3615  _message.setCommandEnum(Message::Command::Unsubscribe);
3616  _message.newCommandId();
3617  _message.setSubscriptionId("all");
3618  _sendWithoutRetry(_message);
3619  }
3620  deferredExecution(&amps_noOpFn, NULL);
3621  }
3622 
3623  std::string sow(const MessageHandler& messageHandler_,
3624  const std::string& topic_,
3625  const std::string& filter_ = "",
3626  const std::string& orderBy_ = "",
3627  const std::string& bookmark_ = "",
3628  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3629  int topN_ = AMPS_DEFAULT_TOP_N,
3630  const std::string& options_ = "",
3631  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3632  {
3633  Lock<Mutex> l(_lock);
3634  _message.reset();
3635  _message.setCommandEnum(Message::Command::SOW);
3636  _message.newCommandId();
3637  // need to keep our own copy of the command ID.
3638  std::string commandId = _message.getCommandId();
3639  _message.setQueryID(_message.getCommandId());
3640  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3641  _message.setAckTypeEnum(ackTypes);
3642  _message.setTopic(topic_);
3643  if (filter_.length())
3644  {
3645  _message.setFilter(filter_);
3646  }
3647  if (orderBy_.length())
3648  {
3649  _message.setOrderBy(orderBy_);
3650  }
3651  if (bookmark_.length())
3652  {
3653  _message.setBookmark(bookmark_);
3654  }
3655  _message.setBatchSize(AMPS::asString(batchSize_));
3656  if (topN_ != AMPS_DEFAULT_TOP_N)
3657  {
3658  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3659  }
3660  if (options_.length())
3661  {
3662  _message.setOptions(options_);
3663  }
3664 
3665  _routes.addRoute(_message.getQueryID(), messageHandler_,
3666  Message::AckType::None, ackTypes, _message.getCommandEnum());
3667 
3668  try
3669  {
3670  syncAckProcessing(timeout_, _message);
3671  }
3672  catch (...)
3673  {
3674  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3675  throw;
3676  }
3677 
3678  return commandId;
3679  }
3680 
3681  std::string sow(const MessageHandler& messageHandler_,
3682  const std::string& topic_,
3683  long timeout_,
3684  const std::string& filter_ = "",
3685  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3686  int topN_ = AMPS_DEFAULT_TOP_N)
3687  {
3688  std::string notSet;
3689  return sow(messageHandler_,
3690  topic_,
3691  filter_,
3692  notSet, // orderBy
3693  notSet, // bookmark
3694  batchSize_,
3695  topN_,
3696  notSet,
3697  timeout_);
3698  }
3699 
3700  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3701  const std::string& topic_,
3702  const std::string& filter_ = "",
3703  const std::string& orderBy_ = "",
3704  const std::string& bookmark_ = "",
3705  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3706  int topN_ = AMPS_DEFAULT_TOP_N,
3707  const std::string& options_ = "",
3708  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3709  bool isHASubscribe_ = true)
3710  {
3711  isHASubscribe_ &= (bool)_subscriptionManager;
3712  unsigned ackTypes = Message::AckType::Processed;
3713  Lock<Mutex> l(_lock);
3714  _message.reset();
3715  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3716  _message.newCommandId();
3717  Field cid = _message.getCommandId();
3718  std::string subId = cid;
3719  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3720  if (filter_.length())
3721  {
3722  _message.setFilter(filter_);
3723  }
3724  if (orderBy_.length())
3725  {
3726  _message.setOrderBy(orderBy_);
3727  }
3728  if (bookmark_.length())
3729  {
3730  _message.setBookmark(bookmark_);
3731  Message::Field bookmark = _message.getBookmark();
3732  if (_bookmarkStore.isValid())
3733  {
3734  ackTypes |= Message::AckType::Persisted;
3735  if (bookmark == AMPS_BOOKMARK_RECENT)
3736  {
3737  _message.assignOwnershipBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3738  }
3739  else if (bookmark != AMPS_BOOKMARK_NOW &&
3740  bookmark != AMPS_BOOKMARK_EPOCH)
3741  {
3742  _bookmarkStore.log(_message);
3743  if (!BookmarkRange::isRange(bookmark))
3744  {
3745  _bookmarkStore.discard(_message);
3746  _bookmarkStore.persisted(_message.getSubscriptionId(),
3747  bookmark);
3748  }
3749  }
3750  }
3751  else if (bookmark == AMPS_BOOKMARK_RECENT)
3752  {
3753  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3754  }
3755  }
3756  _message.setBatchSize(AMPS::asString(batchSize_));
3757  if (topN_ != AMPS_DEFAULT_TOP_N)
3758  {
3759  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3760  }
3761  if (options_.length())
3762  {
3763  _message.setOptions(options_);
3764  }
3765 
3766  Message message = _message;
3767  if (isHASubscribe_)
3768  {
3769  message = _message.deepCopy();
3770  Unlock<Mutex> u(_lock);
3771  _subscriptionManager->subscribe(messageHandler_, message,
3772  Message::AckType::None);
3773  if (_badTimeToHASubscribe)
3774  {
3775  return subId;
3776  }
3777  }
3778  _routes.addRoute(cid, messageHandler_,
3779  Message::AckType::None, ackTypes, _message.getCommandEnum());
3780  message.setAckTypeEnum(ackTypes);
3781  if (!options_.empty())
3782  {
3783  message.setOptions(options_);
3784  }
3785  try
3786  {
3787  syncAckProcessing(timeout_, message, isHASubscribe_);
3788  }
3789  catch (const DisconnectedException&)
3790  {
3791  if (!isHASubscribe_)
3792  {
3793  _routes.removeRoute(subId);
3794  throw;
3795  }
3796  }
3797  catch (const TimedOutException&)
3798  {
3799  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3800  throw;
3801  }
3802  catch (...)
3803  {
3804  if (isHASubscribe_)
3805  {
3806  // Have to unlock before calling into sub manager to avoid deadlock
3807  Unlock<Mutex> unlock(_lock);
3808  _subscriptionManager->unsubscribe(cid);
3809  }
3810  _routes.removeRoute(subId);
3811  throw;
3812  }
3813  return subId;
3814  }
3815 
3816  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3817  const std::string& topic_,
3818  long timeout_,
3819  const std::string& filter_ = "",
3820  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3821  bool oofEnabled_ = false,
3822  int topN_ = AMPS_DEFAULT_TOP_N,
3823  bool isHASubscribe_ = true)
3824  {
3825  std::string notSet;
3826  return sowAndSubscribe(messageHandler_,
3827  topic_,
3828  filter_,
3829  notSet, // orderBy
3830  notSet, // bookmark
3831  batchSize_,
3832  topN_,
3833  (oofEnabled_ ? "oof" : ""),
3834  timeout_,
3835  isHASubscribe_);
3836  }
3837 
3838  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3839  const std::string& topic_,
3840  const std::string& filter_ = "",
3841  const std::string& orderBy_ = "",
3842  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3843  int topN_ = AMPS_DEFAULT_TOP_N,
3844  const std::string& options_ = "",
3845  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3846  bool isHASubscribe_ = true)
3847  {
3848  isHASubscribe_ &= (bool)_subscriptionManager;
3849  Lock<Mutex> l(_lock);
3850  _message.reset();
3851  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3852  _message.newCommandId();
3853  _message.setQueryID(_message.getCommandId());
3854  _message.setSubscriptionId(_message.getCommandId());
3855  std::string subId = _message.getSubscriptionId();
3856  _message.setTopic(topic_);
3857  if (filter_.length())
3858  {
3859  _message.setFilter(filter_);
3860  }
3861  if (orderBy_.length())
3862  {
3863  _message.setOrderBy(orderBy_);
3864  }
3865  _message.setBatchSize(AMPS::asString(batchSize_));
3866  if (topN_ != AMPS_DEFAULT_TOP_N)
3867  {
3868  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3869  }
3870  if (options_.length())
3871  {
3872  _message.setOptions(options_);
3873  }
3874  Message message = _message;
3875  if (isHASubscribe_)
3876  {
3877  message = _message.deepCopy();
3878  Unlock<Mutex> u(_lock);
3879  _subscriptionManager->subscribe(messageHandler_, message,
3880  Message::AckType::None);
3881  if (_badTimeToHASubscribe)
3882  {
3883  return subId;
3884  }
3885  }
3886  _routes.addRoute(message.getQueryID(), messageHandler_,
3887  Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3888  message.setAckTypeEnum(Message::AckType::Processed);
3889  if (!options_.empty())
3890  {
3891  message.setOptions(options_);
3892  }
3893  try
3894  {
3895  syncAckProcessing(timeout_, message, isHASubscribe_);
3896  }
3897  catch (const DisconnectedException&)
3898  {
3899  if (!isHASubscribe_)
3900  {
3901  _routes.removeRoute(subId);
3902  throw;
3903  }
3904  }
3905  catch (const TimedOutException&)
3906  {
3907  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3908  throw;
3909  }
3910  catch (...)
3911  {
3912  if (isHASubscribe_)
3913  {
3914  // Have to unlock before calling into sub manager to avoid deadlock
3915  Unlock<Mutex> unlock(_lock);
3916  _subscriptionManager->unsubscribe(Field(subId));
3917  }
3918  _routes.removeRoute(subId);
3919  throw;
3920  }
3921  return subId;
3922  }
3923 
3924  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3925  const std::string& topic_,
3926  long timeout_,
3927  const std::string& filter_ = "",
3928  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3929  bool oofEnabled_ = false,
3930  bool sendEmpties_ = false,
3931  int topN_ = AMPS_DEFAULT_TOP_N,
3932  bool isHASubscribe_ = true)
3933  {
3934  std::string notSet;
3935  Message::Options options;
3936  if (oofEnabled_)
3937  {
3938  options.setOOF();
3939  }
3940  if (sendEmpties_ == false)
3941  {
3942  options.setNoEmpties();
3943  }
3944  return sowAndDeltaSubscribe(messageHandler_,
3945  topic_,
3946  filter_,
3947  notSet, // orderBy
3948  batchSize_,
3949  topN_,
3950  options,
3951  timeout_,
3952  isHASubscribe_);
3953  }
3954 
3955  std::string sowDelete(const MessageHandler& messageHandler_,
3956  const std::string& topic_,
3957  const std::string& filter_,
3958  long timeout_,
3959  Message::Field commandId_ = Message::Field())
3960  {
3961  if (_publishStore.isValid())
3962  {
3963  unsigned ackType = Message::AckType::Processed |
3964  Message::AckType::Stats |
3965  Message::AckType::Persisted;
3966  publishStoreMessage.reset();
3967  if (commandId_.empty())
3968  {
3969  publishStoreMessage.newCommandId();
3970  commandId_ = publishStoreMessage.getCommandId();
3971  }
3972  else
3973  {
3974  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3975  }
3976  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3977  .assignSubscriptionId(commandId_.data(), commandId_.len())
3978  .assignQueryID(commandId_.data(), commandId_.len())
3979  .setAckTypeEnum(ackType)
3980  .assignTopic(topic_.c_str(), topic_.length())
3981  .assignFilter(filter_.c_str(), filter_.length());
3982  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3983  char buf[AMPS_NUMBER_BUFFER_LEN];
3984  size_t pos = convertToCharArray(buf, haSequenceNumber);
3985  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3986  {
3987  try
3988  {
3989  Lock<Mutex> l(_lock);
3990  _routes.addRoute(commandId_, messageHandler_,
3991  Message::AckType::Stats,
3992  Message::AckType::Processed | Message::AckType::Persisted,
3993  publishStoreMessage.getCommandEnum());
3994  syncAckProcessing(timeout_, publishStoreMessage,
3995  haSequenceNumber);
3996  }
3997  catch (const DisconnectedException&)
3998  {
3999  // -V565
4000  // Pass - it will get replayed upon reconnect
4001  }
4002  catch (...)
4003  {
4004  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4005  throw;
4006  }
4007  }
4008  return (std::string)commandId_;
4009  }
4010  else
4011  {
4012  Lock<Mutex> l(_lock);
4013  _message.reset();
4014  if (commandId_.empty())
4015  {
4016  _message.newCommandId();
4017  commandId_ = _message.getCommandId();
4018  }
4019  else
4020  {
4021  _message.setCommandId(commandId_.data(), commandId_.len());
4022  }
4023  _message.setCommandEnum(Message::Command::SOWDelete)
4024  .assignSubscriptionId(commandId_.data(), commandId_.len())
4025  .assignQueryID(commandId_.data(), commandId_.len())
4026  .setAckTypeEnum(Message::AckType::Processed |
4027  Message::AckType::Stats)
4028  .assignTopic(topic_.c_str(), topic_.length())
4029  .assignFilter(filter_.c_str(), filter_.length());
4030  _routes.addRoute(commandId_, messageHandler_,
4031  Message::AckType::Stats,
4032  Message::AckType::Processed,
4033  _message.getCommandEnum());
4034  try
4035  {
4036  syncAckProcessing(timeout_, _message);
4037  }
4038  catch (...)
4039  {
4040  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4041  throw;
4042  }
4043  return (std::string)commandId_;
4044  }
4045  }
4046 
4047  std::string sowDeleteByData(const MessageHandler& messageHandler_,
4048  const std::string& topic_,
4049  const std::string& data_,
4050  long timeout_,
4051  Message::Field commandId_ = Message::Field())
4052  {
4053  if (_publishStore.isValid())
4054  {
4055  unsigned ackType = Message::AckType::Processed |
4056  Message::AckType::Stats |
4057  Message::AckType::Persisted;
4058  publishStoreMessage.reset();
4059  if (commandId_.empty())
4060  {
4061  publishStoreMessage.newCommandId();
4062  commandId_ = publishStoreMessage.getCommandId();
4063  }
4064  else
4065  {
4066  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4067  }
4068  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4069  .assignSubscriptionId(commandId_.data(), commandId_.len())
4070  .assignQueryID(commandId_.data(), commandId_.len())
4071  .setAckTypeEnum(ackType)
4072  .assignTopic(topic_.c_str(), topic_.length())
4073  .assignData(data_.c_str(), data_.length());
4074  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4075  char buf[AMPS_NUMBER_BUFFER_LEN];
4076  size_t pos = convertToCharArray(buf, haSequenceNumber);
4077  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4078  {
4079  try
4080  {
4081  Lock<Mutex> l(_lock);
4082  _routes.addRoute(commandId_, messageHandler_,
4083  Message::AckType::Stats,
4084  Message::AckType::Processed | Message::AckType::Persisted,
4085  publishStoreMessage.getCommandEnum());
4086  syncAckProcessing(timeout_, publishStoreMessage,
4087  haSequenceNumber);
4088  }
4089  catch (const DisconnectedException&)
4090  {
4091  // -V565
4092  // Pass - it will get replayed upon reconnect
4093  }
4094  catch (...)
4095  {
4096  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4097  throw;
4098  }
4099  }
4100  return (std::string)commandId_;
4101  }
4102  else
4103  {
4104  Lock<Mutex> l(_lock);
4105  _message.reset();
4106  if (commandId_.empty())
4107  {
4108  _message.newCommandId();
4109  commandId_ = _message.getCommandId();
4110  }
4111  else
4112  {
4113  _message.setCommandId(commandId_.data(), commandId_.len());
4114  }
4115  _message.setCommandEnum(Message::Command::SOWDelete)
4116  .assignSubscriptionId(commandId_.data(), commandId_.len())
4117  .assignQueryID(commandId_.data(), commandId_.len())
4118  .setAckTypeEnum(Message::AckType::Processed |
4119  Message::AckType::Stats)
4120  .assignTopic(topic_.c_str(), topic_.length())
4121  .assignData(data_.c_str(), data_.length());
4122  _routes.addRoute(commandId_, messageHandler_,
4123  Message::AckType::Stats,
4124  Message::AckType::Processed,
4125  _message.getCommandEnum());
4126  try
4127  {
4128  syncAckProcessing(timeout_, _message);
4129  }
4130  catch (...)
4131  {
4132  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4133  throw;
4134  }
4135  return (std::string)commandId_;
4136  }
4137  }
4138 
4139  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4140  const std::string& topic_,
4141  const std::string& keys_,
4142  long timeout_,
4143  Message::Field commandId_ = Message::Field())
4144  {
4145  if (_publishStore.isValid())
4146  {
4147  unsigned ackType = Message::AckType::Processed |
4148  Message::AckType::Stats |
4149  Message::AckType::Persisted;
4150  publishStoreMessage.reset();
4151  if (commandId_.empty())
4152  {
4153  publishStoreMessage.newCommandId();
4154  commandId_ = publishStoreMessage.getCommandId();
4155  }
4156  else
4157  {
4158  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4159  }
4160  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4161  .assignSubscriptionId(commandId_.data(), commandId_.len())
4162  .assignQueryID(commandId_.data(), commandId_.len())
4163  .setAckTypeEnum(ackType)
4164  .assignTopic(topic_.c_str(), topic_.length())
4165  .assignSowKeys(keys_.c_str(), keys_.length());
4166  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4167  char buf[AMPS_NUMBER_BUFFER_LEN];
4168  size_t pos = convertToCharArray(buf, haSequenceNumber);
4169  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4170  {
4171  try
4172  {
4173  Lock<Mutex> l(_lock);
4174  _routes.addRoute(commandId_, messageHandler_,
4175  Message::AckType::Stats,
4176  Message::AckType::Processed | Message::AckType::Persisted,
4177  publishStoreMessage.getCommandEnum());
4178  syncAckProcessing(timeout_, publishStoreMessage,
4179  haSequenceNumber);
4180  }
4181  catch (const DisconnectedException&)
4182  {
4183  // -V565
4184  // Pass - it will get replayed upon reconnect
4185  }
4186  catch (...)
4187  {
4188  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4189  throw;
4190  }
4191  }
4192  return (std::string)commandId_;
4193  }
4194  else
4195  {
4196  Lock<Mutex> l(_lock);
4197  _message.reset();
4198  if (commandId_.empty())
4199  {
4200  _message.newCommandId();
4201  commandId_ = _message.getCommandId();
4202  }
4203  else
4204  {
4205  _message.setCommandId(commandId_.data(), commandId_.len());
4206  }
4207  _message.setCommandEnum(Message::Command::SOWDelete)
4208  .assignSubscriptionId(commandId_.data(), commandId_.len())
4209  .assignQueryID(commandId_.data(), commandId_.len())
4210  .setAckTypeEnum(Message::AckType::Processed |
4211  Message::AckType::Stats)
4212  .assignTopic(topic_.c_str(), topic_.length())
4213  .assignSowKeys(keys_.c_str(), keys_.length());
4214  _routes.addRoute(commandId_, messageHandler_,
4215  Message::AckType::Stats,
4216  Message::AckType::Processed,
4217  _message.getCommandEnum());
4218  try
4219  {
4220  syncAckProcessing(timeout_, _message);
4221  }
4222  catch (...)
4223  {
4224  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4225  throw;
4226  }
4227  return (std::string)commandId_;
4228  }
4229  }
4230 
4231  void startTimer(void)
4232  {
4233  if (_serverVersion >= "5.3.2.0")
4234  {
4235  throw CommandException("The start_timer command is deprecated.");
4236  }
4237  Lock<Mutex> l(_lock);
4238  _message.reset();
4239  _message.setCommandEnum(Message::Command::StartTimer);
4240 
4241  _send(_message);
4242  }
4243 
4244  std::string stopTimer(MessageHandler messageHandler_)
4245  {
4246  if (_serverVersion >= "5.3.2.0")
4247  {
4248  throw CommandException("The stop_timer command is deprecated.");
4249  }
4250  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4251  }
4252 
4253  amps_handle getHandle(void)
4254  {
4255  return _client;
4256  }
4257 
4265  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4266  {
4267  _pExceptionListener = pListener_;
4268  _exceptionListener = _pExceptionListener.get();
4269  }
4270 
4271  void setExceptionListener(const ExceptionListener& listener_)
4272  {
4273  _exceptionListener = &listener_;
4274  }
4275 
4276  const ExceptionListener& getExceptionListener(void) const
4277  {
4278  return *_exceptionListener;
4279  }
4280 
4281  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4282  {
4283  if (readTimeout_ < heartbeatInterval_)
4284  {
4285  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4286  }
4287  Lock<Mutex> l(_lock);
4288  if (_heartbeatInterval != heartbeatInterval_ ||
4289  _readTimeout != readTimeout_)
4290  {
4291  _heartbeatInterval = heartbeatInterval_;
4292  _readTimeout = readTimeout_;
4293  _sendHeartbeat();
4294  }
4295  }
4296 
4297  void _sendHeartbeat(void)
4298  {
4299  if (_connected && _heartbeatInterval != 0)
4300  {
4301  std::ostringstream options;
4302  options << "start," << _heartbeatInterval;
4303  _beatMessage.setOptions(options.str());
4304 
4305  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4306  _heartbeatTimer.start();
4307  try
4308  {
4309  _sendWithoutRetry(_beatMessage);
4310  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4311  }
4312  catch (ConnectionException& ex_)
4313  {
4314  // If we are disconnected when we attempt to send, that's OK;
4315  // we'll send this message after we re-connect (if we do).
4316  AMPS_UNHANDLED_EXCEPTION(ex_);
4317  }
4318  _beatMessage.setOptions("beat");
4319  }
4320  amps_result result = AMPS_E_OK;
4321  if (_readTimeout && _connected)
4322  {
4323  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4324  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4325  {
4326  AMPSException::throwFor(_client, result);
4327  }
4328  if (!_queueAckTimeout)
4329  {
4330  result = amps_client_set_idle_time(_client,
4331  (int)(_heartbeatInterval * 1000));
4332  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4333  {
4334  AMPSException::throwFor(_client, result);
4335  }
4336  }
4337  }
4338  }
4339 
4340  void addConnectionStateListener(ConnectionStateListener* listener_)
4341  {
4342  Lock<Mutex> lock(_lock);
4343  _connectionStateListeners.insert(listener_);
4344  }
4345 
4346  void removeConnectionStateListener(ConnectionStateListener* listener_)
4347  {
4348  Lock<Mutex> lock(_lock);
4349  _connectionStateListeners.erase(listener_);
4350  }
4351 
4352  void clearConnectionStateListeners()
4353  {
4354  Lock<Mutex> lock(_lock);
4355  _connectionStateListeners.clear();
4356  }
4357 
4358  void _registerHandler(Command& command_, Message::Field& cid_,
4359  MessageHandler& handler_, unsigned requestedAcks_,
4360  unsigned systemAddedAcks_, Message::Command::Type commandType_)
4361  {
4362  Message message = command_.getMessage();
4363  Message::Command::Type commandType = message.getCommandEnum();
4364  Message::Field subid = message.getSubscriptionId();
4365  Message::Field qid = message.getQueryID();
4366  // If we have an id, we're good, even if it's an existing route
4367  bool added = qid.len() || subid.len() || cid_.len();
4368  bool cidIsQid = cid_ == qid;
4369  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4370  int addedCount = 0;
4371  if (subid.len() > 0)
4372  {
4373  // This can replace a non-subscribe with a matching id
4374  // with a subscription but not another subscription.
4375  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4376  systemAddedAcks_, commandType_);
4377  if (!cidUnique
4378  && (commandType == Message::Command::Subscribe
4379  || commandType == Message::Command::DeltaSubscribe))
4380  {
4381  // We don't need to do anything else
4382  cid_ = subid;
4383  return;
4384  }
4385  }
4386  if (qid.len() > 0 && qid != subid
4387  && (commandType == Message::Command::SOW
4388  || commandType == Message::Command::SOWDelete
4389  || commandType == Message::Command::SOWAndSubscribe
4390  || commandType == Message::Command::SOWAndDeltaSubscribe))
4391  {
4392  while (_routes.hasRoute(qid))
4393  {
4394  message.newQueryId();
4395  if (cidIsQid)
4396  {
4397  cid_ = message.getQueryId();
4398  }
4399  qid = message.getQueryId();
4400  }
4401  if (addedCount == 0)
4402  {
4403  _routes.addRoute(qid, handler_, requestedAcks_,
4404  systemAddedAcks_, commandType_);
4405  }
4406  else
4407  {
4408  void* data = NULL;
4409  {
4410  Unlock<Mutex> u(_lock);
4411  data = amps_invoke_copy_route_function(handler_.userData());
4412  }
4413  if (!data)
4414  {
4415  _routes.addRoute(qid, handler_, requestedAcks_,
4416  systemAddedAcks_, commandType_);
4417  }
4418  else
4419  {
4420  _routes.addRoute(qid,
4421  MessageHandler(handler_.function(),
4422  data),
4423  requestedAcks_,
4424  systemAddedAcks_, commandType_);
4425  }
4426  }
4427  ++addedCount;
4428  }
4429  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4430  {
4431  while (_routes.hasRoute(cid_))
4432  {
4433  cid_ = message.newCommandId().getCommandId();
4434  }
4435  if (addedCount == 0)
4436  {
4437  _routes.addRoute(cid_, handler_, requestedAcks_,
4438  systemAddedAcks_, commandType_);
4439  }
4440  else
4441  {
4442  void* data = NULL;
4443  {
4444  Unlock<Mutex> u(_lock);
4445  data = amps_invoke_copy_route_function(handler_.userData());
4446  }
4447  if (!data)
4448  {
4449  _routes.addRoute(cid_, handler_, requestedAcks_,
4450  systemAddedAcks_, commandType_);
4451  }
4452  else
4453  {
4454  _routes.addRoute(cid_,
4455  MessageHandler(handler_.function(),
4456  data),
4457  requestedAcks_,
4458  systemAddedAcks_, commandType_);
4459  }
4460  }
4461  }
4462  else if ((commandType == Message::Command::Publish ||
4463  commandType == Message::Command::DeltaPublish)
4464  && requestedAcks_ & ~Message::AckType::Persisted)
4465  {
4466  cid_ = command_.getMessage().newCommandId().getCommandId();
4467  _routes.addRoute(cid_, handler_, requestedAcks_,
4468  systemAddedAcks_, commandType_);
4469  added = true;
4470  }
4471  if (!added)
4472  {
4473  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4474  }
4475  }
4476 
4477  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4478  bool isHASubscribe_ = true)
4479  {
4480  isHASubscribe_ &= (bool)_subscriptionManager;
4481  Message& message = command_.getMessage();
4482  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4483  Message::AckType::Processed : Message::AckType::None;
4484  unsigned requestedAcks = message.getAckTypeEnum();
4485  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4486  Message::Command::Type commandType = message.getCommandEnum();
4487  if (commandType == Message::Command::StopTimer)
4488  {
4489  systemAddedAcks |= Message::AckType::Completed;
4490  }
4491  else if (commandType == Message::Command::Unsubscribe)
4492  {
4493  // Clear routes and sub manager
4494  const std::string subId = message.getSubscriptionId();
4495  if (subId == "all")
4496  {
4497  _routes.unsubscribeAll();
4498  if (_subscriptionManager)
4499  {
4500  Unlock<Mutex> unlock(_lock);
4501  _subscriptionManager->clear();
4502  }
4503  }
4504  else
4505  {
4506  _routes.removeRoute(subId);
4507  // Lock is already acquired
4508  if (_subscriptionManager)
4509  {
4510  // Have to unlock before calling into sub manager to avoid deadlock
4511  Unlock<Mutex> unlock(_lock);
4512  _subscriptionManager->unsubscribe(subId);
4513  }
4514  }
4515  // Make sure the clear gets processed by receive thread
4516  deferredExecution(&amps_noOpFn, NULL);
4517  }
4518  Message::Field cid = message.getCommandId();
4519  if (handler_.isValid() && cid.empty())
4520  {
4521  cid = message.newCommandId().getCommandId();
4522  }
4523  if (message.getBookmark().len() > 0)
4524  {
4525  if (command_.isSubscribe())
4526  {
4527  Message::Field bookmark = message.getBookmark();
4528  if (_bookmarkStore.isValid())
4529  {
4530  systemAddedAcks |= Message::AckType::Persisted;
4531  if (bookmark == AMPS_BOOKMARK_RECENT)
4532  {
4533  message.assignOwnershipBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4534  }
4535  else if (bookmark != AMPS_BOOKMARK_NOW &&
4536  bookmark != AMPS_BOOKMARK_EPOCH)
4537  {
4538  _bookmarkStore.log(message);
4539  if (!BookmarkRange::isRange(bookmark))
4540  {
4541  _bookmarkStore.discard(message);
4542  _bookmarkStore.persisted(message.getSubscriptionId(),
4543  bookmark);
4544  }
4545  }
4546  }
4547  else if (bookmark == AMPS_BOOKMARK_RECENT)
4548  {
4550  }
4551  }
4552  }
4553  if (isPublishStore)
4554  {
4555  systemAddedAcks |= Message::AckType::Persisted;
4556  }
4557  bool isSubscribe = command_.isSubscribe();
4558  if (handler_.isValid() && !isSubscribe)
4559  {
4560  _registerHandler(command_, cid, handler_,
4561  requestedAcks, systemAddedAcks, commandType);
4562  }
4563  if (isPublishStore)
4564  {
4565  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4566  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4567  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4568  {
4569  Unlock<Mutex> u(_lock);
4570  haSequenceNumber = _publishStore.store(message);
4571  }
4572  message.setSequence(haSequenceNumber);
4573  try
4574  {
4575  if (useSyncSend)
4576  {
4577  syncAckProcessing((long)command_.getTimeout(), message,
4578  haSequenceNumber);
4579  }
4580  else
4581  {
4582  _send(message, haSequenceNumber);
4583  }
4584  }
4585  catch (const DisconnectedException&)
4586  {
4587  throw;
4588  }
4589  catch (...)
4590  {
4591  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4592  throw;
4593  }
4594  }
4595  else
4596  {
4597  if (isSubscribe)
4598  {
4599  const Message::Field& subId = message.getSubscriptionId();
4600  if (isHASubscribe_)
4601  {
4602  Unlock<Mutex> u(_lock);
4603  _subscriptionManager->subscribe(handler_,
4604  message.deepCopy(),
4605  requestedAcks);
4606  if (_badTimeToHASubscribe)
4607  {
4608  message.setAckTypeEnum(requestedAcks);
4609  return std::string(subId.data(), subId.len());
4610  }
4611  }
4612  if (handler_.isValid())
4613  {
4614  _registerHandler(command_, cid, handler_,
4615  requestedAcks, systemAddedAcks, commandType);
4616  }
4617  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4618  try
4619  {
4620  syncAckProcessing((long)command_.getTimeout(), message,
4621  isHASubscribe_);
4622  }
4623  catch (const DisconnectedException&)
4624  {
4625  if (!isHASubscribe_)
4626  {
4627  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4628  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4629  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4630  message.setAckTypeEnum(requestedAcks);
4631  throw;
4632  }
4633  }
4634  catch (const TimedOutException&)
4635  {
4636  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4637  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4638  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4639  message.setAckTypeEnum(requestedAcks);
4640  throw;
4641  }
4642  catch (...)
4643  {
4644  if (isHASubscribe_)
4645  {
4646  // Have to unlock before calling into sub manager to avoid deadlock
4647  Unlock<Mutex> unlock(_lock);
4648  _subscriptionManager->unsubscribe(subId);
4649  }
4650  if (message.getQueryID().len() > 0)
4651  {
4652  _routes.removeRoute(message.getQueryID());
4653  }
4654  _routes.removeRoute(cid);
4655  _routes.removeRoute(subId);
4656  message.setAckTypeEnum(requestedAcks);
4657  throw;
4658  }
4659  if (subId.len() > 0)
4660  {
4661  message.setAckTypeEnum(requestedAcks);
4662  return std::string(subId.data(), subId.len());
4663  }
4664  }
4665  else
4666  {
4667  // SOW, Flush, etc. should always be sync. Publish/delete may not be.
4668  bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4669  || (cid.len() > 0 && command_.hasProcessedAck());
4670  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4671  try
4672  {
4673  if (useSyncSend)
4674  {
4675  syncAckProcessing((long)(command_.getTimeout()), message);
4676  }
4677  else
4678  {
4679  _send(message);
4680  }
4681  }
4682  catch (const TimedOutException&)
4683  {
4684  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4685  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4686  message.setAckTypeEnum(requestedAcks);
4687  throw;
4688  }
4689  catch (const DisconnectedException&)
4690  {
4691  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4692  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4693  message.setAckTypeEnum(requestedAcks);
4694  throw;
4695  }
4696  catch (...)
4697  {
4698  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4699  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4700  message.setAckTypeEnum(requestedAcks);
4701  throw;
4702  }
4703  }
4704  }
4705  message.setAckTypeEnum(requestedAcks);
4706  return cid;
4707  }
4708 
4709  MessageStream getEmptyMessageStream(void);
4710 
4711  std::string executeAsync(Command& command_, MessageHandler& handler_,
4712  bool isHASubscribe_ = true)
4713  {
4714  Lock<Mutex> lock(_lock);
4715  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4716  }
4717 
4718  // Queue Methods //
4719  void setAutoAck(bool isAutoAckEnabled_)
4720  {
4721  _isAutoAckEnabled = isAutoAckEnabled_;
4722  }
4723  bool getAutoAck(void) const
4724  {
4725  return _isAutoAckEnabled;
4726  }
4727  void setAckBatchSize(const unsigned batchSize_)
4728  {
4729  _ackBatchSize = batchSize_;
4730  if (!_queueAckTimeout)
4731  {
4732  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4733  amps_client_set_idle_time(_client, _queueAckTimeout);
4734  }
4735  }
4736  unsigned getAckBatchSize(void) const
4737  {
4738  return _ackBatchSize;
4739  }
4740  int getAckTimeout(void) const
4741  {
4742  return _queueAckTimeout;
4743  }
4744  void setAckTimeout(const int ackTimeout_)
4745  {
4746  amps_client_set_idle_time(_client, ackTimeout_);
4747  _queueAckTimeout = ackTimeout_;
4748  }
4749  size_t _ack(QueueBookmarks& queueBookmarks_)
4750  {
4751  if (queueBookmarks_._bookmarkCount)
4752  {
4753  publishStoreMessage.reset();
4754  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4755  .setTopic(queueBookmarks_._topic)
4756  .setBookmark(queueBookmarks_._data)
4757  .setCommandId("AMPS-queue-ack");
4758  amps_uint64_t haSequenceNumber = 0;
4759  if (_publishStore.isValid())
4760  {
4761  haSequenceNumber = _publishStore.store(publishStoreMessage);
4762  publishStoreMessage.setAckType("persisted")
4763  .setSequence(haSequenceNumber);
4764  queueBookmarks_._data.erase();
4765  queueBookmarks_._bookmarkCount = 0;
4766  }
4767  _send(publishStoreMessage, haSequenceNumber);
4768  if (!_publishStore.isValid())
4769  {
4770  queueBookmarks_._data.erase();
4771  queueBookmarks_._bookmarkCount = 0;
4772  }
4773  return 1;
4774  }
4775  return 0;
4776  }
4777  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4778  {
4779  if (_isAutoAckEnabled)
4780  {
4781  return;
4782  }
4783  _ack(topic_, bookmark_, options_);
4784  }
4785  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4786  {
4787  if (bookmark_.len() == 0)
4788  {
4789  return;
4790  }
4791  Lock<Mutex> lock(_lock);
4792  if (_ackBatchSize < 2 || options_ != NULL)
4793  {
4794  publishStoreMessage.reset();
4795  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4796  .setCommandId("AMPS-queue-ack")
4797  .setTopic(topic_).setBookmark(bookmark_);
4798  if (options_)
4799  {
4800  publishStoreMessage.setOptions(options_);
4801  }
4802  amps_uint64_t haSequenceNumber = 0;
4803  if (_publishStore.isValid())
4804  {
4805  haSequenceNumber = _publishStore.store(publishStoreMessage);
4806  publishStoreMessage.setAckType("persisted")
4807  .setSequence(haSequenceNumber);
4808  }
4809  _send(publishStoreMessage, haSequenceNumber);
4810  return;
4811  }
4812  // have we acked anything for this hash
4813  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4814  TopicHashMap::iterator it = _topicHashMap.find(hash);
4815  if (it == _topicHashMap.end())
4816  {
4817  // add a new one to the map
4818 #ifdef AMPS_USE_EMPLACE
4819  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4820 #else
4821  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4822 #endif
4823  }
4824  QueueBookmarks& queueBookmarks = it->second;
4825  if (queueBookmarks._data.length())
4826  {
4827  queueBookmarks._data.append(",");
4828  }
4829  else
4830  {
4831  queueBookmarks._oldestTime = amps_now();
4832  }
4833  queueBookmarks._data.append(bookmark_);
4834  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4835  {
4836  _ack(queueBookmarks);
4837  }
4838  }
4839  void flushAcks(void)
4840  {
4841  size_t sendCount = 0;
4842  if (!_connected)
4843  {
4844  return;
4845  }
4846  else
4847  {
4848  Lock<Mutex> lock(_lock);
4849  typedef TopicHashMap::iterator iterator;
4850  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4851  {
4852  QueueBookmarks& queueBookmarks = it->second;
4853  sendCount += _ack(queueBookmarks);
4854  }
4855  }
4856  if (sendCount && _connected)
4857  {
4858  publishFlush(0, Message::AckType::Processed);
4859  }
4860  }
4861  // called when there's idle time, to see if we need to flush out any "acks"
4862  void checkQueueAcks(void)
4863  {
4864  if (!_topicHashMap.size())
4865  {
4866  return;
4867  }
4868  Lock<Mutex> lock(_lock);
4869  try
4870  {
4871  amps_uint64_t threshold = amps_now()
4872  - (amps_uint64_t)_queueAckTimeout;
4873  typedef TopicHashMap::iterator iterator;
4874  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4875  {
4876  QueueBookmarks& queueBookmarks = it->second;
4877  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4878  {
4879  _ack(queueBookmarks);
4880  }
4881  }
4882  }
4883  catch (std::exception& ex)
4884  {
4885  AMPS_UNHANDLED_EXCEPTION(ex);
4886  }
4887  }
4888 
4889  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4890  {
4891  Lock<Mutex> lock(_deferredExecutionLock);
4892 #ifdef AMPS_USE_EMPLACE
4893  _deferredExecutionList.emplace_back(
4894  DeferredExecutionRequest(func_, userData_));
4895 #else
4896  _deferredExecutionList.push_back(
4897  DeferredExecutionRequest(func_, userData_));
4898 #endif
4899  }
4900 
4901  inline void processDeferredExecutions(void)
4902  {
4903  if (_deferredExecutionList.size())
4904  {
4905  Lock<Mutex> lock(_deferredExecutionLock);
4906  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4907  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4908  for (; it != end; ++it)
4909  {
4910  try
4911  {
4912  it->_func(it->_userData);
4913  }
4914  catch (...)
4915  {
4916  // -V565
4917  // Intentionally ignore errors
4918  }
4919  }
4920  _deferredExecutionList.clear();
4921  _routes.invalidateCache();
4922  _routeCache.invalidateCache();
4923  }
4924  }
4925 
4926  bool getRetryOnDisconnect(void) const
4927  {
4928  return _isRetryOnDisconnect;
4929  }
4930 
4931  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4932  {
4933  _isRetryOnDisconnect = isRetryOnDisconnect_;
4934  }
4935 
4936  void setDefaultMaxDepth(unsigned maxDepth_)
4937  {
4938  _defaultMaxDepth = maxDepth_;
4939  }
4940 
4941  unsigned getDefaultMaxDepth(void) const
4942  {
4943  return _defaultMaxDepth;
4944  }
4945 
4946  void setTransportFilterFunction(amps_transport_filter_function filter_,
4947  void* userData_)
4948  {
4949  amps_client_set_transport_filter_function(_client, filter_, userData_);
4950  }
4951 
4952  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4953  void* userData_)
4954  {
4955  amps_client_set_thread_created_callback(_client, callback_, userData_);
4956  }
4957  }; // class ClientImpl
5032 
5034  {
5035  RefHandle<MessageStreamImpl> _body;
5036  public:
5041  class iterator
5042  {
5043  MessageStream* _pStream;
5044  Message _current;
5045  inline void advance(void);
5046 
5047  public:
5048  iterator() // end
5049  : _pStream(NULL)
5050  {;}
5051  iterator(MessageStream* pStream_)
5052  : _pStream(pStream_)
5053  {
5054  advance();
5055  }
5056 
5057  bool operator==(const iterator& rhs) const
5058  {
5059  return _pStream == rhs._pStream;
5060  }
5061  bool operator!=(const iterator& rhs) const
5062  {
5063  return _pStream != rhs._pStream;
5064  }
5065  void operator++(void)
5066  {
5067  advance();
5068  }
5069  Message operator*(void)
5070  {
5071  return _current;
5072  }
5073  Message* operator->(void)
5074  {
5075  return &_current;
5076  }
5077  };
5079  bool isValid() const
5080  {
5081  return _body.isValid();
5082  }
5083 
5087  {
5088  if (!_body.isValid())
5089  {
5090  throw UsageException("This MessageStream is not valid and cannot be iterated.");
5091  }
5092  return iterator(this);
5093  }
5096  // For non-SOW queries, the end is never reached.
5098  {
5099  return iterator();
5100  }
5101  inline MessageStream(void);
5102 
5108  MessageStream timeout(unsigned timeout_);
5109 
5113  MessageStream conflate(void);
5119  MessageStream maxDepth(unsigned maxDepth_);
5122  unsigned getMaxDepth(void) const;
5125  unsigned getDepth(void) const;
5126 
5127  private:
5128  inline MessageStream(const Client& client_);
5129  inline MessageStream(RefHandle<MessageStreamImpl> body_);
5130  inline void setSOWOnly(const std::string& commandId_,
5131  const std::string& queryId_ = "");
5132  inline void setSubscription(const std::string& subId_,
5133  const std::string& commandId_ = "",
5134  const std::string& queryId_ = "");
5135  inline void setStatsOnly(const std::string& commandId_,
5136  const std::string& queryId_ = "");
5137  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5138 
5139  inline operator MessageHandler(void);
5140 
5141  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5142 
5143  friend class Client;
5144  friend class ClientImpl;
5145 
5146  };
5147 
5167  class Client // -V553
5168  {
5169  protected:
5170  BorrowRefHandle<ClientImpl> _body;
5171  public:
5172  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5173  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5174  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5175 
5184  Client(const std::string& clientName = "")
5185  : _body(new ClientImpl(clientName), true)
5186  {;}
5187 
5188  Client(ClientImpl* existingClient)
5189  : _body(existingClient, true)
5190  {;}
5191 
5192  Client(ClientImpl* existingClient, bool isRef)
5193  : _body(existingClient, isRef)
5194  {;}
5195 
5196  Client(const Client& rhs) : _body(rhs._body) {;}
5197  virtual ~Client(void) {;}
5198 
5199  Client& operator=(const Client& rhs)
5200  {
5201  _body = rhs._body;
5202  return *this;
5203  }
5204 
5205  bool isValid()
5206  {
5207  return _body.isValid();
5208  }
5209 
5222  void setName(const std::string& name)
5223  {
5224  _body.get().setName(name);
5225  }
5226 
5229  const std::string& getName() const
5230  {
5231  return _body.get().getName();
5232  }
5233 
5237  const std::string& getNameHash() const
5238  {
5239  return _body.get().getNameHash();
5240  }
5241 
5245  const amps_uint64_t getNameHashValue() const
5246  {
5247  return _body.get().getNameHashValue();
5248  }
5249 
5256  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5257  {
5258  _body.get().setLogonCorrelationData(logonCorrelationData_);
5259  }
5260 
5263  const std::string& getLogonCorrelationData() const
5264  {
5265  return _body.get().getLogonCorrelationData();
5266  }
5267 
5271  void addHttpPreflightHeader(const std::string& header_)
5272  {
5273  _body.get().addHttpPreflightHeader(header_);
5274  }
5275 
5280  void addHttpPreflightHeader(const std::string& key_, const std::string& value_)
5281  {
5282  _body.get().addHttpPreflightHeader(key_, value_);
5283  }
5284 
5287  {
5288  _body.get().clearHttpPreflightHeaders();
5289  }
5290 
5294  template<class T>
5295  void setHttpPreflightHeaders(const T& headers_)
5296  {
5297  _body.get().setHttpPreflightHeaders(headers_);
5298  }
5299 
5308  size_t getServerVersion() const
5309  {
5310  return _body.get().getServerVersion();
5311  }
5312 
5319  VersionInfo getServerVersionInfo() const
5320  {
5321  return _body.get().getServerVersionInfo();
5322  }
5323 
5333  static size_t convertVersionToNumber(const std::string& version_)
5334  {
5335  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5336  }
5337 
5348  static size_t convertVersionToNumber(const char* data_, size_t len_)
5349  {
5350  return AMPS::convertVersionToNumber(data_, len_);
5351  }
5352 
5355  const std::string& getURI() const
5356  {
5357  return _body.get().getURI();
5358  }
5359 
5366 
5368 
5379  void connect(const std::string& uri)
5380  {
5381  _body.get().connect(uri);
5382  }
5383 
5386  void disconnect()
5387  {
5388  _body.get().disconnect();
5389  }
5390 
5404  void send(const Message& message)
5405  {
5406  _body.get().send(message);
5407  }
5408 
5417  void addMessageHandler(const Field& commandId_,
5418  const AMPS::MessageHandler& messageHandler_,
5419  unsigned requestedAcks_, bool isSubscribe_)
5420  {
5421  Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5422  _body.get().addMessageHandler(commandId_, messageHandler_,
5423  requestedAcks_, commandType);
5424  }
5425 
5434  void addMessageHandler(const Field& commandId_,
5435  const AMPS::MessageHandler& messageHandler_,
5436  unsigned requestedAcks_, Message::Command::Type commandType_)
5437  {
5438  _body.get().addMessageHandler(commandId_, messageHandler_,
5439  requestedAcks_, commandType_);
5440  }
5441 
5445  bool removeMessageHandler(const Field& commandId_)
5446  {
5447  return _body.get().removeMessageHandler(commandId_);
5448  }
5449 
5473  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5474  {
5475  return _body.get().send(messageHandler, message, timeout);
5476  }
5477 
5491 #if defined(_WIN32) || __cplusplus >= 201402L
5492  [[deprecated("Use HAClient for automatic reconnection and a ConnectionStateListener to monitor connection state.")]]
5493 #endif
5494  virtual void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5495  {
5496  _body.get().setDisconnectHandler(disconnectHandler);
5497  }
5498 
5505 #if defined(_WIN32) || __cplusplus >= 201402L
5506  [[deprecated("Use HAClient for automatic reconnection and a ConnectionStateListener to monitor connection state.")]]
5507 #endif
5508  DisconnectHandler getDisconnectHandler(void) const
5509  {
5510  return _body.get().getDisconnectHandler();
5511  }
5512 
5517  virtual ConnectionInfo getConnectionInfo() const
5518  {
5519  return _body.get().getConnectionInfo();
5520  }
5521 
5530  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5531  {
5532  _body.get().setBookmarkStore(bookmarkStore_);
5533  }
5534 
5539  {
5540  return _body.get().getBookmarkStore();
5541  }
5542 
5547  {
5548  return _body.get().getSubscriptionManager();
5549  }
5550 
5558  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5559  {
5560  _body.get().setSubscriptionManager(subscriptionManager_);
5561  }
5562 
5582  void setPublishStore(const Store& publishStore_)
5583  {
5584  _body.get().setPublishStore(publishStore_);
5585  }
5586 
5591  {
5592  return _body.get().getPublishStore();
5593  }
5594 
5598  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5599  {
5600  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5601  duplicateMessageHandler_);
5602  }
5603 
5614  {
5615  return _body.get().getDuplicateMessageHandler();
5616  }
5617 
5628  {
5629  _body.get().setFailedWriteHandler(handler_);
5630  }
5631 
5636  {
5637  return _body.get().getFailedWriteHandler();
5638  }
5639 
5640 
5658  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5659  {
5660  return _body.get().publish(topic_.c_str(), topic_.length(),
5661  data_.c_str(), data_.length());
5662  }
5663 
5683  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5684  const char* data_, size_t dataLength_)
5685  {
5686  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5687  }
5688 
5707  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5708  unsigned long expiration_)
5709  {
5710  return _body.get().publish(topic_.c_str(), topic_.length(),
5711  data_.c_str(), data_.length(), expiration_);
5712  }
5713 
5734  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5735  const char* data_, size_t dataLength_,
5736  unsigned long expiration_)
5737  {
5738  return _body.get().publish(topic_, topicLength_,
5739  data_, dataLength_, expiration_);
5740  }
5741 
5780  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5781  {
5782  _body.get().publishFlush(timeout_, ackType_);
5783  }
5784 
5785 
5801  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5802  {
5803  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5804  data_.c_str(), data_.length());
5805  }
5806 
5824  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5825  const char* data_, size_t dataLength_)
5826  {
5827  return _body.get().deltaPublish(topic_, topicLength_,
5828  data_, dataLength_);
5829  }
5830 
5847  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5848  unsigned long expiration_)
5849  {
5850  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5851  data_.c_str(), data_.length(),
5852  expiration_);
5853  }
5854 
5873  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5874  const char* data_, size_t dataLength_,
5875  unsigned long expiration_)
5876  {
5877  return _body.get().deltaPublish(topic_, topicLength_,
5878  data_, dataLength_, expiration_);
5879  }
5880 
5896  std::string logon(int timeout_ = 0,
5897  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5898  const char* options_ = NULL)
5899  {
5900  return _body.get().logon(timeout_, authenticator_, options_);
5901  }
5915  std::string logon(const char* options_, int timeout_ = 0)
5916  {
5917  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5918  options_);
5919  }
5920 
5934  std::string logon(const std::string& options_, int timeout_ = 0)
5935  {
5936  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5937  options_.c_str());
5938  }
5939 
5959  std::string subscribe(const MessageHandler& messageHandler_,
5960  const std::string& topic_,
5961  long timeout_ = 0,
5962  const std::string& filter_ = "",
5963  const std::string& options_ = "",
5964  const std::string& subId_ = "")
5965  {
5966  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5967  filter_, "", options_, subId_);
5968  }
5969 
5985  MessageStream subscribe(const std::string& topic_,
5986  long timeout_ = 0, const std::string& filter_ = "",
5987  const std::string& options_ = "",
5988  const std::string& subId_ = "")
5989  {
5990  MessageStream result(*this);
5991  if (_body.get().getDefaultMaxDepth())
5992  {
5993  result.maxDepth(_body.get().getDefaultMaxDepth());
5994  }
5995  result.setSubscription(_body.get().subscribe(
5996  result.operator MessageHandler(),
5997  topic_, timeout_, filter_, "",
5998  options_, subId_, false));
5999  return result;
6000  }
6001 
6017  MessageStream subscribe(const char* topic_,
6018  long timeout_ = 0, const std::string& filter_ = "",
6019  const std::string& options_ = "",
6020  const std::string& subId_ = "")
6021  {
6022  MessageStream result(*this);
6023  if (_body.get().getDefaultMaxDepth())
6024  {
6025  result.maxDepth(_body.get().getDefaultMaxDepth());
6026  }
6027  result.setSubscription(_body.get().subscribe(
6028  result.operator MessageHandler(),
6029  topic_, timeout_, filter_, "",
6030  options_, subId_, false));
6031  return result;
6032  }
6033 
6046  std::string deltaSubscribe(const MessageHandler& messageHandler_,
6047  const std::string& topic_,
6048  long timeout_,
6049  const std::string& filter_ = "",
6050  const std::string& options_ = "",
6051  const std::string& subId_ = "")
6052  {
6053  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
6054  filter_, "", options_, subId_);
6055  }
6064  MessageStream deltaSubscribe(const std::string& topic_,
6065  long timeout_, const std::string& filter_ = "",
6066  const std::string& options_ = "",
6067  const std::string& subId_ = "")
6068  {
6069  MessageStream result(*this);
6070  if (_body.get().getDefaultMaxDepth())
6071  {
6072  result.maxDepth(_body.get().getDefaultMaxDepth());
6073  }
6074  result.setSubscription(_body.get().deltaSubscribe(
6075  result.operator MessageHandler(),
6076  topic_, timeout_, filter_, "",
6077  options_, subId_, false));
6078  return result;
6079  }
6080 
6082  MessageStream deltaSubscribe(const char* topic_,
6083  long timeout_, const std::string& filter_ = "",
6084  const std::string& options_ = "",
6085  const std::string& subId_ = "")
6086  {
6087  MessageStream result(*this);
6088  if (_body.get().getDefaultMaxDepth())
6089  {
6090  result.maxDepth(_body.get().getDefaultMaxDepth());
6091  }
6092  result.setSubscription(_body.get().deltaSubscribe(
6093  result.operator MessageHandler(),
6094  topic_, timeout_, filter_, "",
6095  options_, subId_, false));
6096  return result;
6097  }
6098 
6124  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
6125  const std::string& topic_,
6126  long timeout_,
6127  const std::string& bookmark_,
6128  const std::string& filter_ = "",
6129  const std::string& options_ = "",
6130  const std::string& subId_ = "")
6131  {
6132  return _body.get().subscribe(messageHandler_, topic_, timeout_,
6133  filter_, bookmark_, options_, subId_);
6134  }
6152  MessageStream bookmarkSubscribe(const std::string& topic_,
6153  long timeout_,
6154  const std::string& bookmark_,
6155  const std::string& filter_ = "",
6156  const std::string& options_ = "",
6157  const std::string& subId_ = "")
6158  {
6159  MessageStream result(*this);
6160  if (_body.get().getDefaultMaxDepth())
6161  {
6162  result.maxDepth(_body.get().getDefaultMaxDepth());
6163  }
6164  result.setSubscription(_body.get().subscribe(
6165  result.operator MessageHandler(),
6166  topic_, timeout_, filter_,
6167  bookmark_, options_,
6168  subId_, false));
6169  return result;
6170  }
6171 
6173  MessageStream bookmarkSubscribe(const char* topic_,
6174  long timeout_,
6175  const std::string& bookmark_,
6176  const std::string& filter_ = "",
6177  const std::string& options_ = "",
6178  const std::string& subId_ = "")
6179  {
6180  MessageStream result(*this);
6181  if (_body.get().getDefaultMaxDepth())
6182  {
6183  result.maxDepth(_body.get().getDefaultMaxDepth());
6184  }
6185  result.setSubscription(_body.get().subscribe(
6186  result.operator MessageHandler(),
6187  topic_, timeout_, filter_,
6188  bookmark_, options_,
6189  subId_, false));
6190  return result;
6191  }
6192 
6201  void unsubscribe(const std::string& commandId)
6202  {
6203  return _body.get().unsubscribe(commandId);
6204  }
6205 
6214  {
6215  return _body.get().unsubscribe();
6216  }
6217 
6218 
6248  std::string sow(const MessageHandler& messageHandler_,
6249  const std::string& topic_,
6250  const std::string& filter_ = "",
6251  const std::string& orderBy_ = "",
6252  const std::string& bookmark_ = "",
6253  int batchSize_ = DEFAULT_BATCH_SIZE,
6254  int topN_ = DEFAULT_TOP_N,
6255  const std::string& options_ = "",
6256  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6257  {
6258  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6259  bookmark_, batchSize_, topN_, options_,
6260  timeout_);
6261  }
6286  MessageStream sow(const std::string& topic_,
6287  const std::string& filter_ = "",
6288  const std::string& orderBy_ = "",
6289  const std::string& bookmark_ = "",
6290  int batchSize_ = DEFAULT_BATCH_SIZE,
6291  int topN_ = DEFAULT_TOP_N,
6292  const std::string& options_ = "",
6293  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6294  {
6295  MessageStream result(*this);
6296  if (_body.get().getDefaultMaxDepth())
6297  {
6298  result.maxDepth(_body.get().getDefaultMaxDepth());
6299  }
6300  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6301  topic_, filter_, orderBy_, bookmark_,
6302  batchSize_, topN_, options_, timeout_));
6303  return result;
6304  }
6305 
6307  MessageStream sow(const char* topic_,
6308  const std::string& filter_ = "",
6309  const std::string& orderBy_ = "",
6310  const std::string& bookmark_ = "",
6311  int batchSize_ = DEFAULT_BATCH_SIZE,
6312  int topN_ = DEFAULT_TOP_N,
6313  const std::string& options_ = "",
6314  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6315  {
6316  MessageStream result(*this);
6317  if (_body.get().getDefaultMaxDepth())
6318  {
6319  result.maxDepth(_body.get().getDefaultMaxDepth());
6320  }
6321  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6322  topic_, filter_, orderBy_, bookmark_,
6323  batchSize_, topN_, options_, timeout_));
6324  return result;
6325  }
6348  std::string sow(const MessageHandler& messageHandler_,
6349  const std::string& topic_,
6350  long timeout_,
6351  const std::string& filter_ = "",
6352  int batchSize_ = DEFAULT_BATCH_SIZE,
6353  int topN_ = DEFAULT_TOP_N)
6354  {
6355  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6356  batchSize_, topN_);
6357  }
6380  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6381  const std::string& topic_,
6382  long timeout_,
6383  const std::string& filter_ = "",
6384  int batchSize_ = DEFAULT_BATCH_SIZE,
6385  bool oofEnabled_ = false,
6386  int topN_ = DEFAULT_TOP_N)
6387  {
6388  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6389  filter_, batchSize_, oofEnabled_,
6390  topN_);
6391  }
6392 
6412  MessageStream sowAndSubscribe(const std::string& topic_,
6413  long timeout_,
6414  const std::string& filter_ = "",
6415  int batchSize_ = DEFAULT_BATCH_SIZE,
6416  bool oofEnabled_ = false,
6417  int topN_ = DEFAULT_TOP_N)
6418  {
6419  MessageStream result(*this);
6420  if (_body.get().getDefaultMaxDepth())
6421  {
6422  result.maxDepth(_body.get().getDefaultMaxDepth());
6423  }
6424  result.setSubscription(_body.get().sowAndSubscribe(
6425  result.operator MessageHandler(),
6426  topic_, timeout_, filter_,
6427  batchSize_, oofEnabled_,
6428  topN_, false));
6429  return result;
6430  }
6450  MessageStream sowAndSubscribe(const char* topic_,
6451  long timeout_,
6452  const std::string& filter_ = "",
6453  int batchSize_ = DEFAULT_BATCH_SIZE,
6454  bool oofEnabled_ = false,
6455  int topN_ = DEFAULT_TOP_N)
6456  {
6457  MessageStream result(*this);
6458  if (_body.get().getDefaultMaxDepth())
6459  {
6460  result.maxDepth(_body.get().getDefaultMaxDepth());
6461  }
6462  result.setSubscription(_body.get().sowAndSubscribe(
6463  result.operator MessageHandler(),
6464  topic_, timeout_, filter_,
6465  batchSize_, oofEnabled_,
6466  topN_, false));
6467  return result;
6468  }
6469 
6470 
6498  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6499  const std::string& topic_,
6500  const std::string& filter_ = "",
6501  const std::string& orderBy_ = "",
6502  const std::string& bookmark_ = "",
6503  int batchSize_ = DEFAULT_BATCH_SIZE,
6504  int topN_ = DEFAULT_TOP_N,
6505  const std::string& options_ = "",
6506  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6507  {
6508  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6509  orderBy_, bookmark_, batchSize_,
6510  topN_, options_, timeout_);
6511  }
6512 
6537  MessageStream sowAndSubscribe(const std::string& topic_,
6538  const std::string& filter_ = "",
6539  const std::string& orderBy_ = "",
6540  const std::string& bookmark_ = "",
6541  int batchSize_ = DEFAULT_BATCH_SIZE,
6542  int topN_ = DEFAULT_TOP_N,
6543  const std::string& options_ = "",
6544  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6545  {
6546  MessageStream result(*this);
6547  if (_body.get().getDefaultMaxDepth())
6548  {
6549  result.maxDepth(_body.get().getDefaultMaxDepth());
6550  }
6551  result.setSubscription(_body.get().sowAndSubscribe(
6552  result.operator MessageHandler(),
6553  topic_, filter_, orderBy_,
6554  bookmark_, batchSize_, topN_,
6555  options_, timeout_, false));
6556  return result;
6557  }
6558 
6560  MessageStream sowAndSubscribe(const char* topic_,
6561  const std::string& filter_ = "",
6562  const std::string& orderBy_ = "",
6563  const std::string& bookmark_ = "",
6564  int batchSize_ = DEFAULT_BATCH_SIZE,
6565  int topN_ = DEFAULT_TOP_N,
6566  const std::string& options_ = "",
6567  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6568  {
6569  MessageStream result(*this);
6570  if (_body.get().getDefaultMaxDepth())
6571  {
6572  result.maxDepth(_body.get().getDefaultMaxDepth());
6573  }
6574  result.setSubscription(_body.get().sowAndSubscribe(
6575  result.operator MessageHandler(),
6576  topic_, filter_, orderBy_,
6577  bookmark_, batchSize_, topN_,
6578  options_, timeout_, false));
6579  return result;
6580  }
6581 
6606  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6607  const std::string& topic_,
6608  const std::string& filter_ = "",
6609  const std::string& orderBy_ = "",
6610  int batchSize_ = DEFAULT_BATCH_SIZE,
6611  int topN_ = DEFAULT_TOP_N,
6612  const std::string& options_ = "",
6613  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6614  {
6615  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6616  filter_, orderBy_, batchSize_,
6617  topN_, options_, timeout_);
6618  }
6639  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6640  const std::string& filter_ = "",
6641  const std::string& orderBy_ = "",
6642  int batchSize_ = DEFAULT_BATCH_SIZE,
6643  int topN_ = DEFAULT_TOP_N,
6644  const std::string& options_ = "",
6645  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6646  {
6647  MessageStream result(*this);
6648  if (_body.get().getDefaultMaxDepth())
6649  {
6650  result.maxDepth(_body.get().getDefaultMaxDepth());
6651  }
6652  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6653  result.operator MessageHandler(),
6654  topic_, filter_, orderBy_,
6655  batchSize_, topN_, options_,
6656  timeout_, false));
6657  return result;
6658  }
6659 
6662  const std::string& filter_ = "",
6663  const std::string& orderBy_ = "",
6664  int batchSize_ = DEFAULT_BATCH_SIZE,
6665  int topN_ = DEFAULT_TOP_N,
6666  const std::string& options_ = "",
6667  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6668  {
6669  MessageStream result(*this);
6670  if (_body.get().getDefaultMaxDepth())
6671  {
6672  result.maxDepth(_body.get().getDefaultMaxDepth());
6673  }
6674  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6675  result.operator MessageHandler(),
6676  topic_, filter_, orderBy_,
6677  batchSize_, topN_, options_,
6678  timeout_, false));
6679  return result;
6680  }
6681 
6706  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6707  const std::string& topic_,
6708  long timeout_,
6709  const std::string& filter_ = "",
6710  int batchSize_ = DEFAULT_BATCH_SIZE,
6711  bool oofEnabled_ = false,
6712  bool sendEmpties_ = false,
6713  int topN_ = DEFAULT_TOP_N)
6714  {
6715  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6716  timeout_, filter_, batchSize_,
6717  oofEnabled_, sendEmpties_,
6718  topN_);
6719  }
6720 
6742  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6743  long timeout_,
6744  const std::string& filter_ = "",
6745  int batchSize_ = DEFAULT_BATCH_SIZE,
6746  bool oofEnabled_ = false,
6747  bool sendEmpties_ = false,
6748  int topN_ = DEFAULT_TOP_N)
6749  {
6750  MessageStream result(*this);
6751  if (_body.get().getDefaultMaxDepth())
6752  {
6753  result.maxDepth(_body.get().getDefaultMaxDepth());
6754  }
6755  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6756  result.operator MessageHandler(),
6757  topic_, timeout_, filter_,
6758  batchSize_, oofEnabled_,
6759  sendEmpties_, topN_, false));
6760  return result;
6761  }
6784  long timeout_,
6785  const std::string& filter_ = "",
6786  int batchSize_ = DEFAULT_BATCH_SIZE,
6787  bool oofEnabled_ = false,
6788  bool sendEmpties_ = false,
6789  int topN_ = DEFAULT_TOP_N)
6790  {
6791  MessageStream result(*this);
6792  if (_body.get().getDefaultMaxDepth())
6793  {
6794  result.maxDepth(_body.get().getDefaultMaxDepth());
6795  }
6796  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6797  result.operator MessageHandler(),
6798  topic_, timeout_, filter_,
6799  batchSize_, oofEnabled_,
6800  sendEmpties_, topN_, false));
6801  return result;
6802  }
6822  std::string sowDelete(const MessageHandler& messageHandler,
6823  const std::string& topic,
6824  const std::string& filter,
6825  long timeout)
6826  {
6827  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6828  }
6845  Message sowDelete(const std::string& topic_, const std::string& filter_,
6846  long timeout_ = 0)
6847  {
6848  MessageStream stream(*this);
6849  stream.timeout((unsigned int)timeout_);
6850  char buf[Message::IdentifierLength + 1];
6851  buf[Message::IdentifierLength] = 0;
6852  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6853  Field cid(buf);
6854  try
6855  {
6856  stream.setStatsOnly(cid);
6857  _body.get().sowDelete(stream.operator MessageHandler(), topic_, filter_, timeout_, cid);
6858  return *(stream.begin());
6859  }
6860  catch (const DisconnectedException&)
6861  {
6862  removeMessageHandler(cid);
6863  throw;
6864  }
6865  catch (const TimedOutException&)
6866  {
6867  removeMessageHandler(cid);
6868  throw;
6869  }
6870  }
6871 
6876  void startTimer()
6877  {
6878  _body.get().startTimer();
6879  }
6880 
6887  std::string stopTimer(const MessageHandler& messageHandler)
6888  {
6889  return _body.get().stopTimer(messageHandler);
6890  }
6891 
6913  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6914  const std::string& topic_,
6915  const std::string& keys_,
6916  long timeout_ = 0)
6917  {
6918  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6919  }
6940  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6941  long timeout_ = 0)
6942  {
6943  MessageStream stream(*this);
6944  stream.timeout((unsigned int)timeout_);
6945  char buf[Message::IdentifierLength + 1];
6946  buf[Message::IdentifierLength] = 0;
6947  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6948  Field cid(buf);
6949  try
6950  {
6951  stream.setStatsOnly(cid);
6952  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6953  return *(stream.begin());
6954  }
6955  catch (const DisconnectedException&)
6956  {
6957  removeMessageHandler(cid);
6958  throw;
6959  }
6960  catch (const TimedOutException&)
6961  {
6962  removeMessageHandler(cid);
6963  throw;
6964  }
6965  }
6966 
6981  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6982  const std::string& topic_, const std::string& data_,
6983  long timeout_ = 0)
6984  {
6985  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6986  }
6987 
7002  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
7003  long timeout_ = 0)
7004  {
7005  MessageStream stream(*this);
7006  stream.timeout((unsigned int)timeout_);
7007  char buf[Message::IdentifierLength + 1];
7008  buf[Message::IdentifierLength] = 0;
7009  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
7010  Field cid(buf);
7011  try
7012  {
7013  stream.setStatsOnly(cid);
7014  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
7015  return *(stream.begin());
7016  }
7017  catch (const DisconnectedException&)
7018  {
7019  removeMessageHandler(cid);
7020  throw;
7021  }
7022  catch (const TimedOutException&)
7023  {
7024  removeMessageHandler(cid);
7025  throw;
7026  }
7027  }
7028 
7033  {
7034  return _body.get().getHandle();
7035  }
7036 
7045  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
7046  {
7047  _body.get().setExceptionListener(pListener_);
7048  }
7049 
7058 #if defined(_WIN32) || __cplusplus >= 201402L
7059  [[deprecated("Use setExceptionListener(std::shared_ptr<const ExceptionListener>&)")]]
7060 #endif
7062  {
7063  _body.get().setExceptionListener(listener_);
7064  }
7065 
7069  {
7070  return _body.get().getExceptionListener();
7071  }
7072 
7080  // type of message) from the server for the specified interval (plus a grace period),
7094  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
7095  {
7096  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
7097  }
7098 
7106  // type of message) from the server for the specified interval (plus a grace period),
7118  void setHeartbeat(unsigned heartbeatTime_)
7119  {
7120  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
7121  }
7122 
7124 #if defined(_WIN32) || __cplusplus >= 201402L
7125  [[deprecated("Use setLastChanceMessageHandler.")]]
7126 #endif
7128  {
7129  setLastChanceMessageHandler(messageHandler);
7130  }
7131 
7135  {
7136  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
7137  messageHandler);
7138  }
7139 
7160  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
7161  {
7162  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7163  }
7164 
7185  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7186  {
7187  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7188  }
7189 
7195  static const char* BOOKMARK_NOW()
7196  {
7197  return AMPS_BOOKMARK_NOW;
7198  }
7204  static const char* NOW()
7205  {
7206  return AMPS_BOOKMARK_NOW;
7207  }
7208 
7214  static const char* BOOKMARK_EPOCH()
7215  {
7216  return AMPS_BOOKMARK_EPOCH;
7217  }
7218 
7224  static const char* EPOCH()
7225  {
7226  return AMPS_BOOKMARK_EPOCH;
7227  }
7228 
7235  static const char* BOOKMARK_MOST_RECENT()
7236  {
7237  return AMPS_BOOKMARK_RECENT;
7238  }
7239 
7246  static const char* MOST_RECENT()
7247  {
7248  return AMPS_BOOKMARK_RECENT;
7249  }
7250 
7257  static const char* BOOKMARK_RECENT()
7258  {
7259  return AMPS_BOOKMARK_RECENT;
7260  }
7261 
7262 
7269  {
7270  _body.get().addConnectionStateListener(listener);
7271  }
7272 
7277  {
7278  _body.get().removeConnectionStateListener(listener);
7279  }
7280 
7284  {
7285  _body.get().clearConnectionStateListeners();
7286  }
7287 
7313  std::string executeAsync(Command& command_, MessageHandler handler_)
7314  {
7315  return _body.get().executeAsync(command_, handler_);
7316  }
7317 
7347  std::string executeAsyncNoResubscribe(Command& command_,
7348  MessageHandler handler_)
7349  {
7350  std::string id;
7351  try
7352  {
7353  if (command_.isSubscribe())
7354  {
7355  Message& message = command_.getMessage();
7356  Field subId = message.getSubscriptionId();
7357  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7358  if (useExistingHandler)
7359  {
7360  MessageHandler existingHandler;
7361  if (_body.get()._routes.getRoute(subId, existingHandler))
7362  {
7363  // we found an existing handler.
7364  _body.get().executeAsync(command_, existingHandler, false);
7365  return id; // empty string indicates existing
7366  }
7367  }
7368  }
7369  id = _body.get().executeAsync(command_, handler_, false);
7370  }
7371  catch (const DisconnectedException&)
7372  {
7373  removeMessageHandler(command_.getMessage().getCommandId());
7374  if (command_.isSubscribe())
7375  {
7376  removeMessageHandler(command_.getMessage().getSubscriptionId());
7377  }
7378  if (command_.isSow())
7379  {
7380  removeMessageHandler(command_.getMessage().getQueryID());
7381  }
7382  throw;
7383  }
7384  return id;
7385  }
7386 
7399  MessageStream execute(Command& command_);
7400 
7409  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7410  {
7411  _body.get().ack(topic_, bookmark_, options_);
7412  }
7413 
7421  void ack(Message& message_, const char* options_ = NULL)
7422  {
7423  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7424  }
7433  void ack(const std::string& topic_, const std::string& bookmark_,
7434  const char* options_ = NULL)
7435  {
7436  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7437  }
7438 
7444  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7445  {
7446  _body.get()._ack(topic_, bookmark_, options_);
7447  }
7457  void flushAcks(void)
7458  {
7459  _body.get().flushAcks();
7460  }
7461 
7466  bool getAutoAck(void) const
7467  {
7468  return _body.get().getAutoAck();
7469  }
7476  void setAutoAck(bool isAutoAckEnabled_)
7477  {
7478  _body.get().setAutoAck(isAutoAckEnabled_);
7479  }
7484  unsigned getAckBatchSize(void) const
7485  {
7486  return _body.get().getAckBatchSize();
7487  }
7494  void setAckBatchSize(const unsigned ackBatchSize_)
7495  {
7496  _body.get().setAckBatchSize(ackBatchSize_);
7497  }
7498 
7505  int getAckTimeout(void) const
7506  {
7507  return _body.get().getAckTimeout();
7508  }
7517  void setAckTimeout(const int ackTimeout_)
7518  {
7519  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7520  {
7521  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7522  }
7523  _body.get().setAckTimeout(ackTimeout_);
7524  }
7525 
7526 
7535  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7536  {
7537  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7538  }
7539 
7544  bool getRetryOnDisconnect(void) const
7545  {
7546  return _body.get().getRetryOnDisconnect();
7547  }
7548 
7553  void setDefaultMaxDepth(unsigned maxDepth_)
7554  {
7555  _body.get().setDefaultMaxDepth(maxDepth_);
7556  }
7557 
7562  unsigned getDefaultMaxDepth(void) const
7563  {
7564  return _body.get().getDefaultMaxDepth();
7565  }
7566 
7574  void* userData_)
7575  {
7576  return _body.get().setTransportFilterFunction(filter_, userData_);
7577  }
7578 
7588  void* userData_)
7589  {
7590  return _body.get().setThreadCreatedCallback(callback_, userData_);
7591  }
7592 
7598  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7599  {
7600  _body.get().deferredExecution(func_, userData_);
7601  }
7605  };
7606 
7607  inline void
7608  ClientImpl::lastChance(AMPS::Message& message)
7609  {
7610  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7611  }
7612 
7613  inline unsigned
7614  ClientImpl::persistedAck(AMPS::Message& message)
7615  {
7616  unsigned deliveries = 0;
7617  try
7618  {
7619  /*
7620  * Best Practice: If you don't care about the dupe acks that
7621  * occur during failover or rapid disconnect/reconnect, then just
7622  * ignore them. We could discard each duplicate from the
7623  * persisted store, but the storage costs of doing 1 record
7624  * discards is heavy. In most scenarios we'll just quickly blow
7625  * through the duplicates and get back to processing the
7626  * non-dupes.
7627  */
7628  const char* data = NULL;
7629  size_t len = 0;
7630  const char* status = NULL;
7631  size_t statusLen = 0;
7632  amps_handle messageHandle = message.getMessage();
7633  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7634  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7635  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7636  if (len == NotEntitled || len == Duplicate ||
7637  (statusLen == Failure && status[0] == 'f'))
7638  {
7639  if (_failedWriteHandler)
7640  {
7641  if (_publishStore.isValid())
7642  {
7643  amps_uint64_t sequence =
7644  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7645  FailedWriteStoreReplayer replayer(this, data, len);
7646  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7647  replayer, sequence));
7648  }
7649  else // Call the handler with what little we have
7650  {
7651  static Message emptyMessage;
7652  emptyMessage.setSequence(message.getSequence());
7653  AMPS_CALL_EXCEPTION_WRAPPER(
7654  _failedWriteHandler->failedWrite(emptyMessage,
7655  data, len));
7656  }
7657  ++deliveries;
7658  }
7659  }
7660  if (_publishStore.isValid())
7661  {
7662  // Ack for publisher will have sequence while
7663  // ack for bookmark subscribe won't
7664  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7665  AMPS_Sequence);
7666  if (seq > 0)
7667  {
7668  ++deliveries;
7669  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7670  }
7671  }
7672 
7673  if (!deliveries && _bookmarkStore.isValid())
7674  {
7675  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7676  &data, &len);
7677  if (len > 0)
7678  {
7679  Message::Field subId(data, len);
7680  const char* bookmarkData = NULL;
7681  size_t bookmarkLen = 0;
7682  amps_message_get_field_value(messageHandle,
7683  AMPS_Bookmark,
7684  &bookmarkData,
7685  &bookmarkLen);
7686  // Everything is there and not unsubscribed AC-912
7687  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7688  {
7689  ++deliveries;
7690  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7691  }
7692  }
7693  }
7694  }
7695  catch (std::exception& ex)
7696  {
7697  AMPS_UNHANDLED_EXCEPTION(ex);
7698  }
7699  return deliveries;
7700  }
7701 
7702  inline unsigned
7703  ClientImpl::processedAck(Message& message)
7704  {
7705  unsigned deliveries = 0;
7706  AckResponse ack;
7707  const char* data = NULL;
7708  size_t len = 0;
7709  amps_handle messageHandle = message.getMessage();
7710  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7711  Lock<Mutex> l(_lock);
7712  if (data && len)
7713  {
7714  Lock<Mutex> guard(_ackMapLock);
7715  AckMap::iterator i = _ackMap.find(std::string(data, len));
7716  if (i != _ackMap.end())
7717  {
7718  ++deliveries;
7719  ack = i->second;
7720  _ackMap.erase(i);
7721  }
7722  }
7723  if (deliveries)
7724  {
7725  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7726  ack.setStatus(data, len);
7727  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7728  ack.setReason(data, len);
7729  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7730  ack.setUsername(data, len);
7731  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7732  ack.setPassword(data, len);
7733  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7734  ack.setServerVersion(data, len);
7735  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7736  ack.setOptions(data, len);
7737  // This sets bookmark, nameHashValue, and sequenceNo
7738  ack.setBookmark(message.getBookmark());
7739  ack.setResponded();
7740  _lock.signalAll();
7741  }
7742  return deliveries;
7743  }
7744 
7745  inline void
7746  ClientImpl::checkAndSendHeartbeat(bool force)
7747  {
7748  if (force || _heartbeatTimer.check())
7749  {
7750  _heartbeatTimer.start();
7751  try
7752  {
7753  sendWithoutRetry(_beatMessage);
7754  }
7755  catch (const AMPSException&)
7756  {
7757  ;
7758  }
7759  }
7760  }
7761 
7762  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7763  {
7764  ConnectionInfo info;
7765  std::ostringstream writer;
7766 
7767  info["client.uri"] = _lastUri;
7768  info["client.name"] = _name;
7769  info["client.username"] = _username;
7770  if (_publishStore.isValid())
7771  {
7772  writer << _publishStore.unpersistedCount();
7773  info["publishStore.unpersistedCount"] = writer.str();
7774  writer.clear();
7775  writer.str("");
7776  }
7777 
7778  return info;
7779  }
7780 
7781  inline amps_result
7782  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7783  {
7784  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7785  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7786  ClientImpl* me = (ClientImpl*) userData_;
7787  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7788  if (!messageHandle_)
7789  {
7790  if (me->_queueAckTimeout)
7791  {
7792  me->checkQueueAcks();
7793  }
7794  me->checkAndSendHeartbeat();
7795  return AMPS_E_OK;
7796  }
7797 
7798  me->_readMessage.replace(messageHandle_);
7799  Message& message = me->_readMessage;
7800  Message::Command::Type commandType = message.getCommandEnum();
7801  if (commandType & SOWMask)
7802  {
7803 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7804  // A small cheat here to get the right handler, using knowledge of the
7805  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7806  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7807  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7808  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7809 #endif
7810  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7811  message.getQueryID()));
7812  }
7813  else if (commandType & PublishMask)
7814  {
7815 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7816  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7817  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7818  GlobalCommandTypeHandlers::Publish :
7819  GlobalCommandTypeHandlers::OOF)].invoke(message));
7820 #endif
7821  const char* subIds = NULL;
7822  size_t subIdsLen = 0;
7823  // Publish command, send to subscriptions
7824  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7825  &subIds, &subIdsLen);
7826  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7827  for (size_t i = 0; i < subIdCount; ++i)
7828  {
7829  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7830  MessageHandler& handler = lookupResult.handler;
7831  if (handler.isValid())
7832  {
7833  amps_message_set_field_value(messageHandle_,
7834  AMPS_SubscriptionId,
7835  subIds + lookupResult.idOffset,
7836  lookupResult.idLength);
7837  Message::Field bookmark = message.getBookmark();
7838  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7839  bool isAutoAck = me->_isAutoAckEnabled;
7840 
7841  if (!isMessageQueue && !bookmark.empty() &&
7842  me->_bookmarkStore.isValid())
7843  {
7844  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7845  {
7846  //Call duplicate message handler in handlers map
7847  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7848  {
7849  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7850  }
7851  }
7852  else
7853  {
7854  me->_bookmarkStore.log(me->_readMessage);
7855  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7856  handler.invoke(message));
7857  }
7858  }
7859  else
7860  {
7861  if (isMessageQueue && isAutoAck)
7862  {
7863  try
7864  {
7865  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7866  if (!message.getIgnoreAutoAck())
7867  {
7868  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7869  me->_ack(message.getTopic(), message.getBookmark()));
7870  }
7871  }
7872  catch (std::exception& ex)
7873  {
7874  if (!message.getIgnoreAutoAck())
7875  {
7876  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7877  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7878  }
7879  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7880  }
7881  }
7882  else
7883  {
7884  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7885  handler.invoke(message));
7886  }
7887  }
7888  }
7889  else
7890  {
7891  me->lastChance(message);
7892  }
7893  } // for (subidsEnd)
7894  }
7895  else if (commandType == Message::Command::Ack)
7896  {
7897  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7898  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7899  unsigned ackType = message.getAckTypeEnum();
7900  unsigned deliveries = 0U;
7901  switch (ackType)
7902  {
7903  case Message::AckType::Persisted:
7904  deliveries += me->persistedAck(message);
7905  break;
7906  case Message::AckType::Processed: // processed
7907  deliveries += me->processedAck(message);
7908  break;
7909  }
7910  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7911  if (deliveries == 0)
7912  {
7913  me->lastChance(message);
7914  }
7915  }
7916  else if (commandType == Message::Command::Heartbeat)
7917  {
7918  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7919  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7920  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7921  {
7922  me->checkAndSendHeartbeat(true);
7923  }
7924  else
7925  {
7926  me->lastChance(message);
7927  }
7928  return AMPS_E_OK;
7929  }
7930  else if (!message.getCommandId().empty())
7931  {
7932  unsigned deliveries = 0U;
7933  try
7934  {
7935  while (me->_connected) // Keep sending heartbeats when stream is full
7936  {
7937  try
7938  {
7939  deliveries = me->_routes.deliverData(message, message.getCommandId());
7940  break;
7941  }
7942 #ifdef _WIN32
7943  catch (MessageStreamFullException&)
7944 #else
7945  catch (MessageStreamFullException& ex_)
7946 #endif
7947  {
7948  try
7949  {
7950  me->checkAndSendHeartbeat(false);
7951  }
7952 #ifdef _WIN32
7953  catch (std::exception&)
7954 #else
7955  catch (std::exception& ex_)
7956 #endif
7957  {
7958  ;
7959  }
7960  }
7961  }
7962  }
7963  catch (std::exception& ex_)
7964  {
7965  try
7966  {
7967  me->_exceptionListener->exceptionThrown(ex_);
7968  }
7969  catch (...)
7970  {
7971  ;
7972  }
7973  }
7974  if (deliveries == 0)
7975  {
7976  me->lastChance(message);
7977  }
7978  }
7979  me->checkAndSendHeartbeat();
7980  return AMPS_E_OK;
7981  }
7982 
7983  inline void
7984  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7985  {
7986  ClientImpl* me = (ClientImpl*) userData;
7987  //Client wrapper(me);
7988  // Go ahead and signal any waiters if they are around...
7989  me->clearAcks(failedConnectionVersion);
7990  }
7991 
7992  inline amps_result
7993  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7994  {
7995  ClientImpl* me = (ClientImpl*) userData;
7996  Lock<Mutex> l(me->_lock);
7997  Client wrapper(me, false);
7998  if (me->_connected)
7999  {
8000  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
8001  }
8002  while (true)
8003  {
8004  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
8005  bool retryInProgress = false;
8006  try
8007  {
8008  me->_connected = false;
8009  me->_lock.signalAll();
8010  // Have to release the lock here or receive thread can't
8011  // invoke the message handler.
8012  Unlock<Mutex> unlock(me->_lock);
8013  me->_disconnectHandler.invoke(wrapper);
8014  }
8015 #ifdef _WIN32
8016  catch (const RetryOperationException&)
8017 #else
8018  catch (const RetryOperationException& ex)
8019 #endif
8020  {
8021  retryInProgress = true;
8022  }
8023  catch (const std::exception& ex)
8024  {
8025  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
8026  }
8027  me->_lock.signalAll();
8028 
8029  if (!me->_connected)
8030  {
8031  if (retryInProgress)
8032  {
8033  AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException("Reconnect in progress."));
8034  }
8035  else
8036  {
8037  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
8038  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
8039  }
8040  return AMPS_E_DISCONNECTED;
8041  }
8042  try
8043  {
8044  // Resubscribe
8045  if (me->_subscriptionManager)
8046  {
8047  {
8048  // Have to release the lock here or receive thread can't
8049  // invoke the message handler.
8050  Unlock<Mutex> unlock(me->_lock);
8051  me->_subscriptionManager->resubscribe(wrapper);
8052  }
8053  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
8054  }
8055  return AMPS_E_OK;
8056  }
8057  catch (const AMPSException& subEx)
8058  {
8059  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8060  }
8061  catch (const std::exception& subEx)
8062  {
8063  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
8064  return AMPS_E_RETRY;
8065  }
8066  catch (...)
8067  {
8068  return AMPS_E_RETRY;
8069  }
8070  }
8071  return AMPS_E_RETRY;
8072  }
8073 
8074  inline const char*
8075  ClientImpl::ClientImplGetHttpPreflightMessage(void* userData_)
8076  {
8077  ClientImpl* me = (ClientImpl*)userData_;
8078  std::ostringstream os;
8079  // [transport]://[user[:password]@][host]:port[/path][?uri_params]
8080  // firstColon is after transport
8081  size_t firstColon = me->_lastUri.find(':');
8082  // pathEnd is start of uri_params or npos
8083  size_t pathEnd = me->_lastUri.find('?');
8084  // lastColon separates host and port, last before pathEnd
8085  size_t lastColon = me->_lastUri.rfind(':', pathEnd);
8086  // at ends user/password and precedes host
8087  size_t at = me->_lastUri.rfind('@', lastColon);
8088  // hostStart is either after at or following firstColon ://
8089  size_t hostStart = at == std::string::npos ? firstColon + 3 : at + 1;
8090  size_t hostLen = lastColon - hostStart;
8091  // pathStart follows port
8092  size_t pathStart = me->_lastUri.find('/', lastColon);
8093  size_t pathLen = pathEnd;
8094  if (pathEnd != std::string::npos)
8095  {
8096  pathLen = pathEnd - pathStart;
8097  }
8098  os << "GET " << me->_lastUri.substr(pathStart, pathLen)
8099  << " HTTP/1.1\r\nHost: " << me->_lastUri.substr(hostStart, hostLen)
8100  << "\r\nConnection: upgrade\r\nUpgrade: "
8101  << me->_lastUri.substr(0, firstColon) << "\r\n";
8102  for (auto header : me->_httpPreflightHeaders)
8103  {
8104  os << header << "\r\n";
8105  }
8106  os << "\r\n";
8107  me->_preflightMessage = os.str();
8108  return me->_preflightMessage.c_str();
8109  }
8110 
8111  class FIX
8112  {
8113  const char* _data;
8114  size_t _len;
8115  char _fieldSep;
8116  public:
8117  class iterator
8118  {
8119  const char* _data;
8120  size_t _len;
8121  size_t _pos;
8122  char _fieldSep;
8123  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
8124  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
8125  {
8126  while (_pos != _len && _data[_pos] == _fieldSep)
8127  {
8128  ++_pos;
8129  }
8130  }
8131  public:
8132  typedef void* difference_type;
8133  typedef std::forward_iterator_tag iterator_category;
8134  typedef std::pair<Message::Field, Message::Field> value_type;
8135  typedef value_type* pointer;
8136  typedef value_type& reference;
8137  bool operator==(const iterator& rhs) const
8138  {
8139  return _pos == rhs._pos;
8140  }
8141  bool operator!=(const iterator& rhs) const
8142  {
8143  return _pos != rhs._pos;
8144  }
8145  iterator& operator++()
8146  {
8147  // Skip through the data
8148  while (_pos != _len && _data[_pos] != _fieldSep)
8149  {
8150  ++_pos;
8151  }
8152  // Skip through any field separators
8153  while (_pos != _len && _data[_pos] == _fieldSep)
8154  {
8155  ++_pos;
8156  }
8157  return *this;
8158  }
8159 
8160  value_type operator*() const
8161  {
8162  value_type result;
8163  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
8164  for (; i < _len && _data[i] != '='; ++i)
8165  {
8166  ++keyLength;
8167  }
8168 
8169  result.first.assign(_data + _pos, keyLength);
8170 
8171  if (i < _len && _data[i] == '=')
8172  {
8173  ++i;
8174  valueStart = i;
8175  for (; i < _len && _data[i] != _fieldSep; ++i)
8176  {
8177  valueLength++;
8178  }
8179  }
8180  result.second.assign(_data + valueStart, valueLength);
8181  return result;
8182  }
8183 
8184  friend class FIX;
8185  };
8186  class reverse_iterator
8187  {
8188  const char* _data;
8189  size_t _len;
8190  const char* _pos;
8191  char _fieldSep;
8192  public:
8193  typedef std::pair<Message::Field, Message::Field> value_type;
8194  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
8195  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
8196  {
8197  if (_pos)
8198  {
8199  // skip past meaningless trailing fieldseps
8200  while (_pos >= _data && *_pos == _fieldSep)
8201  {
8202  --_pos;
8203  }
8204  while (_pos > _data && *_pos != _fieldSep)
8205  {
8206  --_pos;
8207  }
8208  // if we stopped before the 0th character, it's because
8209  // it's a field sep. advance one to point to the first character
8210  // of a key.
8211  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8212  {
8213  ++_pos;
8214  }
8215  if (_pos < _data)
8216  {
8217  _pos = 0;
8218  }
8219  }
8220  }
8221  bool operator==(const reverse_iterator& rhs) const
8222  {
8223  return _pos == rhs._pos;
8224  }
8225  bool operator!=(const reverse_iterator& rhs) const
8226  {
8227  return _pos != rhs._pos;
8228  }
8229  reverse_iterator& operator++()
8230  {
8231  if (_pos == _data)
8232  {
8233  _pos = 0;
8234  }
8235  else
8236  {
8237  // back up 1 to a field separator
8238  --_pos;
8239  // keep backing up through field separators
8240  while (_pos >= _data && *_pos == _fieldSep)
8241  {
8242  --_pos;
8243  }
8244  // now back up to the beginning of this field
8245  while (_pos > _data && *_pos != _fieldSep)
8246  {
8247  --_pos;
8248  }
8249  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8250  {
8251  ++_pos;
8252  }
8253  if (_pos < _data)
8254  {
8255  _pos = 0;
8256  }
8257  }
8258  return *this;
8259  }
8260  value_type operator*() const
8261  {
8262  value_type result;
8263  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8264  size_t i = (size_t)(_pos - _data);
8265  for (; i < _len && _data[i] != '='; ++i)
8266  {
8267  ++keyLength;
8268  }
8269  result.first.assign(_pos, keyLength);
8270  if (i < _len && _data[i] == '=')
8271  {
8272  ++i;
8273  valueStart = i;
8274  for (; i < _len && _data[i] != _fieldSep; ++i)
8275  {
8276  valueLength++;
8277  }
8278  }
8279  result.second.assign(_data + valueStart, valueLength);
8280  return result;
8281  }
8282  };
8283  FIX(const Message::Field& data, char fieldSeparator = 1)
8284  : _data(data.data()), _len(data.len()),
8285  _fieldSep(fieldSeparator)
8286  {
8287  }
8288 
8289  FIX(const char* data, size_t len, char fieldSeparator = 1)
8290  : _data(data), _len(len), _fieldSep(fieldSeparator)
8291  {
8292  }
8293 
8294  iterator begin() const
8295  {
8296  return iterator(_data, _len, 0, _fieldSep);
8297  }
8298  iterator end() const
8299  {
8300  return iterator(_data, _len, _len, _fieldSep);
8301  }
8302 
8303 
8304  reverse_iterator rbegin() const
8305  {
8306  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8307  }
8308 
8309  reverse_iterator rend() const
8310  {
8311  return reverse_iterator(_data, _len, 0, _fieldSep);
8312  }
8313  };
8314 
8315 
8328 
8329  template <class T>
8331  {
8332  std::stringstream _data;
8333  char _fs;
8334  public:
8340  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8341 
8349  void append(const T& tag, const char* value, size_t offset, size_t length)
8350  {
8351  _data << tag << '=';
8352  _data.write(value + offset, (std::streamsize)length);
8353  _data << _fs;
8354  }
8360  void append(const T& tag, const std::string& value)
8361  {
8362  _data << tag << '=' << value << _fs;
8363  }
8364 
8367  std::string getString() const
8368  {
8369  return _data.str();
8370  }
8371  operator std::string() const
8372  {
8373  return _data.str();
8374  }
8375 
8377  void reset()
8378  {
8379  _data.str(std::string());
8380  }
8381  };
8382 
8386 
8388 
8392 
8394 
8395 
8403 
8405  {
8406  char _fs;
8407  public:
8412  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8413 
8416  typedef std::map<Message::Field, Message::Field> map_type;
8417 
8423  map_type toMap(const Message::Field& data)
8424  {
8425  FIX fix(data, _fs);
8426  map_type retval;
8427  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8428  {
8429  retval.insert(*a);
8430  }
8431 
8432  return retval;
8433  }
8434  };
8435 
8436 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8437  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8438  {
8439  Mutex _lock;
8440  std::deque<Message> _q;
8441  std::deque<Message> _cache;
8442  std::string _commandId;
8443  std::string _subId;
8444  std::string _queryId;
8445  Client _client;
8446  unsigned _timeout;
8447  unsigned _maxDepth;
8448  unsigned _requestedAcks;
8449  size_t _cacheMax;
8450  Message::Field _previousTopic;
8451  Message::Field _previousBookmark;
8452  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8453 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8454  std::atomic<State> _state;
8455 #else
8456  volatile State _state;
8457 #endif
8458  typedef std::map<std::string, Message*> SOWKeyMap;
8459  SOWKeyMap _sowKeyMap;
8460  public:
8461  MessageStreamImpl(const Client& client_)
8462  : _client(client_),
8463  _timeout(0),
8464  _maxDepth((unsigned)~0),
8465  _requestedAcks(0),
8466  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8467  _state(Unset)
8468  {
8469  if (_client.isValid())
8470  {
8471  _client.addConnectionStateListener(this);
8472  }
8473  }
8474 
8475  MessageStreamImpl(ClientImpl* client_)
8476  : _client(client_),
8477  _timeout(0),
8478  _maxDepth((unsigned)~0),
8479  _requestedAcks(0),
8480  _state(Unset)
8481  {
8482  if (_client.isValid())
8483  {
8484  _client.addConnectionStateListener(this);
8485  }
8486  }
8487 
8488  ~MessageStreamImpl()
8489  {
8490  }
8491 
8492  virtual void destroy()
8493  {
8494  try
8495  {
8496  close();
8497  }
8498  catch (std::exception& e)
8499  {
8500  try
8501  {
8502  if (_client.isValid())
8503  {
8504  _client.getExceptionListener().exceptionThrown(e);
8505  }
8506  }
8507  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8508  }
8509  if (_client.isValid())
8510  {
8511  _client.removeConnectionStateListener(this);
8512  Client c = _client;
8513  _client = Client((ClientImpl*)NULL);
8514  c.deferredExecution(MessageStreamImpl::destroyer, this);
8515  }
8516  else
8517  {
8518  delete this;
8519  }
8520  }
8521 
8522  static void destroyer(void* vpMessageStreamImpl_)
8523  {
8524  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8525  }
8526 
8527  void setSubscription(const std::string& subId_,
8528  const std::string& commandId_ = "",
8529  const std::string& queryId_ = "")
8530  {
8531  Lock<Mutex> lock(_lock);
8532  _subId = subId_;
8533  if (!commandId_.empty() && commandId_ != subId_)
8534  {
8535  _commandId = commandId_;
8536  }
8537  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8538  {
8539  _queryId = queryId_;
8540  }
8541  // It's possible to disconnect between creation/registration and here.
8542  if (Disconnected == _state)
8543  {
8544  return;
8545  }
8546  assert(Unset == _state);
8547  _state = Subscribe;
8548  }
8549 
8550  void setSOWOnly(const std::string& commandId_,
8551  const std::string& queryId_ = "")
8552  {
8553  Lock<Mutex> lock(_lock);
8554  _commandId = commandId_;
8555  if (!queryId_.empty() && queryId_ != commandId_)
8556  {
8557  _queryId = queryId_;
8558  }
8559  // It's possible to disconnect between creation/registration and here.
8560  if (Disconnected == _state)
8561  {
8562  return;
8563  }
8564  assert(Unset == _state);
8565  _state = SOWOnly;
8566  }
8567 
8568  void setStatsOnly(const std::string& commandId_,
8569  const std::string& queryId_ = "")
8570  {
8571  Lock<Mutex> lock(_lock);
8572  _commandId = commandId_;
8573  if (!queryId_.empty() && queryId_ != commandId_)
8574  {
8575  _queryId = queryId_;
8576  }
8577  // It's possible to disconnect between creation/registration and here.
8578  if (Disconnected == _state)
8579  {
8580  return;
8581  }
8582  assert(Unset == _state);
8583  _state = AcksOnly;
8584  _requestedAcks = Message::AckType::Stats;
8585  }
8586 
8587  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8588  {
8589  Lock<Mutex> lock(_lock);
8590  _commandId = commandId_;
8591  // It's possible to disconnect between creation/registration and here.
8592  if (Disconnected == _state)
8593  {
8594  return;
8595  }
8596  assert(Unset == _state);
8597  _state = AcksOnly;
8598  _requestedAcks = acks_;
8599  }
8600 
8601  void connectionStateChanged(ConnectionStateListener::State state_)
8602  {
8603  Lock<Mutex> lock(_lock);
8604  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8605  {
8606  _state = Disconnected;
8607  close();
8608  }
8609  else if (state_ == AMPS::ConnectionStateListener::Connected
8610  && _commandId.empty()
8611  && _subId.empty()
8612  && _queryId.empty())
8613  {
8614  // AC-1331 Reconnect before command was sent, so Unset
8615  _state = Unset;
8616  }
8617  _lock.signalAll();
8618  }
8619 
8620  void timeout(unsigned timeout_)
8621  {
8622  _timeout = timeout_;
8623  }
8624  void conflate(void)
8625  {
8626  if (_state == Subscribe)
8627  {
8628  _state = Conflate;
8629  }
8630  }
8631  void maxDepth(unsigned maxDepth_)
8632  {
8633  if (maxDepth_)
8634  {
8635  _maxDepth = maxDepth_;
8636  }
8637  else
8638  {
8639  _maxDepth = (unsigned)~0;
8640  }
8641  }
8642  unsigned getMaxDepth(void) const
8643  {
8644  return _maxDepth;
8645  }
8646  unsigned getDepth(void) const
8647  {
8648  return (unsigned)(_q.size());
8649  }
8650 
8651  bool next(Message& current_)
8652  {
8653  Lock<Mutex> lock(_lock);
8654  if (!_previousTopic.empty() && !_previousBookmark.empty())
8655  {
8656  try
8657  {
8658  if (_client.isValid())
8659  {
8660  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8661  }
8662  }
8663 #ifdef _WIN32
8664  catch (AMPSException&)
8665 #else
8666  catch (AMPSException& e)
8667 #endif
8668  {
8669  current_.invalidate();
8670  _previousTopic.clear();
8671  _previousBookmark.clear();
8672  return false;
8673  }
8674  _previousTopic.clear();
8675  _previousBookmark.clear();
8676  }
8677  // Don't wait to wait more than 1s at a time
8678  long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8679  Timer timer((double)_timeout);
8680  timer.start();
8681  while (_q.empty() && _state & Running)
8682  {
8683  // Using timeout so python can interrupt
8684  _lock.wait(minWaitTime);
8685  {
8686  Unlock<Mutex> unlck(_lock);
8687  amps_invoke_waiting_function();
8688  }
8689  if (_timeout)
8690  {
8691  // In case we woke up early, see how much longer to wait
8692  if (timer.checkAndGetRemaining(&minWaitTime))
8693  {
8694  // No time left
8695  break;
8696  }
8697  // Adjust next wait time
8698  minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8699  }
8700  }
8701  if (current_.isValid() && _cache.size() < _cacheMax)
8702  {
8703  current_.reset();
8704  _cache.push_back(current_);
8705  }
8706  if (!_q.empty())
8707  {
8708  current_ = _q.front();
8709  if (_q.size() == _maxDepth)
8710  {
8711  _lock.signalAll();
8712  }
8713  _q.pop_front();
8714  if (_state == Conflate)
8715  {
8716  std::string sowKey = current_.getSowKey();
8717  if (sowKey.length())
8718  {
8719  _sowKeyMap.erase(sowKey);
8720  }
8721  }
8722  else if (_state == AcksOnly)
8723  {
8724  _requestedAcks &= ~(current_.getAckTypeEnum());
8725  }
8726  if ((_state == AcksOnly && _requestedAcks == 0) ||
8727  (_state == SOWOnly && current_.getCommand() == "group_end"))
8728  {
8729  _state = Closed;
8730  }
8731  else if (current_.isValid()
8732  && current_.getCommandEnum() == Message::Command::Publish
8733  && _client.isValid() && _client.getAutoAck()
8734  && !current_.getLeasePeriod().empty()
8735  && !current_.getBookmark().empty())
8736  {
8737  _previousTopic = current_.getTopic().deepCopy();
8738  _previousBookmark = current_.getBookmark().deepCopy();
8739  }
8740  return true;
8741  }
8742  if (_state == Disconnected)
8743  {
8744  throw DisconnectedException("Connection closed.");
8745  }
8746  current_.invalidate();
8747  if (_state == Closed)
8748  {
8749  return false;
8750  }
8751  return _timeout != 0;
8752  }
8753  void close(void)
8754  {
8755  if (_client.isValid())
8756  {
8757  if (_state == SOWOnly || _state == Subscribe) //not delete
8758  {
8759  if (!_commandId.empty())
8760  {
8761  _client.unsubscribe(_commandId);
8762  }
8763  if (!_subId.empty())
8764  {
8765  _client.unsubscribe(_subId);
8766  }
8767  if (!_queryId.empty())
8768  {
8769  _client.unsubscribe(_queryId);
8770  }
8771  }
8772  else
8773  {
8774  if (!_commandId.empty())
8775  {
8776  _client.removeMessageHandler(_commandId);
8777  }
8778  if (!_subId.empty())
8779  {
8780  _client.removeMessageHandler(_subId);
8781  }
8782  if (!_queryId.empty())
8783  {
8784  _client.removeMessageHandler(_queryId);
8785  }
8786  }
8787  }
8788  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8789  {
8790  _state = Closed;
8791  }
8792  }
8793  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8794  {
8795  Lock<Mutex> lock(this_->_lock);
8796  if (this_->_state != Conflate)
8797  {
8798  AMPS_TESTING_SLOW_MESSAGE_STREAM
8799  if (this_->_q.size() >= this_->_maxDepth)
8800  {
8801  // We throw here so that heartbeats can be sent. The exception
8802  // will be handled internally only, and the same Message will
8803  // come back to try again. Make sure to signal.
8804  this_->_lock.signalAll();
8805  throw MessageStreamFullException("Stream is currently full.");
8806  }
8807  if (!this_->_cache.empty())
8808  {
8809  this_->_cache.front().deepCopy(message_);
8810  this_->_q.push_back(this_->_cache.front());
8811  this_->_cache.pop_front();
8812  }
8813  else
8814  {
8815 #ifdef AMPS_USE_EMPLACE
8816  this_->_q.emplace_back(message_.deepCopy());
8817 #else
8818  this_->_q.push_back(message_.deepCopy());
8819 #endif
8820  }
8821  if (message_.getCommandEnum() == Message::Command::Publish &&
8822  this_->_client.isValid() && this_->_client.getAutoAck() &&
8823  !message_.getLeasePeriod().empty() &&
8824  !message_.getBookmark().empty())
8825  {
8826  message_.setIgnoreAutoAck();
8827  }
8828  }
8829  else
8830  {
8831  std::string sowKey = message_.getSowKey();
8832  if (sowKey.length())
8833  {
8834  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8835  if (it != this_->_sowKeyMap.end())
8836  {
8837  it->second->deepCopy(message_);
8838  }
8839  else
8840  {
8841  if (this_->_q.size() >= this_->_maxDepth)
8842  {
8843  // We throw here so that heartbeats can be sent. The
8844  // exception will be handled internally only, and the
8845  // same Message will come back to try again. Make sure
8846  // to signal.
8847  this_->_lock.signalAll();
8848  throw MessageStreamFullException("Stream is currently full.");
8849  }
8850  if (!this_->_cache.empty())
8851  {
8852  this_->_cache.front().deepCopy(message_);
8853  this_->_q.push_back(this_->_cache.front());
8854  this_->_cache.pop_front();
8855  }
8856  else
8857  {
8858 #ifdef AMPS_USE_EMPLACE
8859  this_->_q.emplace_back(message_.deepCopy());
8860 #else
8861  this_->_q.push_back(message_.deepCopy());
8862 #endif
8863  }
8864  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8865  }
8866  }
8867  else
8868  {
8869  if (this_->_q.size() >= this_->_maxDepth)
8870  {
8871  // We throw here so that heartbeats can be sent. The exception
8872  // will be handled internally only, and the same Message will
8873  // come back to try again. Make sure to signal.
8874  this_->_lock.signalAll();
8875  throw MessageStreamFullException("Stream is currently full.");
8876  }
8877  if (!this_->_cache.empty())
8878  {
8879  this_->_cache.front().deepCopy(message_);
8880  this_->_q.push_back(this_->_cache.front());
8881  this_->_cache.pop_front();
8882  }
8883  else
8884  {
8885 #ifdef AMPS_USE_EMPLACE
8886  this_->_q.emplace_back(message_.deepCopy());
8887 #else
8888  this_->_q.push_back(message_.deepCopy());
8889 #endif
8890  }
8891  if (message_.getCommandEnum() == Message::Command::Publish &&
8892  this_->_client.isValid() && this_->_client.getAutoAck() &&
8893  !message_.getLeasePeriod().empty() &&
8894  !message_.getBookmark().empty())
8895  {
8896  message_.setIgnoreAutoAck();
8897  }
8898  }
8899  }
8900  this_->_lock.signalAll();
8901  }
8902  };
8903  inline MessageStream::MessageStream(void)
8904  {
8905  }
8906  inline MessageStream::MessageStream(const Client& client_)
8907  : _body(new MessageStreamImpl(client_))
8908  {
8909  }
8910  inline MessageStream::MessageStream(RefHandle<MessageStreamImpl> body_)
8911  : _body(body_)
8912  {
8913  }
8914  inline void MessageStream::iterator::advance(void)
8915  {
8916  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8917  }
8918  inline MessageStream::operator MessageHandler(void)
8919  {
8920  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8921  }
8922  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8923  {
8924  MessageStream result;
8925  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8926  {
8927  result._body = (MessageStreamImpl*)(handler_._userData);
8928  }
8929  return result;
8930  }
8931 
8932  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8933  const std::string& queryId_)
8934  {
8935  _body->setSOWOnly(commandId_, queryId_);
8936  }
8937  inline void MessageStream::setSubscription(const std::string& subId_,
8938  const std::string& commandId_,
8939  const std::string& queryId_)
8940  {
8941  _body->setSubscription(subId_, commandId_, queryId_);
8942  }
8943  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8944  const std::string& queryId_)
8945  {
8946  _body->setStatsOnly(commandId_, queryId_);
8947  }
8948  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8949  unsigned acks_)
8950  {
8951  _body->setAcksOnly(commandId_, acks_);
8952  }
8953  inline MessageStream MessageStream::timeout(unsigned timeout_)
8954  {
8955  _body->timeout(timeout_);
8956  return *this;
8957  }
8959  {
8960  _body->conflate();
8961  return *this;
8962  }
8963  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8964  {
8965  _body->maxDepth(maxDepth_);
8966  return *this;
8967  }
8968  inline unsigned MessageStream::getMaxDepth(void) const
8969  {
8970  return _body->getMaxDepth();
8971  }
8972  inline unsigned MessageStream::getDepth(void) const
8973  {
8974  return _body->getDepth();
8975  }
8976 
8977  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8978  {
8979  return MessageStream(_pEmptyMessageStream.get()->_body);
8980  }
8981 
8983  {
8984  // If the command is sow and has a sub_id, OR
8985  // if the command has a replace option, return the existing
8986  // messagestream, don't create a new one.
8987  ClientImpl& body = _body.get();
8988  Message& message = command_.getMessage();
8989  Field subId = message.getSubscriptionId();
8990  unsigned ackTypes = message.getAckTypeEnum();
8991  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8992  if (useExistingHandler)
8993  {
8994  // Try to find the existing message handler.
8995  if (!subId.empty())
8996  {
8997  MessageHandler existingHandler;
8998  if (body._routes.getRoute(subId, existingHandler))
8999  {
9000  // we found an existing handler. It might not be a message stream, but that's okay.
9001  body.executeAsync(command_, existingHandler, false);
9002  return MessageStream::fromExistingHandler(existingHandler);
9003  }
9004  }
9005  // fall through; we'll a new handler altogether.
9006  }
9007  // Make sure something will be returned to the stream or use the empty one
9008  // Check that: it's a command that doesn't normally return data, and there
9009  // are no acks requested for the cmd id
9010  Message::Command::Type command = message.getCommandEnum();
9011  if ((command & Message::Command::NoDataCommands)
9012  && (ackTypes == Message::AckType::Persisted
9013  || ackTypes == Message::AckType::None))
9014  {
9015  executeAsync(command_, MessageHandler());
9016  if (!body._pEmptyMessageStream)
9017  {
9018  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
9019  body._pEmptyMessageStream.get()->_body->close();
9020  }
9021  return body.getEmptyMessageStream();
9022  }
9023  MessageStream stream(*this);
9024  if (body.getDefaultMaxDepth())
9025  {
9026  stream.maxDepth(body.getDefaultMaxDepth());
9027  }
9028  MessageHandler handler = stream.operator MessageHandler();
9029  std::string commandID = body.executeAsync(command_, handler, false);
9030  if (command_.hasStatsAck())
9031  {
9032  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
9033  }
9034  else if (command_.isSow())
9035  {
9036  if (command_.getAckTypeEnum() & Message::AckType::Completed)
9037  {
9038  stream.setAcksOnly(commandID,
9039  ackTypes);
9040  }
9041  else
9042  {
9043  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
9044  }
9045  }
9046  else if (command_.isSubscribe())
9047  {
9048  stream.setSubscription(commandID,
9049  command_.getMessage().getCommandId(),
9050  command_.getMessage().getQueryId());
9051  }
9052  else
9053  {
9054  // Persisted acks for writes don't come back with command id
9055  if (command == Message::Command::Publish ||
9056  command == Message::Command::DeltaPublish ||
9057  command == Message::Command::SOWDelete)
9058  {
9059  stream.setAcksOnly(commandID,
9060  ackTypes & (unsigned)~Message::AckType::Persisted);
9061  }
9062  else
9063  {
9064  stream.setAcksOnly(commandID, ackTypes);
9065  }
9066  }
9067  return stream;
9068  }
9069 
9070 // This is here because it uses api from Client.
9071  inline void Message::ack(const char* options_) const
9072  {
9073  ClientImpl* pClient = _body.get().clientImpl();
9074  Message::Field bookmark = getBookmark();
9075  if (pClient && bookmark.len() &&
9076  !pClient->getAutoAck())
9077  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
9078  {
9079  pClient->ack(getTopic(), bookmark, options_);
9080  }
9081  }
9082 }// end namespace AMPS
9083 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:748
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:699
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1538
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5184
AMPSDLL amps_result amps_client_set_http_preflight_callback(amps_handle client, amps_http_preflight_callback callback, void *userData)
Sets a user-supplied callback function for when a connection is established and the provided uri incl...
Field getUserId() const
Retrieves the value of the UserId header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1515
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6913
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6887
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:550
std::string getAckType() const
Definition: ampsplusplus.hpp:953
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Message & assignOwnershipBookmark(const Field &f)
Assigns the value of the Bookmark header for this Message without copying and makes this Message resp...
Definition: Message.hpp:1256
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5445
Message & assignTopic(const std::string &v)
Assigns the value of the Topic header for this Message without copying.
Definition: Message.hpp:1511
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8330
void startTimer()
Definition: ampsplusplus.hpp:6876
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6412
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8958
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1484
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5473
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1366
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1479
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:560
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1290
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:759
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7185
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:931
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7553
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5245
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5333
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6201
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1477
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:673
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7562
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7421
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6152
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5582
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1346
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5847
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5348
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1368
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:725
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:870
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5598
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:587
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7276
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:853
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7505
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5319
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6940
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:568
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1245
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1317
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8367
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7573
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5734
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5086
void addHttpPreflightHeader(const std::string &key_, const std::string &value_)
Adds a given key/value pair as an HTTP header line as "key: value" to the end of the headers that wil...
Definition: ampsplusplus.hpp:5280
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a Field which references the underlyi...
Definition: Message.hpp:1371
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:660
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7204
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:909
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7214
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5404
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1486
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6124
Field getFilter() const
Retrieves the value of the Filter header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1368
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1160
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7535
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8968
Message & assignUserId(const std::string &v)
Assigns the value of the UserId header for this Message without copying.
Definition: Message.hpp:1515
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5379
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5434
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1461
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6606
Success.
Definition: amps.h:221
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1350
Field getOptions() const
Retrieves the value of the Options header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1378
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1043
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5824
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8412
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6248
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:609
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5417
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:5079
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5635
Field getAckType() const
Retrieves the value of the AckType header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1192
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1257
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8982
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:656
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1511
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5590
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7587
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:958
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5308
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1505
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:829
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5896
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1250
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5237
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1105
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6560
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5167
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:7160
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:718
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7195
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1364
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7268
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:583
void clearHttpPreflightHeaders()
Clears all previously set HTTP header lines.
Definition: ampsplusplus.hpp:5286
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7283
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1484
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5683
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:666
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1515
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a Field which references the underlying buf...
Definition: Message.hpp:1511
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5546
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5959
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:575
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1374
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a Field which references the underlying...
Definition: Message.hpp:1364
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:7068
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1479
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1060
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1501
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1489
virtual void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Definition: ampsplusplus.hpp:5494
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6981
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:797
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1479
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Assigns the value of the Expiration header for this Message without copying.
Definition: Message.hpp:1367
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1271
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1055
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6064
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7224
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:592
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:810
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5915
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1365
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a Field which references the underlying bu...
Definition: Message.hpp:1486
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1037
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5256
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1257
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8340
void addHttpPreflightHeader(const std::string &header_)
Adds a given HTTP header line to the end of the headers that will be sent for the HTTP GET Upgrade re...
Definition: ampsplusplus.hpp:5271
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7433
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1050
Message & assignSubscriptionId(const std::string &v)
Assigns the value of the SubscriptionId header for this Message without copying.
Definition: Message.hpp:1489
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1091
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6450
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1191
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8349
Message & assignCommand(const std::string &v)
Assigns the value of the Command header for this Message without copying.
Definition: Message.hpp:1257
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1331
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5538
void setHttpPreflightHeaders(const T &headers_)
Sets the given HTTP header lines to be sent for the HTTP GET Upgrade request.
Definition: ampsplusplus.hpp:5295
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1288
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7494
Field getPassword() const
Retrieves the value of the Password header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1478
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1513
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1381
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1489
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1487
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5530
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1367
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:705
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:7061
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:600
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5558
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:7127
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8360
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7484
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5801
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6537
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:889
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5041
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6783
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1280
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:837
Message & assignAckType(const std::string &v)
Assigns the value of the AckType header for this Message without copying.
Definition: Message.hpp:1192
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5780
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7313
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:625
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6742
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5934
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6213
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7476
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6082
Message & assignCorrelationId(const std::string &v)
Assigns the value of the CorrelationId header for this Message without copying.
Definition: Message.hpp:1366
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1239
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6706
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:873
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:7045
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1363
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5613
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:738
DisconnectHandler getDisconnectHandler(void) const
Definition: ampsplusplus.hpp:5508
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8404
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1192
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5517
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1406
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7409
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1439
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8416
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:686
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1223
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8423
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6639
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7118
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:6017
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:5033
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1355
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:803
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7246
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:731
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message sowDelete(const std::string &topic_, const std::string &filter_, long timeout_=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6845
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1364
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:816
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7257
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1479
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:682
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1309
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8963
Message & assignVersion(const std::string &v)
Assigns the value of the Version header for this Message without copying.
Definition: Message.hpp:1514
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5386
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6286
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8972
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1479
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5263
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7094
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:857
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5355
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:895
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1259
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7235
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1256
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:712
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1222
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:7032
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8377
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5627
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6661
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7517
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:642
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1478
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7466
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5658
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5222
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:1006
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1301
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1411
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:782
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6307
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5707
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:7134
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5873
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:7002
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6046
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8953
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6348
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5229
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:472
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6498
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1255
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5985
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6173
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:612
Message & assignSequence(const std::string &v)
Assigns the value of the Sequence header for this Message without copying.
Definition: Message.hpp:1484
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7347
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:770
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6822
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5097
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7544
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7457
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6380