AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.0
Message.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "amps/util.hpp"
28 #include "amps/constants.hpp"
29 #include "amps/amps_generated.h"
30 #include "amps/Field.hpp"
31 #include <stdio.h>
32 #include <algorithm>
33 #include <ostream>
34 #include <string>
35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
36 
37 // Macros for determing what parts of TR1 we can depend on
38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
39  #define AMPS_USE_FUNCTIONAL 1
40 #endif
41 
42 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=5 )
43  #define AMPS_USE_LAMBDAS 1
44 #endif
45 
46 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=8 )
47  #define AMPS_USE_EMPLACE 1
48 #endif
49 
50 #ifdef AMPS_USE_FUNCTIONAL
51  #include <functional>
52 #endif
53 
54 #include <algorithm>
55 
59 
60 #define AMPS_OPTIONS_NONE ""
61 #define AMPS_OPTIONS_LIVE "live,"
62 #define AMPS_OPTIONS_OOF "oof,"
63 #define AMPS_OPTIONS_REPLACE "replace,"
64 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
65 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
66 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
67 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
68 #define AMPS_OPTIONS_CANCEL "cancel,"
69 #define AMPS_OPTIONS_RESUME "resume,"
70 #define AMPS_OPTIONS_PAUSE "pause,"
71 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
72 #define AMPS_OPTIONS_EXPIRE "expire,"
73 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
74 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
75 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
76 
77 namespace AMPS
78 {
79  typedef void* amps_subscription_handle;
80 
81  class ClientImpl;
82 
87  class MessageImpl : public RefBody
88  {
89  private:
90  amps_handle _message;
91  //Mutex _lock;
92  bool _owner;
93  mutable bool _isIgnoreAutoAck;
94  size_t _bookmarkSeqNo;
95  amps_subscription_handle _subscription;
96  ClientImpl* _clientImpl;
97  public:
109  MessageImpl(amps_handle message_, bool owner_ = false,
110  bool ignoreAutoAck_ = false, size_t bookmarkSeqNo_ = 0,
111  amps_subscription_handle subscription_ = NULL,
112  ClientImpl* clientImpl_ = NULL)
113  : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
114  , _bookmarkSeqNo(bookmarkSeqNo_)
115  , _subscription(subscription_), _clientImpl(clientImpl_)
116  {
117  }
118 
123  : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
124  {
125  // try to create one
126  _message = amps_message_create(NULL);
127  }
128 
129  virtual ~MessageImpl()
130  {
131  if (_owner && _message)
132  {
133  amps_message_destroy(_message);
134  }
135  }
136 
137  MessageImpl* copy() const
138  {
139  amps_handle copy = amps_message_copy(_message);
140  return new MessageImpl(copy, true, _isIgnoreAutoAck, _bookmarkSeqNo,
141  _subscription, _clientImpl);
142  }
143 
144  void copy(const MessageImpl& rhs_)
145  {
146  if (_owner && _message)
147  {
148  amps_message_destroy(_message);
149  }
150  _message = amps_message_copy(rhs_._message);
151  _owner = true;
152  _bookmarkSeqNo = rhs_._bookmarkSeqNo;
153  _subscription = rhs_._subscription;
154  _isIgnoreAutoAck = rhs_._isIgnoreAutoAck;
155  _clientImpl = rhs_._clientImpl;
156  }
157 
158  void setClientImpl(ClientImpl* clientImpl_)
159  {
160  _clientImpl = clientImpl_;
161  }
162 
163  ClientImpl* clientImpl(void) const
164  {
165  return _clientImpl;
166  }
167 
171  {
172  return _message;
173  }
174 
175  void reset()
176  {
177  //Lock<Mutex> l(_lock);
178  amps_message_reset(_message);
179  _bookmarkSeqNo = 0;
180  _subscription = NULL;
181  _isIgnoreAutoAck = false;
182  _clientImpl = NULL;
183  }
184 
190  void replace(amps_handle message_, bool owner_ = false)
191  {
192  //Lock<Mutex> l(_lock);
193  if (_message == message_)
194  {
195  return;
196  }
197  if (_owner && _message)
198  {
199  amps_message_destroy(_message);
200  }
201  _owner = owner_;
202  _message = message_;
203  _bookmarkSeqNo = 0;
204  _subscription = NULL;
205  _isIgnoreAutoAck = false;
206  }
207 
208  void disown()
209  {
210  //Lock<Mutex> l(_lock);
211  _owner = false;
212  }
213 
214  static unsigned long newId()
215  {
216 #if __cplusplus >= 201100L || _MSC_VER >= 1900
217  static std::atomic<uint_fast64_t> _id(0);
218  return (unsigned long)++_id;
219 #else
220  static AMPS_ATOMIC_TYPE _id = 0;
221  return (unsigned long)(AMPS_FETCH_ADD(&_id, 1));
222 #endif
223  }
224 
225  void setBookmarkSeqNo(size_t val_)
226  {
227  _bookmarkSeqNo = val_;
228  }
229 
230  size_t getBookmarkSeqNo(void) const
231  {
232  return _bookmarkSeqNo;
233  }
234 
235  void setSubscriptionHandle(amps_subscription_handle subscription_)
236  {
237  _subscription = subscription_;
238  }
239 
240  amps_subscription_handle getSubscriptionHandle(void) const
241  {
242  return _subscription;
243  }
244 
245  void setIgnoreAutoAck() const
246  {
247  _isIgnoreAutoAck = true;
248  }
249 
250  bool getIgnoreAutoAck() const
251  {
252  return _isIgnoreAutoAck;
253  }
254  };
255 
256 
257 // This block of macros works with the Doxygen preprocessor to
258 // create documentation comments for fields defined with the AMPS_FIELD macro.
259 // A C++ compiler removes comments before expanding macros, so these macros
260 // must ONLY be defined for Doxygen and not for actual compilation.
261 
262 #ifdef DOXYGEN_PREPROCESSOR
263 
264 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
265 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
266 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \
267  DOX_GROUPNAME(s)
268 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
269 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a Field which references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
270 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Modifies the passed in arguments to reference the value of the x header of self in the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
271 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
272 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Assigns the value of the x header for this Message without copying. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
273 #define DOX_MAKEASSIGNOWNCOMMENT(x) DOX_COMMENTHEAD( Assigns the value of the x header for this Message without copying and makes this Message responsible for deleting the value. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
274 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.)
275 
276 #else
277 
278 #define DOX_COMMENTHEAD(s)
279 #define DOX_GROUPNAME(s)
280 #define DOX_OPENGROUP(x)
281 #define DOX_CLOSEGROUP()
282 #define DOX_MAKEGETCOMMENT(x)
283 #define DOX_MAKEGETRAWCOMMENT(x)
284 #define DOX_MAKESETCOMMENT(x)
285 #define DOX_MAKEASSIGNCOMMENT(x)
286 #define DOX_MAKEASSIGNOWNCOMMENT(x)
287 #define DOX_MAKENEWCOMMENT(x)
288 
289 #endif
290 
291 // Macro for defining all of the necessary methods for a field in an AMPS
292 // message.
293 
294 
295 #define AMPS_FIELD(x) \
296  DOX_OPENGROUP(x) \
297  DOX_MAKEGETCOMMENT(x) \
298  Field get##x() const {\
299  Field returnValue;\
300  const char* ptr;\
301  size_t sz;\
302  amps_message_get_field_value(_body.get().getMessage(),\
303  AMPS_##x, &ptr, &sz);\
304  returnValue.assign(ptr, sz);\
305  return returnValue;\
306  }\
307  DOX_MAKEGETRAWCOMMENT(x) \
308  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
309  amps_message_get_field_value(_body.get().getMessage(),\
310  AMPS_##x, dataptr, sizeptr);\
311  return;\
312  }\
313  DOX_MAKESETCOMMENT(x) \
314  Message& set##x(const std::string& v) {\
315  amps_message_set_field_value(_body.get().getMessage(),\
316  AMPS_##x, v.c_str(), v.length());\
317  return *this;\
318  }\
319  DOX_MAKESETCOMMENT(x) \
320  Message& set##x(amps_uint64_t v) {\
321  char buf[22];\
322  AMPS_snprintf_amps_uint64_t(buf,22,v);\
323  amps_message_set_field_value_nts(_body.get().getMessage(),\
324  AMPS_##x, buf);\
325  return *this;\
326  }\
327  DOX_MAKEASSIGNCOMMENT(x) \
328  Message& assign##x(const std::string& v) {\
329  amps_message_assign_field_value(_body.get().getMessage(),\
330  AMPS_##x, v.c_str(), v.length());\
331  return *this;\
332  }\
333  DOX_MAKEASSIGNCOMMENT(x) \
334  Message& assign##x(const char* data, size_t len) {\
335  amps_message_assign_field_value(_body.get().getMessage(),\
336  AMPS_##x, data, len);\
337  return *this;\
338  }\
339  DOX_MAKEASSIGNOWNCOMMENT(x) \
340  Message& assignOwnership##x(const Field& f) {\
341  amps_message_assign_field_value_ownership(_body.get().getMessage(),\
342  AMPS_##x, f.data(), f.len());\
343  return *this;\
344  }\
345  DOX_MAKESETCOMMENT(x) \
346  Message& set##x(const char* str) {\
347  amps_message_set_field_value_nts(_body.get().getMessage(),\
348  AMPS_##x, str);\
349  return *this;\
350  }\
351  DOX_MAKESETCOMMENT(x) \
352  Message& set##x(const char* str,size_t len) {\
353  amps_message_set_field_value(_body.get().getMessage(),\
354  AMPS_##x, str,len);\
355  return *this;\
356  }\
357  DOX_MAKENEWCOMMENT(x) \
358  Message& new##x() {\
359  char buf[Message::IdentifierLength+1];\
360  buf[Message::IdentifierLength] = 0;\
361  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lu" , (unsigned long)(_body.get().newId()));\
362  amps_message_set_field_value_nts(_body.get().getMessage(),\
363  AMPS_##x, buf);\
364  return *this;\
365  } \
366  DOX_CLOSEGROUP()
367 
368 #define AMPS_FIELD_ALIAS(x,y) \
369  DOX_OPENGROUP(y) \
370  DOX_MAKEGETCOMMENT(y) \
371  Field get##y() const {\
372  Field returnValue;\
373  const char* ptr;\
374  size_t sz;\
375  amps_message_get_field_value(_body.get().getMessage(),\
376  AMPS_##y, &ptr, &sz);\
377  returnValue.assign(ptr, sz);\
378  return returnValue;\
379  }\
380  DOX_MAKEGETRAWCOMMENT(y) \
381  void getRaw##y(const char** dataptr, size_t* sizeptr) const {\
382  amps_message_get_field_value(_body.get().getMessage(),\
383  AMPS_##y, dataptr, sizeptr);\
384  return;\
385  }\
386  DOX_MAKESETCOMMENT(y) \
387  Message& set##y(const std::string& v) {\
388  amps_message_set_field_value(_body.get().getMessage(),\
389  AMPS_##y, v.c_str(), v.length());\
390  return *this;\
391  }\
392  DOX_MAKESETCOMMENT(y) \
393  Message& set##y(amps_uint64_t v) {\
394  char buf[22];\
395  AMPS_snprintf_amps_uint64_t(buf,22,v);\
396  amps_message_set_field_value_nts(_body.get().getMessage(),\
397  AMPS_##y, buf);\
398  return *this;\
399  }\
400  DOX_MAKEASSIGNCOMMENT(y) \
401  Message& assign##y(const std::string& v) {\
402  amps_message_assign_field_value(_body.get().getMessage(),\
403  AMPS_##y, v.c_str(), v.length());\
404  return *this;\
405  }\
406  DOX_MAKEASSIGNCOMMENT(y) \
407  Message& assign##y(const char* data, size_t len) {\
408  amps_message_assign_field_value(_body.get().getMessage(),\
409  AMPS_##y, data, len);\
410  return *this;\
411  }\
412  DOX_MAKESETCOMMENT(y) \
413  Message& set##y(const char* str) {\
414  amps_message_set_field_value_nts(_body.get().getMessage(),\
415  AMPS_##y, str);\
416  return *this;\
417  }\
418  DOX_MAKESETCOMMENT(y) \
419  Message& set##y(const char* str,size_t len) {\
420  amps_message_set_field_value(_body.get().getMessage(),\
421  AMPS_##y, str,len);\
422  return *this;\
423  }\
424  DOX_MAKENEWCOMMENT(y) \
425  Message& new##y() {\
426  char buf[Message::IdentifierLength+1];\
427  buf[Message::IdentifierLength] = 0;\
428  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\
429  amps_message_set_field_value_nts(_body.get().getMessage(),\
430  AMPS_##y, buf);\
431  return *this;\
432  }\
433  DOX_MAKEGETCOMMENT(y) \
434  Field get##x() const {\
435  Field returnValue;\
436  const char* ptr;\
437  size_t sz;\
438  amps_message_get_field_value(_body.get().getMessage(),\
439  AMPS_##y, &ptr, &sz);\
440  returnValue.assign(ptr, sz);\
441  return returnValue;\
442  }\
443  DOX_MAKEGETRAWCOMMENT(y) \
444  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
445  amps_message_get_field_value(_body.get().getMessage(),\
446  AMPS_##y, dataptr, sizeptr);\
447  return;\
448  }\
449  DOX_MAKESETCOMMENT(y) \
450  Message& set##x(const std::string& v) {\
451  amps_message_set_field_value(_body.get().getMessage(),\
452  AMPS_##y, v.c_str(), v.length());\
453  return *this;\
454  }\
455  DOX_MAKESETCOMMENT(y) \
456  Message& set##x(amps_uint64_t v) {\
457  char buf[22];\
458  AMPS_snprintf_amps_uint64_t(buf,22,v);\
459  amps_message_set_field_value_nts(_body.get().getMessage(),\
460  AMPS_##y, buf);\
461  return *this;\
462  }\
463  DOX_MAKEASSIGNCOMMENT(y) \
464  Message& assign##x(const std::string& v) {\
465  amps_message_assign_field_value(_body.get().getMessage(),\
466  AMPS_##y, v.c_str(), v.length());\
467  return *this;\
468  }\
469  DOX_MAKEASSIGNCOMMENT(y) \
470  Message& assign##x(const char* data, size_t len) {\
471  amps_message_assign_field_value(_body.get().getMessage(),\
472  AMPS_##y, data, len);\
473  return *this;\
474  }\
475  DOX_MAKESETCOMMENT(y) \
476  Message& set##x(const char* str) {\
477  amps_message_set_field_value_nts(_body.get().getMessage(),\
478  AMPS_##y, str);\
479  return *this;\
480  }\
481  DOX_MAKESETCOMMENT(y) \
482  Message& set##x(const char* str,size_t len) {\
483  amps_message_set_field_value(_body.get().getMessage(),\
484  AMPS_##y, str,len);\
485  return *this;\
486  }\
487  DOX_MAKENEWCOMMENT(y) \
488  Message& new##x() {\
489  char buf[Message::IdentifierLength+1];\
490  buf[Message::IdentifierLength] = 0;\
491  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\
492  amps_message_set_field_value_nts(_body.get().getMessage(),\
493  AMPS_##y, buf);\
494  return *this;\
495  } \
496  DOX_CLOSEGROUP()
497 
498 
539  class Message
540  {
541  RefHandle<MessageImpl> _body;
542 
543  Message(MessageImpl* body_) : _body(body_) { ; }
544 
545  public:
546  typedef AMPS::Field Field;
547 
550  static const unsigned int IdentifierLength = 32;
551 
554  static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
555 
559  enum CtorFlag { EMPTY };
560 
563  Message(CtorFlag) : _body()
564  {
565  }
566 
573  Message(amps_handle message_, bool owner_ = false)
574  : _body(new MessageImpl(message_, owner_))
575  {
576  }
577 
581  Message() : _body(new MessageImpl())
582  {
583  }
584 
587  Message deepCopy(void) const
588  {
589  return Message(_body.get().copy());
590  }
591 
594  void deepCopy(const Message& rhs_)
595  {
596  _body.get().copy(rhs_._body.get());
597  }
598 
609  class Options
610  {
611  public:
612  static const char* None(void)
613  {
614  return AMPS_OPTIONS_NONE;
615  }
616  static const char* Live(void)
617  {
618  return AMPS_OPTIONS_LIVE;
619  }
620  static const char* OOF(void)
621  {
622  return AMPS_OPTIONS_OOF;
623  }
624  static const char* Replace(void)
625  {
626  return AMPS_OPTIONS_REPLACE;
627  }
628  static const char* NoEmpties(void)
629  {
630  return AMPS_OPTIONS_NOEMPTIES;
631  }
632  static const char* SendKeys(void)
633  {
634  return AMPS_OPTIONS_SENDKEYS;
635  }
636  static const char* Timestamp(void)
637  {
638  return AMPS_OPTIONS_TIMESTAMP;
639  }
640  static const char* NoSowKey(void)
641  {
642  return AMPS_OPTIONS_NOSOWKEY;
643  }
644  static const char* Cancel(void)
645  {
646  return AMPS_OPTIONS_CANCEL;
647  }
648  static const char* Resume(void)
649  {
650  return AMPS_OPTIONS_RESUME;
651  }
652  static const char* Pause(void)
653  {
654  return AMPS_OPTIONS_PAUSE;
655  }
656  static const char* FullyDurable(void)
657  {
658  return AMPS_OPTIONS_FULLY_DURABLE;
659  }
660  static const char* Expire(void)
661  {
662  return AMPS_OPTIONS_EXPIRE;
663  }
664  static std::string Conflation(const char* conflation_)
665  {
666  char buf[64];
667  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
668  return buf;
669  }
670  static std::string ConflationKey(const char* conflationKey_)
671  {
672  std::string option("conflation_key=");
673  option.append(conflationKey_).append(",");
674  return option;
675  }
676  static std::string TopN(int topN_)
677  {
678  char buf[24];
679  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
680  return buf;
681  }
682  static std::string MaxBacklog(int maxBacklog_)
683  {
684  char buf[24];
685  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
686  return buf;
687  }
688  static std::string Rate(const char* rate_)
689  {
690  char buf[64];
691  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
692  return buf;
693  }
694  static std::string RateMaxGap(const char* rateMaxGap_)
695  {
696  char buf[64];
697  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
698  return buf;
699  }
700  static std::string SkipN(int skipN_)
701  {
702  char buf[24];
703  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
704  return buf;
705  }
706 
707  static std::string Projection(const std::string& projection_)
708  {
709  return "projection=[" + projection_ + "],";
710  }
711 
712  template<class Iterator>
713  static std::string Projection(Iterator begin_, Iterator end_)
714  {
715  std::string projection = "projection=[";
716  for (Iterator i = begin_; i != end_; ++i)
717  {
718  projection += *i;
719  projection += ',';
720  }
721  projection.insert(projection.length() - 1, "]");
722  return projection;
723  }
724 
725  static std::string Grouping(const std::string& grouping_)
726  {
727  return "grouping=[" + grouping_ + "],";
728  }
729 
730  template<class Iterator>
731  static std::string Grouping(Iterator begin_, Iterator end_)
732  {
733  std::string grouping = "grouping=[";
734  for (Iterator i = begin_; i != end_; ++i)
735  {
736  grouping += *i;
737  grouping += ',';
738  }
739  grouping.insert(grouping.length() - 1, "]");
740  return grouping;
741  }
742 
743  static std::string Select(const std::string& select_)
744  {
745  return "select=[" + select_ + "],";
746  }
747 
748  template<class Iterator>
749  static std::string Select(Iterator begin_, Iterator end_)
750  {
751  std::string select = "select=[";
752  for (Iterator i = begin_; i != end_; ++i)
753  {
754  select += *i;
755  select += ',';
756  }
757  select.insert(select.length() - 1, "]");
758  return select;
759  }
760 
761  static std::string AckConflationInterval(const std::string& interval_)
762  {
763  return "ack_conflation=" + interval_ + ",";
764  }
765 
766  static std::string AckConflationInterval(const char* interval_)
767  {
768  static const std::string start("ack_conflation=");
769  return start + interval_ + ",";
770  }
771 
772  static std::string BookmarkNotFound(const char* action_)
773  {
774  static const std::string start("bookmark_not_found=");
775  return start + action_ + ",";
776  }
777 
778  static std::string BookmarkNotFoundNow()
779  {
780  return BookmarkNotFound("now");
781  }
782 
783  static std::string BookmarkNotFoundEpoch()
784  {
785  return BookmarkNotFound("epoch");
786  }
787 
788  static std::string BookmarkNotFoundFail()
789  {
790  return BookmarkNotFound("fail");
791  }
792 
795  Options(std::string options_ = "")
796  : _optionStr(options_)
797  , _maxBacklog(0)
798  , _topN(0)
799  , _skipN(0)
800  {;}
801 
802  int getMaxBacklog(void) const
803  {
804  return _maxBacklog;
805  }
806  std::string getConflation(void) const
807  {
808  return _conflation;
809  }
810  std::string getConflationKey(void) const
811  {
812  return _conflationKey;
813  }
814  int getTopN(void) const
815  {
816  return _topN;
817  }
818  std::string getRate(void) const
819  {
820  return _rate;
821  }
822  std::string getRateMaxGap(void) const
823  {
824  return _rateMaxGap;
825  }
826 
830  void setNone(void)
831  {
832  _optionStr.clear();
833  }
834 
844  void setLive(void)
845  {
846  _optionStr += AMPS_OPTIONS_LIVE;
847  }
848 
853  void setOOF(void)
854  {
855  _optionStr += AMPS_OPTIONS_OOF;
856  }
857 
862  void setReplace(void)
863  {
864  _optionStr += AMPS_OPTIONS_REPLACE;
865  }
866 
870  void setNoEmpties(void)
871  {
872  _optionStr += AMPS_OPTIONS_NOEMPTIES;
873  }
874 
878  void setSendKeys(void)
879  {
880  _optionStr += AMPS_OPTIONS_SENDKEYS;
881  }
882 
887  void setTimestamp(void)
888  {
889  _optionStr += AMPS_OPTIONS_TIMESTAMP;
890  }
891 
895  void setNoSowKey(void)
896  {
897  _optionStr += AMPS_OPTIONS_NOSOWKEY;
898  }
899 
903  void setCancel(void)
904  {
905  _optionStr += AMPS_OPTIONS_CANCEL;
906  }
907 
914  void setResume(void)
915  {
916  _optionStr += AMPS_OPTIONS_RESUME;
917  }
918 
929  void setPause(void)
930  {
931  _optionStr += AMPS_OPTIONS_PAUSE;
932  }
933 
940  void setFullyDurable(void)
941  {
942  _optionStr += AMPS_OPTIONS_FULLY_DURABLE;
943  }
944 
955  void setMaxBacklog(int maxBacklog_)
956  {
957  char buf[24];
958  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
959  _optionStr += buf;
960  _maxBacklog = maxBacklog_;
961  }
962 
968  void setConflation(const char* conflation_)
969  {
970  char buf[64];
971  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
972  _optionStr += buf;
973  _conflation = conflation_;
974  }
975 
985  void setConflationKey(const char* conflationKey_)
986  {
987  char buf[64];
988  AMPS_snprintf(buf, sizeof(buf), "conflation_key=%s,", conflationKey_);
989  _optionStr += buf;
990  _conflationKey = conflationKey_;
991  }
992 
998  void setTopN(int topN_)
999  {
1000  char buf[24];
1001  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
1002  _optionStr += buf;
1003  _topN = topN_;
1004  }
1005 
1012  void setRate(const char* rate_)
1013  {
1014  char buf[64];
1015  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
1016  _optionStr += buf;
1017  _rate = rate_;
1018  }
1019 
1034  void setRateMaxGap(const char* rateMaxGap_)
1035  {
1036  char buf[64];
1037  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
1038  _optionStr += buf;
1039  _rateMaxGap = rateMaxGap_;
1040  }
1041 
1047  void setSkipN(int skipN_)
1048  {
1049  char buf[24];
1050  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
1051  _optionStr += buf;
1052  _skipN = skipN_;
1053  }
1054 
1059  void setProjection(const std::string& projection_)
1060  {
1061  _projection = "projection=[" + projection_ + "],";
1062  _optionStr += _projection;
1063  }
1064 
1065 
1071  template<class Iterator>
1072  void setProjection(Iterator begin_, Iterator end_)
1073  {
1074  _projection = "projection=[";
1075  for (Iterator i = begin_; i != end_; ++i)
1076  {
1077  _projection += *i;
1078  _projection += ',';
1079  }
1080  _projection.insert(_projection.length() - 1, "]");
1081  _optionStr += _projection;
1082  }
1083 
1088  void setGrouping(const std::string& grouping_)
1089  {
1090  _grouping = "grouping=[" + grouping_ + "],";
1091  _optionStr += _grouping;
1092  }
1093 
1094 
1100  template<class Iterator>
1101  void setGrouping(Iterator begin_, Iterator end_)
1102  {
1103  _grouping = "grouping=[";
1104  for (Iterator i = begin_; i != end_; ++i)
1105  {
1106  _grouping += *i;
1107  _grouping += ',';
1108  }
1109  _grouping.insert(_grouping.length() - 1, "]");
1110  _optionStr += _grouping;
1111  }
1112 
1117  void setBookmarkNotFound(const char* action_)
1118  {
1119  _optionStr += BookmarkNotFound(action_);
1120  }
1121 
1126  {
1127  _optionStr += BookmarkNotFoundNow();
1128  }
1129 
1134  {
1135  _optionStr += BookmarkNotFoundEpoch();
1136  }
1137 
1142  {
1143  _optionStr += BookmarkNotFoundFail();
1144  }
1145 
1149  operator const std::string()
1150  {
1151  return _optionStr.substr(0, _optionStr.length() - 1);
1152  }
1156  size_t getLength() const
1157  {
1158  return (_optionStr.empty() ? 0 : _optionStr.length() - 1);
1159  }
1160 
1165  const char* getStr() const
1166  {
1167  return (_optionStr.empty() ? 0 : _optionStr.data());
1168  }
1169 
1170  private:
1171  std::string _optionStr;
1172  int _maxBacklog;
1173  std::string _conflation;
1174  std::string _conflationKey;
1175  int _topN;
1176  std::string _rate;
1177  std::string _rateMaxGap;
1178  int _skipN;
1179  std::string _projection;
1180  std::string _grouping;
1181  };
1182 
1185  struct AckType
1186  {
1187  typedef enum : unsigned
1188  {
1189  None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1190  } Type;
1191  };
1192  AMPS_FIELD(AckType)
1195  static inline AckType::Type decodeSingleAckType(const char* begin, const char* end)
1196  {
1197  switch (end - begin)
1198  {
1199  case 5:
1200  return AckType::Stats;
1201  case 6:
1202  return AckType::Parsed;
1203  case 8:
1204  return AckType::Received;
1205  case 9:
1206  switch (begin[1])
1207  {
1208  case 'e': return AckType::Persisted;
1209  case 'r': return AckType::Processed;
1210  case 'o': return AckType::Completed;
1211  default: break;
1212  }
1213  break;
1214  default:
1215  break;
1216  }
1217  return AckType::None;
1218  }
1222  unsigned getAckTypeEnum() const
1223  {
1224  unsigned result = AckType::None;
1225  const char* data = NULL; size_t len = 0;
1226  amps_message_get_field_value(_body.get().getMessage(), AMPS_AckType, &data, &len);
1227  const char* mark = data;
1228  for (const char* end = data + len; data != end; ++data)
1229  {
1230  if (*data == ',')
1231  {
1232  result |= decodeSingleAckType(mark, data);
1233  mark = data + 1;
1234  }
1235  }
1236  if (mark < data)
1237  {
1238  result |= decodeSingleAckType(mark, data);
1239  }
1240  return result;
1241  }
1245  Message& setAckTypeEnum(unsigned ackType_)
1246  {
1247  if (ackType_ < AckTypeConstants<0>::Entries)
1248  {
1249  amps_message_assign_field_value(_body.get().getMessage(), AMPS_AckType,
1250  AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1251  }
1252  return *this;
1253  }
1254 
1255  AMPS_FIELD(BatchSize)
1256  AMPS_FIELD(Bookmark)
1257  AMPS_FIELD(Command)
1258 
1262  struct Command
1263  {
1264  typedef enum
1265  {
1266  Unknown = 0,
1267  Publish = 1,
1268  Subscribe = 2,
1269  Unsubscribe = 4,
1270  SOW = 8,
1271  Heartbeat = 16,
1272  SOWDelete = 32,
1273  DeltaPublish = 64,
1274  Logon = 128,
1275  SOWAndSubscribe = 256,
1276  DeltaSubscribe = 512,
1277  SOWAndDeltaSubscribe = 1024,
1278  StartTimer = 2048,
1279  StopTimer = 4096,
1280  GroupBegin = 8192,
1281  GroupEnd = 16384,
1282  OOF = 32768,
1283  Ack = 65536,
1284  Flush = 131072,
1285  NoDataCommands = Publish | Unsubscribe | Heartbeat | SOWDelete | DeltaPublish
1286  | Logon | StartTimer | StopTimer | Flush
1287  } Type;
1288  };
1290  Command::Type getCommandEnum() const
1291  {
1292  const char* data = NULL; size_t len = 0;
1293  amps_message_get_field_value(_body.get().getMessage(), AMPS_Command, &data, &len);
1294  switch (len)
1295  {
1296  case 1: return Command::Publish; // -V1037
1297  case 3:
1298  switch (data[0])
1299  {
1300  case 's': return Command::SOW;
1301  case 'o': return Command::OOF;
1302  case 'a': return Command::Ack;
1303  }
1304  break;
1305  case 5:
1306  switch (data[0])
1307  {
1308  case 'l': return Command::Logon;
1309  case 'f': return Command::Flush;
1310  }
1311  break;
1312  case 7:
1313  return Command::Publish; // -V1037
1314  break;
1315  case 9:
1316  switch (data[0])
1317  {
1318  case 's': return Command::Subscribe;
1319  case 'h': return Command::Heartbeat;
1320  case 'g': return Command::GroupEnd;
1321  }
1322  break;
1323  case 10:
1324  switch (data[1])
1325  {
1326  case 'o': return Command::SOWDelete;
1327  case 't': return Command::StopTimer;
1328  }
1329  break;
1330  case 11:
1331  switch (data[0])
1332  {
1333  case 'g': return Command::GroupBegin;
1334  case 'u': return Command::Unsubscribe;
1335  }
1336  break;
1337  case 13:
1338  return Command::DeltaPublish;
1339  case 15:
1340  return Command::DeltaSubscribe;
1341  case 17:
1342  return Command::SOWAndSubscribe;
1343  case 23:
1344  return Command::SOWAndDeltaSubscribe;
1345  }
1346  return Command::Unknown;
1347  }
1348 
1350  Message& setCommandEnum(Command::Type command_)
1351  {
1352  unsigned bits = 0;
1353  unsigned command = command_;
1354  while (command > 0)
1355  {
1356  ++bits;
1357  command >>= 1;
1358  }
1359  amps_message_assign_field_value(_body.get().getMessage(), AMPS_Command,
1360  CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1361  return *this;
1362  }
1363 
1364  AMPS_FIELD(CommandId)
1365  AMPS_FIELD(ClientName)
1366  AMPS_FIELD(CorrelationId)
1367  AMPS_FIELD(Expiration)
1368  AMPS_FIELD(Filter)
1369  AMPS_FIELD(GroupSequenceNumber)
1370  AMPS_FIELD(Heartbeat)
1371  AMPS_FIELD(LeasePeriod)
1372  AMPS_FIELD(Matches)
1373  AMPS_FIELD(MessageLength)
1374  AMPS_FIELD(MessageType)
1375 
1376  DOX_OPENGROUP(Options)
1377  DOX_MAKEGETCOMMENT(Options)
1378  Field getOptions() const
1379  {
1380  Field returnValue;
1381  const char* ptr;
1382  size_t sz;
1383  amps_message_get_field_value(_body.get().getMessage(),
1384  AMPS_Options, &ptr, &sz);
1385  if (sz && ptr[sz - 1] == ',')
1386  {
1387  --sz;
1388  }
1389  returnValue.assign(ptr, sz);
1390  return returnValue;
1391  }
1392 
1393  DOX_MAKEGETRAWCOMMENT(Options)
1394  void getRawOptions(const char** dataptr, size_t* sizeptr) const
1395  {
1396  amps_message_get_field_value(_body.get().getMessage(),
1397  AMPS_Options, dataptr, sizeptr);
1398  if (*sizeptr && *dataptr && (*dataptr)[*sizeptr - 1] == ',')
1399  {
1400  --*sizeptr;
1401  }
1402  return;
1403  }
1404 
1405  DOX_MAKESETCOMMENT(Options)
1406  Message& setOptions(const std::string& v)
1407  {
1408  size_t sz = v.length();
1409  if (sz && v[sz - 1] == ',')
1410  {
1411  --sz;
1412  }
1413  amps_message_set_field_value(_body.get().getMessage(),
1414  AMPS_Options, v.c_str(), sz);
1415  return *this;
1416  }
1417 
1418  DOX_MAKEASSIGNCOMMENT(Options)
1419  Message& assignOptions(const std::string& v)
1420  {
1421  size_t sz = v.length();
1422  if (sz && v[sz - 1] == ',')
1423  {
1424  --sz;
1425  }
1426  amps_message_assign_field_value(_body.get().getMessage(),
1427  AMPS_Options, v.c_str(), sz);
1428  return *this;
1429  }
1430 
1431  DOX_MAKEASSIGNCOMMENT(Options)
1432  Message& assignOptions(const char* data, size_t len)
1433  {
1434  if (len && data[len - 1] == ',')
1435  {
1436  --len;
1437  }
1438  amps_message_assign_field_value(_body.get().getMessage(),
1439  AMPS_Options, data, len);
1440  return *this;
1441  }
1442 
1443  DOX_MAKESETCOMMENT(Options)
1444  Message& setOptions(const char* str)
1445  {
1446  if (str)
1447  {
1448  size_t sz = strlen(str);
1449  if (sz && str[sz - 1] == ',')
1450  {
1451  --sz;
1452  }
1453  amps_message_set_field_value(_body.get().getMessage(),
1454  AMPS_Options, str, sz);
1455  }
1456  else
1457  {
1458  amps_message_set_field_value(_body.get().getMessage(),
1459  AMPS_Options, str, 0);
1460  }
1461  return *this;
1462  }
1463 
1464  DOX_MAKESETCOMMENT(Options)
1465  Message& setOptions(const char* str, size_t len)
1466  {
1467  if (len && str[len - 1] == ',')
1468  {
1469  --len;
1470  }
1471  amps_message_set_field_value(_body.get().getMessage(),
1472  AMPS_Options, str, len);
1473  return *this;
1474  }
1475  DOX_CLOSEGROUP()
1476 
1477  AMPS_FIELD(OrderBy)
1478  AMPS_FIELD(Password)
1479  AMPS_FIELD_ALIAS(QueryId, QueryID)
1480  AMPS_FIELD(Reason)
1481  AMPS_FIELD(RecordsInserted)
1482  AMPS_FIELD(RecordsReturned)
1483  AMPS_FIELD(RecordsUpdated)
1484  AMPS_FIELD(Sequence)
1485  AMPS_FIELD(SowDelete)
1486  AMPS_FIELD(SowKey)
1487  AMPS_FIELD(SowKeys)
1488  AMPS_FIELD(Status)
1489  AMPS_FIELD_ALIAS(SubId, SubscriptionId) // -V524
1490  AMPS_FIELD(SubscriptionIds)
1491  AMPS_FIELD(TimeoutInterval)
1492  AMPS_FIELD(Timestamp)
1493 
1497  Field getTransmissionTime() const
1498  {
1499  return getTimestamp();
1500  }
1501 
1506  void getRawTransmissionTime(const char** dataptr, size_t* sizeptr) const
1507  {
1508  getRawTimestamp(dataptr, sizeptr);
1509  }
1510 
1511  AMPS_FIELD(Topic)
1512  AMPS_FIELD(TopicMatches)
1513  AMPS_FIELD(TopNRecordsReturned)
1514  AMPS_FIELD(Version)
1515  AMPS_FIELD(UserId)
1516 
1521 
1522  Field getData() const
1523  {
1524  Field returnValue;
1525  char* ptr;
1526  size_t sz;
1527  amps_message_get_data(_body.get().getMessage(), &ptr, &sz);
1528  returnValue.assign(ptr, sz);
1529  return returnValue;
1530  }
1531 
1532  void getRawData(const char** data, size_t* sz) const
1533  {
1534  amps_message_get_data(_body.get().getMessage(), (char**)data, sz);
1535  }
1538  Message& setData(const std::string& v_)
1539  {
1540  amps_message_set_data(_body.get().getMessage(), v_.c_str(), v_.length());
1541  return *this;
1542  }
1543  Message& assignData(const std::string& v_)
1544  {
1545  amps_message_assign_data(_body.get().getMessage(), v_.c_str(), v_.length());
1546  return *this;
1547  }
1548 
1552  Message& setData(const char* data_, size_t length_)
1553  {
1554  amps_message_set_data(_body.get().getMessage(), data_, length_);
1555  return *this;
1556  }
1557  Message& assignData(const char* data_, size_t length_)
1558  {
1559  amps_message_assign_data(_body.get().getMessage(), data_, length_);
1560  return *this;
1561  }
1562 
1565  Message& setData(const char* data_)
1566  {
1567  amps_message_set_data_nts(_body.get().getMessage(), data_);
1568  return *this;
1569  }
1570  Message& assignData(const char* data_)
1571  {
1572  amps_message_assign_data(_body.get().getMessage(), data_, strlen(data_));
1573  return *this;
1574  }
1575  amps_handle getMessage() const
1576  {
1577  return _body.get().getMessage();
1578  }
1579  void replace(amps_handle message, bool owner = false)
1580  {
1581  _body.get().replace(message, owner);
1582  }
1583  void disown()
1584  {
1585  _body.get().disown();
1586  }
1587  void invalidate()
1588  {
1589  _body = NULL;
1590  }
1591  bool isValid(void) const
1592  {
1593  return _body.isValid();
1594  }
1595  Message& reset()
1596  {
1597  _body.get().reset();
1598  return *this;
1599  }
1600 
1601  void setBookmarkSeqNo(size_t val)
1602  {
1603  _body.get().setBookmarkSeqNo(val);
1604  }
1605 
1606  size_t getBookmarkSeqNo() const
1607  {
1608  return _body.get().getBookmarkSeqNo();
1609  }
1610 
1611  void setSubscriptionHandle(amps_handle val)
1612  {
1613  _body.get().setSubscriptionHandle(val);
1614  }
1615 
1616  amps_handle getSubscriptionHandle() const
1617  {
1618  return _body.get().getSubscriptionHandle();
1619  }
1620 
1621  void ack(const char* options_ = NULL) const;
1622 
1623  void setClientImpl(ClientImpl* pClientImpl)
1624  {
1625  _body.get().setClientImpl(pClientImpl);
1626  }
1627 
1628  void setIgnoreAutoAck() const
1629  {
1630  _body.get().setIgnoreAutoAck();
1631  }
1632 
1633  bool getIgnoreAutoAck() const
1634  {
1635  return _body.get().getIgnoreAutoAck();
1636  }
1637 
1638  // static
1639  template <class T>
1640  void throwFor(const T& /*context_*/, const std::string& ackReason_) const
1641  {
1642  switch (ackReason_[0])
1643  {
1644  case 'a': // auth failure
1645  throw AuthenticationException("Logon failed for user \"" +
1646  (std::string)getUserId() + "\"");
1647  break;
1648  case 'b':
1649  switch (ackReason_.length())
1650  {
1651  case 10: // bad filter
1652  throw BadFilterException("bad filter '" +
1653  (std::string)getFilter() +
1654  "'");
1655  break;
1656  case 11: // bad sow key
1657  if (getSowKeys().len())
1658  {
1659  throw BadSowKeyException("bad sow key '" +
1660  (std::string)getSowKeys() +
1661  "'");
1662  }
1663  else
1664  {
1665  throw BadSowKeyException("bad sow key '" +
1666  (std::string)getSowKey() +
1667  "'");
1668  }
1669  break;
1670  case 15: // bad regex topic
1671  throw BadRegexTopicException("bad regex topic '" +
1672  (std::string)getTopic() +
1673  "'.");
1674  break;
1675  default:
1676  break;
1677  }
1678  break;
1679  case 'd':
1680  if (ackReason_.length() == 23) // duplicate logon attempt
1681  {
1682  throw DuplicateLogonException("Client '" +
1683  (std::string)getClientName() +
1684  "' with userid '" +
1685  (std::string)getUserId() +
1686  "' duplicate logon attempt");
1687  }
1688  break;
1689  case 'i':
1690  if (ackReason_.length() >= 9)
1691  {
1692  switch (ackReason_[8])
1693  {
1694  case 'b': // invalid bookmark
1695  throw InvalidBookmarkException("invalid bookmark '" +
1696  (std::string)getBookmark() +
1697  "'.");
1698  break;
1699  case 'm': // invalid message type
1700  throw CommandException(std::string("invalid message type '") +
1701  (std::string)getMessageType() +
1702  "'.");
1703  break;
1704  case 'o':
1705  if (ackReason_[9] == 'p') // invalid options
1706  {
1707  throw InvalidOptionsException("invalid options '" +
1708  (std::string)getOptions() +
1709  "'.");
1710  }
1711  else if (ackReason_[9] == 'r') // invalid order by
1712  {
1713  throw InvalidOrderByException("invalid order by '" +
1714  (std::string)getOrderBy() +
1715  "'.");
1716  }
1717  break;
1718  case 's': // invalid subId
1719  throw InvalidSubIdException("invalid subid '" +
1720  (std::string)getSubscriptionId() +
1721  "'.");
1722  break;
1723  case 't':
1724  if (ackReason_.length() == 13) // invalid topic
1725  {
1726  throw InvalidTopicException("invalid topic '" +
1727  (std::string)getTopic() +
1728  "'.");
1729  }
1730  else if (ackReason_.length() == 23) // invalid topic or filter
1731  {
1732  throw InvalidTopicException("invalid topic or filter. Topic '" +
1733  (std::string)getTopic() +
1734  "' Filter '" +
1735  (std::string)getFilter() +
1736  "'.");
1737  }
1738  break;
1739  default:
1740  break;
1741  }
1742  }
1743  break;
1744  case 'l':
1745  if (ackReason_.length() == 14) // logon required
1746  {
1747  throw LogonRequiredException("logon required before command");
1748  }
1749  break;
1750  case 'n':
1751  switch (ackReason_[4])
1752  {
1753  case ' ': // name in use
1754  throw NameInUseException("name in use '" +
1755  (std::string)getClientName() +
1756  "'.");
1757  break;
1758  case 'e': // not entitled
1759  throw NotEntitledException("User \"" +
1760  (std::string)getUserId() +
1761  "\" not entitled to topic \"" +
1762  (std::string)getTopic() +
1763  "\".");
1764  break;
1765  case 'i': // no filter or bookmark
1766  throw MissingFieldsException("command sent with no filter or bookmark.");
1767  break;
1768  case 'l': // no client name
1769  throw MissingFieldsException("command sent with no client name.");
1770  break;
1771  case 'o': // no topic or filter
1772  throw MissingFieldsException("command sent with no topic or filter.");
1773  break;
1774  case 's': // not supported
1775  throw CommandException("operation on topic '" +
1776  (std::string)getTopic() +
1777  "' with options '" +
1778  (std::string)getOptions() +
1779  "' not supported.");
1780  break;
1781  default:
1782  break;
1783  }
1784  break;
1785  case 'o':
1786  switch (ackReason_.length())
1787  {
1788  case 16: // orderby required
1789  throw MissingFieldsException("orderby required");
1790  break;
1791  case 17: // orderby too large
1792  throw CommandException("orderby too large '" +
1793  (std::string)getOrderBy() +
1794  "'.");
1795  break;
1796  }
1797  break;
1798  case 'p':
1799  throw CommandException("projection clause too large in options '" +
1800  (std::string)getOptions() +
1801  "'.");
1802  break;
1803  case 'r':
1804  switch (ackReason_[2])
1805  {
1806  case 'g': // regex topic not supported
1807  throw BadRegexTopicException("'regex topic not supported '" +
1808  (std::string)getTopic() +
1809  "'.");
1810  break;
1811  default:
1812  break;
1813  }
1814  break;
1815  case 's':
1816  switch (ackReason_[5])
1817  {
1818  case ' ': // subid in use
1819  throw SubidInUseException("subid in use '" +
1820  (std::string)getSubscriptionId() +
1821  "'.");
1822  break;
1823  case 'e': // sow_delete command only supports one of: filter, sow_keys, bookmark, or data
1824  throw CommandException("sow_delete command only supports one of: filter '" +
1825  (std::string)getFilter() +
1826  "', sow_keys '" +
1827  (std::string)getSowKeys() +
1828  "', bookmark '" +
1829  (std::string)getBookmark() +
1830  "', or data '" +
1831  (std::string)getData() +
1832  "'.");
1833  break;
1834  case 't': // sow store failed
1835  throw PublishException("sow store failed.");
1836  break;
1837  default:
1838  break;
1839  }
1840  break;
1841  case 't':
1842  switch (ackReason_[2])
1843  {
1844  case ' ': // tx store failure
1845  throw PublishException("tx store failure.");
1846  break;
1847  case 'n': // txn replay failed
1848  throw CommandException("txn replay failed for '" +
1849  (std::string)getSubId() +
1850  "'.");
1851  break;
1852  }
1853  break;
1854  default:
1855  break;
1856  }
1857  throw CommandException("Error from server while processing this command: '" +
1858  ackReason_ + "'");
1859  }
1860  };
1861 
1862  inline std::string
1863  operator+(const std::string& lhs, const Message::Field& rhs)
1864  {
1865  return lhs + std::string(rhs);
1866  }
1867 
1868  inline std::basic_ostream<char>&
1869  operator<<(std::basic_ostream<char>& os, const Message::Field& rhs)
1870  {
1871  os.write(rhs.data(), (std::streamsize)rhs.len());
1872  return os;
1873  }
1874  inline bool
1875  AMPS::Field::operator<(const AMPS::Field& rhs) const
1876  {
1877  if (!data())
1878  {
1879  return rhs.data() != NULL;
1880  }
1881  if (!rhs.data())
1882  {
1883  return false;
1884  }
1885  return std::lexicographical_compare(data(), data() + len(), rhs.data(), rhs.data() + rhs.len());
1886  }
1887 
1888 }
1889 
1890 #endif
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:940
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1538
AMPSDLL amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:1034
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1290
AMPSDLL amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setBookmarkNotFoundNow()
Set the option for the action to take if the requested bookmark to start the subscription is not foun...
Definition: Message.hpp:1125
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:870
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:587
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:853
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1245
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:895
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:109
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
STL namespace.
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1350
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:878
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:609
AMPSDLL void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:929
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:862
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1262
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1565
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1101
Message(CtorFlag)
Constructs a new empty, invalid Message.
Definition: Message.hpp:563
AMPSDLL void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:887
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:170
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:914
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:968
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:190
AMPSDLL void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
AMPSDLL void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:795
size_t getLength() const
Return the length of this Options object as a string.
Definition: Message.hpp:1156
AMPSDLL void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:1012
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1185
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1088
void setBookmarkNotFoundEpoch()
Set the option for the action to take if the requested bookmark to start the subscription is not foun...
Definition: Message.hpp:1133
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:998
AMPSDLL void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:955
void setBookmarkNotFoundFail()
Set the option for the action to take if the requested bookmark to start the subscription is not foun...
Definition: Message.hpp:1141
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1506
void deepCopy(const Message &rhs_)
Makes self a deep copy of rhs_.
Definition: Message.hpp:594
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:122
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
void setBookmarkNotFound(const char *action_)
Set the option for the action to take if the requested bookmark to start the subscription is ot found...
Definition: Message.hpp:1117
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:985
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1059
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:1047
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:573
CtorFlag
A flag to indicate not to create a body.
Definition: Message.hpp:559
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:830
Implementation class for a Message.
Definition: Message.hpp:87
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1072
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1222
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1552
Definition: ampsplusplus.hpp:103
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:903
const char * getStr() const
Return this Options object as a non-NULL-terminated string.
Definition: Message.hpp:1165
Message()
Construct a new, empty Message.
Definition: Message.hpp:581
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:844