26 #ifndef _MMAPBOOKMARKSTORE_H_ 27 #define _MMAPBOOKMARKSTORE_H_ 38 #include <sys/types.h> 43 #define AMPS_INITIAL_LOG_SIZE 40960UL 60 typedef HANDLE FileType;
65 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
84 , _logOffset(0), _log(0), _fileTimestamp(0)
86 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
91 if (init(useLastModifiedTime_))
93 recover(useLastModifiedTime_,
false);
108 bool useLastModifiedTime_ =
false)
110 , _logOffset(0), _log(0), _fileTimestamp(0)
112 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
117 if (init(useLastModifiedTime_))
119 recover(useLastModifiedTime_,
false);
138 const char* fileName_,
139 RecoveryPointFactory factory_ = NULL,
140 bool useLastModifiedTime_ =
false)
142 , _fileName(fileName_), _fileSize(0)
143 , _logOffset(0), _log(0), _fileTimestamp(0)
145 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
150 if (init(useLastModifiedTime_))
152 recover(useLastModifiedTime_,
true);
171 const std::string& fileName_,
172 RecoveryPointFactory factory_ = NULL,
173 bool useLastModifiedTime_ =
false)
175 , _fileName(fileName_), _fileSize(0)
176 , _logOffset(0), _log(0), _fileTimestamp(0)
178 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
183 if (init(useLastModifiedTime_))
185 recover(useLastModifiedTime_,
true);
192 UnmapViewOfFile(_log);
193 CloseHandle(_mapFile);
196 munmap(_log, _fileSize);
212 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
213 Lock<Mutex> guard(_lock);
222 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
224 write(sub->id(), ENTRY_BOOKMARK, bookmark);
225 return MemoryBookmarkStore::_log(message_);
240 Lock<Mutex> guard(_lock);
241 write(subId, ENTRY_DISCARD, bookmark);
242 MemoryBookmarkStore::_discard(message_);
254 Lock<Mutex> guard(_lock);
255 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
256 if (!entry || entry->_val.empty())
260 write(subId_, ENTRY_DISCARD, entry->_val);
261 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
271 Lock<Mutex> guard(_lock);
272 return MemoryBookmarkStore::_getMostRecent(subId_);
285 Lock<Mutex> l(_lock);
286 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
294 write(subId, ENTRY_BOOKMARK, message_.
getBookmark());
295 write(subId, ENTRY_DISCARD, message_.
getBookmark());
307 Lock<Mutex> guard(_lock);
308 Lock<Mutex> fileGuard(_fileLock);
309 memset(_log, 0, _logOffset);
311 MemoryBookmarkStore::_purge();
321 Lock<Mutex> guard(_lock);
322 Lock<Mutex> fileGuard(_fileLock);
323 MemoryBookmarkStore::_purge(subId_);
324 std::string tmpFileName = _fileName +
".tmp";
325 __prune(tmpFileName);
330 Lock<Mutex> guard(_lock);
336 Lock<Mutex> guard(_lock);
341 void _prune(
const std::string& tmpFileName_)
343 Lock<Mutex> guard(_lock);
344 Lock<Mutex> fileGuard(_fileLock);
350 if (tmpFileName_.empty())
352 __prune(_fileName +
".tmp");
356 __prune(tmpFileName_);
358 _recentChanged =
false;
362 void __prune(
const std::string& tmpFileName_)
364 size_t sz = AMPS_INITIAL_LOG_SIZE;
367 size_t bytesWritten = 0;
369 file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
370 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
371 if ( file == INVALID_HANDLE_VALUE )
373 DWORD err = getErrorNo();
374 std::ostringstream os;
375 os <<
"Failed to create temp store file " << tmpFileName_ <<
376 " to prune MMapBookmarkStore " << _fileName;
377 error(os.str(), err);
379 HANDLE mapFile = NULL;
382 sz = _setFileSize(sz, &log, file, &mapFile);
384 catch (StoreException& ex)
386 if (mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
389 std::ostringstream os;
390 os <<
"Failed to create map of temp file " << tmpFileName_
391 <<
" while resizing it to prune MMapBookmarkStore " << _fileName
392 <<
": " << ex.what();
393 throw StoreException(os.str());
398 CloseHandle(mapFile);
400 std::ostringstream os;
401 os <<
"Failed to map temp file " << tmpFileName_
402 <<
" to memory while resizing it to prune MMapBookmarkStore " 403 << _fileName <<
": " << ex.what();
404 throw StoreException(os.str());
410 DWORD err = getErrorNo();
411 UnmapViewOfFile(log);
412 CloseHandle(mapFile);
414 std::ostringstream os;
415 os <<
"Failed to grow tmp file " << tmpFileName_
416 <<
" to prune MMapBookmarkStore " << _fileName;
417 error(os.str(), err);
420 file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
423 int err = getErrorNo();
424 std::ostringstream os;
425 os <<
"Failed to create temp store file " << tmpFileName_ <<
426 " to prune MMapBookmarkStore " << _fileName;
427 error(os.str(), err);
430 if (::write(file,
"\0\0\0\0", 4) == -1)
432 int err = getErrorNo();
433 std::ostringstream os;
434 os <<
"Failed to write header to temp file " << tmpFileName_
435 <<
" to prune MMapBookmarkStore " << _fileName;
436 error(os.str(), err);
441 sz = _setFileSize(sz, &log, file, 0);
443 catch (StoreException& ex)
445 std::ostringstream os;
446 os <<
"Failed to grow tmp file " << tmpFileName_
447 <<
" to prune MMapBookmarkStore " << _fileName << ex.what();
448 throw StoreException(os.str());
452 int err = getErrorNo();
455 std::ostringstream os;
456 os <<
"Failed to grow tmp file " << tmpFileName_
457 <<
" to prune MMapBookmarkStore " << _fileName;
458 error(os.str(), err);
463 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
466 assert(!subId.
empty());
467 size_t subIdLen = subId.
len();
468 Subscription* mapSubPtr = i->second;
469 const BookmarkRange& range = mapSubPtr->getRange();
472 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
475 amps_uint64_t recentPub, recentSeq;
476 Subscription::parseBookmark(recent, recentPub, recentSeq);
477 Subscription::PublisherMap publishersDiscarded =
478 mapSubPtr->_publishers;
479 MemoryBookmarkStore::EntryPtrList recovered;
480 mapSubPtr->getRecoveryEntries(recovered);
481 mapSubPtr->setPublishersToDiscarded(&recovered,
482 &publishersDiscarded);
483 char tmpBookmarkBuffer[128];
484 for (Subscription::PublisherIterator pub =
485 publishersDiscarded.begin(),
486 e = publishersDiscarded.end();
490 if (pub->first == 0 || pub->second == 0)
495 if (pub->first == recentPub)
499 int written = AMPS_snprintf_amps_uint64_t(
501 sizeof(tmpBookmarkBuffer),
503 *(tmpBookmarkBuffer + written++) =
'|';
504 written += AMPS_snprintf_amps_uint64_t(
505 tmpBookmarkBuffer + written,
506 sizeof(tmpBookmarkBuffer)
509 *(tmpBookmarkBuffer + written++) =
'|';
512 size_t blockLen = subIdLen + 2 *
sizeof(size_t) + tmpBookmark.
len() + 1;
513 if (bytesWritten + blockLen + blockLen >= sz)
516 sz = _setFileSize(sz * 2, &log, file, &mapFile);
518 sz = _setFileSize(sz * 2, &log, file, sz);
521 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
522 write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
524 if (isWritableBookmark(recent.
len()))
527 size_t blockLen = subIdLen + 2 *
sizeof(size_t) + recent.
len() + 1;
528 if (bytesWritten + blockLen + blockLen >= sz)
531 sz = _setFileSize(sz * 2, &log, file, &mapFile);
533 sz = _setFileSize(sz * 2, &log, file, sz);
536 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
537 write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
541 mapSubPtr->getMostRecentList();
544 if (isWritableBookmark(bookmark.
len()))
547 size_t blockLen = subIdLen + 2 *
sizeof(size_t) +
549 if (bytesWritten + blockLen >= sz)
552 sz = _setFileSize(sz * 2, &log, file, &mapFile);
554 sz = _setFileSize(sz * 2, &log, file, sz);
557 write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
558 mapSubPtr->getLastPersisted());
560 mapSubPtr->getActiveEntries(recovered);
561 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
563 entry != recovered.end(); ++entry)
565 if ((*entry)->_val.empty() ||
566 !isWritableBookmark((*entry)->_val.len()))
571 size_t blockLen = subIdLen + 2 *
sizeof(size_t) +
572 (*entry)->_val.len() + 1;
573 if (bytesWritten + blockLen >= sz)
576 sz = _setFileSize(sz * 2, &log, file, &mapFile);
578 sz = _setFileSize(sz * 2, &log, file, sz);
581 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
583 if (!(*entry)->_active)
586 if (bytesWritten + blockLen >= sz)
589 sz = _setFileSize(sz * 2, &log, file, &mapFile);
591 sz = _setFileSize(sz * 2, &log, file, sz);
594 write(&log, &bytesWritten, subId, ENTRY_DISCARD,
600 catch (StoreException& ex)
603 UnmapViewOfFile(log);
604 CloseHandle(mapFile);
608 ::unlink(tmpFileName_.c_str());
610 std::ostringstream os;
611 os <<
"Exception during prune: " << ex.what();
612 throw StoreException(os.str());
615 BOOL success = FlushViewOfFile(_log, 0);
616 success |= UnmapViewOfFile(_log);
618 success |= CloseHandle(_mapFile);
619 success |= CloseHandle(_file);
622 DWORD err = getErrorNo();
623 std::ostringstream os;
624 os <<
"Failed to flush, unmap, and close current file " 626 <<
" in prune in MMapBookmarkStore. ";
627 error(os.str(), err);
629 _mapFile = INVALID_HANDLE_VALUE;
630 _file = INVALID_HANDLE_VALUE;
631 success = FlushViewOfFile(log, 0);
632 success |= UnmapViewOfFile(log);
634 success |= CloseHandle(mapFile);
635 success |= CloseHandle(file);
638 DWORD err = getErrorNo();
639 std::ostringstream os;
640 os <<
"Failed to flush, unmap and close completed temp file " 642 <<
" in prune in MMapBookmarkStore. ";
643 error(os.str(), err);
645 mapFile = INVALID_HANDLE_VALUE;
646 file = INVALID_HANDLE_VALUE;
649 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
650 MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
652 DWORD err = getErrorNo();
653 if (--retryCount > 0)
658 std::string desiredFileName = _fileName;
659 _fileName = tmpFileName_;
661 std::ostringstream os;
662 os <<
"Failed to move completed temp file " << tmpFileName_
663 <<
" to " << desiredFileName
664 <<
" in prune in MMapBookmarkStore. Continuing by using " 665 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
666 error(os.str(), err);
671 munmap(_log, _fileSize);
676 if (-1 == ::unlink(_fileName.c_str()))
678 int err = getErrorNo();
680 std::string desiredFileName = _fileName;
681 _fileName = tmpFileName_;
683 std::ostringstream os;
684 os <<
"Failed to delete file " << desiredFileName
685 <<
" after creating temporary file " << tmpFileName_
686 <<
" in prune in MMapBookmarkStore. Continuing by using " 687 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
688 error(os.str(), err);
690 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
692 int err = getErrorNo();
694 std::string desiredFileName = _fileName;
695 _fileName = tmpFileName_;
697 std::ostringstream os;
698 os <<
"Failed to move completed temp file " << tmpFileName_
699 <<
" to " << desiredFileName
700 <<
" in prune in MMapBookmarkStore. Continuing by using " 701 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
702 error(os.str(), err);
707 _logOffset = bytesWritten;
710 virtual void _persisted(Subscription* subP_,
713 Lock<Mutex> l(_lock);
714 write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
715 MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
718 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
720 Lock<Mutex> l(_lock);
721 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
722 if (!entryPtr || entryPtr->_val.empty())
727 write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
728 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
729 return bookmarkField;
734 bool init(
bool useLastModifiedTime_ =
false)
738 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
739 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
740 if ( _file == INVALID_HANDLE_VALUE )
742 DWORD err = getErrorNo();
743 std::ostringstream os;
744 os <<
"Failed to initialize file " << _fileName <<
" for MMapBookmarkStore";
745 error(os.str(), err);
747 LARGE_INTEGER liFileSize;
748 if (GetFileSizeEx(_file, &liFileSize) == 0)
750 DWORD err = getErrorNo();
752 std::ostringstream os;
753 os <<
"Failure getting initial file size for MMapBookmarkStore " << _fileName;
754 error(os.str(), err);
758 size_t fileSize = liFileSize.QuadPart;
760 size_t fileSize = liFileSize.LowPart;
762 if (useLastModifiedTime_ && fileSize > 0)
764 FILETIME ftModifiedTime;
765 if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
767 DWORD err = getErrorNo();
770 error(
"Failure getting file time while trying to recover.", err);
774 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
776 DWORD err = getErrorNo();
779 error(
"Failure converting file time while trying to recover.", err);
782 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
783 sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
784 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
785 st.wDay, st.wHour, st.wMinute, st.wSecond);
786 _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
788 retVal = (fileSize != 0);
789 setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
790 AMPS_INITIAL_LOG_SIZE : fileSize);
792 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
795 int err = getErrorNo();
796 std::ostringstream os;
797 os <<
"Failed to initialize log file " << _fileName <<
" for MMapBookmarkStore";
798 error(os.str(), err);
801 if (fstat(_file, &statBuf) == -1)
803 int err = getErrorNo();
805 std::ostringstream os;
806 os <<
"Failed to stat log file " << _fileName <<
" for MMapBookmarkStore";
807 error(os.str(), err);
810 size_t fSize = (size_t)statBuf.st_size;
814 if (::write(_file,
"\0\0\0\0", 4) == -1)
816 int err = getErrorNo();
818 std::ostringstream os;
819 os <<
"Failed to write header to log file " << _fileName
820 <<
" for MMapBookmarkStore";
821 error(os.str(), err);
825 else if (useLastModifiedTime_)
827 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
829 gmtime_r(&statBuf.st_mtime, &timeInfo);
830 strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
831 "%Y%m%dT%H%M%S", &timeInfo);
832 _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
835 setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize - 1 : AMPS_INITIAL_LOG_SIZE);
841 DWORD getErrorNo()
const 843 return GetLastError();
846 void error(
const std::string& message_, DWORD err)
848 std::ostringstream os;
849 static const DWORD msgSize = 2048;
851 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
852 FORMAT_MESSAGE_ARGUMENT_ARRAY,
853 NULL, err, LANG_NEUTRAL,
854 pMsg, msgSize, NULL);
855 os <<
"File: " << _fileName <<
". " << message_ <<
" with error " << pMsg;
856 throw StoreException(os.str());
859 int getErrorNo()
const 864 void error(
const std::string& message_,
int err)
866 std::ostringstream os;
867 os << message_ <<
". Error is " << strerror(err);
868 throw StoreException(os.str());
872 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; } 873 #define AMPS_READ8(p, v) { memcpy(&v,p,8); } 875 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; } 876 #define AMPS_READ8(p,v) { v = *(const size_t*)p; } 883 Lock<Mutex> guard(_fileLock);
884 write(&_log, &_logOffset, subId_, type_, bookmark_);
887 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
890 if (!_recovering && isWritableBookmark(bookmark_.
len()))
892 size_t len = subId_.
len();
894 size_t blockLen = len + 2 *
sizeof(size_t) + bookmark_.
len() + 1;
895 if (*logOffsetPtr + blockLen >= _fileSize)
897 setFileSize(_fileSize * 2);
899 char* offset = *logPtr + *logOffsetPtr;
900 AMPS_WRITE8(offset, len);
901 offset +=
sizeof(size_t);
902 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
905 len = bookmark_.
len();
906 AMPS_WRITE8(offset, len);
907 offset +=
sizeof(size_t);
908 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
909 *logOffsetPtr += blockLen;
916 void write(
const Message::Field& subId_,
char type_,
size_t bookmark_)
918 Lock<Mutex> guard(_fileLock);
919 write(&_log, &_logOffset, subId_, type_, bookmark_);
922 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
923 char type_,
size_t bookmark_)
927 size_t len = subId_.
len();
928 size_t blockLen = len + 2 *
sizeof(size_t) + 1;
930 if (*logOffsetPtr + blockLen >= _fileSize)
932 setFileSize(_fileSize * 2);
934 char* offset = *logPtr + *logOffsetPtr;
935 *(
reinterpret_cast<size_t*
>(offset)) = len;
936 offset +=
sizeof(size_t);
937 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
940 *(
reinterpret_cast<size_t*
>(offset)) = bookmark_;
941 *logOffsetPtr += blockLen;
945 void setFileSize(
size_t newSize_)
947 if (_log && newSize_ <= _fileSize)
952 _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
954 _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
959 size_t _setFileSize(
size_t newSize_,
char** log_, FileType file_,
968 size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
969 if (sz < newSize_ || sz == 0)
974 if (*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
978 FlushViewOfFile(*log_, 0);
979 UnmapViewOfFile(*log_);
981 CloseHandle(*mapFile_);
984 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
986 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
988 if (*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
990 DWORD errNo = getErrorNo();
992 std::ostringstream os;
993 os <<
"Failed to create map of MMapBookmarkStore file " << _fileName
994 <<
" during resize.";
995 error(os.str(), errNo);
1001 *log_ = (
char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
1004 DWORD errNo = getErrorNo();
1005 CloseHandle(*mapFile_);
1007 std::ostringstream os;
1008 os <<
"Failed to map MMapBookmarkStore file " << _fileName
1009 <<
" to memory during resize.";
1010 error(os.str(), errNo);
1017 if (lseek(file_, (off_t)sz, SEEK_SET) == -1)
1019 int err = getErrorNo();
1021 std::ostringstream os;
1022 os <<
"Failed to seek in MMapBookmarkStore file " << _fileName
1023 <<
" during resize.";
1024 error(os.str(), err);
1026 if (::write(file_,
"", 1) == -1)
1028 int err = getErrorNo();
1030 std::ostringstream os;
1031 os <<
"Failed to grow MMapBookmarkStore file " << _fileName
1032 <<
" during resize.";
1033 error(os.str(), err);
1038 *log_ =
static_cast<char*
>(mremap(*log_, fileSize_, sz,
1041 munmap(*log_, fileSize_);
1042 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
1043 MAP_SHARED, file_, 0));
1049 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
1050 MAP_SHARED, file_, 0));
1053 if ((
void*)(*log_) == MAP_FAILED)
1055 int err = getErrorNo();
1058 std::ostringstream os;
1059 os <<
"Failed to map MMapBookmarkStore file " << _fileName
1060 <<
" to memory during resize.";
1061 error(os.str(), err);
1068 void recover(
bool useLastModifiedTime_ =
false,
1069 bool hasAdapter_ =
false)
1073 size_t bookmarkLen = 0;
1074 size_t lastGoodOffset = 0;
1075 bool inError =
false;
1076 Lock<Mutex> guard(_lock);
1077 Lock<Mutex> fileGuard(_fileLock);
1080 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1082 Message::Field::FieldHash>::iterator BookmarkMapIter;
1084 typedef std::map<Message::Field, BookmarkMap*,
1085 Message::Field::FieldHash> ReadMap;
1086 typedef std::map<Message::Field, BookmarkMap*,
1087 Message::Field::FieldHash>::iterator ReadMapIter;
1089 size_t subLen = *(
reinterpret_cast<size_t*
>(_log));
1090 while (!inError && subLen > 0)
1093 if (_logOffset == 0 && hasAdapter_)
1095 MemoryBookmarkStore::__purge();
1097 _logOffset +=
sizeof(size_t);
1098 sub.assign(_log + _logOffset, subLen);
1099 _logOffset += subLen;
1100 switch (_log[_logOffset++])
1104 case ENTRY_BOOKMARK:
1106 AMPS_READ8((_log + _logOffset), bookmarkLen);
1107 _logOffset +=
sizeof(size_t);
1108 bookmarkField.assign(_log + _logOffset, bookmarkLen);
1109 _logOffset += bookmarkLen;
1110 Subscription* subP = find(sub);
1111 BookmarkMap* bookmarks = NULL;
1112 ReadMapIter iter = recovered.find(sub);
1113 if (iter == recovered.end())
1115 Message::Field subKey;
1117 bookmarks =
new BookmarkMap();
1118 recovered[subKey] = bookmarks;
1122 bookmarks = iter->second;
1124 if (bookmarks->find(bookmarkField) != bookmarks->end())
1126 std::for_each(bookmarks->begin(), bookmarks->end(),
1129 subP->getMostRecent(
true);
1131 if (BookmarkRange::isRange(bookmarkField))
1133 subP->log(bookmarkField);
1135 else if (!subP->isDiscarded(bookmarkField))
1137 size_t sequence = subP->log(bookmarkField);
1138 Message::Field copy;
1140 bookmarks->insert(std::make_pair(copy, sequence));
1146 Message::Field copy;
1148 bookmarks->insert(std::make_pair(copy, 0));
1154 AMPS_READ8((_log + _logOffset), bookmarkLen);
1155 _logOffset +=
sizeof(size_t);
1156 bookmarkField.assign(_log + _logOffset, bookmarkLen);
1157 _logOffset += bookmarkLen;
1158 size_t sequence = AMPS_UNSET_INDEX;
1159 ReadMapIter iter = recovered.find(sub);
1160 if (iter != recovered.end())
1162 BookmarkMap* bookmarks = iter->second;
1163 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1164 if (bookmarkIter != bookmarks->end())
1166 sequence = bookmarkIter->second;
1167 Message::Field bookmarkToClear(bookmarkIter->first);
1168 bookmarkToClear.
clear();
1169 bookmarks->erase(bookmarkIter);
1172 if (!BookmarkRange::isRange(bookmarkField))
1174 Subscription* subP = find(sub);
1175 if (sequence != AMPS_UNSET_INDEX)
1180 subP->discard(sequence);
1185 subP->discard(bookmarkField);
1190 case ENTRY_PERSISTED:
1192 AMPS_READ8((_log + _logOffset), bookmarkLen);
1193 _logOffset +=
sizeof(size_t);
1194 bookmarkField.assign(_log + _logOffset, bookmarkLen);
1195 _logOffset += bookmarkLen;
1196 MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1200 if (lastGoodOffset == 0)
1202 error(
"Error while recovering MMapBookmarkStore file.", getErrorNo());
1206 _logOffset = lastGoodOffset;
1210 lastGoodOffset = _logOffset;
1211 if (!inError && _logOffset + 8 < _fileSize)
1213 subLen = *(
reinterpret_cast<size_t*
>(_log + _logOffset));
1216 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1218 if (recovered.count(i->first) && !recovered[i->first]->empty())
1220 if (i->second->getMostRecent(
false).len() > 1)
1222 i->second->justRecovered();
1229 _subs[i->first] =
new Subscription(
this, i->first);
1232 if (useLastModifiedTime_ && _fileTimestamp)
1234 _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1239 delete[] _fileTimestamp;
1242 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1244 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1246 Message::Field f = i->first;
1249 _recovering =
false;
1253 std::string _fileName;
1257 char* _fileTimestamp;
1261 static size_t getPageSize()
1263 static size_t pageSize;
1267 SYSTEM_INFO SYS_INFO;
1268 GetSystemInfo(&SYS_INFO);
1269 pageSize = SYS_INFO.dwPageSize;
1271 pageSize = (size_t)sysconf(_SC_PAGESIZE);
1282 #endif // _MMAPBOOKMARKSTORE_H_ virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:305
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:232
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:269
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:328
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MMapBookmarkStore.hpp:252
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:319
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1365
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:170
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:334
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:209
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:137
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:102
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:107
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:283