AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.5
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps/amps.h"
28 #include "amps/ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <regex>
39 #include <set>
40 #include <deque>
41 #include <vector>
42 #include <assert.h>
43 #ifndef _WIN32
44  #include <inttypes.h>
45 #endif
46 #if defined(sun)
47  #include <sys/atomic.h>
48 #endif
49 #include "amps/BookmarkStore.hpp"
50 #include "amps/MessageRouter.hpp"
51 #include "amps/util.hpp"
52 #include "amps/ampscrc.hpp"
53 #if __cplusplus >= 201100L || _MSC_VER >= 1900
54  #include <atomic>
55 #endif
56 
57 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
58  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
59 #endif
60 
65 
66 
73 
84 
85 // For StoreBuffer implementations
86 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
87 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
88 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
89 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
90 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
91 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
92 #define AMPS_DEFAULT_TOP_N -1
93 #define AMPS_DEFAULT_BATCH_SIZE 10
94 #define AMPS_NUMBER_BUFFER_LEN 20
95 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
96 
97 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
98  #define AMPS_X64 1
99 #endif
100 
101 static thread_local AMPS::Message publishStoreMessage;
102 
103 namespace AMPS
104 {
105 
106  typedef std::map<std::string, std::string> ConnectionInfo;
107 
108  template<class Type>
109  inline std::string asString(Type x_)
110  {
111  std::ostringstream os;
112  os << x_;
113  return os.str();
114  }
115 
116  inline
117  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
118  {
119  size_t pos = AMPS_NUMBER_BUFFER_LEN;
120  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
121  {
122  if (seqNo_ > 0)
123  {
124  buf_[--pos] = (char)(seqNo_ % 10 + '0');
125  seqNo_ /= 10;
126  }
127  }
128  return pos;
129  }
130 
131 #ifdef _WIN32
132  inline
133  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
134  {
135  size_t pos = AMPS_NUMBER_BUFFER_LEN;
136  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
137  {
138  if (seqNo_ > 0)
139  {
140  buf_[--pos] = (char)(seqNo_ % 10 + '0');
141  seqNo_ /= 10;
142  }
143  }
144  return pos;
145  }
146 #endif
147 
151  class Reason
152  {
153  public:
154  static const char* duplicate()
155  {
156  return "duplicate";
157  }
158  static const char* badFilter()
159  {
160  return "bad filter";
161  }
162  static const char* badRegexTopic()
163  {
164  return "bad regex topic";
165  }
166  static const char* subscriptionAlreadyExists()
167  {
168  return "subscription already exists";
169  }
170  static const char* nameInUse()
171  {
172  return "name in use";
173  }
174  static const char* authFailure()
175  {
176  return "auth failure";
177  }
178  static const char* notEntitled()
179  {
180  return "not entitled";
181  }
182  static const char* authDisabled()
183  {
184  return "authentication disabled";
185  }
186  static const char* subidInUse()
187  {
188  return "subid in use";
189  }
190  static const char* noTopic()
191  {
192  return "no topic";
193  }
194  };
195 
205  {
206  public:
207  virtual ~ExceptionListener() {;}
208  virtual void exceptionThrown(const std::exception&) const {;}
209  };
210 
212 
213 
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
215  try\
216  {\
217  x;\
218  }\
219  catch (std::exception& ex_)\
220  {\
221  try\
222  {\
223  _exceptionListener->exceptionThrown(ex_);\
224  }\
225  catch(...)\
226  {\
227  ;\
228  }\
229  }
230  /*
231  * Note : we don't attempt to trap non std::exception exceptions
232  * here because doing so interferes with pthread_exit on some OSes.
233  catch (...)\
234  {\
235  try\
236  {\
237  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
238  "An unhandled exception of unknown type was thrown by "\
239  "the registered handler.", AMPS_E_USAGE));\
240  }\
241  catch(...)\
242  {\
243  ;\
244  }\
245  }*/
246 #ifdef _WIN32
247 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
248  try\
249  {\
250  while(me->_connected)\
251  {\
252  try\
253  {\
254  x;\
255  break;\
256  }\
257  catch(MessageStreamFullException&)\
258  {\
259  try\
260  {\
261  me->checkAndSendHeartbeat(false);\
262  }\
263  catch (std::exception& ex_)\
264  {\
265  try\
266  {\
267  me->_exceptionListener->exceptionThrown(ex_);\
268  }\
269  catch(...)\
270  {\
271  ;\
272  }\
273  break;\
274  }\
275  }\
276  }\
277  }\
278  catch (std::exception& ex_)\
279  {\
280  try\
281  {\
282  me->_exceptionListener->exceptionThrown(ex_);\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }
289  /*
290  * Note : we don't attempt to trap non std::exception exceptions
291  * here because doing so interferes with pthread_exit on some OSes.
292  catch (...)\
293  {\
294  try\
295  {\
296  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
297  "An unhandled exception of unknown type was thrown by "\
298  "the registered handler.", AMPS_E_USAGE));\
299  }\
300  catch(...)\
301  {\
302  ;\
303  }\
304  }*/
305 
306 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException&)\
315  {\
316  try\
317  {\
318  me->checkAndSendHeartbeat(false);\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  break;\
331  }\
332  }\
333  }
334 #else
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
336  try\
337  {\
338  while(me->_connected)\
339  {\
340  try\
341  {\
342  x;\
343  break;\
344  }\
345  catch(MessageStreamFullException& ex_)\
346  {\
347  try\
348  {\
349  me->checkAndSendHeartbeat(false);\
350  }\
351  catch (std::exception& ex_)\
352  {\
353  try\
354  {\
355  me->_exceptionListener->exceptionThrown(ex_);\
356  }\
357  catch(...)\
358  {\
359  ;\
360  }\
361  break;\
362  }\
363  }\
364  }\
365  }\
366  catch (std::exception& ex_)\
367  {\
368  try\
369  {\
370  me->_exceptionListener->exceptionThrown(ex_);\
371  }\
372  catch(...)\
373  {\
374  ;\
375  }\
376  }
377  /*
378  * Note : we don't attempt to trap non std::exception exceptions
379  * here because doing so interferes with pthread_exit on some OSes.
380  catch (...)\
381  {\
382  try\
383  {\
384  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
385  "An unhandled exception of unknown type was thrown by "\
386  "the registered handler.", AMPS_E_USAGE));\
387  }\
388  catch(...)\
389  {\
390  ;\
391  }\
392  }*/
393 
394 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
395  while(me->_connected)\
396  {\
397  try\
398  {\
399  x;\
400  break;\
401  }\
402  catch(MessageStreamFullException& ex_)\
403  {\
404  try\
405  {\
406  me->checkAndSendHeartbeat(false);\
407  }\
408  catch (std::exception& ex_)\
409  {\
410  try\
411  {\
412  me->_exceptionListener->exceptionThrown(ex_);\
413  }\
414  catch(...)\
415  {\
416  ;\
417  }\
418  break;\
419  }\
420  }\
421  }
422 #endif
423 
424 #define AMPS_UNHANDLED_EXCEPTION(ex) \
425  try\
426  {\
427  _exceptionListener->exceptionThrown(ex);\
428  }\
429  catch(...)\
430  {\
431  ;\
432  }
433 
434 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
435  try\
436  {\
437  me->_exceptionListener->exceptionThrown(ex);\
438  }\
439  catch(...)\
440  {\
441  ;\
442  }
443 
444 
445  class Client;
446 
471 
472  class Command
473  {
474  Message _message;
475  unsigned _timeout;
476  unsigned _batchSize;
477  unsigned _flags;
478  static const unsigned Subscribe = 1;
479  static const unsigned SOW = 2;
480  static const unsigned NeedsSequenceNumber = 4;
481  static const unsigned ProcessedAck = 8;
482  static const unsigned StatsAck = 16;
483  void init(Message::Command::Type command_)
484  {
485  _timeout = 0;
486  _batchSize = 0;
487  _flags = 0;
488  _message.reset();
489  _message.setCommandEnum(command_);
490  _setIds();
491  }
492  void init(const std::string& command_)
493  {
494  _timeout = 0;
495  _batchSize = 0;
496  _flags = 0;
497  _message.reset();
498  _message.setCommand(command_);
499  _setIds();
500  }
501  void init(const char* command_, size_t commandLen_)
502  {
503  _timeout = 0;
504  _batchSize = 0;
505  _flags = 0;
506  _message.reset();
507  _message.setCommand(command_, commandLen_);
508  _setIds();
509  }
510  void _setIds(void)
511  {
512  Message::Command::Type command = _message.getCommandEnum();
513  if (!(command & Message::Command::NoDataCommands))
514  {
515  _message.newCommandId();
516  if (command == Message::Command::Subscribe ||
517  command == Message::Command::SOWAndSubscribe ||
518  command == Message::Command::DeltaSubscribe ||
519  command == Message::Command::SOWAndDeltaSubscribe)
520  {
521  _message.setSubscriptionId(_message.getCommandId());
522  _flags |= Subscribe;
523  }
524  if (command == Message::Command::SOW
525  || command == Message::Command::SOWAndSubscribe
526  || command == Message::Command::SOWAndDeltaSubscribe)
527  {
528  _message.setQueryID(_message.getCommandId());
529  if (_batchSize == 0)
530  {
531  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
532  }
533  if (command == Message::Command::SOW)
534  {
535  _flags |= SOW;
536  }
537  }
538  _flags |= ProcessedAck;
539  }
540  else if (command == Message::Command::SOWDelete)
541  {
542  _message.newCommandId();
543  _flags |= ProcessedAck;
544  _flags |= NeedsSequenceNumber;
545  }
546  else if (command == Message::Command::Publish
547  || command == Message::Command::DeltaPublish)
548  {
549  _flags |= NeedsSequenceNumber;
550  }
551  else if (command == Message::Command::StopTimer)
552  {
553  _message.newCommandId();
554  }
555  }
556  public:
560  Command(const std::string& command_)
561  {
562  init(command_);
563  }
568  Command(const char* command_, size_t commandLen_)
569  {
570  init(command_, commandLen_);
571  }
575  Command(Message::Command::Type command_)
576  {
577  init(command_);
578  }
579 
583  Command& reset(const std::string& command_)
584  {
585  init(command_);
586  return *this;
587  }
592  Command& reset(const char* command_, size_t commandLen_)
593  {
594  init(command_, commandLen_);
595  return *this;
596  }
600  Command& reset(Message::Command::Type command_)
601  {
602  init(command_);
603  return *this;
604  }
612  Command& setSowKey(const std::string& sowKey_)
613  {
614  _message.setSowKey(sowKey_);
615  return *this;
616  }
625  Command& setSowKey(const char* sowKey_, size_t sowKeyLen_)
626  {
627  _message.setSowKey(sowKey_, sowKeyLen_);
628  return *this;
629  }
642  Command& setSowKeys(const std::string& sowKeys_)
643  {
644  _message.setSowKeys(sowKeys_);
645  return *this;
646  }
660  Command& setSowKeys(const char* sowKeys_, size_t sowKeysLen_)
661  {
662  _message.setSowKeys(sowKeys_, sowKeysLen_);
663  return *this;
664  }
666  Command& setCommandId(const std::string& cmdId_)
667  {
668  _message.setCommandId(cmdId_);
669  return *this;
670  }
673  Command& setCommandId(const char* cmdId_, size_t cmdIdLen_)
674  {
675  _message.setCommandId(cmdId_, cmdIdLen_);
676  return *this;
677  }
679  Command& setTopic(const std::string& topic_)
680  {
681  _message.setTopic(topic_);
682  return *this;
683  }
686  Command& setTopic(const char* topic_, size_t topicLen_)
687  {
688  _message.setTopic(topic_, topicLen_);
689  return *this;
690  }
692  Command& setFilter(const std::string& filter_)
693  {
694  _message.setFilter(filter_);
695  return *this;
696  }
699  Command& setFilter(const char* filter_, size_t filterLen_)
700  {
701  _message.setFilter(filter_, filterLen_);
702  return *this;
703  }
705  Command& setOrderBy(const std::string& orderBy_)
706  {
707  _message.setOrderBy(orderBy_);
708  return *this;
709  }
712  Command& setOrderBy(const char* orderBy_, size_t orderByLen_)
713  {
714  _message.setOrderBy(orderBy_, orderByLen_);
715  return *this;
716  }
718  Command& setSubId(const std::string& subId_)
719  {
720  _message.setSubscriptionId(subId_);
721  return *this;
722  }
725  Command& setSubId(const char* subId_, size_t subIdLen_)
726  {
727  _message.setSubscriptionId(subId_, subIdLen_);
728  return *this;
729  }
731  Command& setQueryId(const std::string& queryId_)
732  {
733  _message.setQueryId(queryId_);
734  return *this;
735  }
738  Command& setQueryId(const char* queryId_, size_t queryIdLen_)
739  {
740  _message.setQueryId(queryId_, queryIdLen_);
741  return *this;
742  }
748  Command& setBookmark(const std::string& bookmark_)
749  {
750  _message.setBookmark(bookmark_);
751  return *this;
752  }
759  Command& setBookmark(const char* bookmark_, size_t bookmarkLen_)
760  {
761  _message.setBookmark(bookmark_, bookmarkLen_);
762  return *this;
763  }
770  Command& setCorrelationId(const std::string& correlationId_)
771  {
772  _message.setCorrelationId(correlationId_);
773  return *this;
774  }
782  Command& setCorrelationId(const char* correlationId_, size_t correlationIdLen_)
783  {
784  _message.setCorrelationId(correlationId_, correlationIdLen_);
785  return *this;
786  }
789  Command& setOptions(const std::string& options_)
790  {
791  _message.setOptions(options_);
792  return *this;
793  }
797  Command& setOptions(const char* options_, size_t optionsLen_)
798  {
799  _message.setOptions(options_, optionsLen_);
800  return *this;
801  }
803  Command& setSequence(const std::string& seq_)
804  {
805  _message.setSequence(seq_);
806  return *this;
807  }
810  Command& setSequence(const char* seq_, size_t seqLen_)
811  {
812  _message.setSequence(seq_, seqLen_);
813  return *this;
814  }
816  Command& setSequence(const amps_uint64_t seq_)
817  {
818  std::ostringstream os;
819  os << seq_;
820  _message.setSequence(os.str());
821  return *this;
822  }
823  amps_uint64_t getSequence() const
824  {
825  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
826  }
829  Command& setData(const std::string& data_)
830  {
831  _message.setData(data_);
832  return *this;
833  }
837  Command& setData(const char* data_, size_t dataLen_)
838  {
839  _message.setData(data_, dataLen_);
840  return *this;
841  }
851  Command& setTimeout(unsigned timeout_)
852  {
853  _timeout = timeout_;
854  return *this;
855  }
857  Command& setTopN(unsigned topN_)
858  {
859  _message.setTopNRecordsReturned(topN_);
860  return *this;
861  }
866  Command& setBatchSize(unsigned batchSize_)
867  {
868  _message.setBatchSize(batchSize_);
869  _batchSize = batchSize_;
870  return *this;
871  }
882  Command& setExpiration(unsigned expiration_)
883  {
884  _message.setExpiration(expiration_);
885  return *this;
886  }
888  Command& addAckType(const std::string& ackType_)
889  {
890  _message.setAckType(_message.getAckType() + "," + ackType_);
891  if (ackType_ == "processed")
892  {
893  _flags |= ProcessedAck;
894  }
895  else if (ackType_ == "stats")
896  {
897  _flags |= StatsAck;
898  }
899  return *this;
900  }
902  Command& setAckType(const std::string& ackType_)
903  {
904  _message.setAckType(ackType_);
905  if (ackType_.find("processed") != std::string::npos)
906  {
907  _flags |= ProcessedAck;
908  }
909  else
910  {
911  _flags &= ~ProcessedAck;
912  }
913  if (ackType_.find("stats") != std::string::npos)
914  {
915  _flags |= StatsAck;
916  }
917  else
918  {
919  _flags &= ~StatsAck;
920  }
921  return *this;
922  }
924  Command& setAckType(unsigned ackType_)
925  {
926  _message.setAckTypeEnum(ackType_);
927  if (ackType_ & Message::AckType::Processed)
928  {
929  _flags |= ProcessedAck;
930  }
931  else
932  {
933  _flags &= ~ProcessedAck;
934  }
935  if (ackType_ & Message::AckType::Stats)
936  {
937  _flags |= StatsAck;
938  }
939  else
940  {
941  _flags &= ~StatsAck;
942  }
943  return *this;
944  }
946  std::string getAckType() const
947  {
948  return (std::string)(_message.getAckType());
949  }
951  unsigned getAckTypeEnum() const
952  {
953  return _message.getAckTypeEnum();
954  }
955 
956  Message& getMessage(void)
957  {
958  return _message;
959  }
960  unsigned getTimeout(void) const
961  {
962  return _timeout;
963  }
964  unsigned getBatchSize(void) const
965  {
966  return _batchSize;
967  }
968  bool isSubscribe(void) const
969  {
970  return _flags & Subscribe;
971  }
972  bool isSow(void) const
973  {
974  return (_flags & SOW) != 0;
975  }
976  bool hasProcessedAck(void) const
977  {
978  return (_flags & ProcessedAck) != 0;
979  }
980  bool hasStatsAck(void) const
981  {
982  return (_flags & StatsAck) != 0;
983  }
984  bool needsSequenceNumber(void) const
985  {
986  return (_flags & NeedsSequenceNumber) != 0;
987  }
988  };
989 
992  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
993 
994  class Message;
996 
1000  {
1001  public:
1002  virtual ~Authenticator() {;}
1003 
1009  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
1017  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
1024  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
1025  };
1026 
1031  {
1032  public:
1033  virtual ~DefaultAuthenticator() {;}
1036  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
1037  {
1038  return password_;
1039  }
1040 
1043  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
1044  {
1045  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
1046  }
1047 
1048  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
1049 
1054  {
1055  static DefaultAuthenticator d;
1056  return d;
1057  }
1058  };
1059 
1063  {
1064  public:
1065 
1069  virtual void execute(Message& message_) = 0;
1070 
1071  virtual ~StoreReplayer() {;}
1072  };
1073 
1074  class Store;
1075 
1084  typedef bool (*PublishStoreResizeHandler)(Store store_,
1085  size_t size_,
1086  void* userData_);
1087 
1090  class StoreImpl : public RefBody
1091  {
1092  public:
1098  StoreImpl(bool errorOnPublishGap_ = false)
1099  : _resizeHandler(NULL)
1100  , _resizeHandlerData(NULL)
1101  , _errorOnPublishGap(errorOnPublishGap_)
1102  {;}
1103 
1108  virtual amps_uint64_t store(const Message& message_) = 0;
1109 
1116  virtual void discardUpTo(amps_uint64_t index_) = 0;
1117 
1122  virtual void replay(StoreReplayer& replayer_) = 0;
1123 
1131  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1132 
1137  virtual size_t unpersistedCount() const = 0;
1138 
1139  virtual ~StoreImpl() {;}
1140 
1149  virtual void flush(long timeout_) = 0;
1150 
1153  static inline size_t getUnsetPosition()
1154  {
1155  return AMPS_UNSET_INDEX;
1156  }
1157 
1160  static inline amps_uint64_t getUnsetSequence()
1161  {
1162  return AMPS_UNSET_SEQUENCE;
1163  }
1164 
1168  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1169 
1173  virtual amps_uint64_t getLastPersisted() = 0;
1174 
1184  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1185  void* userData_)
1186  {
1187  _resizeHandler = handler_;
1188  _resizeHandlerData = userData_;
1189  }
1190 
1191  inline virtual PublishStoreResizeHandler getResizeHandler() const
1192  {
1193  return _resizeHandler;
1194  }
1195 
1196  bool callResizeHandler(size_t newSize_);
1197 
1198  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
1199  {
1200  _errorOnPublishGap = errorOnPublishGap_;
1201  }
1202 
1203  inline virtual bool getErrorOnPublishGap() const
1204  {
1205  return _errorOnPublishGap;
1206  }
1207 
1208  private:
1209  PublishStoreResizeHandler _resizeHandler;
1210  void* _resizeHandlerData;
1211  bool _errorOnPublishGap;
1212  };
1213 
1216  class Store
1217  {
1218  RefHandle<StoreImpl> _body;
1219  public:
1220  Store() {;}
1221  Store(StoreImpl* body_) : _body(body_) {;}
1222  Store(const Store& rhs) : _body(rhs._body) {;}
1223  Store& operator=(const Store& rhs)
1224  {
1225  _body = rhs._body;
1226  return *this;
1227  }
1228 
1232  amps_uint64_t store(const Message& message_)
1233  {
1234  return _body.get().store(message_);
1235  }
1236 
1243  void discardUpTo(amps_uint64_t index_)
1244  {
1245  _body.get().discardUpTo(index_);
1246  }
1247 
1252  void replay(StoreReplayer& replayer_)
1253  {
1254  _body.get().replay(replayer_);
1255  }
1256 
1264  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1265  {
1266  return _body.get().replaySingle(replayer_, index_);
1267  }
1268 
1273  size_t unpersistedCount() const
1274  {
1275  return _body.get().unpersistedCount();
1276  }
1277 
1281  bool isValid() const
1282  {
1283  return _body.isValid();
1284  }
1285 
1294  void flush(long timeout_ = 0)
1295  {
1296  return _body.get().flush(timeout_);
1297  }
1298 
1302  amps_uint64_t getLowestUnpersisted()
1303  {
1304  return _body.get().getLowestUnpersisted();
1305  }
1306 
1310  amps_uint64_t getLastPersisted()
1311  {
1312  return _body.get().getLastPersisted();
1313  }
1314 
1324  void setResizeHandler(PublishStoreResizeHandler handler_,
1325  void* userData_)
1326  {
1327  _body.get().setResizeHandler(handler_, userData_);
1328  }
1329 
1330  PublishStoreResizeHandler getResizeHandler() const
1331  {
1332  return _body.get().getResizeHandler();
1333  }
1334 
1339  inline void setErrorOnPublishGap(bool errorOnPublishGap_)
1340  {
1341  _body.get().setErrorOnPublishGap(errorOnPublishGap_);
1342  }
1343 
1348  inline bool getErrorOnPublishGap() const
1349  {
1350  return _body.get().getErrorOnPublishGap();
1351  }
1352 
1356  StoreImpl* get()
1357  {
1358  if (_body.isValid())
1359  {
1360  return &_body.get();
1361  }
1362  else
1363  {
1364  return NULL;
1365  }
1366  }
1367 
1368  };
1369 
1375  {
1376  public:
1377  virtual ~FailedWriteHandler() {;}
1384  virtual void failedWrite(const Message& message_,
1385  const char* reason_, size_t reasonLength_) = 0;
1386  };
1387 
1388 
1389  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1390  {
1391  if (_resizeHandler)
1392  {
1393  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1394  }
1395  return true;
1396  }
1397 
1404  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1405  void* data_)
1406  {
1407  long* timeoutp = (long*)data_;
1408  size_t count = store_.unpersistedCount();
1409  if (count == 0)
1410  {
1411  return false;
1412  }
1413  try
1414  {
1415  store_.flush(*timeoutp);
1416  }
1417 #ifdef _WIN32
1418  catch (const TimedOutException&)
1419 #else
1420  catch (const TimedOutException& e)
1421 #endif
1422  {
1423  return true;
1424  }
1425  return (count == store_.unpersistedCount());
1426  }
1427 
1433  {
1434  public:
1446  virtual bool failure(const Message& message_, const MessageHandler& handler_,
1447  unsigned requestedAckTypes_,
1448  const AMPSException& exception_) = 0;
1449  };
1450 
1455  {
1456  public:
1457  virtual ~SubscriptionManager() {;}
1465  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1466  unsigned requestedAckTypes_) = 0;
1470  virtual void unsubscribe(const Message::Field& subId_) = 0;
1473  virtual void clear() = 0;
1477  virtual void resubscribe(Client& client_) = 0;
1482  virtual void setFailedResubscribeHandler(std::shared_ptr<FailedResubscribeHandler> handler_)
1483  {
1484  _failedResubscribeHandler = handler_;
1485  }
1486  protected:
1487  std::shared_ptr<FailedResubscribeHandler> _failedResubscribeHandler;
1488  };
1489 
1493 
1495  {
1496  public:
1498  typedef enum { Disconnected = 0,
1499  Shutdown = 1,
1500  Connected = 2,
1501  LoggedOn = 4,
1502  PublishReplayed = 8,
1503  HeartbeatInitiated = 16,
1504  Resubscribed = 32,
1505  UNKNOWN = 16384
1506  } State;
1507 
1517  virtual void connectionStateChanged(State newState_) = 0;
1518  virtual ~ConnectionStateListener() {;}
1519  };
1520 
1521 
1522  class MessageStreamImpl;
1523  class MessageStream;
1524 
1525  typedef void(*DeferredExecutionFunc)(void*);
1526 
1527  class ClientImpl : public RefBody // -V553
1528  {
1529  // Class to wrap turning of Nagle for things like flush and logon
1530  class NoDelay
1531  {
1532  private:
1533  AMPS_SOCKET _socket;
1534  int _noDelay;
1535  char* _valuePtr;
1536 #ifdef _WIN32
1537  int _valueLen;
1538 #else
1539  socklen_t _valueLen;
1540 #endif
1541  public:
1542  NoDelay(amps_handle client_)
1543  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1544  {
1545  _valuePtr = (char*)&_noDelay;
1546  _socket = amps_client_get_socket(client_);
1547  if (_socket != AMPS_INVALID_SOCKET)
1548  {
1549  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1550  if (!_noDelay)
1551  {
1552  _noDelay = 1;
1553  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1554  }
1555  else
1556  {
1557  _socket = AMPS_INVALID_SOCKET;
1558  }
1559  }
1560  }
1561 
1562  ~NoDelay()
1563  {
1564  if (_socket != AMPS_INVALID_SOCKET)
1565  {
1566  _noDelay = 0;
1567  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1568  }
1569  }
1570  };
1571 
1572  friend class Client;
1573  protected:
1574  amps_handle _client;
1575  DisconnectHandler _disconnectHandler;
1576  enum GlobalCommandTypeHandlers : size_t
1577  {
1578  Publish = 0,
1579  SOW = 1,
1580  GroupBegin = 2,
1581  GroupEnd = 3,
1582  Heartbeat = 4,
1583  OOF = 5,
1584  Ack = 6,
1585  LastChance = 7,
1586  DuplicateMessage = 8,
1587  COUNT = 9
1588  };
1589  std::vector<MessageHandler> _globalCommandTypeHandlers;
1590  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1591  MessageRouter _routes;
1592  MessageRouter::RouteCache _routeCache;
1593  mutable Mutex _lock;
1594  std::string _name, _nameHash, _lastUri, _logonCorrelationData, _preflightMessage;
1595  std::vector<std::string> _httpPreflightHeaders;
1596  amps_uint64_t _nameHashValue;
1597  BookmarkStore _bookmarkStore;
1598  Store _publishStore;
1599  bool _isRetryOnDisconnect;
1600  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1601 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1602  std::atomic<amps_uint64_t> _lastSentHaSequenceNumber;
1603 #else
1604  volatile amps_uint64_t _lastSentHaSequenceNumber;
1605 #endif
1606  AMPS_ATOMIC_TYPE_8 _logonInProgress;
1607  AMPS_ATOMIC_TYPE_8 _badTimeToHASubscribe;
1608  VersionInfo _serverVersion;
1609  Timer _heartbeatTimer;
1610  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1611 
1612  // queue data
1613  int _queueAckTimeout;
1614  bool _isAutoAckEnabled;
1615  unsigned _ackBatchSize;
1616  unsigned _queuedAckCount;
1617  unsigned _defaultMaxDepth;
1618  struct QueueBookmarks
1619  {
1620  QueueBookmarks(const std::string& topic_)
1621  : _topic(topic_)
1622  , _oldestTime(0)
1623  , _bookmarkCount(0)
1624  {;}
1625  std::string _topic;
1626  std::string _data;
1627  amps_uint64_t _oldestTime;
1628  unsigned _bookmarkCount;
1629  };
1630  typedef amps_uint64_t topic_hash;
1631  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1632  TopicHashMap _topicHashMap;
1633 
1634  class ClientStoreReplayer : public StoreReplayer
1635  {
1636  ClientImpl* _client;
1637  public:
1638  unsigned _version;
1639  amps_result _res;
1640 
1641  ClientStoreReplayer()
1642  : _client(NULL), _version(0), _res(AMPS_E_OK)
1643  {}
1644 
1645  ClientStoreReplayer(ClientImpl* client_)
1646  : _client(client_), _version(0), _res(AMPS_E_OK)
1647  {}
1648 
1649  void setClient(ClientImpl* client_)
1650  {
1651  _client = client_;
1652  }
1653 
1654  void execute(Message& message_)
1655  {
1656  if (!_client)
1657  {
1658  throw CommandException("Can't replay without a client.");
1659  }
1660  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1661  AMPS_Sequence);
1662  if (index > _client->_lastSentHaSequenceNumber)
1663  {
1664  _client->_lastSentHaSequenceNumber = index;
1665  }
1666 
1667  _res = AMPS_E_OK;
1668  // Don't replay a queue cancel message after a reconnect.
1669  // Currently, the only messages that will have anything in options
1670  // are cancel messages.
1671  if (!message_.getCommand().empty() &&
1672  (!_client->_logonInProgress ||
1673  message_.getOptions().len() < 6))
1674  {
1675  _res = amps_client_send_with_version(_client->_client,
1676  message_.getMessage(),
1677  &_version);
1678  if (_res != AMPS_E_OK)
1679  {
1680  throw DisconnectedException("AMPS Server disconnected during replay");
1681  }
1682  }
1683  }
1684 
1685  };
1686  ClientStoreReplayer _replayer;
1687 
1688  class FailedWriteStoreReplayer : public StoreReplayer
1689  {
1690  ClientImpl* _parent;
1691  const char* _reason;
1692  size_t _reasonLength;
1693  size_t _replayCount;
1694  public:
1695  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1696  : _parent(parent),
1697  _reason(reason_),
1698  _reasonLength(reasonLength_),
1699  _replayCount(0)
1700  {;}
1701  void execute(Message& message_)
1702  {
1703  if (_parent->_failedWriteHandler)
1704  {
1705  ++_replayCount;
1706  _parent->_failedWriteHandler->failedWrite(message_,
1707  _reason, _reasonLength);
1708  }
1709  }
1710  size_t replayCount(void) const
1711  {
1712  return _replayCount;
1713  }
1714  };
1715 
1716  struct AckResponseImpl : public RefBody
1717  {
1718  std::string username, password, reason, status, bookmark, options;
1719  amps_uint64_t sequenceNo;
1720  amps_uint64_t nameHashValue;
1721  VersionInfo serverVersion;
1722 #if __cplusplus >= 201100L || _MSC_VER >= 1900
1723  std::atomic<bool> responded;
1724  std::atomic<bool> abandoned;
1725 #else
1726  volatile bool responded;
1727  volatile bool abandoned;
1728 #endif
1729  unsigned connectionVersion;
1730  AckResponseImpl() :
1731  RefBody(),
1732  username(), password(), reason(), status(), bookmark(), options(),
1733  sequenceNo((amps_uint64_t)0),
1734  serverVersion(),
1735  responded(false),
1736  abandoned(false),
1737  connectionVersion(UINT_MAX) // Don't abandon if unsent AC-1329
1738  {
1739  }
1740  };
1741 
1742  class AckResponse
1743  {
1744  RefHandle<AckResponseImpl> _body;
1745  public:
1746  AckResponse() : _body(NULL) {;}
1747  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1748  static AckResponse create()
1749  {
1750  AckResponse r;
1751  r._body = new AckResponseImpl();
1752  return r;
1753  }
1754 
1755  const std::string& username()
1756  {
1757  return _body.get().username;
1758  }
1759  void setUsername(const char* data_, size_t len_)
1760  {
1761  if (data_)
1762  {
1763  _body.get().username.assign(data_, len_);
1764  }
1765  else
1766  {
1767  _body.get().username.clear();
1768  }
1769  }
1770  const std::string& password()
1771  {
1772  return _body.get().password;
1773  }
1774  void setPassword(const char* data_, size_t len_)
1775  {
1776  if (data_)
1777  {
1778  _body.get().password.assign(data_, len_);
1779  }
1780  else
1781  {
1782  _body.get().password.clear();
1783  }
1784  }
1785  const std::string& reason()
1786  {
1787  return _body.get().reason;
1788  }
1789  void setReason(const char* data_, size_t len_)
1790  {
1791  if (data_)
1792  {
1793  _body.get().reason.assign(data_, len_);
1794  }
1795  else
1796  {
1797  _body.get().reason.clear();
1798  }
1799  }
1800  const std::string& status()
1801  {
1802  return _body.get().status;
1803  }
1804  void setStatus(const char* data_, size_t len_)
1805  {
1806  if (data_)
1807  {
1808  _body.get().status.assign(data_, len_);
1809  }
1810  else
1811  {
1812  _body.get().status.clear();
1813  }
1814  }
1815  const std::string& bookmark()
1816  {
1817  return _body.get().bookmark;
1818  }
1819  void setBookmark(const Field& bookmark_)
1820  {
1821  if (!bookmark_.empty())
1822  {
1823  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1824  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1825  _body.get().sequenceNo);
1826  }
1827  else
1828  {
1829  _body.get().bookmark.clear();
1830  _body.get().sequenceNo = (amps_uint64_t)0;
1831  _body.get().nameHashValue = (amps_uint64_t)0;
1832  }
1833  }
1834  amps_uint64_t sequenceNo() const
1835  {
1836  return _body.get().sequenceNo;
1837  }
1838  amps_uint64_t nameHashValue() const
1839  {
1840  return _body.get().nameHashValue;
1841  }
1842  void setSequenceNo(const char* data_, size_t len_)
1843  {
1844  amps_uint64_t result = (amps_uint64_t)0;
1845  if (data_)
1846  {
1847  for (size_t i = 0; i < len_; ++i)
1848  {
1849  result *= (amps_uint64_t)10;
1850  result += (amps_uint64_t)(data_[i] - '0');
1851  }
1852  }
1853  _body.get().sequenceNo = result;
1854  }
1855  VersionInfo serverVersion() const
1856  {
1857  return _body.get().serverVersion;
1858  }
1859  void setServerVersion(const char* data_, size_t len_)
1860  {
1861  if (data_)
1862  {
1863  _body.get().serverVersion.setVersion(std::string(data_, len_));
1864  }
1865  }
1866  bool responded()
1867  {
1868  return _body.get().responded;
1869  }
1870  void setResponded()
1871  {
1872  _body.get().responded = true;
1873  }
1874  bool abandoned()
1875  {
1876  return _body.get().abandoned;
1877  }
1878  void setAbandoned()
1879  {
1880  if (_body.isValid())
1881  {
1882  _body.get().abandoned = true;
1883  }
1884  }
1885 
1886  void setConnectionVersion(unsigned connectionVersion)
1887  {
1888  _body.get().connectionVersion = connectionVersion;
1889  }
1890 
1891  unsigned getConnectionVersion()
1892  {
1893  return _body.get().connectionVersion;
1894  }
1895  void setOptions(const char* data_, size_t len_)
1896  {
1897  if (data_)
1898  {
1899  _body.get().options.assign(data_, len_);
1900  }
1901  else
1902  {
1903  _body.get().options.clear();
1904  }
1905  }
1906 
1907  const std::string& options()
1908  {
1909  return _body.get().options;
1910  }
1911 
1912  AckResponse& operator=(const AckResponse& rhs)
1913  {
1914  _body = rhs._body;
1915  return *this;
1916  }
1917  };
1918 
1919 
1920  typedef std::map<std::string, AckResponse> AckMap;
1921  AckMap _ackMap;
1922  Mutex _ackMapLock;
1923  DefaultExceptionListener _defaultExceptionListener;
1924  protected:
1925 
1926  struct DeferredExecutionRequest
1927  {
1928  DeferredExecutionRequest(DeferredExecutionFunc func_,
1929  void* userData_)
1930  : _func(func_),
1931  _userData(userData_)
1932  {;}
1933 
1934  DeferredExecutionFunc _func;
1935  void* _userData;
1936  };
1937  const ExceptionListener* _exceptionListener;
1938  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1939  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1940  volatile bool _connected;
1941  std::string _username;
1942  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1943  ConnectionStateListeners _connectionStateListeners;
1944  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1945  Mutex _deferredExecutionLock;
1946  DeferredExecutionList _deferredExecutionList;
1947  unsigned _heartbeatInterval;
1948  unsigned _readTimeout;
1949 
1950  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1951  {
1952  // If we disconnected before we got to notification, don't notify.
1953  // This should only be able to happen for Resubscribed, since the lock
1954  // is released to let the subscription manager run resubscribe so a
1955  // disconnect could be called before the change is broadcast.
1956  if (!_connected && newState_ > ConnectionStateListener::Connected)
1957  {
1958  return;
1959  }
1960  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1961  {
1962  AMPS_CALL_EXCEPTION_WRAPPER(
1963  (*it)->connectionStateChanged(newState_));
1964  }
1965  }
1966  unsigned processedAck(Message& message);
1967  unsigned persistedAck(Message& meesage);
1968  void lastChance(Message& message);
1969  void checkAndSendHeartbeat(bool force = false);
1970  virtual ConnectionInfo getConnectionInfo() const;
1971  static amps_result
1972  ClientImplMessageHandler(amps_handle message, void* userData);
1973  static void
1974  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1975  static amps_result
1976  ClientImplDisconnectHandler(amps_handle client, void* userData);
1977  static const char*
1978  ClientImplGetHttpPreflightMessage(void* userData);
1979 
1980  void unsubscribeInternal(const std::string& id)
1981  {
1982  if (id.empty())
1983  {
1984  return;
1985  }
1986  // remove the handler first to avoid any more message delivery
1987  Message::Field subId;
1988  subId.assign(id.data(), id.length());
1989  _routes.removeRoute(subId);
1990  // Lock is already acquired
1991  if (_subscriptionManager)
1992  {
1993  // Have to unlock before calling into sub manager to avoid deadlock
1994  Unlock<Mutex> unlock(_lock);
1995  _subscriptionManager->unsubscribe(subId);
1996  }
1997  _message.reset();
1998  _message.setCommandEnum(Message::Command::Unsubscribe);
1999  _message.newCommandId();
2000  _message.setSubscriptionId(id);
2001  _sendWithoutRetry(_message);
2002  deferredExecution(&amps_noOpFn, NULL);
2003  }
2004 
2005  AckResponse syncAckProcessing(long timeout_, Message& message_,
2006  bool isHASubscribe_)
2007  {
2008  return syncAckProcessing(timeout_, message_,
2009  (amps_uint64_t)0, isHASubscribe_);
2010  }
2011 
2012  AckResponse syncAckProcessing(long timeout_, Message& message_,
2013  amps_uint64_t haSeq = (amps_uint64_t)0,
2014  bool isHASubscribe_ = false)
2015  {
2016  // inv: we already have _lock locked up.
2017  AckResponse ack = AckResponse::create();
2018  if (1)
2019  {
2020  Lock<Mutex> guard(_ackMapLock);
2021  _ackMap[message_.getCommandId()] = ack;
2022  }
2023  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
2024  if (ack.getConnectionVersion() == 0)
2025  {
2026  // Send failed
2027  throw DisconnectedException("Connection closed while waiting for response.");
2028  }
2029  bool timedOut = false;
2030  AMPS_START_TIMER(timeout_)
2031  while (!timedOut && !ack.responded() && !ack.abandoned())
2032  {
2033  if (timeout_)
2034  {
2035  timedOut = !_lock.wait(timeout_);
2036  // May have woken up early, check real time
2037  if (timedOut)
2038  {
2039  AMPS_RESET_TIMER(timedOut, timeout_);
2040  }
2041  }
2042  else
2043  {
2044  // Using a timeout version to ensure python can interrupt
2045  _lock.wait(1000);
2046  Unlock<Mutex> unlck(_lock);
2047  amps_invoke_waiting_function();
2048  }
2049  }
2050  if (ack.responded())
2051  {
2052  if (ack.status() != "failure")
2053  {
2054  if (message_.getCommand() == "logon")
2055  {
2056  amps_uint64_t ackSequence = ack.sequenceNo();
2057  if (_lastSentHaSequenceNumber < ackSequence)
2058  {
2059  _lastSentHaSequenceNumber = ackSequence;
2060  }
2061  if (_publishStore.isValid())
2062  {
2063  // If this throws, logon will fail and eitehr be
2064  // handled in HAClient/ServerChooser or by the caller
2065  // of logon.
2066  _publishStore.discardUpTo(ackSequence);
2067  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
2068  {
2069  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
2070  }
2071  }
2072  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
2073  _nameHashValue = ack.nameHashValue();
2074  _serverVersion = ack.serverVersion();
2075  if (_bookmarkStore.isValid())
2076  {
2077  _bookmarkStore.setServerVersion(_serverVersion);
2078  }
2079  }
2080  if (_ackBatchSize)
2081  {
2082  const std::string& options = ack.options();
2083  size_t index = options.find_first_of("max_backlog=");
2084  if (index != std::string::npos)
2085  {
2086  unsigned data = 0;
2087  const char* c = options.c_str() + index + 12;
2088  while (*c && *c != ',')
2089  {
2090  data = (data * 10) + (unsigned)(*c++ -48);
2091  }
2092  if (_ackBatchSize > data)
2093  {
2094  _ackBatchSize = data;
2095  }
2096  }
2097  }
2098  return ack;
2099  }
2100  const size_t NotEntitled = 12;
2101  std::string ackReason = ack.reason();
2102  if (ackReason.length() == 0)
2103  {
2104  return ack; // none
2105  }
2106  if (ackReason.length() == NotEntitled &&
2107  ackReason[0] == 'n' &&
2108  message_.getUserId().len() == 0)
2109  {
2110  message_.assignUserId(_username);
2111  }
2112  message_.throwFor(_client, ackReason);
2113  }
2114  else // !ack.responded()
2115  {
2116  if (!ack.abandoned())
2117  {
2118  throw TimedOutException("timed out waiting for operation.");
2119  }
2120  else
2121  {
2122  throw DisconnectedException("Connection closed while waiting for response.");
2123  }
2124  }
2125  return ack;
2126  }
2127 
2128  void _cleanup(void)
2129  {
2130  if (!_client)
2131  {
2132  return;
2133  }
2134  amps_client_set_predisconnect_handler(_client, NULL, 0L);
2135  amps_client_set_disconnect_handler(_client, NULL, 0L);
2136  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
2137  _pEmptyMessageStream.reset(NULL);
2138  amps_client_destroy(_client);
2139  _client = NULL;
2140  }
2141 
2142  public:
2143 
2144  ClientImpl(const std::string& clientName)
2145  : _client(NULL), _name(clientName)
2146  , _isRetryOnDisconnect(true)
2147  , _lastSentHaSequenceNumber((amps_uint64_t)0), _logonInProgress(0)
2148  , _badTimeToHASubscribe(0), _serverVersion()
2149  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
2150  , _isAutoAckEnabled(false)
2151  , _ackBatchSize(0)
2152  , _queuedAckCount(0)
2153  , _defaultMaxDepth(0)
2154  , _connected(false)
2155  , _heartbeatInterval(0)
2156  , _readTimeout(0)
2157  {
2158  _replayer.setClient(this);
2159  _client = amps_client_create(clientName.c_str());
2161  (amps_handler)ClientImpl::ClientImplMessageHandler,
2162  this);
2164  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
2165  this);
2167  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
2168  this);
2170  ClientImpl::ClientImplGetHttpPreflightMessage,
2171  this);
2172  _exceptionListener = &_defaultExceptionListener;
2173  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
2174  {
2175 #ifdef AMPS_USE_EMPLACE
2176  _globalCommandTypeHandlers.emplace_back(MessageHandler());
2177 #else
2178  _globalCommandTypeHandlers.push_back(MessageHandler());
2179 #endif
2180  }
2181  }
2182 
2183  virtual ~ClientImpl()
2184  {
2185  _cleanup();
2186  }
2187 
2188  const std::string& getName() const
2189  {
2190  return _name;
2191  }
2192 
2193  const std::string& getNameHash() const
2194  {
2195  return _nameHash;
2196  }
2197 
2198  const amps_uint64_t getNameHashValue() const
2199  {
2200  return _nameHashValue;
2201  }
2202 
2203  void setName(const std::string& name)
2204  {
2205  // This operation will fail if the client's
2206  // name is already set.
2207  amps_result result = amps_client_set_name(_client, name.c_str());
2208  if (result != AMPS_E_OK)
2209  {
2210  AMPSException::throwFor(_client, result);
2211  }
2212  _name = name;
2213  }
2214 
2215  const std::string& getLogonCorrelationData() const
2216  {
2217  return _logonCorrelationData;
2218  }
2219 
2220  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2221  {
2222  _logonCorrelationData = logonCorrelationData_;
2223  }
2224 
2225  size_t getServerVersion() const
2226  {
2227  return _serverVersion.getOldStyleVersion();
2228  }
2229 
2230  VersionInfo getServerVersionInfo() const
2231  {
2232  return _serverVersion;
2233  }
2234 
2235  const std::string& getURI() const
2236  {
2237  return _lastUri;
2238  }
2239 
2240  virtual void connect(const std::string& uri)
2241  {
2242  Lock<Mutex> l(_lock);
2243  _connect(uri);
2244  }
2245 
2246  virtual void _connect(const std::string& uri)
2247  {
2248  _lastUri = uri;
2249  amps_result result = amps_client_connect(_client, uri.c_str());
2250  if (result != AMPS_E_OK)
2251  {
2252  AMPSException::throwFor(_client, result);
2253  }
2254  _message.reset();
2255  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2256  _publishMessage.setCommandEnum(Message::Command::Publish);
2257  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2258  _beatMessage.setOptions("beat");
2259  _readMessage.setClientImpl(this);
2260  if (_queueAckTimeout)
2261  {
2262  result = amps_client_set_idle_time(_client, _queueAckTimeout);
2263  if (result != AMPS_E_OK)
2264  {
2265  AMPSException::throwFor(_client, result);
2266  }
2267  }
2268  _connected = true;
2269  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2270  }
2271 
2272  void addHttpPreflightHeader(const std::string& header_)
2273  {
2274  _httpPreflightHeaders.push_back(header_);
2275  }
2276 
2277  void addHttpPreflightHeader(const std::string& key_, const std::string& value_)
2278  {
2279  _httpPreflightHeaders.push_back(key_ + std::string(": ") + value_);
2280  }
2281 
2282  void clearHttpPreflightHeaders()
2283  {
2284  _httpPreflightHeaders.clear();
2285  }
2286 
2287  template<class T>
2288  void setHttpPreflightHeaders(const T& headers_)
2289  {
2290  _httpPreflightHeaders.clear();
2291  for (typename T::const_iterator i = headers_.begin(); i != headers_.end(); ++i)
2292  {
2293  _httpPreflightHeaders.push_back(*i);
2294  }
2295  }
2296 
2297  void setDisconnected()
2298  {
2299  {
2300  Lock<Mutex> l(_lock);
2301  if (_connected)
2302  {
2303  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2304  }
2305  _connected = false;
2306  _heartbeatTimer.setTimeout(0.0);
2307  // AC-1189 AC-1329 AC-1337 We need acks cleared while lock is held,
2308  // but not for unsent commands.
2309  clearAcks(UINT_MAX-1);
2310  }
2311  amps_client_disconnect(_client);
2312  _routes.clear();
2313  }
2314 
2315  virtual void disconnect()
2316  {
2317  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2318  setDisconnected();
2319  // Abandon all acks, sent and unsent
2320  clearAcks(UINT_MAX);
2321  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2322  Lock<Mutex> l(_lock);
2323  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2324  }
2325 
2326  void clearAcks(unsigned failedVersion)
2327  {
2328  // Have to lock to prevent race conditions
2329  Lock<Mutex> guard(_ackMapLock);
2330  {
2331  // Go ahead and signal any waiters if they are around...
2332  std::vector<std::string> worklist;
2333  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2334  {
2335  if (i->second.getConnectionVersion() <= failedVersion)
2336  {
2337  i->second.setAbandoned();
2338  worklist.push_back(i->first);
2339  }
2340  }
2341 
2342  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2343  {
2344  _ackMap.erase(*j);
2345  }
2346  }
2347 
2348  _lock.signalAll();
2349  }
2350 
2351  int send(const Message& message)
2352  {
2353  Lock<Mutex> l(_lock);
2354  return _send(message);
2355  }
2356 
2357  void sendWithoutRetry(const Message& message_)
2358  {
2359  Lock<Mutex> l(_lock);
2360  // If we got here while logon was in progress, then we tried to send
2361  // while we were disconnected so throw DisconnectedException
2362  if (_logonInProgress)
2363  {
2364  throw DisconnectedException("The client has been disconnected.");
2365  }
2366  _sendWithoutRetry(message_);
2367  }
2368 
2369  void _sendWithoutRetry(const Message& message_)
2370  {
2371  amps_result result = amps_client_send(_client, message_.getMessage());
2372  if (result != AMPS_E_OK)
2373  {
2374  AMPSException::throwFor(_client, result);
2375  }
2376  }
2377 
2378  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2379  bool isHASubscribe_ = false)
2380  {
2381  // Lock is already acquired
2382  amps_result result = AMPS_E_RETRY;
2383 
2384  // Create a local reference to this message, as we'll need to hold on
2385  // to a reference to it in case reconnect occurs.
2386  Message localMessage = message;
2387  unsigned version = 0;
2388 
2389  while (result == AMPS_E_RETRY)
2390  {
2391  if (haSeq && _logonInProgress)
2392  {
2393  // If retrySend is disabled, do not wait for the reconnect
2394  // to finish, just throw.
2395  if (!_isRetryOnDisconnect)
2396  {
2397  AMPSException::throwFor(_client, AMPS_E_RETRY);
2398  }
2399  if (!_lock.wait(1000))
2400  {
2401  amps_invoke_waiting_function();
2402  }
2403  }
2404  else
2405  {
2406  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2407  (isHASubscribe_ && _badTimeToHASubscribe))
2408  {
2409  return (int)version;
2410  }
2411  // It's possible to get here out of order, but this way we'll
2412  // always send in order.
2413  if (haSeq > _lastSentHaSequenceNumber)
2414  {
2415  while (haSeq > _lastSentHaSequenceNumber + 1)
2416  {
2417  try
2418  {
2419  // Replayer updates _lastSentHaSsequenceNumber
2420  if (!_publishStore.replaySingle(_replayer,
2421  _lastSentHaSequenceNumber + 1))
2422  {
2423  //++_lastSentHaSequenceNumber;
2424  continue;
2425  }
2426  result = AMPS_E_OK;
2427  version = _replayer._version;
2428  }
2429 #ifdef _WIN32
2430  catch (const DisconnectedException&)
2431 #else
2432  catch (const DisconnectedException& e)
2433 #endif
2434  {
2435  result = _replayer._res;
2436  break;
2437  }
2438  }
2439  result = amps_client_send_with_version(_client,
2440  localMessage.getMessage(),
2441  &version);
2442  ++_lastSentHaSequenceNumber;
2443  }
2444  else
2445  {
2446  if (_logonInProgress && localMessage.getCommand().data()[0] != 'l')
2447  {
2448  while (_logonInProgress)
2449  {
2450  if (!_lock.wait(1000))
2451  {
2452  amps_invoke_waiting_function();
2453  }
2454  }
2455  }
2456  result = amps_client_send_with_version(_client,
2457  localMessage.getMessage(),
2458  &version);
2459  }
2460  if (result != AMPS_E_OK)
2461  {
2462  if (!isHASubscribe_ && !haSeq &&
2463  localMessage.getMessage() == message.getMessage())
2464  {
2465  localMessage = message.deepCopy();
2466  }
2467  if (_isRetryOnDisconnect)
2468  {
2469  Unlock<Mutex> u(_lock);
2470  result = amps_client_attempt_reconnect(_client, version);
2471  // If this is an HA publish or subscribe command, it was
2472  // stored first and will have already been replayed by the
2473  // store or sub manager after reconnect, so just return.
2474  if ((isHASubscribe_ || haSeq) &&
2475  result == AMPS_E_RETRY)
2476  {
2477  return (int)version;
2478  }
2479  }
2480  else
2481  {
2482  // retrySend is disabled so throw the error
2483  // from the send as an exception, do not retry.
2484  AMPSException::throwFor(_client, result);
2485  }
2486  }
2487  }
2488  if (result == AMPS_E_RETRY)
2489  {
2490  amps_invoke_waiting_function();
2491  }
2492  }
2493 
2494  if (result != AMPS_E_OK)
2495  {
2496  AMPSException::throwFor(_client, result);
2497  }
2498  return (int)version;
2499  }
2500 
2501  void addMessageHandler(const Field& commandId_,
2502  const AMPS::MessageHandler& messageHandler_,
2503  unsigned requestedAcks_, Message::Command::Type commandType_)
2504  {
2505  Lock<Mutex> lock(_lock);
2506  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2507  0, commandType_);
2508  }
2509 
2510  bool removeMessageHandler(const Field& commandId_)
2511  {
2512  Lock<Mutex> lock(_lock);
2513  return _routes.removeRoute(commandId_);
2514  }
2515 
2516  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2517  {
2518  Field id = message_.getCommandId();
2519  Field subId = message_.getSubscriptionId();
2520  Field qid = message_.getQueryId();
2521  bool isSubscribeOnly = false;
2522  bool replace = false;
2523  unsigned requestedAcks = message_.getAckTypeEnum();
2524  unsigned systemAddedAcks = Message::AckType::None;
2525  Message::Command::Type commandType = message_.getCommandEnum();
2526 
2527  switch (commandType)
2528  {
2529  case Message::Command::Subscribe:
2530  case Message::Command::DeltaSubscribe:
2531  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2532  isSubscribeOnly = true;
2533  // fall through
2534  case Message::Command::SOWAndSubscribe:
2535  case Message::Command::SOWAndDeltaSubscribe:
2536  if (id.empty())
2537  {
2538  id = message_.newCommandId().getCommandId();
2539  }
2540  else
2541  {
2542  while (!replace && id != subId && _routes.hasRoute(id))
2543  {
2544  id = message_.newCommandId().getCommandId();
2545  }
2546  }
2547  if (subId.empty())
2548  {
2549  message_.setSubscriptionId(id);
2550  subId = id;
2551  }
2552  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2553  {
2554  systemAddedAcks |= Message::AckType::Persisted;
2555  }
2556  // fall through
2557  case Message::Command::SOW:
2558  if (id.empty())
2559  {
2560  id = message_.newCommandId().getCommandId();
2561  }
2562  else
2563  {
2564  while (!replace && id != subId && _routes.hasRoute(id))
2565  {
2566  message_.newCommandId();
2567  if (qid == id)
2568  {
2569  qid = message_.getCommandId();
2570  message_.setQueryId(qid);
2571  }
2572  id = message_.getCommandId();
2573  }
2574  }
2575  if (!isSubscribeOnly)
2576  {
2577  if (qid.empty())
2578  {
2579  message_.setQueryID(id);
2580  qid = id;
2581  }
2582  else
2583  {
2584  while (!replace && qid != subId && qid != id
2585  && _routes.hasRoute(qid))
2586  {
2587  qid = message_.newQueryId().getQueryId();
2588  }
2589  }
2590  }
2591  systemAddedAcks |= Message::AckType::Processed;
2592  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2593  {
2594  int routesAdded = 0;
2595  Lock<Mutex> l(_lock);
2596  if (!subId.empty() && messageHandler_.isValid())
2597  {
2598  if (!_routes.hasRoute(subId))
2599  {
2600  ++routesAdded;
2601  }
2602  // This can replace a non-subscribe with a matching id
2603  // with a subscription but not another subscription.
2604  _routes.addRoute(subId, messageHandler_, requestedAcks,
2605  systemAddedAcks, commandType);
2606  }
2607  if (!isSubscribeOnly && !qid.empty()
2608  && messageHandler_.isValid() && qid != subId)
2609  {
2610  if (routesAdded == 0)
2611  {
2612  _routes.addRoute(qid, messageHandler_,
2613  requestedAcks, systemAddedAcks, commandType);
2614  }
2615  else
2616  {
2617  void* data = NULL;
2618  {
2619  Unlock<Mutex> u(_lock);
2620  data = amps_invoke_copy_route_function(
2621  messageHandler_.userData());
2622  }
2623  if (!data)
2624  {
2625  _routes.addRoute(qid, messageHandler_, requestedAcks,
2626  systemAddedAcks, commandType);
2627  }
2628  else
2629  {
2630  _routes.addRoute(qid,
2631  MessageHandler(messageHandler_.function(),
2632  data),
2633  requestedAcks, systemAddedAcks, commandType);
2634  }
2635  }
2636  ++routesAdded;
2637  }
2638  if (!id.empty() && messageHandler_.isValid()
2639  && requestedAcks & ~Message::AckType::Persisted
2640  && id != subId && id != qid)
2641  {
2642  if (routesAdded == 0)
2643  {
2644  _routes.addRoute(id, messageHandler_, requestedAcks,
2645  systemAddedAcks, commandType);
2646  }
2647  else
2648  {
2649  void* data = NULL;
2650  {
2651  Unlock<Mutex> u(_lock);
2652  data = amps_invoke_copy_route_function(
2653  messageHandler_.userData());
2654  }
2655  if (!data)
2656  {
2657  _routes.addRoute(id, messageHandler_, requestedAcks,
2658  systemAddedAcks, commandType);
2659  }
2660  else
2661  {
2662  _routes.addRoute(id,
2663  MessageHandler(messageHandler_.function(),
2664  data),
2665  requestedAcks,
2666  systemAddedAcks, commandType);
2667  }
2668  }
2669  ++routesAdded;
2670  }
2671  try
2672  {
2673  // We aren't adding to subscription manager, so this isn't
2674  // an HA subscribe.
2675  syncAckProcessing(timeout_, message_, 0, false);
2676  message_.setAckTypeEnum(requestedAcks);
2677  }
2678  catch (...)
2679  {
2680  _routes.removeRoute(message_.getQueryID());
2681  _routes.removeRoute(message_.getSubscriptionId());
2682  _routes.removeRoute(id);
2683  message_.setAckTypeEnum(requestedAcks);
2684  throw;
2685  }
2686  }
2687  break;
2688  // These are valid commands that are used as-is
2689  case Message::Command::Unsubscribe:
2690  case Message::Command::Heartbeat:
2691  case Message::Command::Logon:
2692  case Message::Command::StartTimer:
2693  case Message::Command::StopTimer:
2694  case Message::Command::SOWDelete:
2695  {
2696  Lock<Mutex> l(_lock);
2697  // if an ack is requested, it'll need a command ID.
2698  if (message_.getAckTypeEnum() != Message::AckType::None)
2699  {
2700  if (id.empty())
2701  {
2702  message_.newCommandId();
2703  id = message_.getCommandId();
2704  }
2705  if (messageHandler_.isValid())
2706  {
2707  _routes.addRoute(id, messageHandler_, requestedAcks,
2708  Message::AckType::None, commandType);
2709  }
2710  }
2711  _send(message_);
2712  }
2713  break;
2714  case Message::Command::DeltaPublish:
2715  case Message::Command::Publish:
2716  {
2717  bool useSync = message_.getFilter().len() > 0;
2718  Lock<Mutex> l(_lock);
2719  // if an ack is requested, it'll need a command ID.
2720  unsigned ackType = message_.getAckTypeEnum();
2721  if (ackType != Message::AckType::None
2722  || useSync)
2723  {
2724  if (id.empty())
2725  {
2726  message_.newCommandId();
2727  id = message_.getCommandId();
2728  }
2729  if (messageHandler_.isValid())
2730  {
2731  _routes.addRoute(id, messageHandler_, requestedAcks,
2732  Message::AckType::None, commandType);
2733  }
2734  }
2735  if (useSync)
2736  {
2737  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2738  syncAckProcessing(timeout_, message_, 0, false);
2739  }
2740  else
2741  {
2742  _send(message_);
2743  }
2744  }
2745  break;
2746  // These are things that shouldn't be sent (not meaningful)
2747  case Message::Command::GroupBegin:
2748  case Message::Command::GroupEnd:
2749  case Message::Command::OOF:
2750  case Message::Command::Ack:
2751  case Message::Command::Unknown:
2752  default:
2753  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2754  }
2755  message_.setAckTypeEnum(requestedAcks);
2756  return id;
2757  }
2758 
2759  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2760  {
2761  Lock<Mutex> l(_lock);
2762  _disconnectHandler = disconnectHandler;
2763  }
2764 
2765  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2766  {
2767  switch (command_[0])
2768  {
2769 #if 0 // Not currently implemented to avoid an extra branch in delivery
2770  case 'p':
2771  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2772  break;
2773  case 's':
2774  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2775  break;
2776 #endif
2777  case 'h':
2778  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2779  break;
2780 #if 0 // Not currently implemented to avoid an extra branch in delivery
2781  case 'g':
2782  if (command_[6] == 'b')
2783  {
2784  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2785  }
2786  else if (command_[6] == 'e')
2787  {
2788  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2789  }
2790  else
2791  {
2792  std::ostringstream os;
2793  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2794  throw CommandException(os.str());
2795  }
2796  break;
2797  case 'o':
2798  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2799  break;
2800 #endif
2801  case 'a':
2802  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2803  break;
2804  case 'l':
2805  case 'L':
2806  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2807  break;
2808  case 'd':
2809  case 'D':
2810  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2811  break;
2812  default:
2813  std::ostringstream os;
2814  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2815  throw CommandException(os.str());
2816  break;
2817  }
2818  }
2819 
2820  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2821  {
2822  switch (command_)
2823  {
2824 #if 0 // Not currently implemented to avoid an extra branch in delivery
2825  case Message::Command::Publish:
2826  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2827  break;
2828  case Message::Command::SOW:
2829  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2830  break;
2831 #endif
2832  case Message::Command::Heartbeat:
2833  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2834  break;
2835 #if 0 // Not currently implemented to avoid an extra branch in delivery
2836  case Message::Command::GroupBegin:
2837  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2838  break;
2839  case Message::Command::GroupEnd:
2840  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2841  break;
2842  case Message::Command::OOF:
2843  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2844  break;
2845 #endif
2846  case Message::Command::Ack:
2847  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2848  break;
2849  default:
2850  unsigned bits = 0;
2851  unsigned command = command_;
2852  while (command > 0)
2853  {
2854  ++bits;
2855  command >>= 1;
2856  }
2857  char errBuf[128];
2858  AMPS_snprintf(errBuf, sizeof(errBuf),
2859  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2860  CommandConstants<0>::Lengths[bits],
2861  CommandConstants<0>::Values[bits]);
2862  throw CommandException(errBuf);
2863  break;
2864  }
2865  }
2866 
2867  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2868  {
2869  _globalCommandTypeHandlers[handlerType_] = handler_;
2870  }
2871 
2872  void setFailedWriteHandler(FailedWriteHandler* handler_)
2873  {
2874  Lock<Mutex> l(_lock);
2875  _failedWriteHandler.reset(handler_);
2876  }
2877 
2878  void setPublishStore(const Store& publishStore_)
2879  {
2880  Lock<Mutex> l(_lock);
2881  if (_connected)
2882  {
2883  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2884  }
2885  _publishStore = publishStore_;
2886  }
2887 
2888  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2889  {
2890  Lock<Mutex> l(_lock);
2891  if (_connected)
2892  {
2893  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2894  }
2895  _bookmarkStore = bookmarkStore_;
2896  }
2897 
2898  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2899  {
2900  Lock<Mutex> l(_lock);
2901  _subscriptionManager.reset(subscriptionManager_);
2902  }
2903 
2904  SubscriptionManager* getSubscriptionManager() const
2905  {
2906  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2907  }
2908 
2909  DisconnectHandler getDisconnectHandler() const
2910  {
2911  return _disconnectHandler;
2912  }
2913 
2914  MessageHandler getDuplicateMessageHandler() const
2915  {
2916  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2917  }
2918 
2919  FailedWriteHandler* getFailedWriteHandler() const
2920  {
2921  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2922  }
2923 
2924  Store getPublishStore() const
2925  {
2926  return _publishStore;
2927  }
2928 
2929  BookmarkStore getBookmarkStore() const
2930  {
2931  return _bookmarkStore;
2932  }
2933 
2934  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2935  {
2936  if (!_publishStore.isValid())
2937  {
2938  Lock<Mutex> l(_lock);
2939  _publishMessage.assignTopic(topic_, topicLen_);
2940  _publishMessage.assignData(data_, dataLen_);
2941  _send(_publishMessage);
2942  return 0;
2943  }
2944  else
2945  {
2946  publishStoreMessage.reset();
2947  publishStoreMessage.setCommandEnum(Message::Command::Publish);
2948  return _publish(topic_, topicLen_, data_, dataLen_);
2949  }
2950  }
2951 
2952  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2953  size_t dataLen_, unsigned long expiration_)
2954  {
2955  if (!_publishStore.isValid())
2956  {
2957  Lock<Mutex> l(_lock);
2958  _publishMessage.assignTopic(topic_, topicLen_);
2959  _publishMessage.assignData(data_, dataLen_);
2960  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2961  size_t pos = convertToCharArray(exprBuf, expiration_);
2962  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2963  _send(_publishMessage);
2964  _publishMessage.assignExpiration(NULL, 0);
2965  return 0;
2966  }
2967  else
2968  {
2969  publishStoreMessage.reset();
2970  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2971  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2972  publishStoreMessage.setCommandEnum(Message::Command::Publish)
2973  .assignExpiration(exprBuf + exprPos,
2974  AMPS_NUMBER_BUFFER_LEN - exprPos);
2975  return _publish(topic_, topicLen_, data_, dataLen_);
2976  }
2977  }
2978 
2979  class FlushAckHandler : ConnectionStateListener
2980  {
2981  private:
2982  ClientImpl* _pClient;
2983  Field _cmdId;
2984 #if __cplusplus >= 201100L || _MSC_VER >= 1900
2985  std::atomic<bool> _acked;
2986  std::atomic<bool> _disconnected;
2987 #else
2988  volatile bool _acked;
2989  volatile bool _disconnected;
2990 #endif
2991  public:
2992  FlushAckHandler(ClientImpl* pClient_)
2993  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2994  {
2995  pClient_->addConnectionStateListener(this);
2996  }
2997  ~FlushAckHandler()
2998  {
2999  _pClient->removeConnectionStateListener(this);
3000  _pClient->removeMessageHandler(_cmdId);
3001  _cmdId.clear();
3002  }
3003  void setCommandId(const Field& cmdId_)
3004  {
3005  _cmdId.deepCopy(cmdId_);
3006  }
3007  void invoke(const Message&)
3008  {
3009  _acked = true;
3010  }
3011  void connectionStateChanged(State state_)
3012  {
3013  if (state_ <= Shutdown)
3014  {
3015  _disconnected = true;
3016  }
3017  }
3018  bool acked()
3019  {
3020  return _acked;
3021  }
3022  bool done()
3023  {
3024  return _acked || _disconnected;
3025  }
3026  };
3027 
3028  void publishFlush(long timeout_, unsigned ackType_)
3029  {
3030  static const char* processed = "processed";
3031  static const size_t processedLen = strlen(processed);
3032  static const char* persisted = "persisted";
3033  static const size_t persistedLen = strlen(persisted);
3034  static const char* flush = "flush";
3035  static const size_t flushLen = strlen(flush);
3036  static VersionInfo minPersisted("5.3.3.0");
3037  static VersionInfo minFlush("4");
3038  if (ackType_ != Message::AckType::Processed
3039  && ackType_ != Message::AckType::Persisted)
3040  {
3041  throw CommandException("Flush can only be used with processed or persisted acks.");
3042  }
3043  FlushAckHandler flushHandler(this);
3044  if (_serverVersion >= minFlush)
3045  {
3046  Lock<Mutex> l(_lock);
3047  if (!_connected)
3048  {
3049  throw DisconnectedException("Not connected trying to flush");
3050  }
3051  _message.reset();
3052  _message.newCommandId();
3053  _message.assignCommand(flush, flushLen);
3054  if (_serverVersion < minPersisted
3055  || ackType_ == Message::AckType::Processed)
3056  {
3057  _message.assignAckType(processed, processedLen);
3058  }
3059  else
3060  {
3061  _message.assignAckType(persisted, persistedLen);
3062  }
3063  flushHandler.setCommandId(_message.getCommandId());
3064  addMessageHandler(_message.getCommandId(),
3065  std::bind(&FlushAckHandler::invoke,
3066  std::ref(flushHandler),
3067  std::placeholders::_1),
3068  ackType_, _message.getCommandEnum());
3069  NoDelay noDelay(_client);
3070  if (_send(_message) == -1)
3071  {
3072  throw DisconnectedException("Disconnected trying to flush");
3073  }
3074  }
3075  if (_publishStore.isValid())
3076  {
3077  try
3078  {
3079  _publishStore.flush(timeout_);
3080  }
3081  catch (const AMPSException& ex)
3082  {
3083  AMPS_UNHANDLED_EXCEPTION(ex);
3084  throw;
3085  }
3086  }
3087  else if (_serverVersion < minFlush)
3088  {
3089  if (timeout_ > 0)
3090  {
3091  AMPS_USLEEP(timeout_ * 1000);
3092  }
3093  else
3094  {
3095  AMPS_USLEEP(1000 * 1000);
3096  }
3097  return;
3098  }
3099  if (timeout_)
3100  {
3101  Timer timer((double)timeout_);
3102  timer.start();
3103  while (!timer.check() && !flushHandler.done())
3104  {
3105  AMPS_USLEEP(10000);
3106  amps_invoke_waiting_function();
3107  }
3108  }
3109  else
3110  {
3111  while (!flushHandler.done())
3112  {
3113  AMPS_USLEEP(10000);
3114  amps_invoke_waiting_function();
3115  }
3116  }
3117  // No response or disconnect in timeout interval
3118  if (!flushHandler.done())
3119  {
3120  throw TimedOutException("Timed out waiting for flush");
3121  }
3122  // We got disconnected and there is no publish store
3123  if (!flushHandler.acked() && !_publishStore.isValid())
3124  {
3125  throw DisconnectedException("Disconnected waiting for flush");
3126  }
3127  }
3128 
3129  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3130  const char* data_, size_t dataLength_)
3131  {
3132  if (!_publishStore.isValid())
3133  {
3134  Lock<Mutex> l(_lock);
3135  _deltaMessage.assignTopic(topic_, topicLength_);
3136  _deltaMessage.assignData(data_, dataLength_);
3137  _send(_deltaMessage);
3138  return 0;
3139  }
3140  else
3141  {
3142  publishStoreMessage.reset();
3143  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish);
3144  return _publish(topic_, topicLength_, data_, dataLength_);
3145  }
3146  }
3147 
3148  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
3149  const char* data_, size_t dataLength_,
3150  unsigned long expiration_)
3151  {
3152  if (!_publishStore.isValid())
3153  {
3154  Lock<Mutex> l(_lock);
3155  _deltaMessage.assignTopic(topic_, topicLength_);
3156  _deltaMessage.assignData(data_, dataLength_);
3157  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3158  size_t pos = convertToCharArray(exprBuf, expiration_);
3159  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3160  _send(_deltaMessage);
3161  _deltaMessage.assignExpiration(NULL, 0);
3162  return 0;
3163  }
3164  else
3165  {
3166  publishStoreMessage.reset();
3167  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
3168  size_t exprPos = convertToCharArray(exprBuf, expiration_);
3169  publishStoreMessage.setCommandEnum(Message::Command::DeltaPublish)
3170  .assignExpiration(exprBuf + exprPos,
3171  AMPS_NUMBER_BUFFER_LEN - exprPos);
3172  return _publish(topic_, topicLength_, data_, dataLength_);
3173  }
3174  }
3175 
3176  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
3177  const char* data_, size_t dataLength_)
3178  {
3179  publishStoreMessage.assignTopic(topic_, topicLength_)
3180  .setAckTypeEnum(Message::AckType::Persisted)
3181  .assignData(data_, dataLength_);
3182  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3183  char buf[AMPS_NUMBER_BUFFER_LEN];
3184  size_t pos = convertToCharArray(buf, haSequenceNumber);
3185  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3186  {
3187  Lock<Mutex> l(_lock);
3188  _send(publishStoreMessage, haSequenceNumber);
3189  }
3190  return haSequenceNumber;
3191  }
3192 
3193  virtual std::string logon(long timeout_, Authenticator& authenticator_,
3194  const char* options_ = NULL)
3195  {
3196  Lock<Mutex> l(_lock);
3197  return _logon(timeout_, authenticator_, options_);
3198  }
3199 
3200  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
3201  const char* options_ = NULL)
3202  {
3203  _message.reset();
3204  _message.newCommandId();
3205  std::string newCommandId = _message.getCommandId();
3206  _message.setCommandEnum(Message::Command::Logon);
3207  _message.setClientName(_name);
3208 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
3209  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
3210  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
3211 #endif
3212  URI uri(_lastUri);
3213  if (uri.user().size())
3214  {
3215  _message.setUserId(uri.user());
3216  }
3217  if (uri.password().size())
3218  {
3219  _message.setPassword(uri.password());
3220  }
3221  if (uri.protocol() == "amps" && uri.messageType().size())
3222  {
3223  _message.setMessageType(uri.messageType());
3224  }
3225  if (uri.isTrue("pretty"))
3226  {
3227  _message.setOptions("pretty");
3228  }
3229 
3230  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
3231  if (!_logonCorrelationData.empty())
3232  {
3233  _message.assignCorrelationId(_logonCorrelationData);
3234  }
3235  if (options_)
3236  {
3237  _message.setOptions(options_);
3238  }
3239  _username = _message.getUserId();
3240  try
3241  {
3242  AtomicFlagFlip pubFlip(&_logonInProgress);
3243  NoDelay noDelay(_client);
3244  while (true)
3245  {
3246  _message.setAckTypeEnum(Message::AckType::Processed);
3247  AckResponse ack = syncAckProcessing(timeout_, _message);
3248  if (ack.status() == "retry")
3249  {
3250  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
3251  _username = ack.username();
3252  _message.setUserId(_username);
3253  }
3254  else
3255  {
3256  authenticator_.completed(ack.username(), ack.password(), ack.reason());
3257  break;
3258  }
3259  }
3260  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
3261 
3262  // Now re-send the heartbeat command if configured
3263  _sendHeartbeat();
3264  // Signal any threads waiting for _logonInProgress
3265  _lock.signalAll();
3266  }
3267  catch (const AMPSException& ex)
3268  {
3269  _lock.signalAll();
3270  AMPS_UNHANDLED_EXCEPTION(ex);
3271  throw;
3272  }
3273  catch (...)
3274  {
3275  _lock.signalAll();
3276  throw;
3277  }
3278 
3279  if (_publishStore.isValid())
3280  {
3281  try
3282  {
3283  _publishStore.replay(_replayer);
3284  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3285  }
3286  catch (const PublishStoreGapException& ex)
3287  {
3288  _lock.signalAll();
3289  AMPS_UNHANDLED_EXCEPTION(ex);
3290  throw;
3291  }
3292  catch (const StoreException& ex)
3293  {
3294  _lock.signalAll();
3295  std::ostringstream os;
3296  os << "A local store exception occurred while logging on."
3297  << ex.toString();
3298  throw ConnectionException(os.str());
3299  }
3300  catch (const AMPSException& ex)
3301  {
3302  _lock.signalAll();
3303  AMPS_UNHANDLED_EXCEPTION(ex);
3304  throw;
3305  }
3306  catch (const std::exception& ex)
3307  {
3308  _lock.signalAll();
3309  AMPS_UNHANDLED_EXCEPTION(ex);
3310  throw;
3311  }
3312  catch (...)
3313  {
3314  _lock.signalAll();
3315  throw;
3316  }
3317  }
3318  _lock.signalAll();
3319  return newCommandId;
3320  }
3321 
3322  std::string subscribe(const MessageHandler& messageHandler_,
3323  const std::string& topic_,
3324  long timeout_,
3325  const std::string& filter_,
3326  const std::string& bookmark_,
3327  const std::string& options_,
3328  const std::string& subId_,
3329  bool isHASubscribe_ = true)
3330  {
3331  isHASubscribe_ &= (bool)_subscriptionManager;
3332  Lock<Mutex> l(_lock);
3333  _message.reset();
3334  _message.setCommandEnum(Message::Command::Subscribe);
3335  _message.newCommandId();
3336  std::string subId(subId_);
3337  if (subId.empty())
3338  {
3339  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3340  {
3341  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3342  }
3343 
3344  subId = _message.getCommandId();
3345  }
3346  _message.setSubscriptionId(subId);
3347  // we need to deep copy this before sending the message; while we are
3348  // waiting for a response, the fields in _message may get blown away for
3349  // other operations.
3350  AMPS::Message::Field subIdField(subId);
3351  unsigned ackTypes = Message::AckType::Processed;
3352 
3353  if (!bookmark_.empty() && _bookmarkStore.isValid())
3354  {
3355  ackTypes |= Message::AckType::Persisted;
3356  }
3357  _message.setTopic(topic_);
3358 
3359  if (filter_.length())
3360  {
3361  _message.setFilter(filter_);
3362  }
3363  if (bookmark_.length())
3364  {
3365  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3366  {
3367  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3368  _message.setBookmark(mostRecent);
3369  }
3370  else
3371  {
3372  _message.setBookmark(bookmark_);
3373  if (_bookmarkStore.isValid())
3374  {
3375  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3376  bookmark_ != AMPS_BOOKMARK_EPOCH)
3377  {
3378  _bookmarkStore.log(_message);
3379  _bookmarkStore.discard(_message);
3380  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3381  }
3382  }
3383  }
3384  }
3385  if (options_.length())
3386  {
3387  _message.setOptions(options_);
3388  }
3389 
3390  Message message = _message;
3391  if (isHASubscribe_)
3392  {
3393  message = _message.deepCopy();
3394  Unlock<Mutex> u(_lock);
3395  _subscriptionManager->subscribe(messageHandler_, message,
3396  Message::AckType::None);
3397  if (_badTimeToHASubscribe)
3398  {
3399  return subId;
3400  }
3401  }
3402  if (!_routes.hasRoute(_message.getSubscriptionId()))
3403  {
3404  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3405  Message::AckType::None, ackTypes, _message.getCommandEnum());
3406  }
3407  message.setAckTypeEnum(ackTypes);
3408  if (!options_.empty())
3409  {
3410  message.setOptions(options_);
3411  }
3412  try
3413  {
3414  syncAckProcessing(timeout_, message, isHASubscribe_);
3415  }
3416  catch (const DisconnectedException&)
3417  {
3418  if (!isHASubscribe_)
3419  {
3420  _routes.removeRoute(subIdField);
3421  throw;
3422  }
3423  else
3424  {
3425  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3426  throw;
3427  }
3428  }
3429  catch (const TimedOutException&)
3430  {
3431  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3432  throw;
3433  }
3434  catch (...)
3435  {
3436  if (isHASubscribe_)
3437  {
3438  // Have to unlock before calling into sub manager to avoid deadlock
3439  Unlock<Mutex> unlock(_lock);
3440  _subscriptionManager->unsubscribe(subIdField);
3441  }
3442  _routes.removeRoute(subIdField);
3443  throw;
3444  }
3445 
3446  return subId;
3447  }
3448  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3449  const std::string& topic_,
3450  long timeout_,
3451  const std::string& filter_,
3452  const std::string& bookmark_,
3453  const std::string& options_,
3454  const std::string& subId_ = "",
3455  bool isHASubscribe_ = true)
3456  {
3457  isHASubscribe_ &= (bool)_subscriptionManager;
3458  Lock<Mutex> l(_lock);
3459  _message.reset();
3460  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3461  _message.newCommandId();
3462  std::string subId(subId_);
3463  if (subId.empty())
3464  {
3465  subId = _message.getCommandId();
3466  }
3467  _message.setSubscriptionId(subId);
3468  // we need to deep copy this before sending the message; while we are
3469  // waiting for a response, the fields in _message may get blown away for
3470  // other operations.
3471  AMPS::Message::Field subIdField(subId);
3472  unsigned ackTypes = Message::AckType::Processed;
3473 
3474  if (!bookmark_.empty() && _bookmarkStore.isValid())
3475  {
3476  ackTypes |= Message::AckType::Persisted;
3477  }
3478  _message.setTopic(topic_);
3479  if (filter_.length())
3480  {
3481  _message.setFilter(filter_);
3482  }
3483  if (bookmark_.length())
3484  {
3485  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3486  {
3487  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3488  _message.setBookmark(mostRecent);
3489  }
3490  else
3491  {
3492  _message.setBookmark(bookmark_);
3493  if (_bookmarkStore.isValid())
3494  {
3495  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3496  bookmark_ != AMPS_BOOKMARK_EPOCH)
3497  {
3498  _bookmarkStore.log(_message);
3499  _bookmarkStore.discard(_message);
3500  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3501  }
3502  }
3503  }
3504  }
3505  if (options_.length())
3506  {
3507  _message.setOptions(options_);
3508  }
3509  Message message = _message;
3510  if (isHASubscribe_)
3511  {
3512  message = _message.deepCopy();
3513  Unlock<Mutex> u(_lock);
3514  _subscriptionManager->subscribe(messageHandler_, message,
3515  Message::AckType::None);
3516  if (_badTimeToHASubscribe)
3517  {
3518  return subId;
3519  }
3520  }
3521  if (!_routes.hasRoute(_message.getSubscriptionId()))
3522  {
3523  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3524  Message::AckType::None, ackTypes, _message.getCommandEnum());
3525  }
3526  message.setAckTypeEnum(ackTypes);
3527  if (!options_.empty())
3528  {
3529  message.setOptions(options_);
3530  }
3531  try
3532  {
3533  syncAckProcessing(timeout_, message, isHASubscribe_);
3534  }
3535  catch (const DisconnectedException&)
3536  {
3537  if (!isHASubscribe_)
3538  {
3539  _routes.removeRoute(subIdField);
3540  throw;
3541  }
3542  }
3543  catch (const TimedOutException&)
3544  {
3545  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3546  throw;
3547  }
3548  catch (...)
3549  {
3550  if (isHASubscribe_)
3551  {
3552  // Have to unlock before calling into sub manager to avoid deadlock
3553  Unlock<Mutex> unlock(_lock);
3554  _subscriptionManager->unsubscribe(subIdField);
3555  }
3556  _routes.removeRoute(subIdField);
3557  throw;
3558  }
3559  return subId;
3560  }
3561 
3562  void unsubscribe(const std::string& id)
3563  {
3564  Lock<Mutex> l(_lock);
3565  unsubscribeInternal(id);
3566  }
3567 
3568  void unsubscribe(void)
3569  {
3570  if (_subscriptionManager)
3571  {
3572  _subscriptionManager->clear();
3573  }
3574  {
3575  _routes.unsubscribeAll();
3576  Lock<Mutex> l(_lock);
3577  _message.reset();
3578  _message.setCommandEnum(Message::Command::Unsubscribe);
3579  _message.newCommandId();
3580  _message.setSubscriptionId("all");
3581  _sendWithoutRetry(_message);
3582  }
3583  deferredExecution(&amps_noOpFn, NULL);
3584  }
3585 
3586  std::string sow(const MessageHandler& messageHandler_,
3587  const std::string& topic_,
3588  const std::string& filter_ = "",
3589  const std::string& orderBy_ = "",
3590  const std::string& bookmark_ = "",
3591  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3592  int topN_ = AMPS_DEFAULT_TOP_N,
3593  const std::string& options_ = "",
3594  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3595  {
3596  Lock<Mutex> l(_lock);
3597  _message.reset();
3598  _message.setCommandEnum(Message::Command::SOW);
3599  _message.newCommandId();
3600  // need to keep our own copy of the command ID.
3601  std::string commandId = _message.getCommandId();
3602  _message.setQueryID(_message.getCommandId());
3603  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3604  _message.setAckTypeEnum(ackTypes);
3605  _message.setTopic(topic_);
3606  if (filter_.length())
3607  {
3608  _message.setFilter(filter_);
3609  }
3610  if (orderBy_.length())
3611  {
3612  _message.setOrderBy(orderBy_);
3613  }
3614  if (bookmark_.length())
3615  {
3616  _message.setBookmark(bookmark_);
3617  }
3618  _message.setBatchSize(AMPS::asString(batchSize_));
3619  if (topN_ != AMPS_DEFAULT_TOP_N)
3620  {
3621  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3622  }
3623  if (options_.length())
3624  {
3625  _message.setOptions(options_);
3626  }
3627 
3628  _routes.addRoute(_message.getQueryID(), messageHandler_,
3629  Message::AckType::None, ackTypes, _message.getCommandEnum());
3630 
3631  try
3632  {
3633  syncAckProcessing(timeout_, _message);
3634  }
3635  catch (...)
3636  {
3637  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3638  throw;
3639  }
3640 
3641  return commandId;
3642  }
3643 
3644  std::string sow(const MessageHandler& messageHandler_,
3645  const std::string& topic_,
3646  long timeout_,
3647  const std::string& filter_ = "",
3648  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3649  int topN_ = AMPS_DEFAULT_TOP_N)
3650  {
3651  std::string notSet;
3652  return sow(messageHandler_,
3653  topic_,
3654  filter_,
3655  notSet, // orderBy
3656  notSet, // bookmark
3657  batchSize_,
3658  topN_,
3659  notSet,
3660  timeout_);
3661  }
3662 
3663  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3664  const std::string& topic_,
3665  const std::string& filter_ = "",
3666  const std::string& orderBy_ = "",
3667  const std::string& bookmark_ = "",
3668  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3669  int topN_ = AMPS_DEFAULT_TOP_N,
3670  const std::string& options_ = "",
3671  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3672  bool isHASubscribe_ = true)
3673  {
3674  isHASubscribe_ &= (bool)_subscriptionManager;
3675  unsigned ackTypes = Message::AckType::Processed;
3676  Lock<Mutex> l(_lock);
3677  _message.reset();
3678  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3679  _message.newCommandId();
3680  Field cid = _message.getCommandId();
3681  std::string subId = cid;
3682  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3683  if (filter_.length())
3684  {
3685  _message.setFilter(filter_);
3686  }
3687  if (orderBy_.length())
3688  {
3689  _message.setOrderBy(orderBy_);
3690  }
3691  if (bookmark_.length())
3692  {
3693  _message.setBookmark(bookmark_);
3694  Message::Field bookmark = _message.getBookmark();
3695  if (_bookmarkStore.isValid())
3696  {
3697  ackTypes |= Message::AckType::Persisted;
3698  if (bookmark == AMPS_BOOKMARK_RECENT)
3699  {
3700  _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3701  }
3702  else if (bookmark != AMPS_BOOKMARK_NOW &&
3703  bookmark != AMPS_BOOKMARK_EPOCH)
3704  {
3705  _bookmarkStore.log(_message);
3706  if (!BookmarkRange::isRange(bookmark))
3707  {
3708  _bookmarkStore.discard(_message);
3709  _bookmarkStore.persisted(_message.getSubscriptionId(),
3710  bookmark);
3711  }
3712  }
3713  }
3714  else if (bookmark == AMPS_BOOKMARK_RECENT)
3715  {
3716  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3717  }
3718  }
3719  _message.setBatchSize(AMPS::asString(batchSize_));
3720  if (topN_ != AMPS_DEFAULT_TOP_N)
3721  {
3722  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3723  }
3724  if (options_.length())
3725  {
3726  _message.setOptions(options_);
3727  }
3728 
3729  Message message = _message;
3730  if (isHASubscribe_)
3731  {
3732  message = _message.deepCopy();
3733  Unlock<Mutex> u(_lock);
3734  _subscriptionManager->subscribe(messageHandler_, message,
3735  Message::AckType::None);
3736  if (_badTimeToHASubscribe)
3737  {
3738  return subId;
3739  }
3740  }
3741  _routes.addRoute(cid, messageHandler_,
3742  Message::AckType::None, ackTypes, _message.getCommandEnum());
3743  message.setAckTypeEnum(ackTypes);
3744  if (!options_.empty())
3745  {
3746  message.setOptions(options_);
3747  }
3748  try
3749  {
3750  syncAckProcessing(timeout_, message, isHASubscribe_);
3751  }
3752  catch (const DisconnectedException&)
3753  {
3754  if (!isHASubscribe_)
3755  {
3756  _routes.removeRoute(subId);
3757  throw;
3758  }
3759  }
3760  catch (const TimedOutException&)
3761  {
3762  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3763  throw;
3764  }
3765  catch (...)
3766  {
3767  if (isHASubscribe_)
3768  {
3769  // Have to unlock before calling into sub manager to avoid deadlock
3770  Unlock<Mutex> unlock(_lock);
3771  _subscriptionManager->unsubscribe(cid);
3772  }
3773  _routes.removeRoute(subId);
3774  throw;
3775  }
3776  return subId;
3777  }
3778 
3779  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3780  const std::string& topic_,
3781  long timeout_,
3782  const std::string& filter_ = "",
3783  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3784  bool oofEnabled_ = false,
3785  int topN_ = AMPS_DEFAULT_TOP_N,
3786  bool isHASubscribe_ = true)
3787  {
3788  std::string notSet;
3789  return sowAndSubscribe(messageHandler_,
3790  topic_,
3791  filter_,
3792  notSet, // orderBy
3793  notSet, // bookmark
3794  batchSize_,
3795  topN_,
3796  (oofEnabled_ ? "oof" : ""),
3797  timeout_,
3798  isHASubscribe_);
3799  }
3800 
3801  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3802  const std::string& topic_,
3803  const std::string& filter_ = "",
3804  const std::string& orderBy_ = "",
3805  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3806  int topN_ = AMPS_DEFAULT_TOP_N,
3807  const std::string& options_ = "",
3808  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3809  bool isHASubscribe_ = true)
3810  {
3811  isHASubscribe_ &= (bool)_subscriptionManager;
3812  Lock<Mutex> l(_lock);
3813  _message.reset();
3814  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3815  _message.newCommandId();
3816  _message.setQueryID(_message.getCommandId());
3817  _message.setSubscriptionId(_message.getCommandId());
3818  std::string subId = _message.getSubscriptionId();
3819  _message.setTopic(topic_);
3820  if (filter_.length())
3821  {
3822  _message.setFilter(filter_);
3823  }
3824  if (orderBy_.length())
3825  {
3826  _message.setOrderBy(orderBy_);
3827  }
3828  _message.setBatchSize(AMPS::asString(batchSize_));
3829  if (topN_ != AMPS_DEFAULT_TOP_N)
3830  {
3831  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3832  }
3833  if (options_.length())
3834  {
3835  _message.setOptions(options_);
3836  }
3837  Message message = _message;
3838  if (isHASubscribe_)
3839  {
3840  message = _message.deepCopy();
3841  Unlock<Mutex> u(_lock);
3842  _subscriptionManager->subscribe(messageHandler_, message,
3843  Message::AckType::None);
3844  if (_badTimeToHASubscribe)
3845  {
3846  return subId;
3847  }
3848  }
3849  _routes.addRoute(message.getQueryID(), messageHandler_,
3850  Message::AckType::None, Message::AckType::Processed, message.getCommandEnum());
3851  message.setAckTypeEnum(Message::AckType::Processed);
3852  if (!options_.empty())
3853  {
3854  message.setOptions(options_);
3855  }
3856  try
3857  {
3858  syncAckProcessing(timeout_, message, isHASubscribe_);
3859  }
3860  catch (const DisconnectedException&)
3861  {
3862  if (!isHASubscribe_)
3863  {
3864  _routes.removeRoute(subId);
3865  throw;
3866  }
3867  }
3868  catch (const TimedOutException&)
3869  {
3870  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3871  throw;
3872  }
3873  catch (...)
3874  {
3875  if (isHASubscribe_)
3876  {
3877  // Have to unlock before calling into sub manager to avoid deadlock
3878  Unlock<Mutex> unlock(_lock);
3879  _subscriptionManager->unsubscribe(Field(subId));
3880  }
3881  _routes.removeRoute(subId);
3882  throw;
3883  }
3884  return subId;
3885  }
3886 
3887  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3888  const std::string& topic_,
3889  long timeout_,
3890  const std::string& filter_ = "",
3891  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3892  bool oofEnabled_ = false,
3893  bool sendEmpties_ = false,
3894  int topN_ = AMPS_DEFAULT_TOP_N,
3895  bool isHASubscribe_ = true)
3896  {
3897  std::string notSet;
3898  Message::Options options;
3899  if (oofEnabled_)
3900  {
3901  options.setOOF();
3902  }
3903  if (sendEmpties_ == false)
3904  {
3905  options.setNoEmpties();
3906  }
3907  return sowAndDeltaSubscribe(messageHandler_,
3908  topic_,
3909  filter_,
3910  notSet, // orderBy
3911  batchSize_,
3912  topN_,
3913  options,
3914  timeout_,
3915  isHASubscribe_);
3916  }
3917 
3918  std::string sowDelete(const MessageHandler& messageHandler_,
3919  const std::string& topic_,
3920  const std::string& filter_,
3921  long timeout_,
3922  Message::Field commandId_ = Message::Field())
3923  {
3924  if (_publishStore.isValid())
3925  {
3926  unsigned ackType = Message::AckType::Processed |
3927  Message::AckType::Stats |
3928  Message::AckType::Persisted;
3929  publishStoreMessage.reset();
3930  if (commandId_.empty())
3931  {
3932  publishStoreMessage.newCommandId();
3933  commandId_ = publishStoreMessage.getCommandId();
3934  }
3935  else
3936  {
3937  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
3938  }
3939  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
3940  .assignSubscriptionId(commandId_.data(), commandId_.len())
3941  .assignQueryID(commandId_.data(), commandId_.len())
3942  .setAckTypeEnum(ackType)
3943  .assignTopic(topic_.c_str(), topic_.length())
3944  .assignFilter(filter_.c_str(), filter_.length());
3945  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
3946  char buf[AMPS_NUMBER_BUFFER_LEN];
3947  size_t pos = convertToCharArray(buf, haSequenceNumber);
3948  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3949  {
3950  try
3951  {
3952  Lock<Mutex> l(_lock);
3953  _routes.addRoute(commandId_, messageHandler_,
3954  Message::AckType::Stats,
3955  Message::AckType::Processed | Message::AckType::Persisted,
3956  publishStoreMessage.getCommandEnum());
3957  syncAckProcessing(timeout_, publishStoreMessage,
3958  haSequenceNumber);
3959  }
3960  catch (const DisconnectedException&)
3961  {
3962  // -V565
3963  // Pass - it will get replayed upon reconnect
3964  }
3965  catch (...)
3966  {
3967  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3968  throw;
3969  }
3970  }
3971  return (std::string)commandId_;
3972  }
3973  else
3974  {
3975  Lock<Mutex> l(_lock);
3976  _message.reset();
3977  if (commandId_.empty())
3978  {
3979  _message.newCommandId();
3980  commandId_ = _message.getCommandId();
3981  }
3982  else
3983  {
3984  _message.setCommandId(commandId_.data(), commandId_.len());
3985  }
3986  _message.setCommandEnum(Message::Command::SOWDelete)
3987  .assignSubscriptionId(commandId_.data(), commandId_.len())
3988  .assignQueryID(commandId_.data(), commandId_.len())
3989  .setAckTypeEnum(Message::AckType::Processed |
3990  Message::AckType::Stats)
3991  .assignTopic(topic_.c_str(), topic_.length())
3992  .assignFilter(filter_.c_str(), filter_.length());
3993  _routes.addRoute(commandId_, messageHandler_,
3994  Message::AckType::Stats,
3995  Message::AckType::Processed,
3996  _message.getCommandEnum());
3997  try
3998  {
3999  syncAckProcessing(timeout_, _message);
4000  }
4001  catch (...)
4002  {
4003  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4004  throw;
4005  }
4006  return (std::string)commandId_;
4007  }
4008  }
4009 
4010  std::string sowDeleteByData(const MessageHandler& messageHandler_,
4011  const std::string& topic_,
4012  const std::string& data_,
4013  long timeout_,
4014  Message::Field commandId_ = Message::Field())
4015  {
4016  if (_publishStore.isValid())
4017  {
4018  unsigned ackType = Message::AckType::Processed |
4019  Message::AckType::Stats |
4020  Message::AckType::Persisted;
4021  publishStoreMessage.reset();
4022  if (commandId_.empty())
4023  {
4024  publishStoreMessage.newCommandId();
4025  commandId_ = publishStoreMessage.getCommandId();
4026  }
4027  else
4028  {
4029  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4030  }
4031  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4032  .assignSubscriptionId(commandId_.data(), commandId_.len())
4033  .assignQueryID(commandId_.data(), commandId_.len())
4034  .setAckTypeEnum(ackType)
4035  .assignTopic(topic_.c_str(), topic_.length())
4036  .assignData(data_.c_str(), data_.length());
4037  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4038  char buf[AMPS_NUMBER_BUFFER_LEN];
4039  size_t pos = convertToCharArray(buf, haSequenceNumber);
4040  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4041  {
4042  try
4043  {
4044  Lock<Mutex> l(_lock);
4045  _routes.addRoute(commandId_, messageHandler_,
4046  Message::AckType::Stats,
4047  Message::AckType::Processed | Message::AckType::Persisted,
4048  publishStoreMessage.getCommandEnum());
4049  syncAckProcessing(timeout_, publishStoreMessage,
4050  haSequenceNumber);
4051  }
4052  catch (const DisconnectedException&)
4053  {
4054  // -V565
4055  // Pass - it will get replayed upon reconnect
4056  }
4057  catch (...)
4058  {
4059  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4060  throw;
4061  }
4062  }
4063  return (std::string)commandId_;
4064  }
4065  else
4066  {
4067  Lock<Mutex> l(_lock);
4068  _message.reset();
4069  if (commandId_.empty())
4070  {
4071  _message.newCommandId();
4072  commandId_ = _message.getCommandId();
4073  }
4074  else
4075  {
4076  _message.setCommandId(commandId_.data(), commandId_.len());
4077  }
4078  _message.setCommandEnum(Message::Command::SOWDelete)
4079  .assignSubscriptionId(commandId_.data(), commandId_.len())
4080  .assignQueryID(commandId_.data(), commandId_.len())
4081  .setAckTypeEnum(Message::AckType::Processed |
4082  Message::AckType::Stats)
4083  .assignTopic(topic_.c_str(), topic_.length())
4084  .assignData(data_.c_str(), data_.length());
4085  _routes.addRoute(commandId_, messageHandler_,
4086  Message::AckType::Stats,
4087  Message::AckType::Processed,
4088  _message.getCommandEnum());
4089  try
4090  {
4091  syncAckProcessing(timeout_, _message);
4092  }
4093  catch (...)
4094  {
4095  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4096  throw;
4097  }
4098  return (std::string)commandId_;
4099  }
4100  }
4101 
4102  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
4103  const std::string& topic_,
4104  const std::string& keys_,
4105  long timeout_,
4106  Message::Field commandId_ = Message::Field())
4107  {
4108  if (_publishStore.isValid())
4109  {
4110  unsigned ackType = Message::AckType::Processed |
4111  Message::AckType::Stats |
4112  Message::AckType::Persisted;
4113  publishStoreMessage.reset();
4114  if (commandId_.empty())
4115  {
4116  publishStoreMessage.newCommandId();
4117  commandId_ = publishStoreMessage.getCommandId();
4118  }
4119  else
4120  {
4121  publishStoreMessage.setCommandId(commandId_.data(), commandId_.len());
4122  }
4123  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4124  .assignSubscriptionId(commandId_.data(), commandId_.len())
4125  .assignQueryID(commandId_.data(), commandId_.len())
4126  .setAckTypeEnum(ackType)
4127  .assignTopic(topic_.c_str(), topic_.length())
4128  .assignSowKeys(keys_.c_str(), keys_.length());
4129  amps_uint64_t haSequenceNumber = _publishStore.store(publishStoreMessage);
4130  char buf[AMPS_NUMBER_BUFFER_LEN];
4131  size_t pos = convertToCharArray(buf, haSequenceNumber);
4132  publishStoreMessage.assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
4133  {
4134  try
4135  {
4136  Lock<Mutex> l(_lock);
4137  _routes.addRoute(commandId_, messageHandler_,
4138  Message::AckType::Stats,
4139  Message::AckType::Processed | Message::AckType::Persisted,
4140  publishStoreMessage.getCommandEnum());
4141  syncAckProcessing(timeout_, publishStoreMessage,
4142  haSequenceNumber);
4143  }
4144  catch (const DisconnectedException&)
4145  {
4146  // -V565
4147  // Pass - it will get replayed upon reconnect
4148  }
4149  catch (...)
4150  {
4151  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4152  throw;
4153  }
4154  }
4155  return (std::string)commandId_;
4156  }
4157  else
4158  {
4159  Lock<Mutex> l(_lock);
4160  _message.reset();
4161  if (commandId_.empty())
4162  {
4163  _message.newCommandId();
4164  commandId_ = _message.getCommandId();
4165  }
4166  else
4167  {
4168  _message.setCommandId(commandId_.data(), commandId_.len());
4169  }
4170  _message.setCommandEnum(Message::Command::SOWDelete)
4171  .assignSubscriptionId(commandId_.data(), commandId_.len())
4172  .assignQueryID(commandId_.data(), commandId_.len())
4173  .setAckTypeEnum(Message::AckType::Processed |
4174  Message::AckType::Stats)
4175  .assignTopic(topic_.c_str(), topic_.length())
4176  .assignSowKeys(keys_.c_str(), keys_.length());
4177  _routes.addRoute(commandId_, messageHandler_,
4178  Message::AckType::Stats,
4179  Message::AckType::Processed,
4180  _message.getCommandEnum());
4181  try
4182  {
4183  syncAckProcessing(timeout_, _message);
4184  }
4185  catch (...)
4186  {
4187  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
4188  throw;
4189  }
4190  return (std::string)commandId_;
4191  }
4192  }
4193 
4194  void startTimer(void)
4195  {
4196  if (_serverVersion >= "5.3.2.0")
4197  {
4198  throw CommandException("The start_timer command is deprecated.");
4199  }
4200  Lock<Mutex> l(_lock);
4201  _message.reset();
4202  _message.setCommandEnum(Message::Command::StartTimer);
4203 
4204  _send(_message);
4205  }
4206 
4207  std::string stopTimer(MessageHandler messageHandler_)
4208  {
4209  if (_serverVersion >= "5.3.2.0")
4210  {
4211  throw CommandException("The stop_timer command is deprecated.");
4212  }
4213  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
4214  }
4215 
4216  amps_handle getHandle(void)
4217  {
4218  return _client;
4219  }
4220 
4228  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
4229  {
4230  _pExceptionListener = pListener_;
4231  _exceptionListener = _pExceptionListener.get();
4232  }
4233 
4234  void setExceptionListener(const ExceptionListener& listener_)
4235  {
4236  _exceptionListener = &listener_;
4237  }
4238 
4239  const ExceptionListener& getExceptionListener(void) const
4240  {
4241  return *_exceptionListener;
4242  }
4243 
4244  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
4245  {
4246  if (readTimeout_ < heartbeatInterval_)
4247  {
4248  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
4249  }
4250  Lock<Mutex> l(_lock);
4251  if (_heartbeatInterval != heartbeatInterval_ ||
4252  _readTimeout != readTimeout_)
4253  {
4254  _heartbeatInterval = heartbeatInterval_;
4255  _readTimeout = readTimeout_;
4256  _sendHeartbeat();
4257  }
4258  }
4259 
4260  void _sendHeartbeat(void)
4261  {
4262  if (_connected && _heartbeatInterval != 0)
4263  {
4264  std::ostringstream options;
4265  options << "start," << _heartbeatInterval;
4266  _beatMessage.setOptions(options.str());
4267 
4268  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
4269  _heartbeatTimer.start();
4270  try
4271  {
4272  _sendWithoutRetry(_beatMessage);
4273  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4274  }
4275  catch (ConnectionException& ex_)
4276  {
4277  // If we are disconnected when we attempt to send, that's OK;
4278  // we'll send this message after we re-connect (if we do).
4279  AMPS_UNHANDLED_EXCEPTION(ex_);
4280  }
4281  _beatMessage.setOptions("beat");
4282  }
4283  amps_result result = AMPS_E_OK;
4284  if (_readTimeout && _connected)
4285  {
4286  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4287  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4288  {
4289  AMPSException::throwFor(_client, result);
4290  }
4291  if (!_queueAckTimeout)
4292  {
4293  result = amps_client_set_idle_time(_client,
4294  (int)(_heartbeatInterval * 1000));
4295  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4296  {
4297  AMPSException::throwFor(_client, result);
4298  }
4299  }
4300  }
4301  }
4302 
4303  void addConnectionStateListener(ConnectionStateListener* listener_)
4304  {
4305  Lock<Mutex> lock(_lock);
4306  _connectionStateListeners.insert(listener_);
4307  }
4308 
4309  void removeConnectionStateListener(ConnectionStateListener* listener_)
4310  {
4311  Lock<Mutex> lock(_lock);
4312  _connectionStateListeners.erase(listener_);
4313  }
4314 
4315  void clearConnectionStateListeners()
4316  {
4317  Lock<Mutex> lock(_lock);
4318  _connectionStateListeners.clear();
4319  }
4320 
4321  void _registerHandler(Command& command_, Message::Field& cid_,
4322  MessageHandler& handler_, unsigned requestedAcks_,
4323  unsigned systemAddedAcks_, Message::Command::Type commandType_)
4324  {
4325  Message message = command_.getMessage();
4326  Message::Command::Type commandType = message.getCommandEnum();
4327  Message::Field subid = message.getSubscriptionId();
4328  Message::Field qid = message.getQueryID();
4329  // If we have an id, we're good, even if it's an existing route
4330  bool added = qid.len() || subid.len() || cid_.len();
4331  bool cidIsQid = cid_ == qid;
4332  bool cidUnique = !cidIsQid && cid_.len() > 0 && cid_ != subid;
4333  int addedCount = 0;
4334  if (subid.len() > 0)
4335  {
4336  // This can replace a non-subscribe with a matching id
4337  // with a subscription but not another subscription.
4338  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4339  systemAddedAcks_, commandType_);
4340  if (!cidUnique
4341  && (commandType == Message::Command::Subscribe
4342  || commandType == Message::Command::DeltaSubscribe))
4343  {
4344  // We don't need to do anything else
4345  cid_ = subid;
4346  return;
4347  }
4348  }
4349  if (qid.len() > 0 && qid != subid
4350  && (commandType == Message::Command::SOW
4351  || commandType == Message::Command::SOWDelete
4352  || commandType == Message::Command::SOWAndSubscribe
4353  || commandType == Message::Command::SOWAndDeltaSubscribe))
4354  {
4355  while (_routes.hasRoute(qid))
4356  {
4357  message.newQueryId();
4358  if (cidIsQid)
4359  {
4360  cid_ = message.getQueryId();
4361  }
4362  qid = message.getQueryId();
4363  }
4364  if (addedCount == 0)
4365  {
4366  _routes.addRoute(qid, handler_, requestedAcks_,
4367  systemAddedAcks_, commandType_);
4368  }
4369  else
4370  {
4371  void* data = NULL;
4372  {
4373  Unlock<Mutex> u(_lock);
4374  data = amps_invoke_copy_route_function(handler_.userData());
4375  }
4376  if (!data)
4377  {
4378  _routes.addRoute(qid, handler_, requestedAcks_,
4379  systemAddedAcks_, commandType_);
4380  }
4381  else
4382  {
4383  _routes.addRoute(qid,
4384  MessageHandler(handler_.function(),
4385  data),
4386  requestedAcks_,
4387  systemAddedAcks_, commandType_);
4388  }
4389  }
4390  ++addedCount;
4391  }
4392  if (cidUnique && requestedAcks_ & ~Message::AckType::Persisted)
4393  {
4394  while (_routes.hasRoute(cid_))
4395  {
4396  cid_ = message.newCommandId().getCommandId();
4397  }
4398  if (addedCount == 0)
4399  {
4400  _routes.addRoute(cid_, handler_, requestedAcks_,
4401  systemAddedAcks_, commandType_);
4402  }
4403  else
4404  {
4405  void* data = NULL;
4406  {
4407  Unlock<Mutex> u(_lock);
4408  data = amps_invoke_copy_route_function(handler_.userData());
4409  }
4410  if (!data)
4411  {
4412  _routes.addRoute(cid_, handler_, requestedAcks_,
4413  systemAddedAcks_, commandType_);
4414  }
4415  else
4416  {
4417  _routes.addRoute(cid_,
4418  MessageHandler(handler_.function(),
4419  data),
4420  requestedAcks_,
4421  systemAddedAcks_, commandType_);
4422  }
4423  }
4424  }
4425  else if ((commandType == Message::Command::Publish ||
4426  commandType == Message::Command::DeltaPublish)
4427  && requestedAcks_ & ~Message::AckType::Persisted)
4428  {
4429  cid_ = command_.getMessage().newCommandId().getCommandId();
4430  _routes.addRoute(cid_, handler_, requestedAcks_,
4431  systemAddedAcks_, commandType_);
4432  added = true;
4433  }
4434  if (!added)
4435  {
4436  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4437  }
4438  }
4439 
4440  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4441  bool isHASubscribe_ = true)
4442  {
4443  isHASubscribe_ &= (bool)_subscriptionManager;
4444  Message& message = command_.getMessage();
4445  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4446  Message::AckType::Processed : Message::AckType::None;
4447  unsigned requestedAcks = message.getAckTypeEnum();
4448  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4449  Message::Command::Type commandType = message.getCommandEnum();
4450  if (commandType == Message::Command::StopTimer)
4451  {
4452  systemAddedAcks |= Message::AckType::Completed;
4453  }
4454  else if (commandType == Message::Command::Unsubscribe)
4455  {
4456  // Clear routes and sub manager
4457  const std::string subId = message.getSubscriptionId();
4458  if (subId == "all")
4459  {
4460  _routes.unsubscribeAll();
4461  if (_subscriptionManager)
4462  {
4463  Unlock<Mutex> unlock(_lock);
4464  _subscriptionManager->clear();
4465  }
4466  }
4467  else
4468  {
4469  _routes.removeRoute(subId);
4470  // Lock is already acquired
4471  if (_subscriptionManager)
4472  {
4473  // Have to unlock before calling into sub manager to avoid deadlock
4474  Unlock<Mutex> unlock(_lock);
4475  _subscriptionManager->unsubscribe(subId);
4476  }
4477  }
4478  // Make sure the clear gets processed by receive thread
4479  deferredExecution(&amps_noOpFn, NULL);
4480  }
4481  Message::Field cid = message.getCommandId();
4482  if (handler_.isValid() && cid.empty())
4483  {
4484  cid = message.newCommandId().getCommandId();
4485  }
4486  if (message.getBookmark().len() > 0)
4487  {
4488  if (command_.isSubscribe())
4489  {
4490  Message::Field bookmark = message.getBookmark();
4491  if (_bookmarkStore.isValid())
4492  {
4493  systemAddedAcks |= Message::AckType::Persisted;
4494  if (bookmark == AMPS_BOOKMARK_RECENT)
4495  {
4496  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4497  }
4498  else if (bookmark != AMPS_BOOKMARK_NOW &&
4499  bookmark != AMPS_BOOKMARK_EPOCH)
4500  {
4501  _bookmarkStore.log(message);
4502  if (!BookmarkRange::isRange(bookmark))
4503  {
4504  _bookmarkStore.discard(message);
4505  _bookmarkStore.persisted(message.getSubscriptionId(),
4506  bookmark);
4507  }
4508  }
4509  }
4510  else if (bookmark == AMPS_BOOKMARK_RECENT)
4511  {
4513  }
4514  }
4515  }
4516  if (isPublishStore)
4517  {
4518  systemAddedAcks |= Message::AckType::Persisted;
4519  }
4520  bool isSubscribe = command_.isSubscribe();
4521  if (handler_.isValid() && !isSubscribe)
4522  {
4523  _registerHandler(command_, cid, handler_,
4524  requestedAcks, systemAddedAcks, commandType);
4525  }
4526  if (isPublishStore)
4527  {
4528  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4529  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4530  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4531  {
4532  Unlock<Mutex> u(_lock);
4533  haSequenceNumber = _publishStore.store(message);
4534  }
4535  message.setSequence(haSequenceNumber);
4536  try
4537  {
4538  if (useSyncSend)
4539  {
4540  syncAckProcessing((long)command_.getTimeout(), message,
4541  haSequenceNumber);
4542  }
4543  else
4544  {
4545  _send(message, haSequenceNumber);
4546  }
4547  }
4548  catch (const DisconnectedException&)
4549  {
4550  throw;
4551  }
4552  catch (...)
4553  {
4554  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4555  throw;
4556  }
4557  }
4558  else
4559  {
4560  if (isSubscribe)
4561  {
4562  const Message::Field& subId = message.getSubscriptionId();
4563  if (isHASubscribe_)
4564  {
4565  Unlock<Mutex> u(_lock);
4566  _subscriptionManager->subscribe(handler_,
4567  message.deepCopy(),
4568  requestedAcks);
4569  if (_badTimeToHASubscribe)
4570  {
4571  message.setAckTypeEnum(requestedAcks);
4572  return std::string(subId.data(), subId.len());
4573  }
4574  }
4575  if (handler_.isValid())
4576  {
4577  _registerHandler(command_, cid, handler_,
4578  requestedAcks, systemAddedAcks, commandType);
4579  }
4580  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4581  try
4582  {
4583  syncAckProcessing((long)command_.getTimeout(), message,
4584  isHASubscribe_);
4585  }
4586  catch (const DisconnectedException&)
4587  {
4588  if (!isHASubscribe_)
4589  {
4590  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4591  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4592  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4593  message.setAckTypeEnum(requestedAcks);
4594  throw;
4595  }
4596  }
4597  catch (const TimedOutException&)
4598  {
4599  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4600  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4601  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4602  message.setAckTypeEnum(requestedAcks);
4603  throw;
4604  }
4605  catch (...)
4606  {
4607  if (isHASubscribe_)
4608  {
4609  // Have to unlock before calling into sub manager to avoid deadlock
4610  Unlock<Mutex> unlock(_lock);
4611  _subscriptionManager->unsubscribe(subId);
4612  }
4613  if (message.getQueryID().len() > 0)
4614  {
4615  _routes.removeRoute(message.getQueryID());
4616  }
4617  _routes.removeRoute(cid);
4618  _routes.removeRoute(subId);
4619  message.setAckTypeEnum(requestedAcks);
4620  throw;
4621  }
4622  if (subId.len() > 0)
4623  {
4624  message.setAckTypeEnum(requestedAcks);
4625  return std::string(subId.data(), subId.len());
4626  }
4627  }
4628  else
4629  {
4630  // SOW, Flush, etc. should always be sync. Publish/delete may not be.
4631  bool useSyncSend = commandType & ~Message::Command::NoDataCommands
4632  || (cid.len() > 0 && command_.hasProcessedAck());
4633  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4634  try
4635  {
4636  if (useSyncSend)
4637  {
4638  syncAckProcessing((long)(command_.getTimeout()), message);
4639  }
4640  else
4641  {
4642  _send(message);
4643  }
4644  }
4645  catch (const TimedOutException&)
4646  {
4647  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4648  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4649  message.setAckTypeEnum(requestedAcks);
4650  throw;
4651  }
4652  catch (const DisconnectedException&)
4653  {
4654  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4655  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4656  message.setAckTypeEnum(requestedAcks);
4657  throw;
4658  }
4659  catch (...)
4660  {
4661  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4662  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4663  message.setAckTypeEnum(requestedAcks);
4664  throw;
4665  }
4666  }
4667  }
4668  message.setAckTypeEnum(requestedAcks);
4669  return cid;
4670  }
4671 
4672  MessageStream getEmptyMessageStream(void);
4673 
4674  std::string executeAsync(Command& command_, MessageHandler& handler_,
4675  bool isHASubscribe_ = true)
4676  {
4677  Lock<Mutex> lock(_lock);
4678  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4679  }
4680 
4681  // Queue Methods //
4682  void setAutoAck(bool isAutoAckEnabled_)
4683  {
4684  _isAutoAckEnabled = isAutoAckEnabled_;
4685  }
4686  bool getAutoAck(void) const
4687  {
4688  return _isAutoAckEnabled;
4689  }
4690  void setAckBatchSize(const unsigned batchSize_)
4691  {
4692  _ackBatchSize = batchSize_;
4693  if (!_queueAckTimeout)
4694  {
4695  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4696  amps_client_set_idle_time(_client, _queueAckTimeout);
4697  }
4698  }
4699  unsigned getAckBatchSize(void) const
4700  {
4701  return _ackBatchSize;
4702  }
4703  int getAckTimeout(void) const
4704  {
4705  return _queueAckTimeout;
4706  }
4707  void setAckTimeout(const int ackTimeout_)
4708  {
4709  amps_client_set_idle_time(_client, ackTimeout_);
4710  _queueAckTimeout = ackTimeout_;
4711  }
4712  size_t _ack(QueueBookmarks& queueBookmarks_)
4713  {
4714  if (queueBookmarks_._bookmarkCount)
4715  {
4716  publishStoreMessage.reset();
4717  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4718  .setTopic(queueBookmarks_._topic)
4719  .setBookmark(queueBookmarks_._data)
4720  .setCommandId("AMPS-queue-ack");
4721  amps_uint64_t haSequenceNumber = 0;
4722  if (_publishStore.isValid())
4723  {
4724  haSequenceNumber = _publishStore.store(publishStoreMessage);
4725  publishStoreMessage.setAckType("persisted")
4726  .setSequence(haSequenceNumber);
4727  queueBookmarks_._data.erase();
4728  queueBookmarks_._bookmarkCount = 0;
4729  }
4730  _send(publishStoreMessage, haSequenceNumber);
4731  if (!_publishStore.isValid())
4732  {
4733  queueBookmarks_._data.erase();
4734  queueBookmarks_._bookmarkCount = 0;
4735  }
4736  return 1;
4737  }
4738  return 0;
4739  }
4740  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4741  {
4742  if (_isAutoAckEnabled)
4743  {
4744  return;
4745  }
4746  _ack(topic_, bookmark_, options_);
4747  }
4748  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4749  {
4750  if (bookmark_.len() == 0)
4751  {
4752  return;
4753  }
4754  Lock<Mutex> lock(_lock);
4755  if (_ackBatchSize < 2 || options_ != NULL)
4756  {
4757  publishStoreMessage.reset();
4758  publishStoreMessage.setCommandEnum(Message::Command::SOWDelete)
4759  .setCommandId("AMPS-queue-ack")
4760  .setTopic(topic_).setBookmark(bookmark_);
4761  if (options_)
4762  {
4763  publishStoreMessage.setOptions(options_);
4764  }
4765  amps_uint64_t haSequenceNumber = 0;
4766  if (_publishStore.isValid())
4767  {
4768  haSequenceNumber = _publishStore.store(publishStoreMessage);
4769  publishStoreMessage.setAckType("persisted")
4770  .setSequence(haSequenceNumber);
4771  }
4772  _send(publishStoreMessage, haSequenceNumber);
4773  return;
4774  }
4775  // have we acked anything for this hash
4776  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4777  TopicHashMap::iterator it = _topicHashMap.find(hash);
4778  if (it == _topicHashMap.end())
4779  {
4780  // add a new one to the map
4781 #ifdef AMPS_USE_EMPLACE
4782  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4783 #else
4784  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4785 #endif
4786  }
4787  QueueBookmarks& queueBookmarks = it->second;
4788  if (queueBookmarks._data.length())
4789  {
4790  queueBookmarks._data.append(",");
4791  }
4792  else
4793  {
4794  queueBookmarks._oldestTime = amps_now();
4795  }
4796  queueBookmarks._data.append(bookmark_);
4797  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4798  {
4799  _ack(queueBookmarks);
4800  }
4801  }
4802  void flushAcks(void)
4803  {
4804  size_t sendCount = 0;
4805  if (!_connected)
4806  {
4807  return;
4808  }
4809  else
4810  {
4811  Lock<Mutex> lock(_lock);
4812  typedef TopicHashMap::iterator iterator;
4813  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4814  {
4815  QueueBookmarks& queueBookmarks = it->second;
4816  sendCount += _ack(queueBookmarks);
4817  }
4818  }
4819  if (sendCount && _connected)
4820  {
4821  publishFlush(0, Message::AckType::Processed);
4822  }
4823  }
4824  // called when there's idle time, to see if we need to flush out any "acks"
4825  void checkQueueAcks(void)
4826  {
4827  if (!_topicHashMap.size())
4828  {
4829  return;
4830  }
4831  Lock<Mutex> lock(_lock);
4832  try
4833  {
4834  amps_uint64_t threshold = amps_now()
4835  - (amps_uint64_t)_queueAckTimeout;
4836  typedef TopicHashMap::iterator iterator;
4837  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4838  {
4839  QueueBookmarks& queueBookmarks = it->second;
4840  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4841  {
4842  _ack(queueBookmarks);
4843  }
4844  }
4845  }
4846  catch (std::exception& ex)
4847  {
4848  AMPS_UNHANDLED_EXCEPTION(ex);
4849  }
4850  }
4851 
4852  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4853  {
4854  Lock<Mutex> lock(_deferredExecutionLock);
4855 #ifdef AMPS_USE_EMPLACE
4856  _deferredExecutionList.emplace_back(
4857  DeferredExecutionRequest(func_, userData_));
4858 #else
4859  _deferredExecutionList.push_back(
4860  DeferredExecutionRequest(func_, userData_));
4861 #endif
4862  }
4863 
4864  inline void processDeferredExecutions(void)
4865  {
4866  if (_deferredExecutionList.size())
4867  {
4868  Lock<Mutex> lock(_deferredExecutionLock);
4869  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4870  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4871  for (; it != end; ++it)
4872  {
4873  try
4874  {
4875  it->_func(it->_userData);
4876  }
4877  catch (...)
4878  {
4879  // -V565
4880  // Intentionally ignore errors
4881  }
4882  }
4883  _deferredExecutionList.clear();
4884  _routes.invalidateCache();
4885  _routeCache.invalidateCache();
4886  }
4887  }
4888 
4889  bool getRetryOnDisconnect(void) const
4890  {
4891  return _isRetryOnDisconnect;
4892  }
4893 
4894  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4895  {
4896  _isRetryOnDisconnect = isRetryOnDisconnect_;
4897  }
4898 
4899  void setDefaultMaxDepth(unsigned maxDepth_)
4900  {
4901  _defaultMaxDepth = maxDepth_;
4902  }
4903 
4904  unsigned getDefaultMaxDepth(void) const
4905  {
4906  return _defaultMaxDepth;
4907  }
4908 
4909  void setTransportFilterFunction(amps_transport_filter_function filter_,
4910  void* userData_)
4911  {
4912  amps_client_set_transport_filter_function(_client, filter_, userData_);
4913  }
4914 
4915  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4916  void* userData_)
4917  {
4918  amps_client_set_thread_created_callback(_client, callback_, userData_);
4919  }
4920  }; // class ClientImpl
4995 
4997  {
4998  RefHandle<MessageStreamImpl> _body;
4999  public:
5004  class iterator
5005  {
5006  MessageStream* _pStream;
5007  Message _current;
5008  inline void advance(void);
5009 
5010  public:
5011  iterator() // end
5012  : _pStream(NULL)
5013  {;}
5014  iterator(MessageStream* pStream_)
5015  : _pStream(pStream_)
5016  {
5017  advance();
5018  }
5019 
5020  bool operator==(const iterator& rhs) const
5021  {
5022  return _pStream == rhs._pStream;
5023  }
5024  bool operator!=(const iterator& rhs) const
5025  {
5026  return _pStream != rhs._pStream;
5027  }
5028  void operator++(void)
5029  {
5030  advance();
5031  }
5032  Message operator*(void)
5033  {
5034  return _current;
5035  }
5036  Message* operator->(void)
5037  {
5038  return &_current;
5039  }
5040  };
5042  bool isValid() const
5043  {
5044  return _body.isValid();
5045  }
5046 
5050  {
5051  if (!_body.isValid())
5052  {
5053  throw UsageException("This MessageStream is not valid and cannot be iterated.");
5054  }
5055  return iterator(this);
5056  }
5059  // For non-SOW queries, the end is never reached.
5061  {
5062  return iterator();
5063  }
5064  inline MessageStream(void);
5065 
5071  MessageStream timeout(unsigned timeout_);
5072 
5076  MessageStream conflate(void);
5082  MessageStream maxDepth(unsigned maxDepth_);
5085  unsigned getMaxDepth(void) const;
5088  unsigned getDepth(void) const;
5089 
5090  private:
5091  inline MessageStream(const Client& client_);
5092  inline MessageStream(RefHandle<MessageStreamImpl> body_);
5093  inline void setSOWOnly(const std::string& commandId_,
5094  const std::string& queryId_ = "");
5095  inline void setSubscription(const std::string& subId_,
5096  const std::string& commandId_ = "",
5097  const std::string& queryId_ = "");
5098  inline void setStatsOnly(const std::string& commandId_,
5099  const std::string& queryId_ = "");
5100  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
5101 
5102  inline operator MessageHandler(void);
5103 
5104  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
5105 
5106  friend class Client;
5107  friend class ClientImpl;
5108 
5109  };
5110 
5130  class Client // -V553
5131  {
5132  protected:
5133  BorrowRefHandle<ClientImpl> _body;
5134  public:
5135  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
5136  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
5137  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
5138 
5147  Client(const std::string& clientName = "")
5148  : _body(new ClientImpl(clientName), true)
5149  {;}
5150 
5151  Client(ClientImpl* existingClient)
5152  : _body(existingClient, true)
5153  {;}
5154 
5155  Client(ClientImpl* existingClient, bool isRef)
5156  : _body(existingClient, isRef)
5157  {;}
5158 
5159  Client(const Client& rhs) : _body(rhs._body) {;}
5160  virtual ~Client(void) {;}
5161 
5162  Client& operator=(const Client& rhs)
5163  {
5164  _body = rhs._body;
5165  return *this;
5166  }
5167 
5168  bool isValid()
5169  {
5170  return _body.isValid();
5171  }
5172 
5185  void setName(const std::string& name)
5186  {
5187  _body.get().setName(name);
5188  }
5189 
5192  const std::string& getName() const
5193  {
5194  return _body.get().getName();
5195  }
5196 
5200  const std::string& getNameHash() const
5201  {
5202  return _body.get().getNameHash();
5203  }
5204 
5208  const amps_uint64_t getNameHashValue() const
5209  {
5210  return _body.get().getNameHashValue();
5211  }
5212 
5219  void setLogonCorrelationData(const std::string& logonCorrelationData_)
5220  {
5221  _body.get().setLogonCorrelationData(logonCorrelationData_);
5222  }
5223 
5226  const std::string& getLogonCorrelationData() const
5227  {
5228  return _body.get().getLogonCorrelationData();
5229  }
5230 
5234  void addHttpPreflightHeader(const std::string& header_)
5235  {
5236  _body.get().addHttpPreflightHeader(header_);
5237  }
5238 
5243  void addHttpPreflightHeader(const std::string& key_, const std::string& value_)
5244  {
5245  _body.get().addHttpPreflightHeader(key_, value_);
5246  }
5247 
5250  {
5251  _body.get().clearHttpPreflightHeaders();
5252  }
5253 
5257  template<class T>
5258  void setHttpPreflightHeaders(const T& headers_)
5259  {
5260  _body.get().setHttpPreflightHeaders(headers_);
5261  }
5262 
5271  size_t getServerVersion() const
5272  {
5273  return _body.get().getServerVersion();
5274  }
5275 
5282  VersionInfo getServerVersionInfo() const
5283  {
5284  return _body.get().getServerVersionInfo();
5285  }
5286 
5296  static size_t convertVersionToNumber(const std::string& version_)
5297  {
5298  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
5299  }
5300 
5311  static size_t convertVersionToNumber(const char* data_, size_t len_)
5312  {
5313  return AMPS::convertVersionToNumber(data_, len_);
5314  }
5315 
5318  const std::string& getURI() const
5319  {
5320  return _body.get().getURI();
5321  }
5322 
5329 
5331 
5342  void connect(const std::string& uri)
5343  {
5344  _body.get().connect(uri);
5345  }
5346 
5349  void disconnect()
5350  {
5351  _body.get().disconnect();
5352  }
5353 
5367  void send(const Message& message)
5368  {
5369  _body.get().send(message);
5370  }
5371 
5380  void addMessageHandler(const Field& commandId_,
5381  const AMPS::MessageHandler& messageHandler_,
5382  unsigned requestedAcks_, bool isSubscribe_)
5383  {
5384  Message::Command::Type commandType = isSubscribe_ ? Message::Command::Subscribe : Message::Command::SOW;
5385  _body.get().addMessageHandler(commandId_, messageHandler_,
5386  requestedAcks_, commandType);
5387  }
5388 
5397  void addMessageHandler(const Field& commandId_,
5398  const AMPS::MessageHandler& messageHandler_,
5399  unsigned requestedAcks_, Message::Command::Type commandType_)
5400  {
5401  _body.get().addMessageHandler(commandId_, messageHandler_,
5402  requestedAcks_, commandType_);
5403  }
5404 
5408  bool removeMessageHandler(const Field& commandId_)
5409  {
5410  return _body.get().removeMessageHandler(commandId_);
5411  }
5412 
5436  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5437  {
5438  return _body.get().send(messageHandler, message, timeout);
5439  }
5440 
5450  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5451  {
5452  _body.get().setDisconnectHandler(disconnectHandler);
5453  }
5454 
5458  DisconnectHandler getDisconnectHandler(void) const
5459  {
5460  return _body.get().getDisconnectHandler();
5461  }
5462 
5467  virtual ConnectionInfo getConnectionInfo() const
5468  {
5469  return _body.get().getConnectionInfo();
5470  }
5471 
5480  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5481  {
5482  _body.get().setBookmarkStore(bookmarkStore_);
5483  }
5484 
5489  {
5490  return _body.get().getBookmarkStore();
5491  }
5492 
5497  {
5498  return _body.get().getSubscriptionManager();
5499  }
5500 
5508  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5509  {
5510  _body.get().setSubscriptionManager(subscriptionManager_);
5511  }
5512 
5532  void setPublishStore(const Store& publishStore_)
5533  {
5534  _body.get().setPublishStore(publishStore_);
5535  }
5536 
5541  {
5542  return _body.get().getPublishStore();
5543  }
5544 
5548  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5549  {
5550  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5551  duplicateMessageHandler_);
5552  }
5553 
5564  {
5565  return _body.get().getDuplicateMessageHandler();
5566  }
5567 
5578  {
5579  _body.get().setFailedWriteHandler(handler_);
5580  }
5581 
5586  {
5587  return _body.get().getFailedWriteHandler();
5588  }
5589 
5590 
5608  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5609  {
5610  return _body.get().publish(topic_.c_str(), topic_.length(),
5611  data_.c_str(), data_.length());
5612  }
5613 
5633  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5634  const char* data_, size_t dataLength_)
5635  {
5636  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5637  }
5638 
5657  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5658  unsigned long expiration_)
5659  {
5660  return _body.get().publish(topic_.c_str(), topic_.length(),
5661  data_.c_str(), data_.length(), expiration_);
5662  }
5663 
5684  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5685  const char* data_, size_t dataLength_,
5686  unsigned long expiration_)
5687  {
5688  return _body.get().publish(topic_, topicLength_,
5689  data_, dataLength_, expiration_);
5690  }
5691 
5730  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5731  {
5732  _body.get().publishFlush(timeout_, ackType_);
5733  }
5734 
5735 
5751  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5752  {
5753  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5754  data_.c_str(), data_.length());
5755  }
5756 
5774  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5775  const char* data_, size_t dataLength_)
5776  {
5777  return _body.get().deltaPublish(topic_, topicLength_,
5778  data_, dataLength_);
5779  }
5780 
5797  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5798  unsigned long expiration_)
5799  {
5800  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5801  data_.c_str(), data_.length(),
5802  expiration_);
5803  }
5804 
5823  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5824  const char* data_, size_t dataLength_,
5825  unsigned long expiration_)
5826  {
5827  return _body.get().deltaPublish(topic_, topicLength_,
5828  data_, dataLength_, expiration_);
5829  }
5830 
5846  std::string logon(int timeout_ = 0,
5847  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5848  const char* options_ = NULL)
5849  {
5850  return _body.get().logon(timeout_, authenticator_, options_);
5851  }
5865  std::string logon(const char* options_, int timeout_ = 0)
5866  {
5867  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5868  options_);
5869  }
5870 
5884  std::string logon(const std::string& options_, int timeout_ = 0)
5885  {
5886  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5887  options_.c_str());
5888  }
5889 
5909  std::string subscribe(const MessageHandler& messageHandler_,
5910  const std::string& topic_,
5911  long timeout_ = 0,
5912  const std::string& filter_ = "",
5913  const std::string& options_ = "",
5914  const std::string& subId_ = "")
5915  {
5916  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5917  filter_, "", options_, subId_);
5918  }
5919 
5935  MessageStream subscribe(const std::string& topic_,
5936  long timeout_ = 0, const std::string& filter_ = "",
5937  const std::string& options_ = "",
5938  const std::string& subId_ = "")
5939  {
5940  MessageStream result(*this);
5941  if (_body.get().getDefaultMaxDepth())
5942  {
5943  result.maxDepth(_body.get().getDefaultMaxDepth());
5944  }
5945  result.setSubscription(_body.get().subscribe(
5946  result.operator MessageHandler(),
5947  topic_, timeout_, filter_, "",
5948  options_, subId_, false));
5949  return result;
5950  }
5951 
5967  MessageStream subscribe(const char* topic_,
5968  long timeout_ = 0, const std::string& filter_ = "",
5969  const std::string& options_ = "",
5970  const std::string& subId_ = "")
5971  {
5972  MessageStream result(*this);
5973  if (_body.get().getDefaultMaxDepth())
5974  {
5975  result.maxDepth(_body.get().getDefaultMaxDepth());
5976  }
5977  result.setSubscription(_body.get().subscribe(
5978  result.operator MessageHandler(),
5979  topic_, timeout_, filter_, "",
5980  options_, subId_, false));
5981  return result;
5982  }
5983 
5996  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5997  const std::string& topic_,
5998  long timeout_,
5999  const std::string& filter_ = "",
6000  const std::string& options_ = "",
6001  const std::string& subId_ = "")
6002  {
6003  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
6004  filter_, "", options_, subId_);
6005  }
6014  MessageStream deltaSubscribe(const std::string& topic_,
6015  long timeout_, const std::string& filter_ = "",
6016  const std::string& options_ = "",
6017  const std::string& subId_ = "")
6018  {
6019  MessageStream result(*this);
6020  if (_body.get().getDefaultMaxDepth())
6021  {
6022  result.maxDepth(_body.get().getDefaultMaxDepth());
6023  }
6024  result.setSubscription(_body.get().deltaSubscribe(
6025  result.operator MessageHandler(),
6026  topic_, timeout_, filter_, "",
6027  options_, subId_, false));
6028  return result;
6029  }
6030 
6032  MessageStream deltaSubscribe(const char* topic_,
6033  long timeout_, const std::string& filter_ = "",
6034  const std::string& options_ = "",
6035  const std::string& subId_ = "")
6036  {
6037  MessageStream result(*this);
6038  if (_body.get().getDefaultMaxDepth())
6039  {
6040  result.maxDepth(_body.get().getDefaultMaxDepth());
6041  }
6042  result.setSubscription(_body.get().deltaSubscribe(
6043  result.operator MessageHandler(),
6044  topic_, timeout_, filter_, "",
6045  options_, subId_, false));
6046  return result;
6047  }
6048 
6074  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
6075  const std::string& topic_,
6076  long timeout_,
6077  const std::string& bookmark_,
6078  const std::string& filter_ = "",
6079  const std::string& options_ = "",
6080  const std::string& subId_ = "")
6081  {
6082  return _body.get().subscribe(messageHandler_, topic_, timeout_,
6083  filter_, bookmark_, options_, subId_);
6084  }
6102  MessageStream bookmarkSubscribe(const std::string& topic_,
6103  long timeout_,
6104  const std::string& bookmark_,
6105  const std::string& filter_ = "",
6106  const std::string& options_ = "",
6107  const std::string& subId_ = "")
6108  {
6109  MessageStream result(*this);
6110  if (_body.get().getDefaultMaxDepth())
6111  {
6112  result.maxDepth(_body.get().getDefaultMaxDepth());
6113  }
6114  result.setSubscription(_body.get().subscribe(
6115  result.operator MessageHandler(),
6116  topic_, timeout_, filter_,
6117  bookmark_, options_,
6118  subId_, false));
6119  return result;
6120  }
6121 
6123  MessageStream bookmarkSubscribe(const char* topic_,
6124  long timeout_,
6125  const std::string& bookmark_,
6126  const std::string& filter_ = "",
6127  const std::string& options_ = "",
6128  const std::string& subId_ = "")
6129  {
6130  MessageStream result(*this);
6131  if (_body.get().getDefaultMaxDepth())
6132  {
6133  result.maxDepth(_body.get().getDefaultMaxDepth());
6134  }
6135  result.setSubscription(_body.get().subscribe(
6136  result.operator MessageHandler(),
6137  topic_, timeout_, filter_,
6138  bookmark_, options_,
6139  subId_, false));
6140  return result;
6141  }
6142 
6151  void unsubscribe(const std::string& commandId)
6152  {
6153  return _body.get().unsubscribe(commandId);
6154  }
6155 
6164  {
6165  return _body.get().unsubscribe();
6166  }
6167 
6168 
6198  std::string sow(const MessageHandler& messageHandler_,
6199  const std::string& topic_,
6200  const std::string& filter_ = "",
6201  const std::string& orderBy_ = "",
6202  const std::string& bookmark_ = "",
6203  int batchSize_ = DEFAULT_BATCH_SIZE,
6204  int topN_ = DEFAULT_TOP_N,
6205  const std::string& options_ = "",
6206  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6207  {
6208  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
6209  bookmark_, batchSize_, topN_, options_,
6210  timeout_);
6211  }
6236  MessageStream sow(const std::string& topic_,
6237  const std::string& filter_ = "",
6238  const std::string& orderBy_ = "",
6239  const std::string& bookmark_ = "",
6240  int batchSize_ = DEFAULT_BATCH_SIZE,
6241  int topN_ = DEFAULT_TOP_N,
6242  const std::string& options_ = "",
6243  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6244  {
6245  MessageStream result(*this);
6246  if (_body.get().getDefaultMaxDepth())
6247  {
6248  result.maxDepth(_body.get().getDefaultMaxDepth());
6249  }
6250  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6251  topic_, filter_, orderBy_, bookmark_,
6252  batchSize_, topN_, options_, timeout_));
6253  return result;
6254  }
6255 
6257  MessageStream sow(const char* topic_,
6258  const std::string& filter_ = "",
6259  const std::string& orderBy_ = "",
6260  const std::string& bookmark_ = "",
6261  int batchSize_ = DEFAULT_BATCH_SIZE,
6262  int topN_ = DEFAULT_TOP_N,
6263  const std::string& options_ = "",
6264  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6265  {
6266  MessageStream result(*this);
6267  if (_body.get().getDefaultMaxDepth())
6268  {
6269  result.maxDepth(_body.get().getDefaultMaxDepth());
6270  }
6271  result.setSOWOnly(_body.get().sow(result.operator MessageHandler(),
6272  topic_, filter_, orderBy_, bookmark_,
6273  batchSize_, topN_, options_, timeout_));
6274  return result;
6275  }
6298  std::string sow(const MessageHandler& messageHandler_,
6299  const std::string& topic_,
6300  long timeout_,
6301  const std::string& filter_ = "",
6302  int batchSize_ = DEFAULT_BATCH_SIZE,
6303  int topN_ = DEFAULT_TOP_N)
6304  {
6305  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
6306  batchSize_, topN_);
6307  }
6330  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6331  const std::string& topic_,
6332  long timeout_,
6333  const std::string& filter_ = "",
6334  int batchSize_ = DEFAULT_BATCH_SIZE,
6335  bool oofEnabled_ = false,
6336  int topN_ = DEFAULT_TOP_N)
6337  {
6338  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
6339  filter_, batchSize_, oofEnabled_,
6340  topN_);
6341  }
6342 
6362  MessageStream sowAndSubscribe(const std::string& topic_,
6363  long timeout_,
6364  const std::string& filter_ = "",
6365  int batchSize_ = DEFAULT_BATCH_SIZE,
6366  bool oofEnabled_ = false,
6367  int topN_ = DEFAULT_TOP_N)
6368  {
6369  MessageStream result(*this);
6370  if (_body.get().getDefaultMaxDepth())
6371  {
6372  result.maxDepth(_body.get().getDefaultMaxDepth());
6373  }
6374  result.setSubscription(_body.get().sowAndSubscribe(
6375  result.operator MessageHandler(),
6376  topic_, timeout_, filter_,
6377  batchSize_, oofEnabled_,
6378  topN_, false));
6379  return result;
6380  }
6400  MessageStream sowAndSubscribe(const char* topic_,
6401  long timeout_,
6402  const std::string& filter_ = "",
6403  int batchSize_ = DEFAULT_BATCH_SIZE,
6404  bool oofEnabled_ = false,
6405  int topN_ = DEFAULT_TOP_N)
6406  {
6407  MessageStream result(*this);
6408  if (_body.get().getDefaultMaxDepth())
6409  {
6410  result.maxDepth(_body.get().getDefaultMaxDepth());
6411  }
6412  result.setSubscription(_body.get().sowAndSubscribe(
6413  result.operator MessageHandler(),
6414  topic_, timeout_, filter_,
6415  batchSize_, oofEnabled_,
6416  topN_, false));
6417  return result;
6418  }
6419 
6420 
6448  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6449  const std::string& topic_,
6450  const std::string& filter_ = "",
6451  const std::string& orderBy_ = "",
6452  const std::string& bookmark_ = "",
6453  int batchSize_ = DEFAULT_BATCH_SIZE,
6454  int topN_ = DEFAULT_TOP_N,
6455  const std::string& options_ = "",
6456  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6457  {
6458  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6459  orderBy_, bookmark_, batchSize_,
6460  topN_, options_, timeout_);
6461  }
6462 
6487  MessageStream sowAndSubscribe(const std::string& topic_,
6488  const std::string& filter_ = "",
6489  const std::string& orderBy_ = "",
6490  const std::string& bookmark_ = "",
6491  int batchSize_ = DEFAULT_BATCH_SIZE,
6492  int topN_ = DEFAULT_TOP_N,
6493  const std::string& options_ = "",
6494  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6495  {
6496  MessageStream result(*this);
6497  if (_body.get().getDefaultMaxDepth())
6498  {
6499  result.maxDepth(_body.get().getDefaultMaxDepth());
6500  }
6501  result.setSubscription(_body.get().sowAndSubscribe(
6502  result.operator MessageHandler(),
6503  topic_, filter_, orderBy_,
6504  bookmark_, batchSize_, topN_,
6505  options_, timeout_, false));
6506  return result;
6507  }
6508 
6510  MessageStream sowAndSubscribe(const char* topic_,
6511  const std::string& filter_ = "",
6512  const std::string& orderBy_ = "",
6513  const std::string& bookmark_ = "",
6514  int batchSize_ = DEFAULT_BATCH_SIZE,
6515  int topN_ = DEFAULT_TOP_N,
6516  const std::string& options_ = "",
6517  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6518  {
6519  MessageStream result(*this);
6520  if (_body.get().getDefaultMaxDepth())
6521  {
6522  result.maxDepth(_body.get().getDefaultMaxDepth());
6523  }
6524  result.setSubscription(_body.get().sowAndSubscribe(
6525  result.operator MessageHandler(),
6526  topic_, filter_, orderBy_,
6527  bookmark_, batchSize_, topN_,
6528  options_, timeout_, false));
6529  return result;
6530  }
6531 
6556  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6557  const std::string& topic_,
6558  const std::string& filter_ = "",
6559  const std::string& orderBy_ = "",
6560  int batchSize_ = DEFAULT_BATCH_SIZE,
6561  int topN_ = DEFAULT_TOP_N,
6562  const std::string& options_ = "",
6563  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6564  {
6565  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6566  filter_, orderBy_, batchSize_,
6567  topN_, options_, timeout_);
6568  }
6589  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6590  const std::string& filter_ = "",
6591  const std::string& orderBy_ = "",
6592  int batchSize_ = DEFAULT_BATCH_SIZE,
6593  int topN_ = DEFAULT_TOP_N,
6594  const std::string& options_ = "",
6595  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6596  {
6597  MessageStream result(*this);
6598  if (_body.get().getDefaultMaxDepth())
6599  {
6600  result.maxDepth(_body.get().getDefaultMaxDepth());
6601  }
6602  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6603  result.operator MessageHandler(),
6604  topic_, filter_, orderBy_,
6605  batchSize_, topN_, options_,
6606  timeout_, false));
6607  return result;
6608  }
6609 
6612  const std::string& filter_ = "",
6613  const std::string& orderBy_ = "",
6614  int batchSize_ = DEFAULT_BATCH_SIZE,
6615  int topN_ = DEFAULT_TOP_N,
6616  const std::string& options_ = "",
6617  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6618  {
6619  MessageStream result(*this);
6620  if (_body.get().getDefaultMaxDepth())
6621  {
6622  result.maxDepth(_body.get().getDefaultMaxDepth());
6623  }
6624  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6625  result.operator MessageHandler(),
6626  topic_, filter_, orderBy_,
6627  batchSize_, topN_, options_,
6628  timeout_, false));
6629  return result;
6630  }
6631 
6656  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6657  const std::string& topic_,
6658  long timeout_,
6659  const std::string& filter_ = "",
6660  int batchSize_ = DEFAULT_BATCH_SIZE,
6661  bool oofEnabled_ = false,
6662  bool sendEmpties_ = false,
6663  int topN_ = DEFAULT_TOP_N)
6664  {
6665  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6666  timeout_, filter_, batchSize_,
6667  oofEnabled_, sendEmpties_,
6668  topN_);
6669  }
6670 
6692  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6693  long timeout_,
6694  const std::string& filter_ = "",
6695  int batchSize_ = DEFAULT_BATCH_SIZE,
6696  bool oofEnabled_ = false,
6697  bool sendEmpties_ = false,
6698  int topN_ = DEFAULT_TOP_N)
6699  {
6700  MessageStream result(*this);
6701  if (_body.get().getDefaultMaxDepth())
6702  {
6703  result.maxDepth(_body.get().getDefaultMaxDepth());
6704  }
6705  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6706  result.operator MessageHandler(),
6707  topic_, timeout_, filter_,
6708  batchSize_, oofEnabled_,
6709  sendEmpties_, topN_, false));
6710  return result;
6711  }
6734  long timeout_,
6735  const std::string& filter_ = "",
6736  int batchSize_ = DEFAULT_BATCH_SIZE,
6737  bool oofEnabled_ = false,
6738  bool sendEmpties_ = false,
6739  int topN_ = DEFAULT_TOP_N)
6740  {
6741  MessageStream result(*this);
6742  if (_body.get().getDefaultMaxDepth())
6743  {
6744  result.maxDepth(_body.get().getDefaultMaxDepth());
6745  }
6746  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6747  result.operator MessageHandler(),
6748  topic_, timeout_, filter_,
6749  batchSize_, oofEnabled_,
6750  sendEmpties_, topN_, false));
6751  return result;
6752  }
6772  std::string sowDelete(const MessageHandler& messageHandler,
6773  const std::string& topic,
6774  const std::string& filter,
6775  long timeout)
6776  {
6777  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6778  }
6795  Message sowDelete(const std::string& topic, const std::string& filter,
6796  long timeout = 0)
6797  {
6798  MessageStream stream(*this);
6799  char buf[Message::IdentifierLength + 1];
6800  buf[Message::IdentifierLength] = 0;
6801  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6802  Field cid(buf);
6803  try
6804  {
6805  stream.setStatsOnly(cid);
6806  _body.get().sowDelete(stream.operator MessageHandler(), topic, filter, timeout, cid);
6807  return *(stream.begin());
6808  }
6809  catch (const DisconnectedException&)
6810  {
6811  removeMessageHandler(cid);
6812  throw;
6813  }
6814  }
6815 
6820  void startTimer()
6821  {
6822  _body.get().startTimer();
6823  }
6824 
6831  std::string stopTimer(const MessageHandler& messageHandler)
6832  {
6833  return _body.get().stopTimer(messageHandler);
6834  }
6835 
6857  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6858  const std::string& topic_,
6859  const std::string& keys_,
6860  long timeout_ = 0)
6861  {
6862  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6863  }
6884  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6885  long timeout_ = 0)
6886  {
6887  MessageStream stream(*this);
6888  char buf[Message::IdentifierLength + 1];
6889  buf[Message::IdentifierLength] = 0;
6890  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6891  Field cid(buf);
6892  try
6893  {
6894  stream.setStatsOnly(cid);
6895  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6896  return *(stream.begin());
6897  }
6898  catch (const DisconnectedException&)
6899  {
6900  removeMessageHandler(cid);
6901  throw;
6902  }
6903  }
6904 
6919  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6920  const std::string& topic_, const std::string& data_,
6921  long timeout_ = 0)
6922  {
6923  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6924  }
6925 
6940  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
6941  long timeout_ = 0)
6942  {
6943  MessageStream stream(*this);
6944  char buf[Message::IdentifierLength + 1];
6945  buf[Message::IdentifierLength] = 0;
6946  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6947  Field cid(buf);
6948  try
6949  {
6950  stream.setStatsOnly(cid);
6951  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
6952  return *(stream.begin());
6953  }
6954  catch (const DisconnectedException&)
6955  {
6956  removeMessageHandler(cid);
6957  throw;
6958  }
6959  }
6960 
6965  {
6966  return _body.get().getHandle();
6967  }
6968 
6977  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6978  {
6979  _body.get().setExceptionListener(pListener_);
6980  }
6981 
6991  {
6992  _body.get().setExceptionListener(listener_);
6993  }
6994 
6998  {
6999  return _body.get().getExceptionListener();
7000  }
7001 
7009  // type of message) from the server for the specified interval (plus a grace period),
7023  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
7024  {
7025  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
7026  }
7027 
7035  // type of message) from the server for the specified interval (plus a grace period),
7047  void setHeartbeat(unsigned heartbeatTime_)
7048  {
7049  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
7050  }
7051 
7054  {
7055  setLastChanceMessageHandler(messageHandler);
7056  }
7057 
7061  {
7062  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
7063  messageHandler);
7064  }
7065 
7086  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
7087  {
7088  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7089  }
7090 
7111  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
7112  {
7113  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
7114  }
7115 
7121  static const char* BOOKMARK_NOW()
7122  {
7123  return AMPS_BOOKMARK_NOW;
7124  }
7130  static const char* NOW()
7131  {
7132  return AMPS_BOOKMARK_NOW;
7133  }
7134 
7140  static const char* BOOKMARK_EPOCH()
7141  {
7142  return AMPS_BOOKMARK_EPOCH;
7143  }
7144 
7150  static const char* EPOCH()
7151  {
7152  return AMPS_BOOKMARK_EPOCH;
7153  }
7154 
7161  static const char* BOOKMARK_MOST_RECENT()
7162  {
7163  return AMPS_BOOKMARK_RECENT;
7164  }
7165 
7172  static const char* MOST_RECENT()
7173  {
7174  return AMPS_BOOKMARK_RECENT;
7175  }
7176 
7183  static const char* BOOKMARK_RECENT()
7184  {
7185  return AMPS_BOOKMARK_RECENT;
7186  }
7187 
7188 
7195  {
7196  _body.get().addConnectionStateListener(listener);
7197  }
7198 
7203  {
7204  _body.get().removeConnectionStateListener(listener);
7205  }
7206 
7210  {
7211  _body.get().clearConnectionStateListeners();
7212  }
7213 
7239  std::string executeAsync(Command& command_, MessageHandler handler_)
7240  {
7241  return _body.get().executeAsync(command_, handler_);
7242  }
7243 
7273  std::string executeAsyncNoResubscribe(Command& command_,
7274  MessageHandler handler_)
7275  {
7276  std::string id;
7277  try
7278  {
7279  if (command_.isSubscribe())
7280  {
7281  Message& message = command_.getMessage();
7282  Field subId = message.getSubscriptionId();
7283  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
7284  if (useExistingHandler)
7285  {
7286  MessageHandler existingHandler;
7287  if (_body.get()._routes.getRoute(subId, existingHandler))
7288  {
7289  // we found an existing handler.
7290  _body.get().executeAsync(command_, existingHandler, false);
7291  return id; // empty string indicates existing
7292  }
7293  }
7294  }
7295  id = _body.get().executeAsync(command_, handler_, false);
7296  }
7297  catch (const DisconnectedException&)
7298  {
7299  removeMessageHandler(command_.getMessage().getCommandId());
7300  if (command_.isSubscribe())
7301  {
7302  removeMessageHandler(command_.getMessage().getSubscriptionId());
7303  }
7304  if (command_.isSow())
7305  {
7306  removeMessageHandler(command_.getMessage().getQueryID());
7307  }
7308  throw;
7309  }
7310  return id;
7311  }
7312 
7325  MessageStream execute(Command& command_);
7326 
7335  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7336  {
7337  _body.get().ack(topic_, bookmark_, options_);
7338  }
7339 
7347  void ack(Message& message_, const char* options_ = NULL)
7348  {
7349  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
7350  }
7359  void ack(const std::string& topic_, const std::string& bookmark_,
7360  const char* options_ = NULL)
7361  {
7362  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
7363  }
7364 
7370  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7371  {
7372  _body.get()._ack(topic_, bookmark_, options_);
7373  }
7383  void flushAcks(void)
7384  {
7385  _body.get().flushAcks();
7386  }
7387 
7392  bool getAutoAck(void) const
7393  {
7394  return _body.get().getAutoAck();
7395  }
7402  void setAutoAck(bool isAutoAckEnabled_)
7403  {
7404  _body.get().setAutoAck(isAutoAckEnabled_);
7405  }
7410  unsigned getAckBatchSize(void) const
7411  {
7412  return _body.get().getAckBatchSize();
7413  }
7420  void setAckBatchSize(const unsigned ackBatchSize_)
7421  {
7422  _body.get().setAckBatchSize(ackBatchSize_);
7423  }
7424 
7431  int getAckTimeout(void) const
7432  {
7433  return _body.get().getAckTimeout();
7434  }
7443  void setAckTimeout(const int ackTimeout_)
7444  {
7445  if (!ackTimeout_ && _body.get().getAckBatchSize() > 1)
7446  {
7447  throw UsageException("Ack timeout must be > 0 when ack batch size > 1");
7448  }
7449  _body.get().setAckTimeout(ackTimeout_);
7450  }
7451 
7452 
7461  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7462  {
7463  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7464  }
7465 
7470  bool getRetryOnDisconnect(void) const
7471  {
7472  return _body.get().getRetryOnDisconnect();
7473  }
7474 
7479  void setDefaultMaxDepth(unsigned maxDepth_)
7480  {
7481  _body.get().setDefaultMaxDepth(maxDepth_);
7482  }
7483 
7488  unsigned getDefaultMaxDepth(void) const
7489  {
7490  return _body.get().getDefaultMaxDepth();
7491  }
7492 
7500  void* userData_)
7501  {
7502  return _body.get().setTransportFilterFunction(filter_, userData_);
7503  }
7504 
7514  void* userData_)
7515  {
7516  return _body.get().setThreadCreatedCallback(callback_, userData_);
7517  }
7518 
7524  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7525  {
7526  _body.get().deferredExecution(func_, userData_);
7527  }
7531  };
7532 
7533  inline void
7534  ClientImpl::lastChance(AMPS::Message& message)
7535  {
7536  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7537  }
7538 
7539  inline unsigned
7540  ClientImpl::persistedAck(AMPS::Message& message)
7541  {
7542  unsigned deliveries = 0;
7543  try
7544  {
7545  /*
7546  * Best Practice: If you don't care about the dupe acks that
7547  * occur during failover or rapid disconnect/reconnect, then just
7548  * ignore them. We could discard each duplicate from the
7549  * persisted store, but the storage costs of doing 1 record
7550  * discards is heavy. In most scenarios we'll just quickly blow
7551  * through the duplicates and get back to processing the
7552  * non-dupes.
7553  */
7554  const char* data = NULL;
7555  size_t len = 0;
7556  const char* status = NULL;
7557  size_t statusLen = 0;
7558  amps_handle messageHandle = message.getMessage();
7559  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7560  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7561  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7562  if (len == NotEntitled || len == Duplicate ||
7563  (statusLen == Failure && status[0] == 'f'))
7564  {
7565  if (_failedWriteHandler)
7566  {
7567  if (_publishStore.isValid())
7568  {
7569  amps_uint64_t sequence =
7570  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7571  FailedWriteStoreReplayer replayer(this, data, len);
7572  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7573  replayer, sequence));
7574  }
7575  else // Call the handler with what little we have
7576  {
7577  static Message emptyMessage;
7578  emptyMessage.setSequence(message.getSequence());
7579  AMPS_CALL_EXCEPTION_WRAPPER(
7580  _failedWriteHandler->failedWrite(emptyMessage,
7581  data, len));
7582  }
7583  ++deliveries;
7584  }
7585  }
7586  if (_publishStore.isValid())
7587  {
7588  // Ack for publisher will have sequence while
7589  // ack for bookmark subscribe won't
7590  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7591  AMPS_Sequence);
7592  if (seq > 0)
7593  {
7594  ++deliveries;
7595  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7596  }
7597  }
7598 
7599  if (!deliveries && _bookmarkStore.isValid())
7600  {
7601  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7602  &data, &len);
7603  if (len > 0)
7604  {
7605  Message::Field subId(data, len);
7606  const char* bookmarkData = NULL;
7607  size_t bookmarkLen = 0;
7608  amps_message_get_field_value(messageHandle,
7609  AMPS_Bookmark,
7610  &bookmarkData,
7611  &bookmarkLen);
7612  // Everything is there and not unsubscribed AC-912
7613  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7614  {
7615  ++deliveries;
7616  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7617  }
7618  }
7619  }
7620  }
7621  catch (std::exception& ex)
7622  {
7623  AMPS_UNHANDLED_EXCEPTION(ex);
7624  }
7625  return deliveries;
7626  }
7627 
7628  inline unsigned
7629  ClientImpl::processedAck(Message& message)
7630  {
7631  unsigned deliveries = 0;
7632  AckResponse ack;
7633  const char* data = NULL;
7634  size_t len = 0;
7635  amps_handle messageHandle = message.getMessage();
7636  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7637  Lock<Mutex> l(_lock);
7638  if (data && len)
7639  {
7640  Lock<Mutex> guard(_ackMapLock);
7641  AckMap::iterator i = _ackMap.find(std::string(data, len));
7642  if (i != _ackMap.end())
7643  {
7644  ++deliveries;
7645  ack = i->second;
7646  _ackMap.erase(i);
7647  }
7648  }
7649  if (deliveries)
7650  {
7651  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7652  ack.setStatus(data, len);
7653  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7654  ack.setReason(data, len);
7655  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7656  ack.setUsername(data, len);
7657  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7658  ack.setPassword(data, len);
7659  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7660  ack.setServerVersion(data, len);
7661  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7662  ack.setOptions(data, len);
7663  // This sets bookmark, nameHashValue, and sequenceNo
7664  ack.setBookmark(message.getBookmark());
7665  ack.setResponded();
7666  _lock.signalAll();
7667  }
7668  return deliveries;
7669  }
7670 
7671  inline void
7672  ClientImpl::checkAndSendHeartbeat(bool force)
7673  {
7674  if (force || _heartbeatTimer.check())
7675  {
7676  _heartbeatTimer.start();
7677  try
7678  {
7679  sendWithoutRetry(_beatMessage);
7680  }
7681  catch (const AMPSException&)
7682  {
7683  ;
7684  }
7685  }
7686  }
7687 
7688  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7689  {
7690  ConnectionInfo info;
7691  std::ostringstream writer;
7692 
7693  info["client.uri"] = _lastUri;
7694  info["client.name"] = _name;
7695  info["client.username"] = _username;
7696  if (_publishStore.isValid())
7697  {
7698  writer << _publishStore.unpersistedCount();
7699  info["publishStore.unpersistedCount"] = writer.str();
7700  writer.clear();
7701  writer.str("");
7702  }
7703 
7704  return info;
7705  }
7706 
7707  inline amps_result
7708  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7709  {
7710  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7711  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7712  ClientImpl* me = (ClientImpl*) userData_;
7713  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7714  if (!messageHandle_)
7715  {
7716  if (me->_queueAckTimeout)
7717  {
7718  me->checkQueueAcks();
7719  }
7720  me->checkAndSendHeartbeat();
7721  return AMPS_E_OK;
7722  }
7723 
7724  me->_readMessage.replace(messageHandle_);
7725  Message& message = me->_readMessage;
7726  Message::Command::Type commandType = message.getCommandEnum();
7727  if (commandType & SOWMask)
7728  {
7729 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7730  // A small cheat here to get the right handler, using knowledge of the
7731  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7732  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7733  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7734  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7735 #endif
7736  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7737  message.getQueryID()));
7738  }
7739  else if (commandType & PublishMask)
7740  {
7741 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7742  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7743  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7744  GlobalCommandTypeHandlers::Publish :
7745  GlobalCommandTypeHandlers::OOF)].invoke(message));
7746 #endif
7747  const char* subIds = NULL;
7748  size_t subIdsLen = 0;
7749  // Publish command, send to subscriptions
7750  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7751  &subIds, &subIdsLen);
7752  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7753  for (size_t i = 0; i < subIdCount; ++i)
7754  {
7755  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7756  MessageHandler& handler = lookupResult.handler;
7757  if (handler.isValid())
7758  {
7759  amps_message_set_field_value(messageHandle_,
7760  AMPS_SubscriptionId,
7761  subIds + lookupResult.idOffset,
7762  lookupResult.idLength);
7763  Message::Field bookmark = message.getBookmark();
7764  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7765  bool isAutoAck = me->_isAutoAckEnabled;
7766 
7767  if (!isMessageQueue && !bookmark.empty() &&
7768  me->_bookmarkStore.isValid())
7769  {
7770  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7771  {
7772  //Call duplicate message handler in handlers map
7773  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7774  {
7775  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7776  }
7777  }
7778  else
7779  {
7780  me->_bookmarkStore.log(me->_readMessage);
7781  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7782  handler.invoke(message));
7783  }
7784  }
7785  else
7786  {
7787  if (isMessageQueue && isAutoAck)
7788  {
7789  try
7790  {
7791  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7792  if (!message.getIgnoreAutoAck())
7793  {
7794  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7795  me->_ack(message.getTopic(), message.getBookmark()));
7796  }
7797  }
7798  catch (std::exception& ex)
7799  {
7800  if (!message.getIgnoreAutoAck())
7801  {
7802  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7803  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7804  }
7805  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7806  }
7807  }
7808  else
7809  {
7810  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7811  handler.invoke(message));
7812  }
7813  }
7814  }
7815  else
7816  {
7817  me->lastChance(message);
7818  }
7819  } // for (subidsEnd)
7820  }
7821  else if (commandType == Message::Command::Ack)
7822  {
7823  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7824  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7825  unsigned ackType = message.getAckTypeEnum();
7826  unsigned deliveries = 0U;
7827  switch (ackType)
7828  {
7829  case Message::AckType::Persisted:
7830  deliveries += me->persistedAck(message);
7831  break;
7832  case Message::AckType::Processed: // processed
7833  deliveries += me->processedAck(message);
7834  break;
7835  }
7836  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7837  if (deliveries == 0)
7838  {
7839  me->lastChance(message);
7840  }
7841  }
7842  else if (commandType == Message::Command::Heartbeat)
7843  {
7844  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7845  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7846  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7847  {
7848  me->checkAndSendHeartbeat(true);
7849  }
7850  else
7851  {
7852  me->lastChance(message);
7853  }
7854  return AMPS_E_OK;
7855  }
7856  else if (!message.getCommandId().empty())
7857  {
7858  unsigned deliveries = 0U;
7859  try
7860  {
7861  while (me->_connected) // Keep sending heartbeats when stream is full
7862  {
7863  try
7864  {
7865  deliveries = me->_routes.deliverData(message, message.getCommandId());
7866  break;
7867  }
7868 #ifdef _WIN32
7869  catch (MessageStreamFullException&)
7870 #else
7871  catch (MessageStreamFullException& ex_)
7872 #endif
7873  {
7874  try
7875  {
7876  me->checkAndSendHeartbeat(false);
7877  }
7878 #ifdef _WIN32
7879  catch (std::exception&)
7880 #else
7881  catch (std::exception& ex_)
7882 #endif
7883  {
7884  ;
7885  }
7886  }
7887  }
7888  }
7889  catch (std::exception& ex_)
7890  {
7891  try
7892  {
7893  me->_exceptionListener->exceptionThrown(ex_);
7894  }
7895  catch (...)
7896  {
7897  ;
7898  }
7899  }
7900  if (deliveries == 0)
7901  {
7902  me->lastChance(message);
7903  }
7904  }
7905  me->checkAndSendHeartbeat();
7906  return AMPS_E_OK;
7907  }
7908 
7909  inline void
7910  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7911  {
7912  ClientImpl* me = (ClientImpl*) userData;
7913  //Client wrapper(me);
7914  // Go ahead and signal any waiters if they are around...
7915  me->clearAcks(failedConnectionVersion);
7916  }
7917 
7918  inline amps_result
7919  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7920  {
7921  ClientImpl* me = (ClientImpl*) userData;
7922  Lock<Mutex> l(me->_lock);
7923  Client wrapper(me, false);
7924  if (me->_connected)
7925  {
7926  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7927  }
7928  while (true)
7929  {
7930  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7931  bool retryInProgress = false;
7932  try
7933  {
7934  me->_connected = false;
7935  me->_lock.signalAll();
7936  // Have to release the lock here or receive thread can't
7937  // invoke the message handler.
7938  Unlock<Mutex> unlock(me->_lock);
7939  me->_disconnectHandler.invoke(wrapper);
7940  }
7941 #ifdef _WIN32
7942  catch (const RetryOperationException&)
7943 #else
7944  catch (const RetryOperationException& ex)
7945 #endif
7946  {
7947  retryInProgress = true;
7948  }
7949  catch (const std::exception& ex)
7950  {
7951  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7952  }
7953  me->_lock.signalAll();
7954 
7955  if (!me->_connected)
7956  {
7957  if (retryInProgress)
7958  {
7959  AMPS_UNHANDLED_EXCEPTION_2(me, RetryOperationException("Reconnect in progress."));
7960  }
7961  else
7962  {
7963  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7964  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
7965  }
7966  return AMPS_E_DISCONNECTED;
7967  }
7968  try
7969  {
7970  // Resubscribe
7971  if (me->_subscriptionManager)
7972  {
7973  {
7974  // Have to release the lock here or receive thread can't
7975  // invoke the message handler.
7976  Unlock<Mutex> unlock(me->_lock);
7977  me->_subscriptionManager->resubscribe(wrapper);
7978  }
7979  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7980  }
7981  return AMPS_E_OK;
7982  }
7983  catch (const AMPSException& subEx)
7984  {
7985  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7986  }
7987  catch (const std::exception& subEx)
7988  {
7989  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7990  return AMPS_E_RETRY;
7991  }
7992  catch (...)
7993  {
7994  return AMPS_E_RETRY;
7995  }
7996  }
7997  return AMPS_E_RETRY;
7998  }
7999 
8000  inline const char*
8001  ClientImpl::ClientImplGetHttpPreflightMessage(void* userData_)
8002  {
8003  ClientImpl* me = (ClientImpl*)userData_;
8004  std::ostringstream os;
8005  // [transport]://[user[:password]@][host]:port[/path][?uri_params]
8006  // firstColon is after transport
8007  size_t firstColon = me->_lastUri.find(':');
8008  // pathEnd is start of uri_params or npos
8009  size_t pathEnd = me->_lastUri.find('?');
8010  // lastColon separates host and port, last before pathEnd
8011  size_t lastColon = me->_lastUri.rfind(':', pathEnd);
8012  // at ends user/password and precedes host
8013  size_t at = me->_lastUri.rfind('@', lastColon);
8014  // hostStart is either after at or following firstColon ://
8015  size_t hostStart = at == std::string::npos ? firstColon + 3 : at + 1;
8016  size_t hostLen = lastColon - hostStart;
8017  // pathStart follows port
8018  size_t pathStart = me->_lastUri.find('/', lastColon);
8019  size_t pathLen = pathEnd;
8020  if (pathEnd != std::string::npos)
8021  {
8022  pathLen = pathEnd - pathStart;
8023  }
8024  os << "GET " << me->_lastUri.substr(pathStart, pathLen)
8025  << " HTTP/1.1\r\nHost: " << me->_lastUri.substr(hostStart, hostLen)
8026  << "\r\nConnection: upgrade\r\nUpgrade: "
8027  << me->_lastUri.substr(0, firstColon) << "\r\n";
8028  for (auto header : me->_httpPreflightHeaders)
8029  {
8030  os << header << "\r\n";
8031  }
8032  os << "\r\n";
8033  me->_preflightMessage = os.str();
8034  return me->_preflightMessage.c_str();
8035  }
8036 
8037  class FIX
8038  {
8039  const char* _data;
8040  size_t _len;
8041  char _fieldSep;
8042  public:
8043  class iterator
8044  {
8045  const char* _data;
8046  size_t _len;
8047  size_t _pos;
8048  char _fieldSep;
8049  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
8050  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
8051  {
8052  while (_pos != _len && _data[_pos] == _fieldSep)
8053  {
8054  ++_pos;
8055  }
8056  }
8057  public:
8058  typedef void* difference_type;
8059  typedef std::forward_iterator_tag iterator_category;
8060  typedef std::pair<Message::Field, Message::Field> value_type;
8061  typedef value_type* pointer;
8062  typedef value_type& reference;
8063  bool operator==(const iterator& rhs) const
8064  {
8065  return _pos == rhs._pos;
8066  }
8067  bool operator!=(const iterator& rhs) const
8068  {
8069  return _pos != rhs._pos;
8070  }
8071  iterator& operator++()
8072  {
8073  // Skip through the data
8074  while (_pos != _len && _data[_pos] != _fieldSep)
8075  {
8076  ++_pos;
8077  }
8078  // Skip through any field separators
8079  while (_pos != _len && _data[_pos] == _fieldSep)
8080  {
8081  ++_pos;
8082  }
8083  return *this;
8084  }
8085 
8086  value_type operator*() const
8087  {
8088  value_type result;
8089  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
8090  for (; i < _len && _data[i] != '='; ++i)
8091  {
8092  ++keyLength;
8093  }
8094 
8095  result.first.assign(_data + _pos, keyLength);
8096 
8097  if (i < _len && _data[i] == '=')
8098  {
8099  ++i;
8100  valueStart = i;
8101  for (; i < _len && _data[i] != _fieldSep; ++i)
8102  {
8103  valueLength++;
8104  }
8105  }
8106  result.second.assign(_data + valueStart, valueLength);
8107  return result;
8108  }
8109 
8110  friend class FIX;
8111  };
8112  class reverse_iterator
8113  {
8114  const char* _data;
8115  size_t _len;
8116  const char* _pos;
8117  char _fieldSep;
8118  public:
8119  typedef std::pair<Message::Field, Message::Field> value_type;
8120  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
8121  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
8122  {
8123  if (_pos)
8124  {
8125  // skip past meaningless trailing fieldseps
8126  while (_pos >= _data && *_pos == _fieldSep)
8127  {
8128  --_pos;
8129  }
8130  while (_pos > _data && *_pos != _fieldSep)
8131  {
8132  --_pos;
8133  }
8134  // if we stopped before the 0th character, it's because
8135  // it's a field sep. advance one to point to the first character
8136  // of a key.
8137  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8138  {
8139  ++_pos;
8140  }
8141  if (_pos < _data)
8142  {
8143  _pos = 0;
8144  }
8145  }
8146  }
8147  bool operator==(const reverse_iterator& rhs) const
8148  {
8149  return _pos == rhs._pos;
8150  }
8151  bool operator!=(const reverse_iterator& rhs) const
8152  {
8153  return _pos != rhs._pos;
8154  }
8155  reverse_iterator& operator++()
8156  {
8157  if (_pos == _data)
8158  {
8159  _pos = 0;
8160  }
8161  else
8162  {
8163  // back up 1 to a field separator
8164  --_pos;
8165  // keep backing up through field separators
8166  while (_pos >= _data && *_pos == _fieldSep)
8167  {
8168  --_pos;
8169  }
8170  // now back up to the beginning of this field
8171  while (_pos > _data && *_pos != _fieldSep)
8172  {
8173  --_pos;
8174  }
8175  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
8176  {
8177  ++_pos;
8178  }
8179  if (_pos < _data)
8180  {
8181  _pos = 0;
8182  }
8183  }
8184  return *this;
8185  }
8186  value_type operator*() const
8187  {
8188  value_type result;
8189  size_t keyLength = 0, valueStart = 0, valueLength = 0;
8190  size_t i = (size_t)(_pos - _data);
8191  for (; i < _len && _data[i] != '='; ++i)
8192  {
8193  ++keyLength;
8194  }
8195  result.first.assign(_pos, keyLength);
8196  if (i < _len && _data[i] == '=')
8197  {
8198  ++i;
8199  valueStart = i;
8200  for (; i < _len && _data[i] != _fieldSep; ++i)
8201  {
8202  valueLength++;
8203  }
8204  }
8205  result.second.assign(_data + valueStart, valueLength);
8206  return result;
8207  }
8208  };
8209  FIX(const Message::Field& data, char fieldSeparator = 1)
8210  : _data(data.data()), _len(data.len()),
8211  _fieldSep(fieldSeparator)
8212  {
8213  }
8214 
8215  FIX(const char* data, size_t len, char fieldSeparator = 1)
8216  : _data(data), _len(len), _fieldSep(fieldSeparator)
8217  {
8218  }
8219 
8220  iterator begin() const
8221  {
8222  return iterator(_data, _len, 0, _fieldSep);
8223  }
8224  iterator end() const
8225  {
8226  return iterator(_data, _len, _len, _fieldSep);
8227  }
8228 
8229 
8230  reverse_iterator rbegin() const
8231  {
8232  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
8233  }
8234 
8235  reverse_iterator rend() const
8236  {
8237  return reverse_iterator(_data, _len, 0, _fieldSep);
8238  }
8239  };
8240 
8241 
8254 
8255  template <class T>
8257  {
8258  std::stringstream _data;
8259  char _fs;
8260  public:
8266  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8267 
8275  void append(const T& tag, const char* value, size_t offset, size_t length)
8276  {
8277  _data << tag << '=';
8278  _data.write(value + offset, (std::streamsize)length);
8279  _data << _fs;
8280  }
8286  void append(const T& tag, const std::string& value)
8287  {
8288  _data << tag << '=' << value << _fs;
8289  }
8290 
8293  std::string getString() const
8294  {
8295  return _data.str();
8296  }
8297  operator std::string() const
8298  {
8299  return _data.str();
8300  }
8301 
8303  void reset()
8304  {
8305  _data.str(std::string());
8306  }
8307  };
8308 
8312 
8314 
8318 
8320 
8321 
8329 
8331  {
8332  char _fs;
8333  public:
8338  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
8339 
8342  typedef std::map<Message::Field, Message::Field> map_type;
8343 
8349  map_type toMap(const Message::Field& data)
8350  {
8351  FIX fix(data, _fs);
8352  map_type retval;
8353  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
8354  {
8355  retval.insert(*a);
8356  }
8357 
8358  return retval;
8359  }
8360  };
8361 
8362 #define AMPS_MESSAGE_STREAM_CACHE_MAX 128
8363  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
8364  {
8365  Mutex _lock;
8366  std::deque<Message> _q;
8367  std::deque<Message> _cache;
8368  std::string _commandId;
8369  std::string _subId;
8370  std::string _queryId;
8371  Client _client;
8372  unsigned _timeout;
8373  unsigned _maxDepth;
8374  unsigned _requestedAcks;
8375  size_t _cacheMax;
8376  Message::Field _previousTopic;
8377  Message::Field _previousBookmark;
8378  typedef enum : unsigned int { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } State;
8379 #if __cplusplus >= 201100L || _MSC_VER >= 1900
8380  std::atomic<State> _state;
8381 #else
8382  volatile State _state;
8383 #endif
8384  typedef std::map<std::string, Message*> SOWKeyMap;
8385  SOWKeyMap _sowKeyMap;
8386  public:
8387  MessageStreamImpl(const Client& client_)
8388  : _client(client_),
8389  _timeout(0),
8390  _maxDepth((unsigned)~0),
8391  _requestedAcks(0),
8392  _cacheMax(AMPS_MESSAGE_STREAM_CACHE_MAX),
8393  _state(Unset)
8394  {
8395  if (_client.isValid())
8396  {
8397  _client.addConnectionStateListener(this);
8398  }
8399  }
8400 
8401  MessageStreamImpl(ClientImpl* client_)
8402  : _client(client_),
8403  _timeout(0),
8404  _maxDepth((unsigned)~0),
8405  _requestedAcks(0),
8406  _state(Unset)
8407  {
8408  if (_client.isValid())
8409  {
8410  _client.addConnectionStateListener(this);
8411  }
8412  }
8413 
8414  ~MessageStreamImpl()
8415  {
8416  }
8417 
8418  virtual void destroy()
8419  {
8420  try
8421  {
8422  close();
8423  }
8424  catch (std::exception& e)
8425  {
8426  try
8427  {
8428  if (_client.isValid())
8429  {
8430  _client.getExceptionListener().exceptionThrown(e);
8431  }
8432  }
8433  catch (...) {/*Ignore exception listener exceptions*/} // -V565
8434  }
8435  if (_client.isValid())
8436  {
8437  _client.removeConnectionStateListener(this);
8438  Client c = _client;
8439  _client = Client((ClientImpl*)NULL);
8440  c.deferredExecution(MessageStreamImpl::destroyer, this);
8441  }
8442  else
8443  {
8444  delete this;
8445  }
8446  }
8447 
8448  static void destroyer(void* vpMessageStreamImpl_)
8449  {
8450  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8451  }
8452 
8453  void setSubscription(const std::string& subId_,
8454  const std::string& commandId_ = "",
8455  const std::string& queryId_ = "")
8456  {
8457  Lock<Mutex> lock(_lock);
8458  _subId = subId_;
8459  if (!commandId_.empty() && commandId_ != subId_)
8460  {
8461  _commandId = commandId_;
8462  }
8463  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8464  {
8465  _queryId = queryId_;
8466  }
8467  // It's possible to disconnect between creation/registration and here.
8468  if (Disconnected == _state)
8469  {
8470  return;
8471  }
8472  assert(Unset == _state);
8473  _state = Subscribe;
8474  }
8475 
8476  void setSOWOnly(const std::string& commandId_,
8477  const std::string& queryId_ = "")
8478  {
8479  Lock<Mutex> lock(_lock);
8480  _commandId = commandId_;
8481  if (!queryId_.empty() && queryId_ != commandId_)
8482  {
8483  _queryId = queryId_;
8484  }
8485  // It's possible to disconnect between creation/registration and here.
8486  if (Disconnected == _state)
8487  {
8488  return;
8489  }
8490  assert(Unset == _state);
8491  _state = SOWOnly;
8492  }
8493 
8494  void setStatsOnly(const std::string& commandId_,
8495  const std::string& queryId_ = "")
8496  {
8497  Lock<Mutex> lock(_lock);
8498  _commandId = commandId_;
8499  if (!queryId_.empty() && queryId_ != commandId_)
8500  {
8501  _queryId = queryId_;
8502  }
8503  // It's possible to disconnect between creation/registration and here.
8504  if (Disconnected == _state)
8505  {
8506  return;
8507  }
8508  assert(Unset == _state);
8509  _state = AcksOnly;
8510  _requestedAcks = Message::AckType::Stats;
8511  }
8512 
8513  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8514  {
8515  Lock<Mutex> lock(_lock);
8516  _commandId = commandId_;
8517  // It's possible to disconnect between creation/registration and here.
8518  if (Disconnected == _state)
8519  {
8520  return;
8521  }
8522  assert(Unset == _state);
8523  _state = AcksOnly;
8524  _requestedAcks = acks_;
8525  }
8526 
8527  void connectionStateChanged(ConnectionStateListener::State state_)
8528  {
8529  Lock<Mutex> lock(_lock);
8530  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8531  {
8532  _state = Disconnected;
8533  close();
8534  }
8535  else if (state_ == AMPS::ConnectionStateListener::Connected
8536  && _commandId.empty()
8537  && _subId.empty()
8538  && _queryId.empty())
8539  {
8540  // AC-1331 Reconnect before command was sent, so Unset
8541  _state = Unset;
8542  }
8543  _lock.signalAll();
8544  }
8545 
8546  void timeout(unsigned timeout_)
8547  {
8548  _timeout = timeout_;
8549  }
8550  void conflate(void)
8551  {
8552  if (_state == Subscribe)
8553  {
8554  _state = Conflate;
8555  }
8556  }
8557  void maxDepth(unsigned maxDepth_)
8558  {
8559  if (maxDepth_)
8560  {
8561  _maxDepth = maxDepth_;
8562  }
8563  else
8564  {
8565  _maxDepth = (unsigned)~0;
8566  }
8567  }
8568  unsigned getMaxDepth(void) const
8569  {
8570  return _maxDepth;
8571  }
8572  unsigned getDepth(void) const
8573  {
8574  return (unsigned)(_q.size());
8575  }
8576 
8577  bool next(Message& current_)
8578  {
8579  Lock<Mutex> lock(_lock);
8580  if (!_previousTopic.empty() && !_previousBookmark.empty())
8581  {
8582  try
8583  {
8584  if (_client.isValid())
8585  {
8586  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8587  }
8588  }
8589 #ifdef _WIN32
8590  catch (AMPSException&)
8591 #else
8592  catch (AMPSException& e)
8593 #endif
8594  {
8595  current_.invalidate();
8596  _previousTopic.clear();
8597  _previousBookmark.clear();
8598  return false;
8599  }
8600  _previousTopic.clear();
8601  _previousBookmark.clear();
8602  }
8603  // Don't wait to wait more than 1s at a time
8604  long minWaitTime = (_timeout && _timeout < 1000) ? _timeout : 1000;
8605  Timer timer((double)_timeout);
8606  timer.start();
8607  while (_q.empty() && _state & Running)
8608  {
8609  // Using timeout so python can interrupt
8610  _lock.wait(minWaitTime);
8611  {
8612  Unlock<Mutex> unlck(_lock);
8613  amps_invoke_waiting_function();
8614  }
8615  if (_timeout)
8616  {
8617  // In case we woke up early, see how much longer to wait
8618  if (timer.checkAndGetRemaining(&minWaitTime))
8619  {
8620  // No time left
8621  break;
8622  }
8623  // Adjust next wait time
8624  minWaitTime = (minWaitTime < 1000) ? minWaitTime : 1000;
8625  }
8626  }
8627  if (current_.isValid() && _cache.size() < _cacheMax)
8628  {
8629  current_.reset();
8630  _cache.push_back(current_);
8631  }
8632  if (!_q.empty())
8633  {
8634  current_ = _q.front();
8635  if (_q.size() == _maxDepth)
8636  {
8637  _lock.signalAll();
8638  }
8639  _q.pop_front();
8640  if (_state == Conflate)
8641  {
8642  std::string sowKey = current_.getSowKey();
8643  if (sowKey.length())
8644  {
8645  _sowKeyMap.erase(sowKey);
8646  }
8647  }
8648  else if (_state == AcksOnly)
8649  {
8650  _requestedAcks &= ~(current_.getAckTypeEnum());
8651  }
8652  if ((_state == AcksOnly && _requestedAcks == 0) ||
8653  (_state == SOWOnly && current_.getCommand() == "group_end"))
8654  {
8655  _state = Closed;
8656  }
8657  else if (current_.isValid()
8658  && current_.getCommandEnum() == Message::Command::Publish
8659  && _client.isValid() && _client.getAutoAck()
8660  && !current_.getLeasePeriod().empty()
8661  && !current_.getBookmark().empty())
8662  {
8663  _previousTopic = current_.getTopic().deepCopy();
8664  _previousBookmark = current_.getBookmark().deepCopy();
8665  }
8666  return true;
8667  }
8668  if (_state == Disconnected)
8669  {
8670  throw DisconnectedException("Connection closed.");
8671  }
8672  current_.invalidate();
8673  if (_state == Closed)
8674  {
8675  return false;
8676  }
8677  return _timeout != 0;
8678  }
8679  void close(void)
8680  {
8681  if (_client.isValid())
8682  {
8683  if (_state == SOWOnly || _state == Subscribe) //not delete
8684  {
8685  if (!_commandId.empty())
8686  {
8687  _client.unsubscribe(_commandId);
8688  }
8689  if (!_subId.empty())
8690  {
8691  _client.unsubscribe(_subId);
8692  }
8693  if (!_queryId.empty())
8694  {
8695  _client.unsubscribe(_queryId);
8696  }
8697  }
8698  else
8699  {
8700  if (!_commandId.empty())
8701  {
8702  _client.removeMessageHandler(_commandId);
8703  }
8704  if (!_subId.empty())
8705  {
8706  _client.removeMessageHandler(_subId);
8707  }
8708  if (!_queryId.empty())
8709  {
8710  _client.removeMessageHandler(_queryId);
8711  }
8712  }
8713  }
8714  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8715  {
8716  _state = Closed;
8717  }
8718  }
8719  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8720  {
8721  Lock<Mutex> lock(this_->_lock);
8722  if (this_->_state != Conflate)
8723  {
8724  AMPS_TESTING_SLOW_MESSAGE_STREAM
8725  if (this_->_q.size() >= this_->_maxDepth)
8726  {
8727  // We throw here so that heartbeats can be sent. The exception
8728  // will be handled internally only, and the same Message will
8729  // come back to try again. Make sure to signal.
8730  this_->_lock.signalAll();
8731  throw MessageStreamFullException("Stream is currently full.");
8732  }
8733  if (!this_->_cache.empty())
8734  {
8735  this_->_cache.front().deepCopy(message_);
8736  this_->_q.push_back(this_->_cache.front());
8737  this_->_cache.pop_front();
8738  }
8739  else
8740  {
8741 #ifdef AMPS_USE_EMPLACE
8742  this_->_q.emplace_back(message_.deepCopy());
8743 #else
8744  this_->_q.push_back(message_.deepCopy());
8745 #endif
8746  }
8747  if (message_.getCommandEnum() == Message::Command::Publish &&
8748  this_->_client.isValid() && this_->_client.getAutoAck() &&
8749  !message_.getLeasePeriod().empty() &&
8750  !message_.getBookmark().empty())
8751  {
8752  message_.setIgnoreAutoAck();
8753  }
8754  }
8755  else
8756  {
8757  std::string sowKey = message_.getSowKey();
8758  if (sowKey.length())
8759  {
8760  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8761  if (it != this_->_sowKeyMap.end())
8762  {
8763  it->second->deepCopy(message_);
8764  }
8765  else
8766  {
8767  if (this_->_q.size() >= this_->_maxDepth)
8768  {
8769  // We throw here so that heartbeats can be sent. The
8770  // exception will be handled internally only, and the
8771  // same Message will come back to try again. Make sure
8772  // to signal.
8773  this_->_lock.signalAll();
8774  throw MessageStreamFullException("Stream is currently full.");
8775  }
8776  if (!this_->_cache.empty())
8777  {
8778  this_->_cache.front().deepCopy(message_);
8779  this_->_q.push_back(this_->_cache.front());
8780  this_->_cache.pop_front();
8781  }
8782  else
8783  {
8784 #ifdef AMPS_USE_EMPLACE
8785  this_->_q.emplace_back(message_.deepCopy());
8786 #else
8787  this_->_q.push_back(message_.deepCopy());
8788 #endif
8789  }
8790  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8791  }
8792  }
8793  else
8794  {
8795  if (this_->_q.size() >= this_->_maxDepth)
8796  {
8797  // We throw here so that heartbeats can be sent. The exception
8798  // will be handled internally only, and the same Message will
8799  // come back to try again. Make sure to signal.
8800  this_->_lock.signalAll();
8801  throw MessageStreamFullException("Stream is currently full.");
8802  }
8803  if (!this_->_cache.empty())
8804  {
8805  this_->_cache.front().deepCopy(message_);
8806  this_->_q.push_back(this_->_cache.front());
8807  this_->_cache.pop_front();
8808  }
8809  else
8810  {
8811 #ifdef AMPS_USE_EMPLACE
8812  this_->_q.emplace_back(message_.deepCopy());
8813 #else
8814  this_->_q.push_back(message_.deepCopy());
8815 #endif
8816  }
8817  if (message_.getCommandEnum() == Message::Command::Publish &&
8818  this_->_client.isValid() && this_->_client.getAutoAck() &&
8819  !message_.getLeasePeriod().empty() &&
8820  !message_.getBookmark().empty())
8821  {
8822  message_.setIgnoreAutoAck();
8823  }
8824  }
8825  }
8826  this_->_lock.signalAll();
8827  }
8828  };
8829  inline MessageStream::MessageStream(void)
8830  {
8831  }
8832  inline MessageStream::MessageStream(const Client& client_)
8833  : _body(new MessageStreamImpl(client_))
8834  {
8835  }
8836  inline MessageStream::MessageStream(RefHandle<MessageStreamImpl> body_)
8837  : _body(body_)
8838  {
8839  }
8840  inline void MessageStream::iterator::advance(void)
8841  {
8842  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8843  }
8844  inline MessageStream::operator MessageHandler(void)
8845  {
8846  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8847  }
8848  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8849  {
8850  MessageStream result;
8851  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8852  {
8853  result._body = (MessageStreamImpl*)(handler_._userData);
8854  }
8855  return result;
8856  }
8857 
8858  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8859  const std::string& queryId_)
8860  {
8861  _body->setSOWOnly(commandId_, queryId_);
8862  }
8863  inline void MessageStream::setSubscription(const std::string& subId_,
8864  const std::string& commandId_,
8865  const std::string& queryId_)
8866  {
8867  _body->setSubscription(subId_, commandId_, queryId_);
8868  }
8869  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8870  const std::string& queryId_)
8871  {
8872  _body->setStatsOnly(commandId_, queryId_);
8873  }
8874  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8875  unsigned acks_)
8876  {
8877  _body->setAcksOnly(commandId_, acks_);
8878  }
8879  inline MessageStream MessageStream::timeout(unsigned timeout_)
8880  {
8881  _body->timeout(timeout_);
8882  return *this;
8883  }
8885  {
8886  _body->conflate();
8887  return *this;
8888  }
8889  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8890  {
8891  _body->maxDepth(maxDepth_);
8892  return *this;
8893  }
8894  inline unsigned MessageStream::getMaxDepth(void) const
8895  {
8896  return _body->getMaxDepth();
8897  }
8898  inline unsigned MessageStream::getDepth(void) const
8899  {
8900  return _body->getDepth();
8901  }
8902 
8903  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8904  {
8905  return MessageStream(_pEmptyMessageStream.get()->_body);
8906  }
8907 
8909  {
8910  // If the command is sow and has a sub_id, OR
8911  // if the command has a replace option, return the existing
8912  // messagestream, don't create a new one.
8913  ClientImpl& body = _body.get();
8914  Message& message = command_.getMessage();
8915  Field subId = message.getSubscriptionId();
8916  unsigned ackTypes = message.getAckTypeEnum();
8917  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8918  if (useExistingHandler)
8919  {
8920  // Try to find the existing message handler.
8921  if (!subId.empty())
8922  {
8923  MessageHandler existingHandler;
8924  if (body._routes.getRoute(subId, existingHandler))
8925  {
8926  // we found an existing handler. It might not be a message stream, but that's okay.
8927  body.executeAsync(command_, existingHandler, false);
8928  return MessageStream::fromExistingHandler(existingHandler);
8929  }
8930  }
8931  // fall through; we'll a new handler altogether.
8932  }
8933  // Make sure something will be returned to the stream or use the empty one
8934  // Check that: it's a command that doesn't normally return data, and there
8935  // are no acks requested for the cmd id
8936  Message::Command::Type command = message.getCommandEnum();
8937  if ((command & Message::Command::NoDataCommands)
8938  && (ackTypes == Message::AckType::Persisted
8939  || ackTypes == Message::AckType::None))
8940  {
8941  executeAsync(command_, MessageHandler());
8942  if (!body._pEmptyMessageStream)
8943  {
8944  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
8945  body._pEmptyMessageStream.get()->_body->close();
8946  }
8947  return body.getEmptyMessageStream();
8948  }
8949  MessageStream stream(*this);
8950  if (body.getDefaultMaxDepth())
8951  {
8952  stream.maxDepth(body.getDefaultMaxDepth());
8953  }
8954  MessageHandler handler = stream.operator MessageHandler();
8955  std::string commandID = body.executeAsync(command_, handler, false);
8956  if (command_.hasStatsAck())
8957  {
8958  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
8959  }
8960  else if (command_.isSow())
8961  {
8962  if (command_.getAckTypeEnum() & Message::AckType::Completed)
8963  {
8964  stream.setAcksOnly(commandID,
8965  ackTypes);
8966  }
8967  else
8968  {
8969  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
8970  }
8971  }
8972  else if (command_.isSubscribe())
8973  {
8974  stream.setSubscription(commandID,
8975  command_.getMessage().getCommandId(),
8976  command_.getMessage().getQueryId());
8977  }
8978  else
8979  {
8980  // Persisted acks for writes don't come back with command id
8981  if (command == Message::Command::Publish ||
8982  command == Message::Command::DeltaPublish ||
8983  command == Message::Command::SOWDelete)
8984  {
8985  stream.setAcksOnly(commandID,
8986  ackTypes & (unsigned)~Message::AckType::Persisted);
8987  }
8988  else
8989  {
8990  stream.setAcksOnly(commandID, ackTypes);
8991  }
8992  }
8993  return stream;
8994  }
8995 
8996 // This is here because it uses api from Client.
8997  inline void Message::ack(const char* options_) const
8998  {
8999  ClientImpl* pClient = _body.get().clientImpl();
9000  Message::Field bookmark = getBookmark();
9001  if (pClient && bookmark.len() &&
9002  !pClient->getAutoAck())
9003  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
9004  {
9005  pClient->ack(getTopic(), bookmark, options_);
9006  }
9007  }
9008 }// end namespace AMPS
9009 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:748
Command & setFilter(const char *filter_, size_t filterLen_)
Definition: ampsplusplus.hpp:699
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:5147
AMPSDLL amps_result amps_client_set_http_preflight_callback(amps_handle client, amps_http_preflight_callback callback, void *userData)
Sets a user-supplied callback function for when a connection is established and the provided uri incl...
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6857
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6831
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
std::string getAckType() const
Definition: ampsplusplus.hpp:946
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5408
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:8256
void startTimer()
Definition: ampsplusplus.hpp:6820
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6362
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1090
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8884
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1422
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5436
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:560
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Command & setBookmark(const char *bookmark_, size_t bookmarkLen_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:759
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:7111
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:924
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7479
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:5208
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5296
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:6151
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1415
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1062
Command & setCommandId(const char *cmdId_, size_t cmdIdLen_)
Definition: ampsplusplus.hpp:673
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7488
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7347
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6102
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5532
void setErrorOnPublishGap(bool errorOnPublishGap_)
Called to enable or disable throwing PublishStoreGapException.
Definition: ampsplusplus.hpp:1339
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5797
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:5311
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1306
Command & setSubId(const char *subId_, size_t subIdLen_)
Definition: ampsplusplus.hpp:725
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5548
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7202
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7431
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5282
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6884
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Command(const char *command_, size_t commandLen_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:568
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1310
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:8293
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7499
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5684
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5049
void addHttpPreflightHeader(const std::string &key_, const std::string &value_)
Adds a given key/value pair as an HTTP header line as "key: value" to the end of the headers that wil...
Definition: ampsplusplus.hpp:5243
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1309
Command & setSowKeys(const char *sowKeys_, size_t sowKeysLen_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:660
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7130
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:902
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7140
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5367
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1424
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6074
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1153
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7461
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8894
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:5342
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, Message::Command::Type commandType_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5397
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1454
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6556
Success.
Definition: amps.h:221
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:1036
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5774
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:8338
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:6198
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5380
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:5042
amps_result
Return values from amps_xxx functions.
Definition: amps.h:216
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5585
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1130
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8908
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:642
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1449
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5540
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7513
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:951
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:5271
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1498
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:829
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5846
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1243
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:5200
StoreImpl(bool errorOnPublishGap_=false)
Default constructor.
Definition: ampsplusplus.hpp:1098
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6510
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5130
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:7086
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:718
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:7121
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1302
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:7194
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:583
void clearHttpPreflightHeaders()
Clears all previously set HTTP header lines.
Definition: ampsplusplus.hpp:5249
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:7209
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5633
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:666
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1453
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5496
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5909
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:575
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1312
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6997
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:1053
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1494
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6919
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:797
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1417
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1264
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:1048
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6014
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:7150
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
Command & reset(const char *command_, size_t commandLen_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:592
Command & setSequence(const char *seq_, size_t seqLen_)
Definition: ampsplusplus.hpp:810
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5865
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1303
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:1030
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:5219
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:8266
void addHttpPreflightHeader(const std::string &header_)
Adds a given HTTP header line to the end of the headers that will be sent for the HTTP GET Upgrade re...
Definition: ampsplusplus.hpp:5234
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:7359
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:1043
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1084
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6400
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1184
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8275
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1195
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1324
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5488
void setHttpPreflightHeaders(const T &headers_)
Sets the given HTTP header lines to be sent for the HTTP GET Upgrade request.
Definition: ampsplusplus.hpp:5258
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1281
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7420
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1416
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1451
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1374
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6795
virtual void setFailedResubscribeHandler(std::shared_ptr< FailedResubscribeHandler > handler_)
Set a handler to deal with failing subscriptions after a failover event.
Definition: ampsplusplus.hpp:1482
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1425
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5480
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1305
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:705
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6990
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:600
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5508
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:7053
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:8286
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7410
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5751
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6487
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:882
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5004
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6733
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1273
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:837
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5730
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7239
Command & setSowKey(const char *sowKey_, size_t sowKeyLen_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:625
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6692
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5884
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:6163
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7402
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:6032
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1304
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1232
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6656
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:866
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6977
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1356
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5563
Command & setQueryId(const char *queryId_, size_t queryIdLen_)
Definition: ampsplusplus.hpp:738
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5458
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:8330
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1130
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5467
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:7335
Abstract base class where you can implement handling of exceptions that occur when a SubscriptionMana...
Definition: ampsplusplus.hpp:1432
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:8342
Command & setTopic(const char *topic_, size_t topicLen_)
Definition: ampsplusplus.hpp:686
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5450
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1216
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:8349
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6589
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7047
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5967
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4996
bool getErrorOnPublishGap() const
Called to check if the Store will throw PublishStoreGapException.
Definition: ampsplusplus.hpp:1348
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:803
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7172
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:731
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1302
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:816
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7183
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1417
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:668
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1302
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8889
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1452
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5349
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6236
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8898
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:5226
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:7023
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:857
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:5318
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:888
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1252
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:7161
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Command & setOrderBy(const char *orderBy_, size_t orderByLen_)
Definition: ampsplusplus.hpp:712
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6964
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:8303
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5577
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6611
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7443
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:642
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1416
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7392
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5608
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:5185
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:999
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1294
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1404
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1160
Command & setCorrelationId(const char *correlationId_, size_t correlationIdLen_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:782
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6257
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5657
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:7060
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5823
The client and server are disconnected.
Definition: amps.h:249
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6940
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5996
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8879
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:6298
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5192
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:472
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6448
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1193
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5935
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:6123
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:612
Message & assignSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1422
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:7273
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:770
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6772
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:5060
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7470
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7383
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6330