25 #ifndef __AMPS_MESSAGE_HPP__ 26 #define __AMPS_MESSAGE_HPP__ 27 #include "amps/util.hpp" 28 #include "amps/constants.hpp" 29 #include "amps/amps_generated.h" 35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1 38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600) 39 #define AMPS_USE_FUNCTIONAL 1 42 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=5 ) 43 #define AMPS_USE_LAMBDAS 1 46 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=8 ) 47 #define AMPS_USE_EMPLACE 1 50 #ifdef AMPS_USE_FUNCTIONAL 60 #define AMPS_OPTIONS_NONE "" 61 #define AMPS_OPTIONS_LIVE "live," 62 #define AMPS_OPTIONS_OOF "oof," 63 #define AMPS_OPTIONS_REPLACE "replace," 64 #define AMPS_OPTIONS_NOEMPTIES "no_empties," 65 #define AMPS_OPTIONS_SENDKEYS "send_keys," 66 #define AMPS_OPTIONS_TIMESTAMP "timestamp," 67 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey," 68 #define AMPS_OPTIONS_CANCEL "cancel," 69 #define AMPS_OPTIONS_RESUME "resume," 70 #define AMPS_OPTIONS_PAUSE "pause," 71 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable," 72 #define AMPS_OPTIONS_EXPIRE "expire," 73 #define AMPS_OPTIONS_TOPN(x) "top_n=##x," 74 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x," 75 #define AMPS_OPTIONS_RATE(x) "rate=##x," 79 typedef void* amps_subscription_handle;
93 mutable bool _isIgnoreAutoAck;
94 size_t _bookmarkSeqNo;
95 amps_subscription_handle _subscription;
96 ClientImpl* _clientImpl;
110 bool ignoreAutoAck_ =
false,
size_t bookmarkSeqNo_ = 0,
111 amps_subscription_handle subscription_ = NULL,
112 ClientImpl* clientImpl_ = NULL)
113 : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
114 , _bookmarkSeqNo(bookmarkSeqNo_)
115 , _subscription(subscription_), _clientImpl(clientImpl_)
123 : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
131 if (_owner && _message)
140 return new MessageImpl(copy,
true, _isIgnoreAutoAck, _bookmarkSeqNo,
141 _subscription, _clientImpl);
146 if (_owner && _message)
152 _bookmarkSeqNo = rhs_._bookmarkSeqNo;
153 _subscription = rhs_._subscription;
154 _isIgnoreAutoAck = rhs_._isIgnoreAutoAck;
155 _clientImpl = rhs_._clientImpl;
158 void setClientImpl(ClientImpl* clientImpl_)
160 _clientImpl = clientImpl_;
163 ClientImpl* clientImpl(
void)
const 180 _subscription = NULL;
181 _isIgnoreAutoAck =
false;
193 if (_message == message_)
197 if (_owner && _message)
204 _subscription = NULL;
205 _isIgnoreAutoAck =
false;
214 static unsigned long newId()
216 #if __cplusplus >= 201100L || _MSC_VER >= 1900 217 static std::atomic<uint_fast64_t> _id(0);
218 return (
unsigned long)++_id;
220 static AMPS_ATOMIC_TYPE _id = 0;
221 return (
unsigned long)(AMPS_FETCH_ADD(&_id, 1));
225 void setBookmarkSeqNo(
size_t val_)
227 _bookmarkSeqNo = val_;
230 size_t getBookmarkSeqNo(
void)
const 232 return _bookmarkSeqNo;
235 void setSubscriptionHandle(amps_subscription_handle subscription_)
237 _subscription = subscription_;
240 amps_subscription_handle getSubscriptionHandle(
void)
const 242 return _subscription;
245 void setIgnoreAutoAck()
const 247 _isIgnoreAutoAck =
true;
250 bool getIgnoreAutoAck()
const 252 return _isIgnoreAutoAck;
262 #ifdef DOXYGEN_PREPROCESSOR 264 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## / 265 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions) 266 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \ 268 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@}) 269 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 270 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 271 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 272 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 273 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.) 277 #define DOX_COMMENTHEAD(s) 278 #define DOX_GROUPNAME(s) 279 #define DOX_OPENGROUP(x) 280 #define DOX_CLOSEGROUP() 281 #define DOX_MAKEGETCOMMENT(x) 282 #define DOX_MAKEGETRAWCOMMENT(x) 283 #define DOX_MAKESETCOMMENT(x) 284 #define DOX_MAKEASSIGNCOMMENT(x) 285 #define DOX_MAKENEWCOMMENT(x) 293 #define AMPS_FIELD(x) \ 295 DOX_MAKEGETCOMMENT(x) \ 296 Field get##x() const {\ 300 amps_message_get_field_value(_body.get().getMessage(),\ 301 AMPS_##x, &ptr, &sz);\ 302 returnValue.assign(ptr, sz);\ 305 DOX_MAKEGETRAWCOMMENT(x) \ 306 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\ 307 amps_message_get_field_value(_body.get().getMessage(),\ 308 AMPS_##x, dataptr, sizeptr);\ 311 DOX_MAKESETCOMMENT(x) \ 312 Message& set##x(const std::string& v) {\ 313 amps_message_set_field_value(_body.get().getMessage(),\ 314 AMPS_##x, v.c_str(), v.length());\ 317 DOX_MAKESETCOMMENT(x) \ 318 Message& set##x(amps_uint64_t v) {\ 320 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 321 amps_message_set_field_value_nts(_body.get().getMessage(),\ 325 DOX_MAKEASSIGNCOMMENT(x) \ 326 Message& assign##x(const std::string& v) {\ 327 amps_message_assign_field_value(_body.get().getMessage(),\ 328 AMPS_##x, v.c_str(), v.length());\ 331 DOX_MAKEASSIGNCOMMENT(x) \ 332 Message& assign##x(const char* data, size_t len) {\ 333 amps_message_assign_field_value(_body.get().getMessage(),\ 334 AMPS_##x, data, len);\ 337 DOX_MAKESETCOMMENT(x) \ 338 Message& set##x(const char* str) {\ 339 amps_message_set_field_value_nts(_body.get().getMessage(),\ 343 DOX_MAKESETCOMMENT(x) \ 344 Message& set##x(const char* str,size_t len) {\ 345 amps_message_set_field_value(_body.get().getMessage(),\ 349 DOX_MAKENEWCOMMENT(x) \ 351 char buf[Message::IdentifierLength+1];\ 352 buf[Message::IdentifierLength] = 0;\ 353 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lu" , (unsigned long)(_body.get().newId()));\ 354 amps_message_set_field_value_nts(_body.get().getMessage(),\ 360 #define AMPS_FIELD_ALIAS(x,y) \ 362 DOX_MAKEGETCOMMENT(y) \ 363 Field get##y() const {\ 367 amps_message_get_field_value(_body.get().getMessage(),\ 368 AMPS_##y, &ptr, &sz);\ 369 returnValue.assign(ptr, sz);\ 372 DOX_MAKEGETRAWCOMMENT(y) \ 373 void getRaw##y(const char** dataptr, size_t* sizeptr) const {\ 374 amps_message_get_field_value(_body.get().getMessage(),\ 375 AMPS_##y, dataptr, sizeptr);\ 378 DOX_MAKESETCOMMENT(y) \ 379 Message& set##y(const std::string& v) {\ 380 amps_message_set_field_value(_body.get().getMessage(),\ 381 AMPS_##y, v.c_str(), v.length());\ 384 DOX_MAKESETCOMMENT(y) \ 385 Message& set##y(amps_uint64_t v) {\ 387 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 388 amps_message_set_field_value_nts(_body.get().getMessage(),\ 392 DOX_MAKEASSIGNCOMMENT(y) \ 393 Message& assign##y(const std::string& v) {\ 394 amps_message_assign_field_value(_body.get().getMessage(),\ 395 AMPS_##y, v.c_str(), v.length());\ 398 DOX_MAKEASSIGNCOMMENT(y) \ 399 Message& assign##y(const char* data, size_t len) {\ 400 amps_message_assign_field_value(_body.get().getMessage(),\ 401 AMPS_##y, data, len);\ 404 DOX_MAKESETCOMMENT(y) \ 405 Message& set##y(const char* str) {\ 406 amps_message_set_field_value_nts(_body.get().getMessage(),\ 410 DOX_MAKESETCOMMENT(y) \ 411 Message& set##y(const char* str,size_t len) {\ 412 amps_message_set_field_value(_body.get().getMessage(),\ 416 DOX_MAKENEWCOMMENT(y) \ 418 char buf[Message::IdentifierLength+1];\ 419 buf[Message::IdentifierLength] = 0;\ 420 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\ 421 amps_message_set_field_value_nts(_body.get().getMessage(),\ 425 DOX_MAKEGETCOMMENT(y) \ 426 Field get##x() const {\ 430 amps_message_get_field_value(_body.get().getMessage(),\ 431 AMPS_##y, &ptr, &sz);\ 432 returnValue.assign(ptr, sz);\ 435 DOX_MAKEGETRAWCOMMENT(y) \ 436 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\ 437 amps_message_get_field_value(_body.get().getMessage(),\ 438 AMPS_##y, dataptr, sizeptr);\ 441 DOX_MAKESETCOMMENT(y) \ 442 Message& set##x(const std::string& v) {\ 443 amps_message_set_field_value(_body.get().getMessage(),\ 444 AMPS_##y, v.c_str(), v.length());\ 447 DOX_MAKESETCOMMENT(y) \ 448 Message& set##x(amps_uint64_t v) {\ 450 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 451 amps_message_set_field_value_nts(_body.get().getMessage(),\ 455 DOX_MAKEASSIGNCOMMENT(y) \ 456 Message& assign##x(const std::string& v) {\ 457 amps_message_assign_field_value(_body.get().getMessage(),\ 458 AMPS_##y, v.c_str(), v.length());\ 461 DOX_MAKEASSIGNCOMMENT(y) \ 462 Message& assign##x(const char* data, size_t len) {\ 463 amps_message_assign_field_value(_body.get().getMessage(),\ 464 AMPS_##y, data, len);\ 467 DOX_MAKESETCOMMENT(y) \ 468 Message& set##x(const char* str) {\ 469 amps_message_set_field_value_nts(_body.get().getMessage(),\ 473 DOX_MAKESETCOMMENT(y) \ 474 Message& set##x(const char* str,size_t len) {\ 475 amps_message_set_field_value(_body.get().getMessage(),\ 479 DOX_MAKENEWCOMMENT(y) \ 481 char buf[Message::IdentifierLength+1];\ 482 buf[Message::IdentifierLength] = 0;\ 483 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lux" , (unsigned long)(_body.get().newId()));\ 484 amps_message_set_field_value_nts(_body.get().getMessage(),\ 533 RefHandle<MessageImpl> _body;
542 static const unsigned int IdentifierLength = 32;
546 static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
581 return Message(_body.get().copy());
588 _body.get().copy(rhs_._body.get());
604 static const char* None(
void)
606 return AMPS_OPTIONS_NONE;
608 static const char* Live(
void)
610 return AMPS_OPTIONS_LIVE;
612 static const char* OOF(
void)
614 return AMPS_OPTIONS_OOF;
616 static const char* Replace(
void)
618 return AMPS_OPTIONS_REPLACE;
620 static const char* NoEmpties(
void)
622 return AMPS_OPTIONS_NOEMPTIES;
624 static const char* SendKeys(
void)
626 return AMPS_OPTIONS_SENDKEYS;
628 static const char* Timestamp(
void)
630 return AMPS_OPTIONS_TIMESTAMP;
632 static const char* NoSowKey(
void)
634 return AMPS_OPTIONS_NOSOWKEY;
636 static const char* Cancel(
void)
638 return AMPS_OPTIONS_CANCEL;
640 static const char* Resume(
void)
642 return AMPS_OPTIONS_RESUME;
644 static const char* Pause(
void)
646 return AMPS_OPTIONS_PAUSE;
648 static const char* FullyDurable(
void)
650 return AMPS_OPTIONS_FULLY_DURABLE;
652 static const char* Expire(
void)
654 return AMPS_OPTIONS_EXPIRE;
656 static std::string Conflation(
const char* conflation_)
659 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
662 static std::string ConflationKey(
const char* conflationKey_)
664 std::string option(
"conflation_key=");
665 option.append(conflationKey_).append(
",");
668 static std::string TopN(
int topN_)
671 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
674 static std::string MaxBacklog(
int maxBacklog_)
677 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
680 static std::string Rate(
const char* rate_)
683 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
686 static std::string RateMaxGap(
const char* rateMaxGap_)
689 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
692 static std::string SkipN(
int skipN_)
695 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
699 static std::string Projection(
const std::string& projection_)
701 return "projection=[" + projection_ +
"],";
704 template<
class Iterator>
705 static std::string Projection(Iterator begin_, Iterator end_)
707 std::string projection =
"projection=[";
708 for (Iterator i = begin_; i != end_; ++i)
713 projection.insert(projection.length() - 1,
"]");
717 static std::string Grouping(
const std::string& grouping_)
719 return "grouping=[" + grouping_ +
"],";
722 template<
class Iterator>
723 static std::string Grouping(Iterator begin_, Iterator end_)
725 std::string grouping =
"grouping=[";
726 for (Iterator i = begin_; i != end_; ++i)
731 grouping.insert(grouping.length() - 1,
"]");
735 static std::string Select(
const std::string& select_)
737 return "select=[" + select_ +
"],";
740 template<
class Iterator>
741 static std::string Select(Iterator begin_, Iterator end_)
743 std::string select =
"select=[";
744 for (Iterator i = begin_; i != end_; ++i)
749 select.insert(select.length() - 1,
"]");
753 static std::string AckConflationInterval(
const std::string& interval_)
755 return "ack_conflation=" + interval_ +
",";
758 static std::string AckConflationInterval(
const char* interval_)
760 static const std::string start(
"ack_conflation=");
761 return start + interval_ +
",";
767 : _optionStr(options_)
773 int getMaxBacklog(
void)
const 777 std::string getConflation(
void)
const 781 std::string getConflationKey(
void)
const 783 return _conflationKey;
785 int getTopN(
void)
const 789 std::string getRate(
void)
const 793 std::string getRateMaxGap(
void)
const 817 _optionStr += AMPS_OPTIONS_LIVE;
826 _optionStr += AMPS_OPTIONS_OOF;
835 _optionStr += AMPS_OPTIONS_REPLACE;
843 _optionStr += AMPS_OPTIONS_NOEMPTIES;
851 _optionStr += AMPS_OPTIONS_SENDKEYS;
860 _optionStr += AMPS_OPTIONS_TIMESTAMP;
868 _optionStr += AMPS_OPTIONS_NOSOWKEY;
876 _optionStr += AMPS_OPTIONS_CANCEL;
887 _optionStr += AMPS_OPTIONS_RESUME;
902 _optionStr += AMPS_OPTIONS_PAUSE;
913 _optionStr += AMPS_OPTIONS_FULLY_DURABLE;
929 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
931 _maxBacklog = maxBacklog_;
942 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
944 _conflation = conflation_;
959 AMPS_snprintf(buf,
sizeof(buf),
"conflation_key=%s,", conflationKey_);
961 _conflationKey = conflationKey_;
972 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
986 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
1008 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
1010 _rateMaxGap = rateMaxGap_;
1021 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
1032 _projection =
"projection=[" + projection_ +
"],";
1033 _optionStr += _projection;
1042 template<
class Iterator>
1045 _projection =
"projection=[";
1046 for (Iterator i = begin_; i != end_; ++i)
1051 _projection.insert(_projection.length() - 1,
"]");
1052 _optionStr += _projection;
1061 _grouping =
"grouping=[" + grouping_ +
"],";
1062 _optionStr += _grouping;
1071 template<
class Iterator>
1074 _grouping =
"grouping=[";
1075 for (Iterator i = begin_; i != end_; ++i)
1080 _grouping.insert(_grouping.length() - 1,
"]");
1081 _optionStr += _grouping;
1087 operator const std::string()
1089 return _optionStr.substr(0, _optionStr.length() - 1);
1096 return (_optionStr.empty() ? 0 : _optionStr.length() - 1);
1105 return (_optionStr.empty() ? 0 : _optionStr.data());
1109 std::string _optionStr;
1111 std::string _conflation;
1112 std::string _conflationKey;
1115 std::string _rateMaxGap;
1117 std::string _projection;
1118 std::string _grouping;
1125 typedef enum :
unsigned 1127 None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1133 static inline
AckType::Type decodeSingleAckType(const
char* begin, const
char* end)
1135 switch (end - begin)
1138 return AckType::Stats;
1140 return AckType::Parsed;
1142 return AckType::Received;
1146 case 'e':
return AckType::Persisted;
1147 case 'r':
return AckType::Processed;
1148 case 'o':
return AckType::Completed;
1155 return AckType::None;
1162 unsigned result = AckType::None;
1163 const char* data = NULL;
size_t len = 0;
1165 const char* mark = data;
1166 for (
const char* end = data + len; data != end; ++data)
1170 result |= decodeSingleAckType(mark, data);
1176 result |= decodeSingleAckType(mark, data);
1185 if (ackType_ < AckTypeConstants<0>::Entries)
1188 AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1193 AMPS_FIELD(BatchSize)
1194 AMPS_FIELD(Bookmark)
1213 SOWAndSubscribe = 256,
1214 DeltaSubscribe = 512,
1215 SOWAndDeltaSubscribe = 1024,
1223 NoDataCommands = Publish | Unsubscribe | Heartbeat | SOWDelete | DeltaPublish
1224 | Logon | StartTimer | StopTimer | Flush
1230 const char* data = NULL;
size_t len = 0;
1234 case 1:
return Command::Publish;
1238 case 's':
return Command::SOW;
1239 case 'o':
return Command::OOF;
1240 case 'a':
return Command::Ack;
1246 case 'l':
return Command::Logon;
1247 case 'f':
return Command::Flush;
1251 return Command::Publish;
1256 case 's':
return Command::Subscribe;
1257 case 'h':
return Command::Heartbeat;
1258 case 'g':
return Command::GroupEnd;
1264 case 'o':
return Command::SOWDelete;
1265 case 't':
return Command::StopTimer;
1271 case 'g':
return Command::GroupBegin;
1272 case 'u':
return Command::Unsubscribe;
1276 return Command::DeltaPublish;
1278 return Command::DeltaSubscribe;
1280 return Command::SOWAndSubscribe;
1282 return Command::SOWAndDeltaSubscribe;
1284 return Command::Unknown;
1291 unsigned command = command_;
1298 CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1302 AMPS_FIELD(CommandId)
1303 AMPS_FIELD(ClientName)
1304 AMPS_FIELD(CorrelationId)
1305 AMPS_FIELD(Expiration)
1307 AMPS_FIELD(GroupSequenceNumber)
1308 AMPS_FIELD(Heartbeat)
1309 AMPS_FIELD(LeasePeriod)
1311 AMPS_FIELD(MessageLength)
1312 AMPS_FIELD(MessageType)
1316 Field getOptions()
const 1322 AMPS_Options, &ptr, &sz);
1323 if (sz && ptr[sz - 1] ==
',')
1327 returnValue.assign(ptr, sz);
1331 DOX_MAKEGETRAWCOMMENT(
Options)
1332 void getRawOptions(const
char** dataptr,
size_t* sizeptr)
const 1335 AMPS_Options, dataptr, sizeptr);
1336 if (*sizeptr && *dataptr && (*dataptr)[*sizeptr - 1] ==
',')
1346 size_t sz = v.length();
1347 if (sz && v[sz - 1] ==
',')
1352 AMPS_Options, v.c_str(), sz);
1356 DOX_MAKEASSIGNCOMMENT(
Options)
1359 size_t sz = v.length();
1360 if (sz && v[sz - 1] ==
',')
1365 AMPS_Options, v.c_str(), sz);
1369 DOX_MAKEASSIGNCOMMENT(
Options)
1370 Message& assignOptions(const
char* data,
size_t len)
1372 if (len && data[len - 1] ==
',')
1377 AMPS_Options, data, len);
1386 size_t sz = strlen(str);
1387 if (sz && str[sz - 1] ==
',')
1392 AMPS_Options, str, sz);
1397 AMPS_Options, str, 0);
1405 if (len && str[len - 1] ==
',')
1410 AMPS_Options, str, len);
1416 AMPS_FIELD(Password)
1417 AMPS_FIELD_ALIAS(QueryId, QueryID)
1419 AMPS_FIELD(RecordsInserted)
1420 AMPS_FIELD(RecordsReturned)
1421 AMPS_FIELD(RecordsUpdated)
1422 AMPS_FIELD(Sequence)
1423 AMPS_FIELD(SowDelete)
1427 AMPS_FIELD_ALIAS(SubId, SubscriptionId)
1428 AMPS_FIELD(SubscriptionIds)
1429 AMPS_FIELD(TimeoutInterval)
1430 AMPS_FIELD(Timestamp)
1437 return getTimestamp();
1446 getRawTimestamp(dataptr, sizeptr);
1450 AMPS_FIELD(TopicMatches)
1451 AMPS_FIELD(TopNRecordsReturned)
1466 returnValue.assign(ptr, sz);
1470 void getRawData(
const char** data,
size_t* sz)
const 1481 Message& assignData(
const std::string& v_)
1495 Message& assignData(
const char* data_,
size_t length_)
1508 Message& assignData(
const char* data_)
1515 return _body.get().getMessage();
1519 _body.get().replace(message, owner);
1523 _body.get().disown();
1529 bool isValid(
void)
const 1531 return _body.isValid();
1535 _body.get().reset();
1539 void setBookmarkSeqNo(
size_t val)
1541 _body.get().setBookmarkSeqNo(val);
1544 size_t getBookmarkSeqNo()
const 1546 return _body.get().getBookmarkSeqNo();
1551 _body.get().setSubscriptionHandle(val);
1556 return _body.get().getSubscriptionHandle();
1559 void ack(
const char* options_ = NULL)
const;
1561 void setClientImpl(ClientImpl* pClientImpl)
1563 _body.get().setClientImpl(pClientImpl);
1566 void setIgnoreAutoAck()
const 1568 _body.get().setIgnoreAutoAck();
1571 bool getIgnoreAutoAck()
const 1573 return _body.get().getIgnoreAutoAck();
1577 void throwFor(
const T& ,
const std::string& ackReason_)
const 1579 switch (ackReason_[0])
1582 throw AuthenticationException(
"Logon failed for user \"" +
1583 (std::string)getUserId() +
"\"");
1586 switch (ackReason_.length())
1589 throw BadFilterException(
"bad filter '" +
1590 (std::string)getFilter() +
1594 if (getSowKeys().len())
1596 throw BadSowKeyException(
"bad sow key '" +
1597 (std::string)getSowKeys() +
1602 throw BadSowKeyException(
"bad sow key '" +
1603 (std::string)getSowKey() +
1608 throw BadRegexTopicException(
"bad regex topic '" +
1609 (std::string)getTopic() +
1617 if (ackReason_.length() == 23)
1619 throw DuplicateLogonException(
"Client '" +
1620 (std::string)getClientName() +
1622 (std::string)getUserId() +
1623 "' duplicate logon attempt");
1627 if (ackReason_.length() >= 9)
1629 switch (ackReason_[8])
1632 throw InvalidBookmarkException(
"invalid bookmark '" +
1633 (std::string)getBookmark() +
1637 throw CommandException(std::string(
"invalid message type '") +
1638 (std::string)getMessageType() +
1642 if (ackReason_[9] ==
'p')
1644 throw InvalidOptionsException(
"invalid options '" +
1645 (std::string)getOptions() +
1648 else if (ackReason_[9] ==
'r')
1650 throw InvalidOrderByException(
"invalid order by '" +
1651 (std::string)getOrderBy() +
1656 throw InvalidSubIdException(
"invalid subid '" +
1657 (std::string)getSubscriptionId() +
1661 if (ackReason_.length() == 13)
1663 throw InvalidTopicException(
"invalid topic '" +
1664 (std::string)getTopic() +
1667 else if (ackReason_.length() == 23)
1669 throw InvalidTopicException(
"invalid topic or filter. Topic '" +
1670 (std::string)getTopic() +
1672 (std::string)getFilter() +
1682 if (ackReason_.length() == 14)
1684 throw LogonRequiredException(
"logon required before command");
1688 switch (ackReason_[4])
1691 throw NameInUseException(
"name in use '" +
1692 (std::string)getClientName() +
1696 throw NotEntitledException(
"User \"" +
1697 (std::string)getUserId() +
1698 "\" not entitled to topic \"" +
1699 (std::string)getTopic() +
1703 throw MissingFieldsException(
"command sent with no filter or bookmark.");
1706 throw MissingFieldsException(
"command sent with no client name.");
1709 throw MissingFieldsException(
"command sent with no topic or filter.");
1712 throw CommandException(
"operation on topic '" +
1713 (std::string)getTopic() +
1714 "' with options '" +
1715 (std::string)getOptions() +
1716 "' not supported.");
1723 switch (ackReason_.length())
1726 throw MissingFieldsException(
"orderby required");
1729 throw CommandException(
"orderby too large '" +
1730 (std::string)getOrderBy() +
1736 throw CommandException(
"projection clause too large in options '" +
1737 (std::string)getOptions() +
1741 switch (ackReason_[2])
1744 throw BadRegexTopicException(
"'regex topic not supported '" +
1745 (std::string)getTopic() +
1753 switch (ackReason_[5])
1756 throw SubidInUseException(
"subid in use '" +
1757 (std::string)getSubscriptionId() +
1761 throw CommandException(
"sow_delete command only supports one of: filter '" +
1762 (std::string)getFilter() +
1764 (std::string)getSowKeys() +
1766 (std::string)getBookmark() +
1768 (std::string)getData() +
1772 throw PublishException(
"sow store failed.");
1779 switch (ackReason_[2])
1782 throw PublishException(
"tx store failure.");
1785 throw CommandException(
"txn replay failed for '" +
1786 (std::string)getSubId() +
1794 throw CommandException(
"Error from server while processing this command: '" +
1802 return lhs + std::string(rhs);
1805 inline std::basic_ostream<char>&
1806 operator<<(std::basic_ostream<char>& os,
const Message::Field& rhs)
1808 os.write(rhs.
data(), (std::streamsize)rhs.
len());
1812 AMPS::Field::operator<(
const AMPS::Field& rhs)
const 1816 return rhs.
data() != NULL;
1822 return std::lexicographical_compare(data(), data() + len(), rhs.
data(), rhs.
data() + rhs.
len());
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:911
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:150
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1476
AMPSDLL amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:1005
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
AMPSDLL amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:841
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:824
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1183
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:866
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:109
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1288
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:849
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:601
AMPSDLL void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:900
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:833
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1200
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1503
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1072
Message(CtorFlag)
Constructs a new empty, invalid Message.
Definition: Message.hpp:555
AMPSDLL void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:858
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:170
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:885
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:939
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:190
AMPSDLL void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
AMPSDLL void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:766
size_t getLength() const
Return the length of this Options object as a string.
Definition: Message.hpp:1094
AMPSDLL void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:983
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1123
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1059
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:969
AMPSDLL void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:926
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1444
void deepCopy(const Message &rhs_)
Makes self a deep copy of rhs_.
Definition: Message.hpp:586
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:122
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:956
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1030
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:1018
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:565
CtorFlag
A flag to indicate not to create a body.
Definition: Message.hpp:551
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:801
Implementation class for a Message.
Definition: Message.hpp:87
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1043
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1490
Definition: ampsplusplus.hpp:102
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:874
const char * getStr() const
Return this Options object as a non-NULL-terminated string.
Definition: Message.hpp:1103
Message()
Construct a new, empty Message.
Definition: Message.hpp:573
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:815