26 #ifndef _MEMORYBOOKMARKSTORE_H_ 27 #define _MEMORYBOOKMARKSTORE_H_ 29 #include <amps/BookmarkStore.hpp> 41 #define AMPS_MIN_BOOKMARK_LEN 3 42 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL 63 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
64 typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
65 typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
67 typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
73 : _current(1), _currentBase(0), _least(1), _leastBase(0)
74 , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
75 , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
76 , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
81 _store->resize(_id, (
char**)&_entries,
82 sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE,
false);
83 setLastPersistedToEpoch();
88 Lock<Mutex> guard(_subLock);
91 for (
size_t i = 0; i < _entriesLength; ++i)
93 _entries[i]._val.clear();
96 _store->resize(_id, (
char**)&_entries, 0);
100 _lastPersisted.clear();
103 _recoveryTimestamp.clear();
112 Lock<Mutex> guard(_subLock);
114 size_t index = recover(bookmark_,
true);
115 if (index == AMPS_UNSET_INDEX)
118 if (_current >= _entriesLength)
121 _currentBase += _entriesLength;
125 if ((_current == _least && _leastBase < _currentBase) ||
126 (_current == _recoveryMin && _recoveryBase < _currentBase))
128 if (!_store->resize(_id, (
char**)&_entries,
129 sizeof(Entry) * _entriesLength * 2))
132 return log(bookmark_);
165 bool isRange = BookmarkRange::isRange(bookmark_);
166 bool startExclusive =
false;
169 _range.set(bookmark_);
171 if (!_range.isValid())
174 throw CommandException(
"Invalid bookmark range specified.");
176 startExclusive = !_range.isStartInclusive();
178 if (isRange || _publishers.empty())
181 Message::Field parseable = isRange ? _range.getStart() : bookmark_;
182 amps_uint64_t publisher, sequence;
183 std::vector<Field> bmList = Field::parseBookmarkList(parseable);
184 if (bmList.empty() && !Field::isTimestamp(parseable))
192 for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
194 parseBookmark(*bkmk, publisher, sequence);
195 if (publisher != (amps_uint64_t)0)
197 if (isRange && startExclusive)
200 PublisherIterator pub = _publishers.find(publisher);
201 if (pub == _publishers.end() || pub->second < sequence)
203 _publishers[publisher] = sequence;
207 else if (!Field::isTimestamp(*bkmk))
224 _entries[_current]._val.deepCopy(bookmark_);
228 Unlock<Mutex> unlock(_subLock);
229 _store->updateAdapter(
this);
234 _entries[_current]._active =
true;
237 return index + _currentBase;
242 Lock<Mutex> guard(_subLock);
243 return _discard(index_);
253 Lock<Mutex> guard(_subLock);
254 size_t search = _least;
255 size_t searchBase = _leastBase;
256 size_t searchMax = _current;
257 size_t searchMaxBase = _currentBase;
258 if (_least + _leastBase == _current + _currentBase)
260 if (_recoveryMin != AMPS_UNSET_INDEX)
262 search = _recoveryMin;
263 searchBase = _recoveryBase;
264 searchMax = _recoveryMax;
265 searchMaxBase = _recoveryMaxBase;
272 assert(searchMax != AMPS_UNSET_INDEX);
273 assert(searchMaxBase != AMPS_UNSET_INDEX);
274 assert(search != AMPS_UNSET_INDEX);
275 assert(searchBase != AMPS_UNSET_INDEX);
277 while (search + searchBase < searchMax + searchMaxBase)
279 if (_entries[search]._val == bookmark_)
281 return _discard(search + searchBase);
283 if (++search == _entriesLength)
286 searchBase += _entriesLength;
295 amps_uint64_t& publisherId_,
296 amps_uint64_t& sequenceNumber_)
298 Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
305 Lock<Mutex> guard(_subLock);
306 if (BookmarkRange::isRange(bookmark_))
311 amps_uint64_t publisher, sequence;
312 parseBookmark(bookmark_, publisher, sequence);
316 if (publisher == 0 && !Field::isTimestamp(bookmark_))
321 size_t recoveredIdx = recover(bookmark_,
false);
323 PublisherIterator pub = _publishers.find(publisher);
324 if (pub == _publishers.end() || pub->second < sequence)
326 _publishers[publisher] = sequence;
327 if (recoveredIdx == AMPS_UNSET_INDEX)
332 if (recoveredIdx != AMPS_UNSET_INDEX)
334 if (!_entries[recoveredIdx]._active)
336 _recovered.erase(bookmark_);
344 if (_store->_recovering)
352 size_t base = _leastBase;
353 for (
size_t i = _least; i + base < _current + _currentBase; i++)
355 if ( i >= _entriesLength )
360 if (_entries[i]._val == bookmark_)
362 return !_entries[i]._active;
369 bool empty(
void)
const 371 if (_least == AMPS_UNSET_INDEX ||
372 ((_least + _leastBase) == (_current + _currentBase) &&
373 _recoveryMin == AMPS_UNSET_INDEX))
380 void updateMostRecent()
382 Lock<Mutex> guard(_subLock);
386 const BookmarkRange& getRange()
const 393 Lock<Mutex> guard(_subLock);
394 bool useLastPersisted = !_lastPersisted.empty() &&
395 _lastPersisted.len() > 1;
400 bool useRecent = !_recent.empty() && _recent.len() > 1;
401 amps_uint64_t lastPublisher = 0;
402 amps_uint64_t lastSeq = 0;
403 amps_uint64_t recentPublisher = 0;
404 amps_uint64_t recentSeq = 0;
405 if (useLastPersisted)
407 parseBookmark(_lastPersisted, lastPublisher, lastSeq);
411 parseBookmark(_recent, recentPublisher, recentSeq);
412 if (empty() && useLastPersisted)
418 if (useLastPersisted && lastPublisher == recentPublisher)
420 if (lastSeq <= recentSeq)
426 useLastPersisted =
false;
432 size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
435 totalLen += _recent.len() + 1;
440 if (usePublishersList_
441 && ((!useLastPersisted && !useRecent)
444 std::ostringstream os;
445 for (PublisherIterator pub = _publishers.begin();
446 pub != _publishers.end(); ++pub)
448 if (pub->first == 0 && pub->second == 0)
452 if (pub->first == recentPublisher && recentSeq < pub->second)
454 os << recentPublisher <<
'|' << recentSeq <<
"|,";
458 os << pub->first <<
'|' << pub->second <<
"|,";
461 std::string recent = os.str();
464 totalLen = recent.length();
465 if (!_recoveryTimestamp.empty())
467 totalLen += _recoveryTimestamp.len();
468 recent += std::string(_recoveryTimestamp);
473 recent.erase(--totalLen);
478 if (_range.isValid())
480 if (_range.getStart() != recent
483 _range.replaceStart(_recentList,
true);
485 else if (_range.isStartInclusive())
487 amps_uint64_t publisher, sequence;
488 parseBookmark(_range.getStart(), publisher,
490 PublisherIterator pub = _publishers.find(publisher);
491 if (pub != _publishers.end()
492 && pub->second >= sequence)
494 _range.makeStartExclusive();
501 if (_range.isValid())
506 if (!_recoveryTimestamp.empty() && !_range.isValid())
508 totalLen += _recoveryTimestamp.len() + 1;
512 || (_recent.len() < 2 && !empty()))
514 if (_range.isValid())
522 _setLastPersistedToEpoch();
523 return _lastPersisted;
527 char* field =
new char[totalLen];
532 memcpy(field, _recent.data(), len);
538 if (useLastPersisted)
540 memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
541 len += _lastPersisted.len();
547 if (!_recoveryTimestamp.empty() && !_range.isValid())
549 memcpy(field + len, _recoveryTimestamp.data(),
550 _recoveryTimestamp.len());
557 _recentList.assign(field, totalLen);
558 if (_range.isValid())
562 if (_range.getStart() != _recentList)
564 _range.replaceStart(_recentList,
true);
566 else if (_range.isStartInclusive())
568 amps_uint64_t publisher, sequence;
569 parseBookmark(_range.getStart(), publisher,
571 PublisherIterator pub = _publishers.find(publisher);
572 if (pub != _publishers.end()
573 && pub->second >= sequence)
575 _range.makeStartExclusive();
586 Lock<Mutex> guard(_subLock);
589 if (update_ && _store->_recentChanged)
605 Lock<Mutex> guard(_subLock);
606 return _lastPersisted;
612 _recent.deepCopy(recent_);
615 void setRecoveryTimestamp(
const char* recoveryTimestamp_,
618 _recoveryTimestamp.clear();
619 size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
620 char* ts =
new char[len];
621 memcpy((
void*)ts, (
const void*)recoveryTimestamp_, len);
622 _recoveryTimestamp.assign(ts, len);
625 void moveEntries(
char* old_,
char* new_,
size_t newSize_)
627 size_t least = _least;
628 size_t leastBase = _leastBase;
629 if (_recoveryMin != AMPS_UNSET_INDEX)
631 least = _recoveryMin;
632 leastBase = _recoveryBase;
637 if (newSize_ - (
sizeof(Entry)*_entriesLength) >
sizeof(Entry)*least)
639 memcpy(new_ + (
sizeof(Entry)*_entriesLength),
640 old_, (
sizeof(Entry)*least));
642 memset(old_, 0,
sizeof(Entry)*least);
646 Entry* buffer =
new Entry[least];
647 memcpy((
void*)buffer, (
void*)old_,
sizeof(Entry)*least);
649 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
650 (_entriesLength - least)*
sizeof(Entry));
652 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
653 (
void*)buffer, least *
sizeof(Entry));
663 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
664 (_entriesLength - least)*
sizeof(Entry));
666 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
667 (
void*)old_, least *
sizeof(Entry));
672 if (_recoveryMin != AMPS_UNSET_INDEX)
674 _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
675 _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
676 (_recoveryMin + _recoveryBase);
677 _recoveryMaxBase = leastBase;
678 _recoveryMin = least;
679 _recoveryBase = leastBase;
685 _leastBase = leastBase;
687 _currentBase = _leastBase;
688 _current = least + _entriesLength;
693 Lock<Mutex> guard(_subLock);
695 return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
703 || BookmarkRange::isRange(bookmark_))
707 Lock<Mutex> guard(_subLock);
708 return _setLastPersisted(bookmark_);
713 if (!_lastPersisted.empty())
715 amps_uint64_t publisher, publisher_lastPersisted;
716 amps_uint64_t sequence, sequence_lastPersisted;
717 parseBookmark(bookmark_, publisher, sequence);
718 parseBookmark(_lastPersisted, publisher_lastPersisted,
719 sequence_lastPersisted);
720 if (publisher == publisher_lastPersisted &&
721 sequence <= sequence_lastPersisted)
727 _lastPersisted.deepCopy(bookmark_);
728 _store->_recentChanged =
true;
729 _recoveryTimestamp.clear();
735 Lock<Mutex> guard(_subLock);
739 || BookmarkRange::isRange(bookmark))
743 _setLastPersisted(bookmark);
751 size_t recover(
const Message::Field& bookmark_,
bool relogIfNotDiscarded)
753 size_t retVal = AMPS_UNSET_INDEX;
754 if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
760 RecoveryIterator item = _recovered.find(bookmark_);
761 if (item != _recovered.end())
763 size_t seqNo = item->second;
764 size_t index = (seqNo - _recoveryBase) % _entriesLength;
767 if (_least + _leastBase == _current + _currentBase &&
768 !_entries[index]._active)
770 _store->_recentChanged =
true;
772 _recent = _entries[index]._val.deepCopy();
773 retVal = moveEntry(index);
774 if (retVal == AMPS_UNSET_INDEX)
776 recover(bookmark_, relogIfNotDiscarded);
779 _leastBase = _currentBase;
781 else if (!_entries[index]._active || relogIfNotDiscarded)
783 retVal = moveEntry(index);
784 if (retVal == AMPS_UNSET_INDEX)
786 recover(bookmark_, relogIfNotDiscarded);
793 _recovered.erase(item);
794 if (_recovered.empty())
796 _recoveryMin = AMPS_UNSET_INDEX;
797 _recoveryBase = AMPS_UNSET_INDEX;
798 _recoveryMax = AMPS_UNSET_INDEX;
799 _recoveryMaxBase = AMPS_UNSET_INDEX;
801 else if (index == _recoveryMin)
803 while (_entries[_recoveryMin]._val.empty() &&
804 (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
806 if (++_recoveryMin == _entriesLength)
809 _recoveryBase += _entriesLength;
829 Entry() : _active(
false)
835 typedef std::vector<Entry*> EntryPtrList;
837 void getRecoveryEntries(EntryPtrList& list_)
839 if (_recoveryMin == AMPS_UNSET_INDEX ||
840 _recoveryMax == AMPS_UNSET_INDEX)
844 size_t base = _recoveryBase;
845 size_t max = _recoveryMax + _recoveryMaxBase;
846 for (
size_t i = _recoveryMin; i + base < max; ++i)
848 if (i == _entriesLength)
851 base = _recoveryMaxBase;
854 list_.push_back(&(_entries[i]));
859 void getActiveEntries(EntryPtrList& list_)
861 size_t base = _leastBase;
862 for (
size_t i = _least; i + base < _current + _currentBase; ++i)
864 if (i >= _entriesLength)
870 list_.push_back(&(_entries[i]));
875 Entry* getEntryByIndex(
size_t index_)
877 Lock<Mutex> guard(_subLock);
878 size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
879 index_ >= _least + _leastBase)
880 ? _leastBase : _recoveryBase;
882 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
883 _least + _leastBase :
884 _recoveryMin + _recoveryBase);
885 if (index_ >= _current + _currentBase || index_ < min)
889 return &(_entries[(index_ - base) % _entriesLength]);
894 Lock<Mutex> guard(_subLock);
897 getRecoveryEntries(list);
898 setPublishersToDiscarded(&list, &_publishers);
901 void setPublishersToDiscarded(EntryPtrList* recovered_,
902 PublisherMap* publishers_)
908 for (EntryPtrList::iterator i = recovered_->begin();
909 i != recovered_->end(); ++i)
911 if ((*i)->_val.empty())
915 amps_uint64_t publisher = (amps_uint64_t)0;
916 amps_uint64_t sequence = (amps_uint64_t)0;
917 parseBookmark((*i)->_val, publisher, sequence);
918 if (publisher && sequence && (*i)->_active &&
919 (*publishers_)[publisher] >= sequence)
921 (*publishers_)[publisher] = sequence - 1;
926 void clearLastPersisted()
928 Lock<Mutex> guard(_subLock);
929 _lastPersisted.clear();
932 void setLastPersistedToEpoch()
934 Lock<Mutex> guard(_subLock);
935 _setLastPersistedToEpoch();
939 Subscription(
const Subscription&);
940 Subscription& operator=(
const Subscription&);
942 size_t moveEntry(
size_t index_)
945 if (_current >= _entriesLength)
948 _currentBase += _entriesLength;
952 if ((_current == _least % _entriesLength &&
953 _leastBase < _currentBase) ||
954 (_current == _recoveryMin && _recoveryBase < _currentBase))
956 if (!_store->resize(_id, (
char**)&_entries,
957 sizeof(Entry) * _entriesLength * 2))
959 return AMPS_UNSET_INDEX;
964 _entries[_current]._val = _entries[index_]._val;
965 _entries[_current]._active = _entries[index_]._active;
967 _entries[index_]._val.assign(NULL, 0);
968 _entries[index_]._active =
false;
972 void _setLastPersistedToEpoch()
975 char* field =
new char[fieldLen];
977 _lastPersisted.clear();
978 _lastPersisted.assign(field, fieldLen);
981 bool _discard(
size_t index_)
985 assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
986 (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
987 size_t base = (_recoveryBase == AMPS_UNSET_INDEX
988 || index_ >= _least + _leastBase)
989 ? _leastBase : _recoveryBase;
991 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
992 _recoveryMin + _recoveryBase);
993 if (index_ >= _current + _currentBase || index_ < min)
1000 Entry& e = _entries[(index_ - base) % _entriesLength];
1003 size_t index = index_;
1004 if (_recoveryMin != AMPS_UNSET_INDEX &&
1005 index_ == _recoveryMin + _recoveryBase)
1008 size_t j = _recoveryMin;
1009 while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
1010 !_entries[j]._active)
1031 if (!bookmark.
empty())
1033 _recovered.erase(bookmark);
1035 amps_uint64_t publisher, sequence;
1036 parseBookmark(bookmark, publisher, sequence);
1037 PublisherIterator pub = _publishers.find(publisher);
1038 if (pub == _publishers.end() || pub->second < sequence)
1040 _publishers[publisher] = sequence;
1042 if (_least + _leastBase == _current + _currentBase ||
1043 ((_least + _leastBase) % _entriesLength) ==
1044 ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
1048 _store->_recentChanged =
true;
1049 _recoveryTimestamp.clear();
1052 bookmark.assign(NULL, 0);
1062 if (++j == _entriesLength)
1065 _recoveryBase += _entriesLength;
1069 assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1070 _recovered.empty());
1071 if (_recovered.empty())
1073 _recoveryMin = AMPS_UNSET_INDEX;
1074 _recoveryBase = AMPS_UNSET_INDEX;
1075 _recoveryMax = AMPS_UNSET_INDEX;
1076 _recoveryMaxBase = AMPS_UNSET_INDEX;
1078 index = _least + _leastBase;
1087 if (index == _least + _leastBase)
1091 while (j + _leastBase < _current + _currentBase &&
1092 !_entries[j]._active)
1096 _recent = _entries[j]._val;
1097 _entries[j]._val.assign(NULL, 0);
1098 _store->_recentChanged =
true;
1100 _recoveryTimestamp.clear();
1103 if (++j == _entriesLength)
1106 _leastBase += _entriesLength;
1115 void _updateMostRecent()
1119 assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1120 (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1121 size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1122 size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1123 _recoveryMin = AMPS_UNSET_INDEX;
1124 _recoveryBase = AMPS_UNSET_INDEX;
1125 _recoveryMax = AMPS_UNSET_INDEX;
1126 _recoveryMaxBase = AMPS_UNSET_INDEX;
1127 for (
size_t i = start; i + base < _current + _currentBase; i++)
1129 if ( i >= _entriesLength )
1132 base = _currentBase;
1134 if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1138 Entry& entry = _entries[i];
1139 if (!entry._val.empty())
1141 _recovered[entry._val] = i + base;
1142 if (_recoveryMin == AMPS_UNSET_INDEX)
1145 _recoveryBase = base;
1146 _recoveryMax = _current;
1147 _recoveryMaxBase = _currentBase;
1151 if (_current == _entriesLength)
1154 _currentBase += _entriesLength;
1157 _leastBase = _currentBase;
1164 BookmarkRange _range;
1167 size_t _currentBase;
1170 size_t _recoveryMin;
1171 size_t _recoveryBase;
1172 size_t _recoveryMax;
1173 size_t _recoveryMaxBase;
1174 size_t _entriesLength;
1178 RecoveryMap _recovered;
1180 PublisherMap _publishers;
1189 _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1190 _recentChanged(true),
1192 _recoveryPointAdapter(NULL),
1193 _recoveryPointFactory(NULL)
1196 typedef RecoveryPointAdapter::iterator RecoveryIterator;
1205 RecoveryPointFactory factory_ = NULL)
1209 , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1210 , _recentChanged(true)
1212 , _recoveryPointAdapter(adapter_)
1213 , _recoveryPointFactory(factory_)
1216 if (!_recoveryPointFactory)
1220 for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1221 recoveryPoint != _recoveryPointAdapter.end();
1224 Field subId(recoveryPoint->getSubId());
1225 msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1227 Field bookmark = recoveryPoint->getBookmark();
1228 if (BookmarkRange::isRange(bookmark))
1235 std::vector<Field> bmList = Field::parseBookmarkList(bookmark);
1236 for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
1238 if (Field::isTimestamp(*bkmk))
1240 find(subId)->setRecoveryTimestamp(bkmk->data(), bkmk->len());
1254 _recovering =
false;
1259 if (_recoveryPointAdapter.isValid())
1261 _recoveryPointAdapter.close();
1273 Lock<Mutex> guard(_lock);
1274 return _log(message_);
1284 Lock<Mutex> guard(_lock);
1285 (void)_discard(message_);
1297 Lock<Mutex> guard(_lock);
1298 (void)_discard(subId_, bookmarkSeqNo_);
1308 Lock<Mutex> guard(_lock);
1309 return _getMostRecent(subId_);
1322 Lock<Mutex> guard(_lock);
1323 return _isDiscarded(message_);
1333 Lock<Mutex> guard(_lock);
1344 Lock<Mutex> guard(_lock);
1354 Lock<Mutex> guard(_lock);
1355 return _getOldestBookmarkSeq(subId_);
1366 Lock<Mutex> guard(_lock);
1367 _persisted(find(subId_), bookmark_);
1379 Lock<Mutex> guard(_lock);
1380 return _persisted(find(subId_), bookmark_);
1398 Lock<Mutex> guard(_subsLock);
1399 _serverVersion = version_;
1402 inline bool isWritableBookmark(
size_t length)
1404 return length >= AMPS_MIN_BOOKMARK_LEN;
1407 typedef Subscription::EntryPtrList EntryPtrList;
1412 size_t _log(
Message& message_)
1415 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1424 message_.setSubscriptionHandle(
1425 static_cast<amps_subscription_handle>(pSub));
1427 size_t retVal = pSub->log(bookmark);
1428 message_.setBookmarkSeqNo(retVal);
1433 bool _discard(
const Message& message_)
1435 size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1436 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1446 bool retVal = pSub->discard(bookmarkSeqNo);
1449 updateAdapter(pSub);
1455 bool _discard(
const Message::Field& subId_,
size_t bookmarkSeqNo_)
1457 Subscription* pSub = find(subId_);
1458 bool retVal = pSub->discard(bookmarkSeqNo_);
1461 updateAdapter(pSub);
1468 bool usePublishersList_ =
true)
1470 Subscription* pSub = find(subId_);
1471 return pSub->getMostRecentList(usePublishersList_);
1475 bool _isDiscarded(
Message& message_)
1482 Subscription* pSub = find(subId);
1483 message_.setSubscriptionHandle(
1484 static_cast<amps_subscription_handle>(pSub));
1491 Subscription* pSub = find(subId_);
1492 return pSub->getOldestBookmarkSeq();
1496 virtual void _persisted(Subscription* pSub_,
1499 if (pSub_->lastPersisted(bookmark_))
1501 updateAdapter(pSub_);
1506 virtual Message::Field _persisted(Subscription* pSub_,
size_t bookmark_)
1508 return pSub_->lastPersisted(bookmark_);
1514 if (_recoveryPointAdapter.isValid())
1516 _recoveryPointAdapter.purge();
1525 while (!_subs.empty())
1527 SubscriptionMap::iterator iter = _subs.begin();
1531 delete (iter->second);
1540 if (_recoveryPointAdapter.isValid())
1542 _recoveryPointAdapter.purge(subId_);
1550 Lock<Mutex> guard(_subsLock);
1551 SubscriptionMap::iterator iter = _subs.find(subId_);
1552 if (iter == _subs.end())
1557 delete (iter->second);
1565 find(subId_)->setMostRecent(recent_);
1570 static const char ENTRY_BOOKMARK =
'b';
1571 static const char ENTRY_DISCARD =
'd';
1572 static const char ENTRY_PERSISTED =
'p';
1578 throw StoreException(
"A valid subscription ID must be provided to the Bookmark Store");
1580 Lock<Mutex> guard(_subsLock);
1581 if (_subs.count(subId_) == 0)
1586 _subs[id] =
new Subscription(
this,
id);
1589 return _subs[subId_];
1592 virtual bool resize(
const Message::Field& subId_,
char** newBuffer_,
size_t size_,
1593 bool callResizeHandler_ =
true)
1595 assert(newBuffer_ != 0);
1605 if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1609 char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1610 *newBuffer_ = (
char*)malloc(size_);
1611 memset(*newBuffer_, 0, size_);
1614 find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1621 void updateAdapter(Subscription* pSub_)
1623 if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1630 pSub_->getMostRecentList(
false)).deepCopy();
1631 Unlock<Mutex> unlock(_lock);
1632 _recoveryPointAdapter.update(update);
1637 typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1638 SubscriptionMap _subs;
1639 size_t _serverVersion;
1640 bool _recentChanged;
1642 typedef std::set<Subscription*> SubscriptionSet;
1649 #endif //_MEMORYBOOKMARKSTORE_H_ Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
void clear()
Clear the internal state, possibly reclaiming memory.
Definition: RecoveryPoint.hpp:109
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1331
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1295
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1306
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1376
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1204
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1186
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1363
Message & assignBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1396
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1320
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1387
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1282
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1271
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
static RecoveryPoint create(const Field &subId_, const Field &bookmark_)
Use this function in BookmarkStore::setRecoveryPointFactory( std::bind(&FixedRecoveryPoint::create, std::placeholder::_1, std::placeholder::_2))
Definition: RecoveryPoint.hpp:139
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1352
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1342
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194