26 #ifndef _MEMORYBOOKMARKSTORE_H_ 27 #define _MEMORYBOOKMARKSTORE_H_ 29 #include <amps/BookmarkStore.hpp> 40 #define AMPS_MIN_BOOKMARK_LEN 3 41 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL 62 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
63 typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
64 typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
66 typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
72 : _current(1), _currentBase(0), _least(1), _leastBase(0)
73 , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
74 , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
75 , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
80 _store->resize(_id, (
char**)&_entries,
81 sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE,
false);
82 setLastPersistedToEpoch();
87 Lock<Mutex> guard(_subLock);
90 for (
size_t i = 0; i < _entriesLength; ++i)
92 _entries[i]._val.clear();
95 _store->resize(_id, (
char**)&_entries, 0);
99 _lastPersisted.clear();
102 _recoveryTimestamp.clear();
111 Lock<Mutex> guard(_subLock);
113 size_t index = recover(bookmark_,
true);
114 if (index == AMPS_UNSET_INDEX)
117 if (_current >= _entriesLength)
120 _currentBase += _entriesLength;
124 if ((_current == _least && _leastBase < _currentBase) ||
125 (_current == _recoveryMin && _recoveryBase < _currentBase))
127 if (!_store->resize(_id, (
char**)&_entries,
128 sizeof(Entry) * _entriesLength * 2))
131 return log(bookmark_);
164 if (!BookmarkRange::isRange(bookmark_))
166 _entries[_current]._val.deepCopy(bookmark_);
171 _range.set(bookmark_);
173 if (!_range.isValid())
175 throw CommandException(
"Invalid bookmark range specified.");
177 _store->updateAdapter(
this);
178 if (!_range.isStartInclusive())
181 amps_uint64_t publisher, sequence;
182 std::vector<Field> bmList = Field::parseBookmarkList(_range.getStart());
183 for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
185 parseBookmark(*bkmk, publisher, sequence);
186 if (publisher != (amps_uint64_t)0)
189 PublisherIterator pub = _publishers.find(publisher);
190 if (pub == _publishers.end() || pub->second < sequence)
192 _publishers[publisher] = sequence;
201 _entries[_current]._active =
true;
204 return index + _currentBase;
209 Lock<Mutex> guard(_subLock);
210 return _discard(index_);
220 Lock<Mutex> guard(_subLock);
221 size_t search = _least;
222 size_t searchBase = _leastBase;
223 size_t searchMax = _current;
224 size_t searchMaxBase = _currentBase;
225 if (_least + _leastBase == _current + _currentBase)
227 if (_recoveryMin != AMPS_UNSET_INDEX)
229 search = _recoveryMin;
230 searchBase = _recoveryBase;
231 searchMax = _recoveryMax;
232 searchMaxBase = _recoveryMaxBase;
239 assert(searchMax != AMPS_UNSET_INDEX);
240 assert(searchMaxBase != AMPS_UNSET_INDEX);
241 assert(search != AMPS_UNSET_INDEX);
242 assert(searchBase != AMPS_UNSET_INDEX);
244 while (search + searchBase < searchMax + searchMaxBase)
246 if (_entries[search]._val == bookmark_)
248 return _discard(search + searchBase);
250 if (++search == _entriesLength)
253 searchBase += _entriesLength;
262 amps_uint64_t& publisherId_,
263 amps_uint64_t& sequenceNumber_)
265 Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
272 Lock<Mutex> guard(_subLock);
273 if (BookmarkRange::isRange(bookmark_))
278 size_t recoveredIdx = recover(bookmark_,
false);
280 amps_uint64_t publisher, sequence;
281 parseBookmark(bookmark_, publisher, sequence);
283 PublisherIterator pub = _publishers.find(publisher);
284 if (pub == _publishers.end() || pub->second < sequence)
286 _publishers[publisher] = sequence;
287 if (recoveredIdx == AMPS_UNSET_INDEX)
292 if (recoveredIdx != AMPS_UNSET_INDEX)
294 if (!_entries[recoveredIdx]._active)
296 _recovered.erase(bookmark_);
304 if (_store->_recovering)
312 size_t base = _leastBase;
313 for (
size_t i = _least; i + base < _current + _currentBase; i++)
315 if ( i >= _entriesLength )
320 if (_entries[i]._val == bookmark_)
322 return !_entries[i]._active;
329 bool empty(
void)
const 331 if (_least == AMPS_UNSET_INDEX ||
332 ((_least + _leastBase) == (_current + _currentBase) &&
333 _recoveryMin == AMPS_UNSET_INDEX))
340 void updateMostRecent()
342 Lock<Mutex> guard(_subLock);
346 const BookmarkRange& getRange()
const 353 Lock<Mutex> guard(_subLock);
354 bool useLastPersisted = !_lastPersisted.empty() &&
355 _lastPersisted.len() > 1;
360 bool useRecent = !_recent.empty() && _recent.len() > 1;
361 amps_uint64_t lastPublisher = 0;
362 amps_uint64_t lastSeq = 0;
363 amps_uint64_t recentPublisher = 0;
364 amps_uint64_t recentSeq = 0;
365 if (useLastPersisted)
367 parseBookmark(_lastPersisted, lastPublisher, lastSeq);
371 parseBookmark(_recent, recentPublisher, recentSeq);
372 if (empty() && useLastPersisted)
378 if (useLastPersisted && lastPublisher == recentPublisher)
380 if (lastSeq <= recentSeq)
386 useLastPersisted =
false;
392 size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
395 totalLen += _recent.len() + 1;
400 if (usePublishersList_
401 && ((!useLastPersisted && !useRecent)
404 std::ostringstream os;
405 for (PublisherIterator pub = _publishers.begin();
406 pub != _publishers.end(); ++pub)
408 if (pub->first == 0 && pub->second == 0)
412 if (pub->first == recentPublisher && recentSeq < pub->second)
414 os << recentPublisher <<
'|' << recentSeq <<
"|,";
418 os << pub->first <<
'|' << pub->second <<
"|,";
421 std::string recent = os.str();
422 totalLen = recent.length();
425 if (!_recoveryTimestamp.empty())
427 totalLen += _recoveryTimestamp.len();
428 recent += std::string(_recoveryTimestamp);
433 recent.erase(--totalLen);
438 if (_range.isValid())
440 if (_range.getStart() != recent
443 _range.replaceStart(_recentList,
true);
445 else if (_range.isStartInclusive())
447 amps_uint64_t publisher, sequence;
448 parseBookmark(_range.getStart(), publisher,
450 PublisherIterator pub = _publishers.find(publisher);
451 if (pub != _publishers.end()
452 && pub->second >= sequence)
454 _range.makeStartExclusive();
461 if (_range.isValid())
466 if (!_recoveryTimestamp.empty() && !_range.isValid())
468 totalLen += _recoveryTimestamp.len() + 1;
472 || (_recent.len() < 2 && !empty()))
474 if (_range.isValid())
482 _setLastPersistedToEpoch();
483 return _lastPersisted;
487 char* field =
new char[totalLen];
492 memcpy(field, _recent.data(), len);
498 if (useLastPersisted)
500 memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
501 len += _lastPersisted.len();
507 if (!_recoveryTimestamp.empty() && !_range.isValid())
509 memcpy(field + len, _recoveryTimestamp.data(),
510 _recoveryTimestamp.len());
517 _recentList.assign(field, totalLen);
518 if (_range.isValid())
522 if (_range.getStart() != _recentList)
524 _range.replaceStart(_recentList,
true);
526 else if (_range.isStartInclusive())
528 amps_uint64_t publisher, sequence;
529 parseBookmark(_range.getStart(), publisher,
531 PublisherIterator pub = _publishers.find(publisher);
532 if (pub != _publishers.end()
533 && pub->second >= sequence)
535 _range.makeStartExclusive();
546 Lock<Mutex> guard(_subLock);
549 if (update_ && _store->_recentChanged)
565 Lock<Mutex> guard(_subLock);
566 return _lastPersisted;
572 _recent.deepCopy(recent_);
575 void setRecoveryTimestamp(
const char* recoveryTimestamp_,
578 _recoveryTimestamp.clear();
579 size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
580 char* ts =
new char[len];
581 memcpy((
void*)ts, (
const void*)recoveryTimestamp_, len);
582 _recoveryTimestamp.assign(ts, len);
585 void moveEntries(
char* old_,
char* new_,
size_t newSize_)
587 size_t least = _least;
588 size_t leastBase = _leastBase;
589 if (_recoveryMin != AMPS_UNSET_INDEX)
591 least = _recoveryMin;
592 leastBase = _recoveryBase;
597 if (newSize_ - (
sizeof(Entry)*_entriesLength) >
sizeof(Entry)*least)
599 memcpy(new_ + (
sizeof(Entry)*_entriesLength),
600 old_, (
sizeof(Entry)*least));
602 memset(old_, 0,
sizeof(Entry)*least);
606 Entry* buffer =
new Entry[least];
607 memcpy((
void*)buffer, (
void*)old_,
sizeof(Entry)*least);
609 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
610 (_entriesLength - least)*
sizeof(Entry));
612 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
613 (
void*)buffer, least *
sizeof(Entry));
623 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
624 (_entriesLength - least)*
sizeof(Entry));
626 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
627 (
void*)old_, least *
sizeof(Entry));
632 if (_recoveryMin != AMPS_UNSET_INDEX)
634 _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
635 _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
636 (_recoveryMin + _recoveryBase);
637 _recoveryMaxBase = leastBase;
638 _recoveryMin = least;
639 _recoveryBase = leastBase;
645 _leastBase = leastBase;
647 _currentBase = _leastBase;
648 _current = least + _entriesLength;
653 Lock<Mutex> guard(_subLock);
655 return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
663 || BookmarkRange::isRange(bookmark_))
667 Lock<Mutex> guard(_subLock);
668 return _setLastPersisted(bookmark_);
673 if (!_lastPersisted.empty())
675 amps_uint64_t publisher, publisher_lastPersisted;
676 amps_uint64_t sequence, sequence_lastPersisted;
677 parseBookmark(bookmark_, publisher, sequence);
678 parseBookmark(_lastPersisted, publisher_lastPersisted,
679 sequence_lastPersisted);
680 if (publisher == publisher_lastPersisted &&
681 sequence <= sequence_lastPersisted)
687 _lastPersisted.deepCopy(bookmark_);
688 _store->_recentChanged =
true;
689 _recoveryTimestamp.clear();
695 Lock<Mutex> guard(_subLock);
699 || BookmarkRange::isRange(bookmark))
703 _setLastPersisted(bookmark);
711 size_t recover(
const Message::Field& bookmark_,
bool relogIfNotDiscarded)
713 size_t retVal = AMPS_UNSET_INDEX;
714 if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
720 RecoveryIterator item = _recovered.find(bookmark_);
721 if (item != _recovered.end())
723 size_t seqNo = item->second;
724 size_t index = (seqNo - _recoveryBase) % _entriesLength;
727 if (_least + _leastBase == _current + _currentBase &&
728 !_entries[index]._active)
730 _store->_recentChanged =
true;
732 _recent = _entries[index]._val.deepCopy();
733 retVal = moveEntry(index);
734 if (retVal == AMPS_UNSET_INDEX)
736 recover(bookmark_, relogIfNotDiscarded);
739 _leastBase = _currentBase;
741 else if (!_entries[index]._active || relogIfNotDiscarded)
743 retVal = moveEntry(index);
744 if (retVal == AMPS_UNSET_INDEX)
746 recover(bookmark_, relogIfNotDiscarded);
753 _recovered.erase(item);
754 if (_recovered.empty())
756 _recoveryMin = AMPS_UNSET_INDEX;
757 _recoveryBase = AMPS_UNSET_INDEX;
758 _recoveryMax = AMPS_UNSET_INDEX;
759 _recoveryMaxBase = AMPS_UNSET_INDEX;
761 else if (index == _recoveryMin)
763 while (_entries[_recoveryMin]._val.empty() &&
764 (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
766 if (++_recoveryMin == _entriesLength)
769 _recoveryBase += _entriesLength;
789 Entry() : _active(
false)
797 Field::FieldHash _hasher;
799 size_t operator()(
const Entry* entryPtr_)
const 801 return _hasher(entryPtr_->_val);
804 bool operator()(
const Entry* lhsPtr_,
const Entry* rhsPtr_)
const 806 return _hasher(lhsPtr_->_val, rhsPtr_->_val);
811 typedef std::vector<Entry*> EntryPtrList;
813 void getRecoveryEntries(EntryPtrList& list_)
815 if (_recoveryMin == AMPS_UNSET_INDEX ||
816 _recoveryMax == AMPS_UNSET_INDEX)
820 size_t base = _recoveryBase;
821 size_t max = _recoveryMax + _recoveryMaxBase;
822 for (
size_t i = _recoveryMin; i + base < max; ++i)
824 if (i == _entriesLength)
827 base = _recoveryMaxBase;
830 list_.push_back(&(_entries[i]));
835 void getActiveEntries(EntryPtrList& list_)
837 size_t base = _leastBase;
838 for (
size_t i = _least; i + base < _current + _currentBase; ++i)
840 if (i >= _entriesLength)
846 list_.push_back(&(_entries[i]));
851 Entry* getEntryByIndex(
size_t index_)
853 Lock<Mutex> guard(_subLock);
854 size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
855 index_ >= _least + _leastBase)
856 ? _leastBase : _recoveryBase;
858 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
859 _least + _leastBase :
860 _recoveryMin + _recoveryBase);
861 if (index_ >= _current + _currentBase || index_ < min)
865 return &(_entries[(index_ - base) % _entriesLength]);
870 Lock<Mutex> guard(_subLock);
873 getRecoveryEntries(list);
874 setPublishersToDiscarded(&list, &_publishers);
877 void setPublishersToDiscarded(EntryPtrList* recovered_,
878 PublisherMap* publishers_)
884 for (EntryPtrList::iterator i = recovered_->begin();
885 i != recovered_->end(); ++i)
887 if ((*i)->_val.empty())
891 amps_uint64_t publisher = (amps_uint64_t)0;
892 amps_uint64_t sequence = (amps_uint64_t)0;
893 parseBookmark((*i)->_val, publisher, sequence);
894 if (publisher && sequence && (*i)->_active &&
895 (*publishers_)[publisher] >= sequence)
897 (*publishers_)[publisher] = sequence - 1;
902 void clearLastPersisted()
904 Lock<Mutex> guard(_subLock);
905 _lastPersisted.clear();
908 void setLastPersistedToEpoch()
910 Lock<Mutex> guard(_subLock);
911 _setLastPersistedToEpoch();
915 Subscription(
const Subscription&);
916 Subscription& operator=(
const Subscription&);
918 size_t moveEntry(
size_t index_)
921 if (_current >= _entriesLength)
924 _currentBase += _entriesLength;
928 if ((_current == _least % _entriesLength &&
929 _leastBase < _currentBase) ||
930 (_current == _recoveryMin && _recoveryBase < _currentBase))
932 if (!_store->resize(_id, (
char**)&_entries,
933 sizeof(Entry) * _entriesLength * 2))
935 return AMPS_UNSET_INDEX;
940 _entries[_current]._val = _entries[index_]._val;
941 _entries[_current]._active = _entries[index_]._active;
943 _entries[index_]._val.assign(NULL, 0);
944 _entries[index_]._active =
false;
948 void _setLastPersistedToEpoch()
951 char* field =
new char[fieldLen];
953 _lastPersisted.clear();
954 _lastPersisted.assign(field, fieldLen);
957 bool _discard(
size_t index_)
961 assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
962 (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
963 size_t base = (_recoveryBase == AMPS_UNSET_INDEX
964 || index_ >= _least + _leastBase)
965 ? _leastBase : _recoveryBase;
967 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
968 _recoveryMin + _recoveryBase);
969 if (index_ >= _current + _currentBase || index_ < min)
976 Entry& e = _entries[(index_ - base) % _entriesLength];
979 size_t index = index_;
980 if (_recoveryMin != AMPS_UNSET_INDEX &&
981 index_ == _recoveryMin + _recoveryBase)
984 size_t j = _recoveryMin;
985 while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
986 !_entries[j]._active)
1007 if (!bookmark.
empty())
1009 _recovered.erase(bookmark);
1011 amps_uint64_t publisher, sequence;
1012 parseBookmark(bookmark, publisher, sequence);
1013 PublisherIterator pub = _publishers.find(publisher);
1014 if (pub == _publishers.end() || pub->second < sequence)
1016 _publishers[publisher] = sequence;
1018 if (_least + _leastBase == _current + _currentBase ||
1019 ((_least + _leastBase) % _entriesLength) ==
1020 ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
1024 _store->_recentChanged =
true;
1025 _recoveryTimestamp.clear();
1028 bookmark.assign(NULL, 0);
1038 if (++j == _entriesLength)
1041 _recoveryBase += _entriesLength;
1045 assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1046 _recovered.empty());
1047 if (_recovered.empty())
1049 _recoveryMin = AMPS_UNSET_INDEX;
1050 _recoveryBase = AMPS_UNSET_INDEX;
1051 _recoveryMax = AMPS_UNSET_INDEX;
1052 _recoveryMaxBase = AMPS_UNSET_INDEX;
1054 index = _least + _leastBase;
1063 if (index == _least + _leastBase)
1067 while (j + _leastBase < _current + _currentBase &&
1068 !_entries[j]._active)
1072 _recent = _entries[j]._val;
1073 _entries[j]._val.assign(NULL, 0);
1074 _store->_recentChanged =
true;
1076 _recoveryTimestamp.clear();
1079 if (++j == _entriesLength)
1082 _leastBase += _entriesLength;
1091 void _updateMostRecent()
1095 assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1096 (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1097 size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1098 size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1099 _recoveryMin = AMPS_UNSET_INDEX;
1100 _recoveryBase = AMPS_UNSET_INDEX;
1101 _recoveryMax = AMPS_UNSET_INDEX;
1102 _recoveryMaxBase = AMPS_UNSET_INDEX;
1103 for (
size_t i = start; i + base < _current + _currentBase; i++)
1105 if ( i >= _entriesLength )
1108 base = _currentBase;
1110 if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1114 Entry& entry = _entries[i];
1115 if (!entry._val.empty())
1117 _recovered[entry._val] = i + base;
1118 if (_recoveryMin == AMPS_UNSET_INDEX)
1121 _recoveryBase = base;
1122 _recoveryMax = _current;
1123 _recoveryMaxBase = _currentBase;
1127 if (_current == _entriesLength)
1130 _currentBase += _entriesLength;
1133 _leastBase = _currentBase;
1140 BookmarkRange _range;
1143 size_t _currentBase;
1146 size_t _recoveryMin;
1147 size_t _recoveryBase;
1148 size_t _recoveryMax;
1149 size_t _recoveryMaxBase;
1150 size_t _entriesLength;
1154 RecoveryMap _recovered;
1156 PublisherMap _publishers;
1165 _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1166 _recentChanged(true),
1168 _recoveryPointAdapter(NULL),
1169 _recoveryPointFactory(NULL)
1172 typedef RecoveryPointAdapter::iterator RecoveryIterator;
1181 RecoveryPointFactory factory_ = NULL)
1185 , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1186 , _recentChanged(true)
1188 , _recoveryPointAdapter(adapter_)
1189 , _recoveryPointFactory(factory_)
1192 for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1193 recoveryPoint != _recoveryPointAdapter.end();
1196 Field subId(recoveryPoint->getSubId());
1197 msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1199 Field bookmark = recoveryPoint->getBookmark();
1200 if (BookmarkRange::isRange(bookmark))
1207 std::vector<Field> bmList = Field::parseBookmarkList(bookmark);
1208 for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
1210 if (Field::isTimestamp(*bkmk))
1212 find(subId)->setRecoveryTimestamp(bkmk->data(), bkmk->len());
1226 _recovering =
false;
1241 Lock<Mutex> guard(_lock);
1242 return _log(message_);
1252 Lock<Mutex> guard(_lock);
1253 (void)_discard(message_);
1265 Lock<Mutex> guard(_lock);
1266 (void)_discard(subId_, bookmarkSeqNo_);
1276 Lock<Mutex> guard(_lock);
1277 return _getMostRecent(subId_);
1290 Lock<Mutex> guard(_lock);
1291 return _isDiscarded(message_);
1301 Lock<Mutex> guard(_lock);
1312 Lock<Mutex> guard(_lock);
1322 Lock<Mutex> guard(_lock);
1323 return _getOldestBookmarkSeq(subId_);
1334 Lock<Mutex> guard(_lock);
1335 _persisted(find(subId_), bookmark_);
1347 Lock<Mutex> guard(_lock);
1348 return _persisted(find(subId_), bookmark_);
1366 Lock<Mutex> guard(_subsLock);
1367 _serverVersion = version_;
1370 inline bool isWritableBookmark(
size_t length)
1372 return length >= AMPS_MIN_BOOKMARK_LEN;
1375 typedef Subscription::EntryPtrList EntryPtrList;
1380 size_t _log(
Message& message_)
1383 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1392 message_.setSubscriptionHandle(
1393 static_cast<amps_subscription_handle>(pSub));
1395 size_t retVal = pSub->log(bookmark);
1396 message_.setBookmarkSeqNo(retVal);
1401 bool _discard(
const Message& message_)
1403 size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1404 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1414 bool retVal = pSub->discard(bookmarkSeqNo);
1417 updateAdapter(pSub);
1423 bool _discard(
const Message::Field& subId_,
size_t bookmarkSeqNo_)
1425 Subscription* pSub = find(subId_);
1426 bool retVal = pSub->discard(bookmarkSeqNo_);
1429 updateAdapter(pSub);
1436 bool usePublishersList_ =
true)
1438 Subscription* pSub = find(subId_);
1439 return pSub->getMostRecentList(usePublishersList_);
1443 bool _isDiscarded(
Message& message_)
1450 Subscription* pSub = find(subId);
1451 message_.setSubscriptionHandle(
1452 static_cast<amps_subscription_handle>(pSub));
1459 Subscription* pSub = find(subId_);
1460 return pSub->getOldestBookmarkSeq();
1464 virtual void _persisted(Subscription* pSub_,
1467 if (pSub_->lastPersisted(bookmark_))
1469 updateAdapter(pSub_);
1474 virtual Message::Field _persisted(Subscription* pSub_,
size_t bookmark_)
1476 return pSub_->lastPersisted(bookmark_);
1482 if (_recoveryPointAdapter.isValid())
1484 _recoveryPointAdapter.purge();
1493 while (!_subs.empty())
1495 SubscriptionMap::iterator iter = _subs.begin();
1499 delete (iter->second);
1508 if (_recoveryPointAdapter.isValid())
1510 _recoveryPointAdapter.purge(subId_);
1518 Lock<Mutex> guard(_subsLock);
1519 SubscriptionMap::iterator iter = _subs.find(subId_);
1520 if (iter == _subs.end())
1525 delete (iter->second);
1533 find(subId_)->setMostRecent(recent_);
1538 static const char ENTRY_BOOKMARK =
'b';
1539 static const char ENTRY_DISCARD =
'd';
1540 static const char ENTRY_PERSISTED =
'p';
1546 throw StoreException(
"A valid subscription ID must be provided to the Bookmark Store");
1548 Lock<Mutex> guard(_subsLock);
1549 if (_subs.count(subId_) == 0)
1554 _subs[id] =
new Subscription(
this,
id);
1557 return _subs[subId_];
1560 virtual bool resize(
const Message::Field& subId_,
char** newBuffer_,
size_t size_,
1561 bool callResizeHandler_ =
true)
1563 assert(newBuffer_ != 0);
1573 if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1577 char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1578 *newBuffer_ = (
char*)malloc(size_);
1579 memset(*newBuffer_, 0, size_);
1582 find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1589 void updateAdapter(Subscription* pSub_)
1591 if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1595 if (_recoveryPointFactory)
1598 pSub_->getMostRecentList(
false)));
1599 _recoveryPointAdapter.update(update);
1604 pSub_->getMostRecentList(
false)));
1605 _recoveryPointAdapter.update(update);
1609 typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1610 SubscriptionMap _subs;
1611 size_t _serverVersion;
1612 bool _recentChanged;
1614 typedef std::set<Subscription*> SubscriptionSet;
1621 #endif //_MEMORYBOOKMARKSTORE_H_ Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1299
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1263
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1274
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1344
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1180
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1162
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1331
Message & assignBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1364
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1288
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1355
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1250
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1239
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1320
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1310
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:106
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194