25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "amps/util.hpp"
28 #include "amps/constants.hpp"
29 #include "amps/amps_generated.h"
35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
39 #define AMPS_USE_FUNCTIONAL 1
42 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=5 )
43 #define AMPS_USE_LAMBDAS 1
46 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=8 )
47 #define AMPS_USE_EMPLACE 1
50 #ifdef AMPS_USE_FUNCTIONAL
60 #define AMPS_OPTIONS_NONE ""
61 #define AMPS_OPTIONS_LIVE "live,"
62 #define AMPS_OPTIONS_OOF "oof,"
63 #define AMPS_OPTIONS_REPLACE "replace,"
64 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
65 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
66 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
67 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
68 #define AMPS_OPTIONS_CANCEL "cancel,"
69 #define AMPS_OPTIONS_RESUME "resume,"
70 #define AMPS_OPTIONS_PAUSE "pause,"
71 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
72 #define AMPS_OPTIONS_EXPIRE "expire,"
73 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
74 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
75 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
79 typedef void* amps_subscription_handle;
93 mutable bool _isIgnoreAutoAck;
94 size_t _bookmarkSeqNo;
95 amps_subscription_handle _subscription;
96 ClientImpl* _clientImpl;
110 bool ignoreAutoAck_ =
false,
size_t bookmarkSeqNo_ = 0,
111 amps_subscription_handle subscription_ = NULL,
112 ClientImpl* clientImpl_ = NULL)
113 : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
114 , _bookmarkSeqNo(bookmarkSeqNo_)
115 , _subscription(subscription_), _clientImpl(clientImpl_)
123 : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
131 if (_owner && _message)
140 return new MessageImpl(copy,
true, _isIgnoreAutoAck, _bookmarkSeqNo,
141 _subscription, _clientImpl);
146 if (_owner && _message)
152 _bookmarkSeqNo = rhs_._bookmarkSeqNo;
153 _subscription = rhs_._subscription;
154 _isIgnoreAutoAck = rhs_._isIgnoreAutoAck;
155 _clientImpl = rhs_._clientImpl;
158 void setClientImpl(ClientImpl* clientImpl_)
160 _clientImpl = clientImpl_;
163 ClientImpl* clientImpl(
void)
const
180 _subscription = NULL;
181 _isIgnoreAutoAck =
false;
193 if (_message == message_)
197 if (_owner && _message)
204 _subscription = NULL;
205 _isIgnoreAutoAck =
false;
214 static unsigned long newId()
216 #if __cplusplus >= 201100L || _MSC_VER >= 1900
217 static std::atomic<uint_fast64_t> _id(0);
218 return (
unsigned long)++_id;
220 static AMPS_ATOMIC_TYPE _id = 0;
221 return (
unsigned long)(AMPS_FETCH_ADD(&_id, 1));
225 void setBookmarkSeqNo(
size_t val_)
227 _bookmarkSeqNo = val_;
230 size_t getBookmarkSeqNo(
void)
const
232 return _bookmarkSeqNo;
235 void setSubscriptionHandle(amps_subscription_handle subscription_)
237 _subscription = subscription_;
240 amps_subscription_handle getSubscriptionHandle(
void)
const
242 return _subscription;
245 void setIgnoreAutoAck()
const
247 _isIgnoreAutoAck =
true;
250 bool getIgnoreAutoAck()
const
252 return _isIgnoreAutoAck;
262 #ifdef DOXYGEN_PREPROCESSOR
264 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
265 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
266 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \
268 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
269 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
270 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
271 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
272 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
273 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.)
277 #define DOX_COMMENTHEAD(s)
278 #define DOX_GROUPNAME(s)
279 #define DOX_OPENGROUP(x)
280 #define DOX_CLOSEGROUP()
281 #define DOX_MAKEGETCOMMENT(x)
282 #define DOX_MAKEGETRAWCOMMENT(x)
283 #define DOX_MAKESETCOMMENT(x)
284 #define DOX_MAKEASSIGNCOMMENT(x)
285 #define DOX_MAKENEWCOMMENT(x)
293 #define AMPS_FIELD(x) \
295 DOX_MAKEGETCOMMENT(x) \
296 Field get##x() const {\
300 amps_message_get_field_value(_body.get().getMessage(),\
301 AMPS_##x, &ptr, &sz);\
302 returnValue.assign(ptr, sz);\
305 DOX_MAKEGETRAWCOMMENT(x) \
306 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
307 amps_message_get_field_value(_body.get().getMessage(),\
308 AMPS_##x, dataptr, sizeptr);\
311 DOX_MAKESETCOMMENT(x) \
312 Message& set##x(const std::string& v) {\
313 amps_message_set_field_value(_body.get().getMessage(),\
314 AMPS_##x, v.c_str(), v.length());\
317 DOX_MAKESETCOMMENT(x) \
318 Message& set##x(amps_uint64_t v) {\
320 AMPS_snprintf_amps_uint64_t(buf,22,v);\
321 amps_message_set_field_value_nts(_body.get().getMessage(),\
325 DOX_MAKEASSIGNCOMMENT(x) \
326 Message& assign##x(const std::string& v) {\
327 amps_message_assign_field_value(_body.get().getMessage(),\
328 AMPS_##x, v.c_str(), v.length());\
331 DOX_MAKEASSIGNCOMMENT(x) \
332 Message& assign##x(const char* data, size_t len) {\
333 amps_message_assign_field_value(_body.get().getMessage(),\
334 AMPS_##x, data, len);\
337 DOX_MAKESETCOMMENT(x) \
338 Message& set##x(const char* str) {\
339 amps_message_set_field_value_nts(_body.get().getMessage(),\
343 DOX_MAKESETCOMMENT(x) \
344 Message& set##x(const char* str,size_t len) {\
345 amps_message_set_field_value(_body.get().getMessage(),\
349 DOX_MAKENEWCOMMENT(x) \
351 char buf[Message::IdentifierLength+1];\
352 buf[Message::IdentifierLength] = 0;\
353 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lu" , (unsigned long)(_body.get().newId()));\
354 amps_message_set_field_value_nts(_body.get().getMessage(),\
360 #define AMPS_FIELD_ALIAS(x,y) \
362 DOX_MAKEGETCOMMENT(y) \
363 Field get##y() const {\
367 amps_message_get_field_value(_body.get().getMessage(),\
368 AMPS_##y, &ptr, &sz);\
369 returnValue.assign(ptr, sz);\
372 DOX_MAKEGETRAWCOMMENT(y) \
373 void getRaw##y(const char** dataptr, size_t* sizeptr) const {\
374 amps_message_get_field_value(_body.get().getMessage(),\
375 AMPS_##y, dataptr, sizeptr);\
378 DOX_MAKESETCOMMENT(y) \
379 Message& set##y(const std::string& v) {\
380 amps_message_set_field_value(_body.get().getMessage(),\
381 AMPS_##y, v.c_str(), v.length());\
384 DOX_MAKESETCOMMENT(y) \
385 Message& set##y(amps_uint64_t v) {\
387 AMPS_snprintf_amps_uint64_t(buf,22,v);\
388 amps_message_set_field_value_nts(_body.get().getMessage(),\
392 DOX_MAKEASSIGNCOMMENT(y) \
393 Message& assign##y(const std::string& v) {\
394 amps_message_assign_field_value(_body.get().getMessage(),\
395 AMPS_##y, v.c_str(), v.length());\
398 DOX_MAKEASSIGNCOMMENT(y) \
399 Message& assign##y(const char* data, size_t len) {\
400 amps_message_assign_field_value(_body.get().getMessage(),\
401 AMPS_##y, data, len);\
404 DOX_MAKESETCOMMENT(y) \
405 Message& set##y(const char* str) {\
406 amps_message_set_field_value_nts(_body.get().getMessage(),\
410 DOX_MAKESETCOMMENT(y) \
411 Message& set##y(const char* str,size_t len) {\
412 amps_message_set_field_value(_body.get().getMessage(),\
416 DOX_MAKENEWCOMMENT(y) \
418 char buf[Message::IdentifierLength+1];\
419 buf[Message::IdentifierLength] = 0;\
420 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\
421 amps_message_set_field_value_nts(_body.get().getMessage(),\
425 DOX_MAKEGETCOMMENT(y) \
426 Field get##x() const {\
430 amps_message_get_field_value(_body.get().getMessage(),\
431 AMPS_##y, &ptr, &sz);\
432 returnValue.assign(ptr, sz);\
435 DOX_MAKEGETRAWCOMMENT(y) \
436 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
437 amps_message_get_field_value(_body.get().getMessage(),\
438 AMPS_##y, dataptr, sizeptr);\
441 DOX_MAKESETCOMMENT(y) \
442 Message& set##x(const std::string& v) {\
443 amps_message_set_field_value(_body.get().getMessage(),\
444 AMPS_##y, v.c_str(), v.length());\
447 DOX_MAKESETCOMMENT(y) \
448 Message& set##x(amps_uint64_t v) {\
450 AMPS_snprintf_amps_uint64_t(buf,22,v);\
451 amps_message_set_field_value_nts(_body.get().getMessage(),\
455 DOX_MAKEASSIGNCOMMENT(y) \
456 Message& assign##x(const std::string& v) {\
457 amps_message_assign_field_value(_body.get().getMessage(),\
458 AMPS_##y, v.c_str(), v.length());\
461 DOX_MAKEASSIGNCOMMENT(y) \
462 Message& assign##x(const char* data, size_t len) {\
463 amps_message_assign_field_value(_body.get().getMessage(),\
464 AMPS_##y, data, len);\
467 DOX_MAKESETCOMMENT(y) \
468 Message& set##x(const char* str) {\
469 amps_message_set_field_value_nts(_body.get().getMessage(),\
473 DOX_MAKESETCOMMENT(y) \
474 Message& set##x(const char* str,size_t len) {\
475 amps_message_set_field_value(_body.get().getMessage(),\
479 DOX_MAKENEWCOMMENT(y) \
481 char buf[Message::IdentifierLength+1];\
482 buf[Message::IdentifierLength] = 0;\
483 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\
484 amps_message_set_field_value_nts(_body.get().getMessage(),\
533 RefHandle<MessageImpl> _body;
581 return Message(_body.get().copy());
588 _body.get().copy(rhs_._body.get());
604 static const char* None(
void)
606 return AMPS_OPTIONS_NONE;
608 static const char* Live(
void)
610 return AMPS_OPTIONS_LIVE;
612 static const char* OOF(
void)
614 return AMPS_OPTIONS_OOF;
616 static const char* Replace(
void)
618 return AMPS_OPTIONS_REPLACE;
620 static const char* NoEmpties(
void)
622 return AMPS_OPTIONS_NOEMPTIES;
624 static const char* SendKeys(
void)
626 return AMPS_OPTIONS_SENDKEYS;
628 static const char* Timestamp(
void)
630 return AMPS_OPTIONS_TIMESTAMP;
632 static const char* NoSowKey(
void)
634 return AMPS_OPTIONS_NOSOWKEY;
636 static const char* Cancel(
void)
638 return AMPS_OPTIONS_CANCEL;
640 static const char* Resume(
void)
642 return AMPS_OPTIONS_RESUME;
644 static const char* Pause(
void)
646 return AMPS_OPTIONS_PAUSE;
648 static const char* FullyDurable(
void)
650 return AMPS_OPTIONS_FULLY_DURABLE;
652 static const char* Expire(
void)
654 return AMPS_OPTIONS_EXPIRE;
656 static std::string Conflation(
const char* conflation_)
659 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
662 static std::string ConflationKey(
const char* conflationKey_)
664 std::string option(
"conflation_key=");
665 option.append(conflationKey_).append(
",");
668 static std::string TopN(
int topN_)
671 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
674 static std::string MaxBacklog(
int maxBacklog_)
677 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
680 static std::string Rate(
const char* rate_)
683 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
686 static std::string RateMaxGap(
const char* rateMaxGap_)
689 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
692 static std::string SkipN(
int skipN_)
695 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
699 static std::string Projection(
const std::string& projection_)
701 return "projection=[" + projection_ +
"],";
704 template<
class Iterator>
705 static std::string Projection(Iterator begin_, Iterator end_)
707 std::string projection =
"projection=[";
708 for (Iterator i = begin_; i != end_; ++i)
713 projection.insert(projection.length() - 1,
"]");
717 static std::string Grouping(
const std::string& grouping_)
719 return "grouping=[" + grouping_ +
"],";
722 template<
class Iterator>
723 static std::string Grouping(Iterator begin_, Iterator end_)
725 std::string grouping =
"grouping=[";
726 for (Iterator i = begin_; i != end_; ++i)
731 grouping.insert(grouping.length() - 1,
"]");
735 static std::string Select(
const std::string& select_)
737 return "select=[" + select_ +
"],";
740 template<
class Iterator>
741 static std::string Select(Iterator begin_, Iterator end_)
743 std::string select =
"select=[";
744 for (Iterator i = begin_; i != end_; ++i)
749 select.insert(select.length() - 1,
"]");
753 static std::string AckConflationInterval(
const std::string& interval_)
755 return "ack_conflation=" + interval_ +
",";
758 static std::string AckConflationInterval(
const char* interval_)
760 static const std::string start(
"ack_conflation=");
761 return start + interval_ +
",";
767 : _optionStr(options_)
773 int getMaxBacklog(
void)
const
777 std::string getConflation(
void)
const
781 std::string getConflationKey(
void)
const
783 return _conflationKey;
785 int getTopN(
void)
const
789 std::string getRate(
void)
const
793 std::string getRateMaxGap(
void)
const
817 _optionStr += AMPS_OPTIONS_LIVE;
826 _optionStr += AMPS_OPTIONS_OOF;
835 _optionStr += AMPS_OPTIONS_REPLACE;
843 _optionStr += AMPS_OPTIONS_NOEMPTIES;
851 _optionStr += AMPS_OPTIONS_SENDKEYS;
860 _optionStr += AMPS_OPTIONS_TIMESTAMP;
868 _optionStr += AMPS_OPTIONS_NOSOWKEY;
876 _optionStr += AMPS_OPTIONS_CANCEL;
887 _optionStr += AMPS_OPTIONS_RESUME;
902 _optionStr += AMPS_OPTIONS_PAUSE;
913 _optionStr += AMPS_OPTIONS_FULLY_DURABLE;
929 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
931 _maxBacklog = maxBacklog_;
942 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
944 _conflation = conflation_;
959 AMPS_snprintf(buf,
sizeof(buf),
"conflation_key=%s,", conflationKey_);
961 _conflationKey = conflationKey_;
972 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
986 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
1008 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
1010 _rateMaxGap = rateMaxGap_;
1021 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
1032 _projection =
"projection=[" + projection_ +
"],";
1033 _optionStr += _projection;
1042 template<
class Iterator>
1045 _projection =
"projection=[";
1046 for (Iterator i = begin_; i != end_; ++i)
1051 _projection.insert(_projection.length() - 1,
"]");
1052 _optionStr += _projection;
1061 _grouping =
"grouping=[" + grouping_ +
"],";
1062 _optionStr += _grouping;
1071 template<
class Iterator>
1074 _grouping =
"grouping=[";
1075 for (Iterator i = begin_; i != end_; ++i)
1080 _grouping.insert(_grouping.length() - 1,
"]");
1081 _optionStr += _grouping;
1087 operator const std::string()
1089 return _optionStr.substr(0, _optionStr.length() - 1);
1096 return (_optionStr.empty() ? 0 : _optionStr.length() - 1);
1105 return (_optionStr.empty() ? 0 : _optionStr.data());
1109 std::string _optionStr;
1111 std::string _conflation;
1112 std::string _conflationKey;
1115 std::string _rateMaxGap;
1117 std::string _projection;
1118 std::string _grouping;
1125 typedef enum :
unsigned
1127 None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1135 switch (end - begin)
1138 return AckType::Stats;
1140 return AckType::Parsed;
1142 return AckType::Received;
1146 case 'e':
return AckType::Persisted;
1147 case 'r':
return AckType::Processed;
1148 case 'o':
return AckType::Completed;
1155 return AckType::None;
1162 unsigned result = AckType::None;
1163 const char* data = NULL;
size_t len = 0;
1165 const char* mark = data;
1166 for (
const char* end = data + len; data != end; ++data)
1185 if (ackType_ < AckTypeConstants<0>::Entries)
1188 AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1193 AMPS_FIELD(BatchSize)
1194 AMPS_FIELD(Bookmark)
1213 SOWAndSubscribe = 256,
1214 DeltaSubscribe = 512,
1215 SOWAndDeltaSubscribe = 1024,
1223 NoDataCommands = Publish | Unsubscribe | Heartbeat | SOWDelete | DeltaPublish
1224 | Logon | StartTimer | StopTimer | Flush
1230 const char* data = NULL;
size_t len = 0;
1234 case 1:
return Command::Publish;
1238 case 's':
return Command::SOW;
1239 case 'o':
return Command::OOF;
1240 case 'a':
return Command::Ack;
1246 case 'l':
return Command::Logon;
1247 case 'f':
return Command::Flush;
1251 return Command::Publish;
1256 case 's':
return Command::Subscribe;
1257 case 'h':
return Command::Heartbeat;
1258 case 'g':
return Command::GroupEnd;
1264 case 'o':
return Command::SOWDelete;
1265 case 't':
return Command::StopTimer;
1271 case 'g':
return Command::GroupBegin;
1272 case 'u':
return Command::Unsubscribe;
1276 return Command::DeltaPublish;
1278 return Command::DeltaSubscribe;
1280 return Command::SOWAndSubscribe;
1282 return Command::SOWAndDeltaSubscribe;
1284 return Command::Unknown;
1291 unsigned command = command_;
1298 CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1302 AMPS_FIELD(CommandId)
1303 AMPS_FIELD(ClientName)
1304 AMPS_FIELD(CorrelationId)
1305 AMPS_FIELD(Expiration)
1307 AMPS_FIELD(GroupSequenceNumber)
1308 AMPS_FIELD(Heartbeat)
1309 AMPS_FIELD(LeasePeriod)
1311 AMPS_FIELD(MessageLength)
1312 AMPS_FIELD(MessageType)
1322 AMPS_Options, &ptr, &sz);
1323 if (sz && ptr[sz - 1] ==
',')
1327 returnValue.assign(ptr, sz);
1331 DOX_MAKEGETRAWCOMMENT(Options)
1335 AMPS_Options, dataptr, sizeptr);
1336 if (*sizeptr && *dataptr && (*dataptr)[*sizeptr - 1] ==
',')
1343 DOX_MAKESETCOMMENT(Options)
1346 size_t sz = v.length();
1347 if (sz && v[sz - 1] ==
',')
1352 AMPS_Options, v.c_str(), sz);
1356 DOX_MAKEASSIGNCOMMENT(Options)
1359 size_t sz = v.length();
1360 if (sz && v[sz - 1] ==
',')
1365 AMPS_Options, v.c_str(), sz);
1369 DOX_MAKEASSIGNCOMMENT(Options)
1372 if (len && data[len - 1] ==
',')
1377 AMPS_Options, data, len);
1381 DOX_MAKESETCOMMENT(Options)
1386 size_t sz = strlen(str);
1387 if (sz && str[sz - 1] ==
',')
1392 AMPS_Options, str, sz);
1397 AMPS_Options, str, 0);
1402 DOX_MAKESETCOMMENT(Options)
1405 if (len && str[len - 1] ==
',')
1410 AMPS_Options, str, len);
1416 AMPS_FIELD(Password)
1417 AMPS_FIELD_ALIAS(QueryId, QueryID)
1419 AMPS_FIELD(RecordsInserted)
1420 AMPS_FIELD(RecordsReturned)
1421 AMPS_FIELD(RecordsUpdated)
1422 AMPS_FIELD(Sequence)
1423 AMPS_FIELD(SowDelete)
1427 AMPS_FIELD_ALIAS(SubId, SubscriptionId)
1428 AMPS_FIELD(SubscriptionIds)
1429 AMPS_FIELD(TimeoutInterval)
1430 AMPS_FIELD(Timestamp)
1450 AMPS_FIELD(TopicMatches)
1451 AMPS_FIELD(TopNRecordsReturned)
1466 returnValue.assign(ptr, sz);
1470 void getRawData(
const char** data,
size_t* sz)
const
1481 Message& assignData(
const std::string& v_)
1495 Message& assignData(
const char* data_,
size_t length_)
1508 Message& assignData(
const char* data_)
1515 return _body.get().getMessage();
1517 void replace(
amps_handle message,
bool owner =
false)
1519 _body.get().replace(message, owner);
1523 _body.get().disown();
1529 bool isValid(
void)
const
1531 return _body.isValid();
1535 _body.get().reset();
1539 void setBookmarkSeqNo(
size_t val)
1541 _body.get().setBookmarkSeqNo(val);
1544 size_t getBookmarkSeqNo()
const
1546 return _body.get().getBookmarkSeqNo();
1551 _body.get().setSubscriptionHandle(val);
1556 return _body.get().getSubscriptionHandle();
1559 void ack(
const char* options_ = NULL)
const;
1561 void setClientImpl(ClientImpl* pClientImpl)
1563 _body.get().setClientImpl(pClientImpl);
1566 void setIgnoreAutoAck()
const
1568 _body.get().setIgnoreAutoAck();
1571 bool getIgnoreAutoAck()
const
1573 return _body.get().getIgnoreAutoAck();
1578 void throwFor(
const T& ,
const std::string& ackReason_)
const
1580 switch (ackReason_[0])
1583 throw AuthenticationException(
"Logon failed for user \"" +
1587 switch (ackReason_.length())
1590 throw BadFilterException(
"bad filter '" +
1597 throw BadSowKeyException(
"bad sow key '" +
1603 throw BadSowKeyException(
"bad sow key '" +
1609 throw BadRegexTopicException(
"bad regex topic '" +
1618 if (ackReason_.length() == 23)
1620 throw DuplicateLogonException(
"Client '" +
1624 "' duplicate logon attempt");
1628 if (ackReason_.length() >= 9)
1630 switch (ackReason_[8])
1633 throw InvalidBookmarkException(
"invalid bookmark '" +
1638 throw CommandException(std::string(
"invalid message type '") +
1643 if (ackReason_[9] ==
'p')
1645 throw InvalidOptionsException(
"invalid options '" +
1649 else if (ackReason_[9] ==
'r')
1651 throw InvalidOrderByException(
"invalid order by '" +
1657 throw InvalidSubIdException(
"invalid subid '" +
1662 if (ackReason_.length() == 13)
1664 throw InvalidTopicException(
"invalid topic '" +
1668 else if (ackReason_.length() == 23)
1670 throw InvalidTopicException(
"invalid topic or filter. Topic '" +
1683 if (ackReason_.length() == 14)
1685 throw LogonRequiredException(
"logon required before command");
1689 switch (ackReason_[4])
1692 throw NameInUseException(
"name in use '" +
1697 throw NotEntitledException(
"User \"" +
1699 "\" not entitled to topic \"" +
1704 throw MissingFieldsException(
"command sent with no filter or bookmark.");
1707 throw MissingFieldsException(
"command sent with no client name.");
1710 throw MissingFieldsException(
"command sent with no topic or filter.");
1713 throw CommandException(
"operation on topic '" +
1715 "' with options '" +
1717 "' not supported.");
1724 switch (ackReason_.length())
1727 throw MissingFieldsException(
"orderby required");
1730 throw CommandException(
"orderby too large '" +
1737 throw CommandException(
"projection clause too large in options '" +
1742 switch (ackReason_[2])
1745 throw BadRegexTopicException(
"'regex topic not supported '" +
1754 switch (ackReason_[5])
1757 throw SubidInUseException(
"subid in use '" +
1762 throw CommandException(
"sow_delete command only supports one of: filter '" +
1773 throw PublishException(
"sow store failed.");
1780 switch (ackReason_[2])
1783 throw PublishException(
"tx store failure.");
1786 throw CommandException(
"txn replay failed for '" +
1795 throw CommandException(
"Error from server while processing this command: '" +
1801 operator+(
const std::string& lhs,
const Message::Field& rhs)
1803 return lhs + std::string(rhs);
1806 inline std::basic_ostream<char>&
1807 operator<<(std::basic_ostream<char>& os,
const Message::Field& rhs)
1809 os.write(rhs.data(), (std::streamsize)rhs.len());
1813 AMPS::Field::operator<(
const AMPS::Field& rhs)
const
1817 return rhs.
data() != NULL;
1823 return std::lexicographical_compare(data(), data() + len(), rhs.
data(), rhs.
data() + rhs.
len());
Defines the AMPS::Field class, which represents the value of a field in a message.
AMPSDLL amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
AMPSDLL void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
AMPSDLL void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
AMPSDLL void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
AMPSDLL void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.
AMPSDLL void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
AMPSDLL amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
AMPSDLL void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
Field represents the value of a single field in a Message.
Definition: Field.hpp:87
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Implementation class for a Message.
Definition: Message.hpp:88
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way.
Definition: Message.hpp:190
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:109
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:170
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:122
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:602
size_t getLength() const
Return the length of this Options object as a string.
Definition: Message.hpp:1094
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:956
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:833
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:801
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:885
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription.
Definition: Message.hpp:1030
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1059
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:866
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:911
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:939
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command.
Definition: Message.hpp:969
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:849
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query.
Definition: Message.hpp:858
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription.
Definition: Message.hpp:1043
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:926
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:1018
const char * getStr() const
Return this Options object as a non-NULL-terminated string.
Definition: Message.hpp:1103
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1072
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:983
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:900
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:766
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:874
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:1005
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:815
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:532
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1453
void getRawTimestamp(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Timestamp header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1430
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
void deepCopy(const Message &rhs_)
Makes self a deep copy of rhs_.
Definition: Message.hpp:586
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:542
Field getOrderBy() const
Retrieves the value of the OrderBy header of the Message as a new Field.
Definition: Message.hpp:1415
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:565
Field getData() const
Returns the data from this message.
Definition: Message.hpp:1460
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
CtorFlag
A flag to indicate not to create a body.
Definition: Message.hpp:551
static const size_t BOOKMARK_NONE
An indicator of no bookmark value.
Definition: Message.hpp:546
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1444
Message & assignOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1357
Field getMessageType() const
Retrieves the value of the MessageType header of the Message as a new Field.
Definition: Message.hpp:1312
Message(CtorFlag)
Constructs a new empty, invalid Message.
Definition: Message.hpp:555
Field getSowKeys() const
Retrieves the value of the SowKeys header of the Message as a new Field.
Definition: Message.hpp:1425
Field getTimestamp() const
Retrieves the value of the Timestamp header of the Message as a new Field.
Definition: Message.hpp:1430
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1306
void getRawOptions(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Options header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1332
static AckType::Type decodeSingleAckType(const char *begin, const char *end)
Decodes a single ack string.
Definition: Message.hpp:1133
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType.
Definition: Message.hpp:1160
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1344
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1424
Message()
Construct a new, empty Message.
Definition: Message.hpp:573
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
Field getClientName() const
Retrieves the value of the ClientName header of the Message as a new Field.
Definition: Message.hpp:1303
Field getTransmissionTime() const
Definition: Message.hpp:1435
Field getSubId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1449
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1490
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1503
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:151
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1124
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1201