AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.3
Message.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "amps/util.hpp"
28 #include "amps/constants.hpp"
29 #include "amps/amps_generated.h"
30 #include "amps/Field.hpp"
31 #include <stdio.h>
32 #include <algorithm>
33 #include <ostream>
34 #include <string>
35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
36 
37 // Macros for determing what parts of TR1 we can depend on
38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
39  #define AMPS_USE_FUNCTIONAL 1
40 #endif
41 
42 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=5 )
43  #define AMPS_USE_LAMBDAS 1
44 #endif
45 
46 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=8 )
47  #define AMPS_USE_EMPLACE 1
48 #endif
49 
50 #ifdef AMPS_USE_FUNCTIONAL
51  #include <functional>
52 #endif
53 
54 #include <algorithm>
55 
59 
60 #define AMPS_OPTIONS_NONE ""
61 #define AMPS_OPTIONS_LIVE "live,"
62 #define AMPS_OPTIONS_OOF "oof,"
63 #define AMPS_OPTIONS_REPLACE "replace,"
64 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
65 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
66 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
67 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
68 #define AMPS_OPTIONS_CANCEL "cancel,"
69 #define AMPS_OPTIONS_RESUME "resume,"
70 #define AMPS_OPTIONS_PAUSE "pause,"
71 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
72 #define AMPS_OPTIONS_EXPIRE "expire,"
73 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
74 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
75 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
76 
77 namespace AMPS
78 {
79  typedef void* amps_subscription_handle;
80 
81  class ClientImpl;
82 
87  class MessageImpl : public RefBody
88  {
89  private:
90  amps_handle _message;
91  //Mutex _lock;
92  bool _owner;
93  mutable bool _isIgnoreAutoAck;
94  size_t _bookmarkSeqNo;
95  amps_subscription_handle _subscription;
96  ClientImpl* _clientImpl;
97  public:
109  MessageImpl(amps_handle message_, bool owner_ = false,
110  bool ignoreAutoAck_ = false, size_t bookmarkSeqNo_ = 0,
111  amps_subscription_handle subscription_ = NULL,
112  ClientImpl* clientImpl_ = NULL)
113  : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
114  , _bookmarkSeqNo(bookmarkSeqNo_)
115  , _subscription(subscription_), _clientImpl(clientImpl_)
116  {
117  }
118 
123  : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
124  {
125  // try to create one
126  _message = amps_message_create(NULL);
127  }
128 
129  virtual ~MessageImpl()
130  {
131  if (_owner && _message)
132  {
133  amps_message_destroy(_message);
134  }
135  }
136 
137  MessageImpl* copy() const
138  {
139  amps_handle copy = amps_message_copy(_message);
140  return new MessageImpl(copy, true, _isIgnoreAutoAck, _bookmarkSeqNo,
141  _subscription, _clientImpl);
142  }
143 
144  void copy(const MessageImpl& rhs_)
145  {
146  if (_owner && _message)
147  {
148  amps_message_destroy(_message);
149  }
150  _message = amps_message_copy(rhs_._message);
151  _owner = true;
152  _bookmarkSeqNo = rhs_._bookmarkSeqNo;
153  _subscription = rhs_._subscription;
154  _isIgnoreAutoAck = rhs_._isIgnoreAutoAck;
155  _clientImpl = rhs_._clientImpl;
156  }
157 
158  void setClientImpl(ClientImpl* clientImpl_)
159  {
160  _clientImpl = clientImpl_;
161  }
162 
163  ClientImpl* clientImpl(void) const
164  {
165  return _clientImpl;
166  }
167 
171  {
172  return _message;
173  }
174 
175  void reset()
176  {
177  //Lock<Mutex> l(_lock);
178  amps_message_reset(_message);
179  _bookmarkSeqNo = 0;
180  _subscription = NULL;
181  _isIgnoreAutoAck = false;
182  _clientImpl = NULL;
183  }
184 
190  void replace(amps_handle message_, bool owner_ = false)
191  {
192  //Lock<Mutex> l(_lock);
193  if (_message == message_)
194  {
195  return;
196  }
197  if (_owner && _message)
198  {
199  amps_message_destroy(_message);
200  }
201  _owner = owner_;
202  _message = message_;
203  _bookmarkSeqNo = 0;
204  _subscription = NULL;
205  _isIgnoreAutoAck = false;
206  }
207 
208  void disown()
209  {
210  //Lock<Mutex> l(_lock);
211  _owner = false;
212  }
213 
214  static unsigned long newId()
215  {
216 #if __cplusplus >= 201100L || _MSC_VER >= 1900
217  static std::atomic<uint_fast64_t> _id(0);
218  return (unsigned long)++_id;
219 #else
220  static AMPS_ATOMIC_TYPE _id = 0;
221  return (unsigned long)(AMPS_FETCH_ADD(&_id, 1));
222 #endif
223  }
224 
225  void setBookmarkSeqNo(size_t val_)
226  {
227  _bookmarkSeqNo = val_;
228  }
229 
230  size_t getBookmarkSeqNo(void) const
231  {
232  return _bookmarkSeqNo;
233  }
234 
235  void setSubscriptionHandle(amps_subscription_handle subscription_)
236  {
237  _subscription = subscription_;
238  }
239 
240  amps_subscription_handle getSubscriptionHandle(void) const
241  {
242  return _subscription;
243  }
244 
245  void setIgnoreAutoAck() const
246  {
247  _isIgnoreAutoAck = true;
248  }
249 
250  bool getIgnoreAutoAck() const
251  {
252  return _isIgnoreAutoAck;
253  }
254  };
255 
256 
257 // This block of macros works with the Doxygen preprocessor to
258 // create documentation comments for fields defined with the AMPS_FIELD macro.
259 // A C++ compiler removes comments before expanding macros, so these macros
260 // must ONLY be defined for Doxygen and not for actual compilation.
261 
262 #ifdef DOXYGEN_PREPROCESSOR
263 
264 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
265 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
266 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \
267  DOX_GROUPNAME(s)
268 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
269 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
270 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
271 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
272 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
273 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.)
274 
275 #else
276 
277 #define DOX_COMMENTHEAD(s)
278 #define DOX_GROUPNAME(s)
279 #define DOX_OPENGROUP(x)
280 #define DOX_CLOSEGROUP()
281 #define DOX_MAKEGETCOMMENT(x)
282 #define DOX_MAKEGETRAWCOMMENT(x)
283 #define DOX_MAKESETCOMMENT(x)
284 #define DOX_MAKEASSIGNCOMMENT(x)
285 #define DOX_MAKENEWCOMMENT(x)
286 
287 #endif
288 
289 // Macro for defining all of the necessary methods for a field in an AMPS
290 // message.
291 
292 
293 #define AMPS_FIELD(x) \
294  DOX_OPENGROUP(x) \
295  DOX_MAKEGETCOMMENT(x) \
296  Field get##x() const {\
297  Field returnValue;\
298  const char* ptr;\
299  size_t sz;\
300  amps_message_get_field_value(_body.get().getMessage(),\
301  AMPS_##x, &ptr, &sz);\
302  returnValue.assign(ptr, sz);\
303  return returnValue;\
304  }\
305  DOX_MAKEGETRAWCOMMENT(x) \
306  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
307  amps_message_get_field_value(_body.get().getMessage(),\
308  AMPS_##x, dataptr, sizeptr);\
309  return;\
310  }\
311  DOX_MAKESETCOMMENT(x) \
312  Message& set##x(const std::string& v) {\
313  amps_message_set_field_value(_body.get().getMessage(),\
314  AMPS_##x, v.c_str(), v.length());\
315  return *this;\
316  }\
317  DOX_MAKESETCOMMENT(x) \
318  Message& set##x(amps_uint64_t v) {\
319  char buf[22];\
320  AMPS_snprintf_amps_uint64_t(buf,22,v);\
321  amps_message_set_field_value_nts(_body.get().getMessage(),\
322  AMPS_##x, buf);\
323  return *this;\
324  }\
325  DOX_MAKEASSIGNCOMMENT(x) \
326  Message& assign##x(const std::string& v) {\
327  amps_message_assign_field_value(_body.get().getMessage(),\
328  AMPS_##x, v.c_str(), v.length());\
329  return *this;\
330  }\
331  DOX_MAKEASSIGNCOMMENT(x) \
332  Message& assign##x(const char* data, size_t len) {\
333  amps_message_assign_field_value(_body.get().getMessage(),\
334  AMPS_##x, data, len);\
335  return *this;\
336  }\
337  DOX_MAKESETCOMMENT(x) \
338  Message& set##x(const char* str) {\
339  amps_message_set_field_value_nts(_body.get().getMessage(),\
340  AMPS_##x, str);\
341  return *this;\
342  }\
343  DOX_MAKESETCOMMENT(x) \
344  Message& set##x(const char* str,size_t len) {\
345  amps_message_set_field_value(_body.get().getMessage(),\
346  AMPS_##x, str,len);\
347  return *this;\
348  }\
349  DOX_MAKENEWCOMMENT(x) \
350  Message& new##x() {\
351  char buf[Message::IdentifierLength+1];\
352  buf[Message::IdentifierLength] = 0;\
353  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lu" , (unsigned long)(_body.get().newId()));\
354  amps_message_set_field_value_nts(_body.get().getMessage(),\
355  AMPS_##x, buf);\
356  return *this;\
357  } \
358  DOX_CLOSEGROUP()
359 
360 #define AMPS_FIELD_ALIAS(x,y) \
361  DOX_OPENGROUP(y) \
362  DOX_MAKEGETCOMMENT(y) \
363  Field get##y() const {\
364  Field returnValue;\
365  const char* ptr;\
366  size_t sz;\
367  amps_message_get_field_value(_body.get().getMessage(),\
368  AMPS_##y, &ptr, &sz);\
369  returnValue.assign(ptr, sz);\
370  return returnValue;\
371  }\
372  DOX_MAKEGETRAWCOMMENT(y) \
373  void getRaw##y(const char** dataptr, size_t* sizeptr) const {\
374  amps_message_get_field_value(_body.get().getMessage(),\
375  AMPS_##y, dataptr, sizeptr);\
376  return;\
377  }\
378  DOX_MAKESETCOMMENT(y) \
379  Message& set##y(const std::string& v) {\
380  amps_message_set_field_value(_body.get().getMessage(),\
381  AMPS_##y, v.c_str(), v.length());\
382  return *this;\
383  }\
384  DOX_MAKESETCOMMENT(y) \
385  Message& set##y(amps_uint64_t v) {\
386  char buf[22];\
387  AMPS_snprintf_amps_uint64_t(buf,22,v);\
388  amps_message_set_field_value_nts(_body.get().getMessage(),\
389  AMPS_##y, buf);\
390  return *this;\
391  }\
392  DOX_MAKEASSIGNCOMMENT(y) \
393  Message& assign##y(const std::string& v) {\
394  amps_message_assign_field_value(_body.get().getMessage(),\
395  AMPS_##y, v.c_str(), v.length());\
396  return *this;\
397  }\
398  DOX_MAKEASSIGNCOMMENT(y) \
399  Message& assign##y(const char* data, size_t len) {\
400  amps_message_assign_field_value(_body.get().getMessage(),\
401  AMPS_##y, data, len);\
402  return *this;\
403  }\
404  DOX_MAKESETCOMMENT(y) \
405  Message& set##y(const char* str) {\
406  amps_message_set_field_value_nts(_body.get().getMessage(),\
407  AMPS_##y, str);\
408  return *this;\
409  }\
410  DOX_MAKESETCOMMENT(y) \
411  Message& set##y(const char* str,size_t len) {\
412  amps_message_set_field_value(_body.get().getMessage(),\
413  AMPS_##y, str,len);\
414  return *this;\
415  }\
416  DOX_MAKENEWCOMMENT(y) \
417  Message& new##y() {\
418  char buf[Message::IdentifierLength+1];\
419  buf[Message::IdentifierLength] = 0;\
420  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\
421  amps_message_set_field_value_nts(_body.get().getMessage(),\
422  AMPS_##y, buf);\
423  return *this;\
424  }\
425  DOX_MAKEGETCOMMENT(y) \
426  Field get##x() const {\
427  Field returnValue;\
428  const char* ptr;\
429  size_t sz;\
430  amps_message_get_field_value(_body.get().getMessage(),\
431  AMPS_##y, &ptr, &sz);\
432  returnValue.assign(ptr, sz);\
433  return returnValue;\
434  }\
435  DOX_MAKEGETRAWCOMMENT(y) \
436  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
437  amps_message_get_field_value(_body.get().getMessage(),\
438  AMPS_##y, dataptr, sizeptr);\
439  return;\
440  }\
441  DOX_MAKESETCOMMENT(y) \
442  Message& set##x(const std::string& v) {\
443  amps_message_set_field_value(_body.get().getMessage(),\
444  AMPS_##y, v.c_str(), v.length());\
445  return *this;\
446  }\
447  DOX_MAKESETCOMMENT(y) \
448  Message& set##x(amps_uint64_t v) {\
449  char buf[22];\
450  AMPS_snprintf_amps_uint64_t(buf,22,v);\
451  amps_message_set_field_value_nts(_body.get().getMessage(),\
452  AMPS_##y, buf);\
453  return *this;\
454  }\
455  DOX_MAKEASSIGNCOMMENT(y) \
456  Message& assign##x(const std::string& v) {\
457  amps_message_assign_field_value(_body.get().getMessage(),\
458  AMPS_##y, v.c_str(), v.length());\
459  return *this;\
460  }\
461  DOX_MAKEASSIGNCOMMENT(y) \
462  Message& assign##x(const char* data, size_t len) {\
463  amps_message_assign_field_value(_body.get().getMessage(),\
464  AMPS_##y, data, len);\
465  return *this;\
466  }\
467  DOX_MAKESETCOMMENT(y) \
468  Message& set##x(const char* str) {\
469  amps_message_set_field_value_nts(_body.get().getMessage(),\
470  AMPS_##y, str);\
471  return *this;\
472  }\
473  DOX_MAKESETCOMMENT(y) \
474  Message& set##x(const char* str,size_t len) {\
475  amps_message_set_field_value(_body.get().getMessage(),\
476  AMPS_##y, str,len);\
477  return *this;\
478  }\
479  DOX_MAKENEWCOMMENT(y) \
480  Message& new##x() {\
481  char buf[Message::IdentifierLength+1];\
482  buf[Message::IdentifierLength] = 0;\
483  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\
484  amps_message_set_field_value_nts(_body.get().getMessage(),\
485  AMPS_##y, buf);\
486  return *this;\
487  } \
488  DOX_CLOSEGROUP()
489 
490 
531  class Message
532  {
533  RefHandle<MessageImpl> _body;
534 
535  Message(MessageImpl* body_) : _body(body_) { ; }
536 
537  public:
538  typedef AMPS::Field Field;
539 
542  static const unsigned int IdentifierLength = 32;
543 
546  static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
547 
551  enum CtorFlag { EMPTY };
552 
555  Message(CtorFlag) : _body()
556  {
557  }
558 
565  Message(amps_handle message_, bool owner_ = false)
566  : _body(new MessageImpl(message_, owner_))
567  {
568  }
569 
573  Message() : _body(new MessageImpl())
574  {
575  }
576 
579  Message deepCopy(void) const
580  {
581  return Message(_body.get().copy());
582  }
583 
586  void deepCopy(const Message& rhs_)
587  {
588  _body.get().copy(rhs_._body.get());
589  }
590 
601  class Options
602  {
603  public:
604  static const char* None(void)
605  {
606  return AMPS_OPTIONS_NONE;
607  }
608  static const char* Live(void)
609  {
610  return AMPS_OPTIONS_LIVE;
611  }
612  static const char* OOF(void)
613  {
614  return AMPS_OPTIONS_OOF;
615  }
616  static const char* Replace(void)
617  {
618  return AMPS_OPTIONS_REPLACE;
619  }
620  static const char* NoEmpties(void)
621  {
622  return AMPS_OPTIONS_NOEMPTIES;
623  }
624  static const char* SendKeys(void)
625  {
626  return AMPS_OPTIONS_SENDKEYS;
627  }
628  static const char* Timestamp(void)
629  {
630  return AMPS_OPTIONS_TIMESTAMP;
631  }
632  static const char* NoSowKey(void)
633  {
634  return AMPS_OPTIONS_NOSOWKEY;
635  }
636  static const char* Cancel(void)
637  {
638  return AMPS_OPTIONS_CANCEL;
639  }
640  static const char* Resume(void)
641  {
642  return AMPS_OPTIONS_RESUME;
643  }
644  static const char* Pause(void)
645  {
646  return AMPS_OPTIONS_PAUSE;
647  }
648  static const char* FullyDurable(void)
649  {
650  return AMPS_OPTIONS_FULLY_DURABLE;
651  }
652  static const char* Expire(void)
653  {
654  return AMPS_OPTIONS_EXPIRE;
655  }
656  static std::string Conflation(const char* conflation_)
657  {
658  char buf[64];
659  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
660  return buf;
661  }
662  static std::string ConflationKey(const char* conflationKey_)
663  {
664  std::string option("conflation_key=");
665  option.append(conflationKey_).append(",");
666  return option;
667  }
668  static std::string TopN(int topN_)
669  {
670  char buf[24];
671  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
672  return buf;
673  }
674  static std::string MaxBacklog(int maxBacklog_)
675  {
676  char buf[24];
677  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
678  return buf;
679  }
680  static std::string Rate(const char* rate_)
681  {
682  char buf[64];
683  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
684  return buf;
685  }
686  static std::string RateMaxGap(const char* rateMaxGap_)
687  {
688  char buf[64];
689  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
690  return buf;
691  }
692  static std::string SkipN(int skipN_)
693  {
694  char buf[24];
695  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
696  return buf;
697  }
698 
699  static std::string Projection(const std::string& projection_)
700  {
701  return "projection=[" + projection_ + "],";
702  }
703 
704  template<class Iterator>
705  static std::string Projection(Iterator begin_, Iterator end_)
706  {
707  std::string projection = "projection=[";
708  for (Iterator i = begin_; i != end_; ++i)
709  {
710  projection += *i;
711  projection += ',';
712  }
713  projection.insert(projection.length() - 1, "]");
714  return projection;
715  }
716 
717  static std::string Grouping(const std::string& grouping_)
718  {
719  return "grouping=[" + grouping_ + "],";
720  }
721 
722  template<class Iterator>
723  static std::string Grouping(Iterator begin_, Iterator end_)
724  {
725  std::string grouping = "grouping=[";
726  for (Iterator i = begin_; i != end_; ++i)
727  {
728  grouping += *i;
729  grouping += ',';
730  }
731  grouping.insert(grouping.length() - 1, "]");
732  return grouping;
733  }
734 
735  static std::string Select(const std::string& select_)
736  {
737  return "select=[" + select_ + "],";
738  }
739 
740  template<class Iterator>
741  static std::string Select(Iterator begin_, Iterator end_)
742  {
743  std::string select = "select=[";
744  for (Iterator i = begin_; i != end_; ++i)
745  {
746  select += *i;
747  select += ',';
748  }
749  select.insert(select.length() - 1, "]");
750  return select;
751  }
752 
753  static std::string AckConflationInterval(const std::string& interval_)
754  {
755  return "ack_conflation=" + interval_ + ",";
756  }
757 
758  static std::string AckConflationInterval(const char* interval_)
759  {
760  static const std::string start("ack_conflation=");
761  return start + interval_ + ",";
762  }
763 
766  Options(std::string options_ = "")
767  : _optionStr(options_)
768  , _maxBacklog(0)
769  , _topN(0)
770  , _skipN(0)
771  {;}
772 
773  int getMaxBacklog(void) const
774  {
775  return _maxBacklog;
776  }
777  std::string getConflation(void) const
778  {
779  return _conflation;
780  }
781  std::string getConflationKey(void) const
782  {
783  return _conflationKey;
784  }
785  int getTopN(void) const
786  {
787  return _topN;
788  }
789  std::string getRate(void) const
790  {
791  return _rate;
792  }
793  std::string getRateMaxGap(void) const
794  {
795  return _rateMaxGap;
796  }
797 
801  void setNone(void)
802  {
803  _optionStr.clear();
804  }
805 
815  void setLive(void)
816  {
817  _optionStr += AMPS_OPTIONS_LIVE;
818  }
819 
824  void setOOF(void)
825  {
826  _optionStr += AMPS_OPTIONS_OOF;
827  }
828 
833  void setReplace(void)
834  {
835  _optionStr += AMPS_OPTIONS_REPLACE;
836  }
837 
841  void setNoEmpties(void)
842  {
843  _optionStr += AMPS_OPTIONS_NOEMPTIES;
844  }
845 
849  void setSendKeys(void)
850  {
851  _optionStr += AMPS_OPTIONS_SENDKEYS;
852  }
853 
858  void setTimestamp(void)
859  {
860  _optionStr += AMPS_OPTIONS_TIMESTAMP;
861  }
862 
866  void setNoSowKey(void)
867  {
868  _optionStr += AMPS_OPTIONS_NOSOWKEY;
869  }
870 
874  void setCancel(void)
875  {
876  _optionStr += AMPS_OPTIONS_CANCEL;
877  }
878 
885  void setResume(void)
886  {
887  _optionStr += AMPS_OPTIONS_RESUME;
888  }
889 
900  void setPause(void)
901  {
902  _optionStr += AMPS_OPTIONS_PAUSE;
903  }
904 
911  void setFullyDurable(void)
912  {
913  _optionStr += AMPS_OPTIONS_FULLY_DURABLE;
914  }
915 
926  void setMaxBacklog(int maxBacklog_)
927  {
928  char buf[24];
929  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
930  _optionStr += buf;
931  _maxBacklog = maxBacklog_;
932  }
933 
939  void setConflation(const char* conflation_)
940  {
941  char buf[64];
942  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
943  _optionStr += buf;
944  _conflation = conflation_;
945  }
946 
956  void setConflationKey(const char* conflationKey_)
957  {
958  char buf[64];
959  AMPS_snprintf(buf, sizeof(buf), "conflation_key=%s,", conflationKey_);
960  _optionStr += buf;
961  _conflationKey = conflationKey_;
962  }
963 
969  void setTopN(int topN_)
970  {
971  char buf[24];
972  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
973  _optionStr += buf;
974  _topN = topN_;
975  }
976 
983  void setRate(const char* rate_)
984  {
985  char buf[64];
986  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
987  _optionStr += buf;
988  _rate = rate_;
989  }
990 
1005  void setRateMaxGap(const char* rateMaxGap_)
1006  {
1007  char buf[64];
1008  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
1009  _optionStr += buf;
1010  _rateMaxGap = rateMaxGap_;
1011  }
1012 
1018  void setSkipN(int skipN_)
1019  {
1020  char buf[24];
1021  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
1022  _optionStr += buf;
1023  _skipN = skipN_;
1024  }
1025 
1030  void setProjection(const std::string& projection_)
1031  {
1032  _projection = "projection=[" + projection_ + "],";
1033  _optionStr += _projection;
1034  }
1035 
1036 
1042  template<class Iterator>
1043  void setProjection(Iterator begin_, Iterator end_)
1044  {
1045  _projection = "projection=[";
1046  for (Iterator i = begin_; i != end_; ++i)
1047  {
1048  _projection += *i;
1049  _projection += ',';
1050  }
1051  _projection.insert(_projection.length() - 1, "]");
1052  _optionStr += _projection;
1053  }
1054 
1059  void setGrouping(const std::string& grouping_)
1060  {
1061  _grouping = "grouping=[" + grouping_ + "],";
1062  _optionStr += _grouping;
1063  }
1064 
1065 
1071  template<class Iterator>
1072  void setGrouping(Iterator begin_, Iterator end_)
1073  {
1074  _grouping = "grouping=[";
1075  for (Iterator i = begin_; i != end_; ++i)
1076  {
1077  _grouping += *i;
1078  _grouping += ',';
1079  }
1080  _grouping.insert(_grouping.length() - 1, "]");
1081  _optionStr += _grouping;
1082  }
1083 
1087  operator const std::string()
1088  {
1089  return _optionStr.substr(0, _optionStr.length() - 1);
1090  }
1094  size_t getLength() const
1095  {
1096  return (_optionStr.empty() ? 0 : _optionStr.length() - 1);
1097  }
1098 
1103  const char* getStr() const
1104  {
1105  return (_optionStr.empty() ? 0 : _optionStr.data());
1106  }
1107 
1108  private:
1109  std::string _optionStr;
1110  int _maxBacklog;
1111  std::string _conflation;
1112  std::string _conflationKey;
1113  int _topN;
1114  std::string _rate;
1115  std::string _rateMaxGap;
1116  int _skipN;
1117  std::string _projection;
1118  std::string _grouping;
1119  };
1120 
1123  struct AckType
1124  {
1125  typedef enum : unsigned
1126  {
1127  None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1128  } Type;
1129  };
1130  AMPS_FIELD(AckType)
1133  static inline AckType::Type decodeSingleAckType(const char* begin, const char* end)
1134  {
1135  switch (end - begin)
1136  {
1137  case 5:
1138  return AckType::Stats;
1139  case 6:
1140  return AckType::Parsed;
1141  case 8:
1142  return AckType::Received;
1143  case 9:
1144  switch (begin[1])
1145  {
1146  case 'e': return AckType::Persisted;
1147  case 'r': return AckType::Processed;
1148  case 'o': return AckType::Completed;
1149  default: break;
1150  }
1151  break;
1152  default:
1153  break;
1154  }
1155  return AckType::None;
1156  }
1160  unsigned getAckTypeEnum() const
1161  {
1162  unsigned result = AckType::None;
1163  const char* data = NULL; size_t len = 0;
1164  amps_message_get_field_value(_body.get().getMessage(), AMPS_AckType, &data, &len);
1165  const char* mark = data;
1166  for (const char* end = data + len; data != end; ++data)
1167  {
1168  if (*data == ',')
1169  {
1170  result |= decodeSingleAckType(mark, data);
1171  mark = data + 1;
1172  }
1173  }
1174  if (mark < data)
1175  {
1176  result |= decodeSingleAckType(mark, data);
1177  }
1178  return result;
1179  }
1183  Message& setAckTypeEnum(unsigned ackType_)
1184  {
1185  if (ackType_ < AckTypeConstants<0>::Entries)
1186  {
1187  amps_message_assign_field_value(_body.get().getMessage(), AMPS_AckType,
1188  AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1189  }
1190  return *this;
1191  }
1192 
1193  AMPS_FIELD(BatchSize)
1194  AMPS_FIELD(Bookmark)
1195  AMPS_FIELD(Command)
1196 
1200  struct Command
1201  {
1202  typedef enum
1203  {
1204  Unknown = 0,
1205  Publish = 1,
1206  Subscribe = 2,
1207  Unsubscribe = 4,
1208  SOW = 8,
1209  Heartbeat = 16,
1210  SOWDelete = 32,
1211  DeltaPublish = 64,
1212  Logon = 128,
1213  SOWAndSubscribe = 256,
1214  DeltaSubscribe = 512,
1215  SOWAndDeltaSubscribe = 1024,
1216  StartTimer = 2048,
1217  StopTimer = 4096,
1218  GroupBegin = 8192,
1219  GroupEnd = 16384,
1220  OOF = 32768,
1221  Ack = 65536,
1222  Flush = 131072,
1223  NoDataCommands = Publish | Unsubscribe | Heartbeat | SOWDelete | DeltaPublish
1224  | Logon | StartTimer | StopTimer | Flush
1225  } Type;
1226  };
1228  Command::Type getCommandEnum() const
1229  {
1230  const char* data = NULL; size_t len = 0;
1231  amps_message_get_field_value(_body.get().getMessage(), AMPS_Command, &data, &len);
1232  switch (len)
1233  {
1234  case 1: return Command::Publish; // -V1037
1235  case 3:
1236  switch (data[0])
1237  {
1238  case 's': return Command::SOW;
1239  case 'o': return Command::OOF;
1240  case 'a': return Command::Ack;
1241  }
1242  break;
1243  case 5:
1244  switch (data[0])
1245  {
1246  case 'l': return Command::Logon;
1247  case 'f': return Command::Flush;
1248  }
1249  break;
1250  case 7:
1251  return Command::Publish; // -V1037
1252  break;
1253  case 9:
1254  switch (data[0])
1255  {
1256  case 's': return Command::Subscribe;
1257  case 'h': return Command::Heartbeat;
1258  case 'g': return Command::GroupEnd;
1259  }
1260  break;
1261  case 10:
1262  switch (data[1])
1263  {
1264  case 'o': return Command::SOWDelete;
1265  case 't': return Command::StopTimer;
1266  }
1267  break;
1268  case 11:
1269  switch (data[0])
1270  {
1271  case 'g': return Command::GroupBegin;
1272  case 'u': return Command::Unsubscribe;
1273  }
1274  break;
1275  case 13:
1276  return Command::DeltaPublish;
1277  case 15:
1278  return Command::DeltaSubscribe;
1279  case 17:
1280  return Command::SOWAndSubscribe;
1281  case 23:
1282  return Command::SOWAndDeltaSubscribe;
1283  }
1284  return Command::Unknown;
1285  }
1286 
1288  Message& setCommandEnum(Command::Type command_)
1289  {
1290  unsigned bits = 0;
1291  unsigned command = command_;
1292  while (command > 0)
1293  {
1294  ++bits;
1295  command >>= 1;
1296  }
1297  amps_message_assign_field_value(_body.get().getMessage(), AMPS_Command,
1298  CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1299  return *this;
1300  }
1301 
1302  AMPS_FIELD(CommandId)
1303  AMPS_FIELD(ClientName)
1304  AMPS_FIELD(CorrelationId)
1305  AMPS_FIELD(Expiration)
1306  AMPS_FIELD(Filter)
1307  AMPS_FIELD(GroupSequenceNumber)
1308  AMPS_FIELD(Heartbeat)
1309  AMPS_FIELD(LeasePeriod)
1310  AMPS_FIELD(Matches)
1311  AMPS_FIELD(MessageLength)
1312  AMPS_FIELD(MessageType)
1313 
1314  DOX_OPENGROUP(Options)
1315  DOX_MAKEGETCOMMENT(Options)
1316  Field getOptions() const
1317  {
1318  Field returnValue;
1319  const char* ptr;
1320  size_t sz;
1321  amps_message_get_field_value(_body.get().getMessage(),
1322  AMPS_Options, &ptr, &sz);
1323  if (sz && ptr[sz - 1] == ',')
1324  {
1325  --sz;
1326  }
1327  returnValue.assign(ptr, sz);
1328  return returnValue;
1329  }
1330 
1331  DOX_MAKEGETRAWCOMMENT(Options)
1332  void getRawOptions(const char** dataptr, size_t* sizeptr) const
1333  {
1334  amps_message_get_field_value(_body.get().getMessage(),
1335  AMPS_Options, dataptr, sizeptr);
1336  if (*sizeptr && *dataptr && (*dataptr)[*sizeptr - 1] == ',')
1337  {
1338  --*sizeptr;
1339  }
1340  return;
1341  }
1342 
1343  DOX_MAKESETCOMMENT(Options)
1344  Message& setOptions(const std::string& v)
1345  {
1346  size_t sz = v.length();
1347  if (sz && v[sz - 1] == ',')
1348  {
1349  --sz;
1350  }
1351  amps_message_set_field_value(_body.get().getMessage(),
1352  AMPS_Options, v.c_str(), sz);
1353  return *this;
1354  }
1355 
1356  DOX_MAKEASSIGNCOMMENT(Options)
1357  Message& assignOptions(const std::string& v)
1358  {
1359  size_t sz = v.length();
1360  if (sz && v[sz - 1] == ',')
1361  {
1362  --sz;
1363  }
1364  amps_message_assign_field_value(_body.get().getMessage(),
1365  AMPS_Options, v.c_str(), sz);
1366  return *this;
1367  }
1368 
1369  DOX_MAKEASSIGNCOMMENT(Options)
1370  Message& assignOptions(const char* data, size_t len)
1371  {
1372  if (len && data[len - 1] == ',')
1373  {
1374  --len;
1375  }
1376  amps_message_assign_field_value(_body.get().getMessage(),
1377  AMPS_Options, data, len);
1378  return *this;
1379  }
1380 
1381  DOX_MAKESETCOMMENT(Options)
1382  Message& setOptions(const char* str)
1383  {
1384  if (str)
1385  {
1386  size_t sz = strlen(str);
1387  if (sz && str[sz - 1] == ',')
1388  {
1389  --sz;
1390  }
1391  amps_message_set_field_value(_body.get().getMessage(),
1392  AMPS_Options, str, sz);
1393  }
1394  else
1395  {
1396  amps_message_set_field_value(_body.get().getMessage(),
1397  AMPS_Options, str, 0);
1398  }
1399  return *this;
1400  }
1401 
1402  DOX_MAKESETCOMMENT(Options)
1403  Message& setOptions(const char* str, size_t len)
1404  {
1405  if (len && str[len - 1] == ',')
1406  {
1407  --len;
1408  }
1409  amps_message_set_field_value(_body.get().getMessage(),
1410  AMPS_Options, str, len);
1411  return *this;
1412  }
1413  DOX_CLOSEGROUP()
1414 
1415  AMPS_FIELD(OrderBy)
1416  AMPS_FIELD(Password)
1417  AMPS_FIELD_ALIAS(QueryId, QueryID)
1418  AMPS_FIELD(Reason)
1419  AMPS_FIELD(RecordsInserted)
1420  AMPS_FIELD(RecordsReturned)
1421  AMPS_FIELD(RecordsUpdated)
1422  AMPS_FIELD(Sequence)
1423  AMPS_FIELD(SowDelete)
1424  AMPS_FIELD(SowKey)
1425  AMPS_FIELD(SowKeys)
1426  AMPS_FIELD(Status)
1427  AMPS_FIELD_ALIAS(SubId, SubscriptionId) // -V524
1428  AMPS_FIELD(SubscriptionIds)
1429  AMPS_FIELD(TimeoutInterval)
1430  AMPS_FIELD(Timestamp)
1431 
1435  Field getTransmissionTime() const
1436  {
1437  return getTimestamp();
1438  }
1439 
1444  void getRawTransmissionTime(const char** dataptr, size_t* sizeptr) const
1445  {
1446  getRawTimestamp(dataptr, sizeptr);
1447  }
1448 
1449  AMPS_FIELD(Topic)
1450  AMPS_FIELD(TopicMatches)
1451  AMPS_FIELD(TopNRecordsReturned)
1452  AMPS_FIELD(Version)
1453  AMPS_FIELD(UserId)
1454 
1459 
1460  Field getData() const
1461  {
1462  Field returnValue;
1463  char* ptr;
1464  size_t sz;
1465  amps_message_get_data(_body.get().getMessage(), &ptr, &sz);
1466  returnValue.assign(ptr, sz);
1467  return returnValue;
1468  }
1469 
1470  void getRawData(const char** data, size_t* sz) const
1471  {
1472  amps_message_get_data(_body.get().getMessage(), (char**)data, sz);
1473  }
1476  Message& setData(const std::string& v_)
1477  {
1478  amps_message_set_data(_body.get().getMessage(), v_.c_str(), v_.length());
1479  return *this;
1480  }
1481  Message& assignData(const std::string& v_)
1482  {
1483  amps_message_assign_data(_body.get().getMessage(), v_.c_str(), v_.length());
1484  return *this;
1485  }
1486 
1490  Message& setData(const char* data_, size_t length_)
1491  {
1492  amps_message_set_data(_body.get().getMessage(), data_, length_);
1493  return *this;
1494  }
1495  Message& assignData(const char* data_, size_t length_)
1496  {
1497  amps_message_assign_data(_body.get().getMessage(), data_, length_);
1498  return *this;
1499  }
1500 
1503  Message& setData(const char* data_)
1504  {
1505  amps_message_set_data_nts(_body.get().getMessage(), data_);
1506  return *this;
1507  }
1508  Message& assignData(const char* data_)
1509  {
1510  amps_message_assign_data(_body.get().getMessage(), data_, strlen(data_));
1511  return *this;
1512  }
1513  amps_handle getMessage() const
1514  {
1515  return _body.get().getMessage();
1516  }
1517  void replace(amps_handle message, bool owner = false)
1518  {
1519  _body.get().replace(message, owner);
1520  }
1521  void disown()
1522  {
1523  _body.get().disown();
1524  }
1525  void invalidate()
1526  {
1527  _body = NULL;
1528  }
1529  bool isValid(void) const
1530  {
1531  return _body.isValid();
1532  }
1533  Message& reset()
1534  {
1535  _body.get().reset();
1536  return *this;
1537  }
1538 
1539  void setBookmarkSeqNo(size_t val)
1540  {
1541  _body.get().setBookmarkSeqNo(val);
1542  }
1543 
1544  size_t getBookmarkSeqNo() const
1545  {
1546  return _body.get().getBookmarkSeqNo();
1547  }
1548 
1549  void setSubscriptionHandle(amps_handle val)
1550  {
1551  _body.get().setSubscriptionHandle(val);
1552  }
1553 
1554  amps_handle getSubscriptionHandle() const
1555  {
1556  return _body.get().getSubscriptionHandle();
1557  }
1558 
1559  void ack(const char* options_ = NULL) const;
1560 
1561  void setClientImpl(ClientImpl* pClientImpl)
1562  {
1563  _body.get().setClientImpl(pClientImpl);
1564  }
1565 
1566  void setIgnoreAutoAck() const
1567  {
1568  _body.get().setIgnoreAutoAck();
1569  }
1570 
1571  bool getIgnoreAutoAck() const
1572  {
1573  return _body.get().getIgnoreAutoAck();
1574  }
1575 
1576  template <class T> // static
1577  void throwFor(const T& /*context_*/, const std::string& ackReason_) const
1578  {
1579  switch (ackReason_[0])
1580  {
1581  case 'a': // auth failure
1582  throw AuthenticationException("Logon failed for user \"" +
1583  (std::string)getUserId() + "\"");
1584  break;
1585  case 'b':
1586  switch (ackReason_.length())
1587  {
1588  case 10: // bad filter
1589  throw BadFilterException("bad filter '" +
1590  (std::string)getFilter() +
1591  "'");
1592  break;
1593  case 11: // bad sow key
1594  if (getSowKeys().len())
1595  {
1596  throw BadSowKeyException("bad sow key '" +
1597  (std::string)getSowKeys() +
1598  "'");
1599  }
1600  else
1601  {
1602  throw BadSowKeyException("bad sow key '" +
1603  (std::string)getSowKey() +
1604  "'");
1605  }
1606  break;
1607  case 15: // bad regex topic
1608  throw BadRegexTopicException("bad regex topic '" +
1609  (std::string)getTopic() +
1610  "'.");
1611  break;
1612  default:
1613  break;
1614  }
1615  break;
1616  case 'd':
1617  if (ackReason_.length() == 23) // duplicate logon attempt
1618  {
1619  throw DuplicateLogonException("Client '" +
1620  (std::string)getClientName() +
1621  "' with userid '" +
1622  (std::string)getUserId() +
1623  "' duplicate logon attempt");
1624  }
1625  break;
1626  case 'i':
1627  if (ackReason_.length() >= 9)
1628  {
1629  switch (ackReason_[8])
1630  {
1631  case 'b': // invalid bookmark
1632  throw InvalidBookmarkException("invalid bookmark '" +
1633  (std::string)getBookmark() +
1634  "'.");
1635  break;
1636  case 'm': // invalid message type
1637  throw CommandException(std::string("invalid message type '") +
1638  (std::string)getMessageType() +
1639  "'.");
1640  break;
1641  case 'o':
1642  if (ackReason_[9] == 'p') // invalid options
1643  {
1644  throw InvalidOptionsException("invalid options '" +
1645  (std::string)getOptions() +
1646  "'.");
1647  }
1648  else if (ackReason_[9] == 'r') // invalid order by
1649  {
1650  throw InvalidOrderByException("invalid order by '" +
1651  (std::string)getOrderBy() +
1652  "'.");
1653  }
1654  break;
1655  case 's': // invalid subId
1656  throw InvalidSubIdException("invalid subid '" +
1657  (std::string)getSubscriptionId() +
1658  "'.");
1659  break;
1660  case 't':
1661  if (ackReason_.length() == 13) // invalid topic
1662  {
1663  throw InvalidTopicException("invalid topic '" +
1664  (std::string)getTopic() +
1665  "'.");
1666  }
1667  else if (ackReason_.length() == 23) // invalid topic or filter
1668  {
1669  throw InvalidTopicException("invalid topic or filter. Topic '" +
1670  (std::string)getTopic() +
1671  "' Filter '" +
1672  (std::string)getFilter() +
1673  "'.");
1674  }
1675  break;
1676  default:
1677  break;
1678  }
1679  }
1680  break;
1681  case 'l':
1682  if (ackReason_.length() == 14) // logon required
1683  {
1684  throw LogonRequiredException("logon required before command");
1685  }
1686  break;
1687  case 'n':
1688  switch (ackReason_[4])
1689  {
1690  case ' ': // name in use
1691  throw NameInUseException("name in use '" +
1692  (std::string)getClientName() +
1693  "'.");
1694  break;
1695  case 'e': // not entitled
1696  throw NotEntitledException("User \"" +
1697  (std::string)getUserId() +
1698  "\" not entitled to topic \"" +
1699  (std::string)getTopic() +
1700  "\".");
1701  break;
1702  case 'i': // no filter or bookmark
1703  throw MissingFieldsException("command sent with no filter or bookmark.");
1704  break;
1705  case 'l': // no client name
1706  throw MissingFieldsException("command sent with no client name.");
1707  break;
1708  case 'o': // no topic or filter
1709  throw MissingFieldsException("command sent with no topic or filter.");
1710  break;
1711  case 's': // not supported
1712  throw CommandException("operation on topic '" +
1713  (std::string)getTopic() +
1714  "' with options '" +
1715  (std::string)getOptions() +
1716  "' not supported.");
1717  break;
1718  default:
1719  break;
1720  }
1721  break;
1722  case 'o':
1723  switch (ackReason_.length())
1724  {
1725  case 16: // orderby required
1726  throw MissingFieldsException("orderby required");
1727  break;
1728  case 17: // orderby too large
1729  throw CommandException("orderby too large '" +
1730  (std::string)getOrderBy() +
1731  "'.");
1732  break;
1733  }
1734  break;
1735  case 'p':
1736  throw CommandException("projection clause too large in options '" +
1737  (std::string)getOptions() +
1738  "'.");
1739  break;
1740  case 'r':
1741  switch (ackReason_[2])
1742  {
1743  case 'g': // regex topic not supported
1744  throw BadRegexTopicException("'regex topic not supported '" +
1745  (std::string)getTopic() +
1746  "'.");
1747  break;
1748  default:
1749  break;
1750  }
1751  break;
1752  case 's':
1753  switch (ackReason_[5])
1754  {
1755  case ' ': // subid in use
1756  throw SubidInUseException("subid in use '" +
1757  (std::string)getSubscriptionId() +
1758  "'.");
1759  break;
1760  case 'e': // sow_delete command only supports one of: filter, sow_keys, bookmark, or data
1761  throw CommandException("sow_delete command only supports one of: filter '" +
1762  (std::string)getFilter() +
1763  "', sow_keys '" +
1764  (std::string)getSowKeys() +
1765  "', bookmark '" +
1766  (std::string)getBookmark() +
1767  "', or data '" +
1768  (std::string)getData() +
1769  "'.");
1770  break;
1771  case 't': // sow store failed
1772  throw PublishException("sow store failed.");
1773  break;
1774  default:
1775  break;
1776  }
1777  break;
1778  case 't':
1779  switch (ackReason_[2])
1780  {
1781  case ' ': // tx store failure
1782  throw PublishException("tx store failure.");
1783  break;
1784  case 'n': // txn replay failed
1785  throw CommandException("txn replay failed for '" +
1786  (std::string)getSubId() +
1787  "'.");
1788  break;
1789  }
1790  break;
1791  default:
1792  break;
1793  }
1794  throw CommandException("Error from server while processing this command: '" +
1795  ackReason_ + "'");
1796  }
1797  };
1798 
1799  inline std::string
1800  operator+(const std::string& lhs, const Message::Field& rhs)
1801  {
1802  return lhs + std::string(rhs);
1803  }
1804 
1805  inline std::basic_ostream<char>&
1806  operator<<(std::basic_ostream<char>& os, const Message::Field& rhs)
1807  {
1808  os.write(rhs.data(), (std::streamsize)rhs.len());
1809  return os;
1810  }
1811  inline bool
1812  AMPS::Field::operator<(const AMPS::Field& rhs) const
1813  {
1814  if (!data())
1815  {
1816  return rhs.data() != NULL;
1817  }
1818  if (!rhs.data())
1819  {
1820  return false;
1821  }
1822  return std::lexicographical_compare(data(), data() + len(), rhs.data(), rhs.data() + rhs.len());
1823  }
1824 
1825 }
1826 
1827 #endif
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:911
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
AMPSDLL amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:1005
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
AMPSDLL amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:866
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:109
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
STL namespace.
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1288
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:849
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
AMPSDLL void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:900
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:833
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1200
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1503
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1072
Message(CtorFlag)
Constructs a new empty, invalid Message.
Definition: Message.hpp:555
AMPSDLL void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:858
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:170
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:885
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:939
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:190
AMPSDLL void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
AMPSDLL void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:766
size_t getLength() const
Return the length of this Options object as a string.
Definition: Message.hpp:1094
AMPSDLL void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:983
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1123
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1059
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:969
AMPSDLL void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:926
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1444
void deepCopy(const Message &rhs_)
Makes self a deep copy of rhs_.
Definition: Message.hpp:586
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:122
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:956
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1030
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:1018
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:565
CtorFlag
A flag to indicate not to create a body.
Definition: Message.hpp:551
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:801
Implementation class for a Message.
Definition: Message.hpp:87
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1043
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1490
Definition: ampsplusplus.hpp:102
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:874
const char * getStr() const
Return this Options object as a non-NULL-terminated string.
Definition: Message.hpp:1103
Message()
Construct a new, empty Message.
Definition: Message.hpp:573
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:815