28 #ifndef _LOGGEDBOOKMARKSTORE_H_ 29 #define _LOGGEDBOOKMARKSTORE_H_ 42 #include <sys/types.h> 49 typedef char* amps_iovec_base_ptr;
51 typedef void* amps_iovec_base_ptr;
67 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
73 typedef HANDLE FileType;
89 bool useLastModifiedTime_ =
false)
92 , _file(INVALID_HANDLE_VALUE)
96 , _fileName(fileName_)
99 recover(useLastModifiedTime_,
false);
110 bool useLastModifiedTime_ =
false)
113 , _file(INVALID_HANDLE_VALUE)
117 , _fileName(fileName_)
120 recover(useLastModifiedTime_,
false);
138 const char* fileName_,
139 RecoveryPointFactory factory_ = NULL,
140 bool useLastModifiedTime_ =
false)
143 , _file(INVALID_HANDLE_VALUE)
147 , _fileName(fileName_)
150 recover(useLastModifiedTime_,
true);
165 const std::string& fileName_,
166 RecoveryPointFactory factory_ = NULL,
167 bool useLastModifiedTime_ =
false)
170 , _file(INVALID_HANDLE_VALUE)
174 , _fileName(fileName_)
177 recover(useLastModifiedTime_,
true);
186 _recoveringFile =
true;
206 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
207 Lock<Mutex> guard(_lock);
216 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
218 write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
219 return MemoryBookmarkStore::_log(message_);
234 Lock<Mutex> guard(_lock);
235 write(_file, subId, ENTRY_DISCARD, bookmark);
236 MemoryBookmarkStore::_discard(message_);
248 Lock<Mutex> l(_lock);
249 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
250 if (!entry || entry->_val.empty())
254 write(_file, subId_, ENTRY_DISCARD, entry->_val);
255 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
265 Lock<Mutex> l(_lock);
266 return MemoryBookmarkStore::_getMostRecent(subId_);
279 Lock<Mutex> l(_lock);
280 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
288 write(_file, subId, ENTRY_BOOKMARK, message_.
getBookmark());
289 write(_file, subId, ENTRY_DISCARD, message_.
getBookmark());
301 Lock<Mutex> guard(_lock);
303 if (_file != INVALID_HANDLE_VALUE)
307 DeleteFileA(_fileName.c_str());
308 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
309 NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
310 if ( _file == INVALID_HANDLE_VALUE )
312 DWORD err = getErrorNo();
313 std::ostringstream os;
314 os <<
"Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName <<
" for LoggedBookmarkStore";
315 error(os.str(), err);
320 ::unlink(_fileName.c_str());
321 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
324 error(
"Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
328 MemoryBookmarkStore::_purge();
338 Lock<Mutex> guard(_lock);
339 MemoryBookmarkStore::_purge(subId_);
340 std::string tmpFileName = _fileName +
".tmp";
341 __prune(tmpFileName);
355 void _prune(
const std::string& tmpFileName_)
357 Lock<Mutex> guard(_lock);
363 if (tmpFileName_.empty())
365 __prune(_fileName +
".tmp");
369 __prune(tmpFileName_);
371 _recentChanged =
false;
374 void __prune(
const std::string& tmpFileName_)
378 tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
379 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
380 if (tmpFile == INVALID_HANDLE_VALUE )
382 DWORD err = getErrorNo();
383 std::ostringstream os;
384 os <<
"Failed to create temp log file " << tmpFileName_ <<
385 " to prune LoggedBookmarkStore " << _fileName;
386 error(os.str(), err);
391 tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
394 int err = getErrorNo();
395 std::ostringstream os;
396 os <<
"Failed to create temp log file " << tmpFileName_ <<
397 " to prune LoggedBookmarkStore " << _fileName;
398 error(os.str(), err);
404 for (SubscriptionMap::iterator i = _subs.begin();
405 i != _subs.end(); ++i)
408 assert(!subId.
empty());
409 Subscription* subPtr = i->second;
410 const BookmarkRange& range = subPtr->getRange();
413 write(tmpFile, subId, ENTRY_BOOKMARK, range);
416 amps_uint64_t recentPub, recentSeq;
417 Subscription::parseBookmark(recent, recentPub, recentSeq);
418 Subscription::PublisherMap publishersDiscarded =
420 MemoryBookmarkStore::EntryPtrList recovered;
421 subPtr->getRecoveryEntries(recovered);
422 subPtr->setPublishersToDiscarded(&recovered,
423 &publishersDiscarded);
424 char tmpBookmarkBuffer[128];
425 for (Subscription::PublisherIterator pub =
426 publishersDiscarded.begin(),
427 e = publishersDiscarded.end();
431 if (pub->first == 0 || pub->second == 0)
436 if (pub->first == recentPub)
440 int written = AMPS_snprintf_amps_uint64_t(
442 sizeof(tmpBookmarkBuffer),
444 *(tmpBookmarkBuffer + written++) =
'|';
445 written += AMPS_snprintf_amps_uint64_t(
446 tmpBookmarkBuffer + written,
447 sizeof(tmpBookmarkBuffer)
450 *(tmpBookmarkBuffer + written++) =
'|';
452 write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
453 write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
455 if (isWritableBookmark(recent.
len()))
457 write(tmpFile, subId, ENTRY_BOOKMARK, recent);
458 write(tmpFile, subId, ENTRY_DISCARD, recent);
462 subPtr->getMostRecentList();
464 if (isWritableBookmark(subPtr->getLastPersisted().len()))
466 write(tmpFile, subId, ENTRY_PERSISTED,
467 subPtr->getLastPersisted());
469 subPtr->getActiveEntries(recovered);
470 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
472 entry != recovered.end(); ++entry)
474 if ((*entry)->_val.empty() ||
475 !isWritableBookmark((*entry)->_val.len()))
479 write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
480 if (!(*entry)->_active)
482 write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
487 catch (StoreException& ex)
490 CloseHandle(tmpFile);
491 DeleteFileA(tmpFileName_.c_str());
494 unlink(tmpFileName_.c_str());
496 std::ostringstream os;
497 os <<
"Exception during prune: " << ex.what();
498 throw StoreException(os.str());
502 CloseHandle(tmpFile);
503 _file = INVALID_HANDLE_VALUE;
504 tmpFile = INVALID_HANDLE_VALUE;
507 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
508 MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
510 DWORD err = getErrorNo();
511 if (--retryCount > 0)
516 std::string desiredFileName = _fileName;
517 _fileName = tmpFileName_;
519 std::ostringstream os;
520 os <<
"Failed to move completed temp file " << tmpFileName_
521 <<
" to " << desiredFileName
522 <<
" in prune in LoggedBookmarkStore. Continuing by using " 523 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
524 error(os.str(), err);
528 SetFilePointer(_file, 0, NULL, FILE_END);
532 if (-1 == ::unlink(_fileName.c_str()))
534 int err = getErrorNo();
536 std::string desiredFileName = _fileName;
537 _fileName = tmpFileName_;
539 std::ostringstream os;
540 os <<
"Failed to delete file " << desiredFileName
541 <<
" after creating temporary file " << tmpFileName_
542 <<
" in prune in LoggedBookmarkStore. Continuing by using " 543 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
544 error(os.str(), err);
547 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
549 int err = getErrorNo();
551 std::string desiredFileName = _fileName;
552 _fileName = tmpFileName_;
554 std::ostringstream os;
555 os <<
"Failed to move completed temp file " << tmpFileName_
556 <<
" to " << desiredFileName
557 <<
" in prune in LoggedBookmarkStore. Continuing by using " 558 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
559 error(os.str(), err);
564 if (-1 == ::fstat(_file, &fst))
566 int err = getErrorNo();
567 std::ostringstream os;
568 os <<
"Failed to get size of pruned file " << _fileName
569 <<
" in prune in LoggedBookmarkStore. ";
570 error(os.str(), err);
573 ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
578 virtual void _persisted(Subscription* subP_,
581 Lock<Mutex> guard(_lock);
582 write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
583 MemoryBookmarkStore::_persisted(subP_, bookmark_);
586 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
588 Lock<Mutex> l(_lock);
589 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
590 if (!entryPtr || entryPtr->_val.empty())
595 write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
596 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
597 return bookmarkField;
601 typedef DWORD ERRTYPE;
602 ERRTYPE getErrorNo()
const 604 return GetLastError();
607 void error(
const std::string& message_, ERRTYPE err)
609 std::ostringstream os;
610 static const DWORD msgSize = 2048;
612 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
613 FORMAT_MESSAGE_ARGUMENT_ARRAY,
614 NULL, err, LANG_NEUTRAL,
615 pMsg, msgSize, NULL);
616 os <<
"File: " << _fileName <<
". " << message_;
619 os <<
" with error " << pMsg;
621 throw StoreException(os.str());
625 ERRTYPE getErrorNo()
const 630 void error(
const std::string& message_, ERRTYPE err)
632 std::ostringstream os;
633 os <<
"File: " << _fileName <<
". " << message_;
636 os <<
" with error " << strerror(err);
639 throw StoreException(os.str());
646 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
647 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
648 if ( _file == INVALID_HANDLE_VALUE )
650 DWORD err = getErrorNo();
651 std::ostringstream os;
652 os <<
"Failed to initialize log file " << _fileName <<
" for LoggedBookmarkStore";
653 error(os.str(), err);
657 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
660 int err = getErrorNo();
661 std::ostringstream os;
662 os <<
"Failed to initialize log file " << _fileName <<
" for LoggedBookmarkStore";
663 error(os.str(), err);
672 void write(FileType file_,
const Message::Field& subId_,
char type_,
675 Lock<Mutex> guard(_fileLock);
676 if (!_recoveringFile && isWritableBookmark(bookmark_.
len()))
680 size_t len = subId_.
len();
681 BOOL ok = WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
682 ok |= WriteFile(file_, (LPVOID)subId_.
data(), (DWORD)len, &written, NULL);
683 ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
684 len = bookmark_.
len();
685 ok |= WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
686 ok |= WriteFile(file_, (LPVOID)bookmark_.
data(), (DWORD)len,
690 error(
"Failed to write to bookmark log.", getErrorNo());
697 file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
700 int err = getErrorNo();
701 std::ostringstream os;
702 os <<
"Failed to open file " << _fileName
703 <<
" for write in LoggedBookmarkStore. ";
704 error(os.str(), err);
708 struct iovec data[5];
709 size_t len = subId_.
len();
710 data[0].iov_base = (amps_iovec_base_ptr)(
void*)&len;
711 data[0].iov_len =
sizeof(size_t);
712 data[1].iov_base = (amps_iovec_base_ptr)(
void*)subId_.
data();
713 data[1].iov_len = len;
714 data[2].iov_base = (amps_iovec_base_ptr)(
void*)&type_;
716 size_t bookmarkLen = bookmark_.
len();
717 data[3].iov_base = (amps_iovec_base_ptr)(
void*)&bookmarkLen;
718 data[3].iov_len =
sizeof(size_t);
719 data[4].iov_base = (amps_iovec_base_ptr)(
void*)bookmark_.
data();
720 data[4].iov_len = bookmarkLen;
721 ssize_t written = ::writev(file_, data, 5);
724 error(
"Failed to write to bookmark log.", getErrorNo());
735 char type_,
size_t bookmark_)
737 Lock<Mutex> guard(_fileLock);
738 if (!_recoveringFile)
742 size_t len = subId_.
len();
743 BOOL ok = WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
744 ok |= WriteFile(file_, (LPVOID)subId_.
data(), (DWORD)len, &written, NULL);
745 ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
746 ok |= WriteFile(file_, (LPVOID)&bookmark_,
sizeof(
size_t),
750 error(
"Failed to write bookmark sequence to file.", getErrorNo());
757 file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
760 int err = getErrorNo();
761 std::ostringstream os;
762 os <<
"Failed to open file " << _fileName
763 <<
" to write bookmark sequence in LoggedBookmarkStore. ";
764 error(os.str(), err);
768 struct iovec data[4];
769 size_t len = subId_.
len();
770 data[0].iov_base = (amps_iovec_base_ptr)(
void*)&len;
771 data[0].iov_len =
sizeof(size_t);
772 data[1].iov_base = (amps_iovec_base_ptr)(
void*)subId_.
data();
773 data[1].iov_len = len;
774 data[2].iov_base = (amps_iovec_base_ptr)(
void*)&type_;
776 data[3].iov_base = (amps_iovec_base_ptr)(
void*)&bookmark_;
777 data[3].iov_len =
sizeof(size_t);
778 ssize_t written = ::writev(file_, data, 4);
781 error(
"Failed to write bookmark sequence to file.", getErrorNo());
789 #define VOID_P(buf) (LPVOID)buf 790 bool readFileBytes(LPVOID buffer,
size_t numBytes, DWORD* bytesRead)
792 return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
795 #define VOID_P(buf) (void*)buf 796 bool readFileBytes(
void* buffer,
size_t numBytes, ssize_t* bytesRead)
798 *bytesRead = ::read(_file, buffer, numBytes);
799 return (*bytesRead >= 0);
803 void recover(
bool useLastModifiedTime_,
bool hasAdapter_)
805 size_t bufferLen = 128;
806 char* buffer =
new char[bufferLen];
807 size_t subIdBufferLen = 128;
808 char* subIdBuffer =
new char[bufferLen];
812 size_t bookmarkLen = 0;
813 Lock<Mutex> l(_lock);
814 Lock<Mutex> guard(_fileLock);
815 _recoveringFile =
true;
816 char* fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
817 fileTimestamp[0] =
'\0';
819 LARGE_INTEGER lifileSize;
820 if (GetFileSizeEx(_file, &lifileSize) == 0)
822 DWORD err = getErrorNo();
824 delete[] subIdBuffer;
825 _recoveringFile =
false;
826 error(
"Failure getting file size while trying to recover.", err);
830 size_t fileSize = lifileSize.QuadPart;
832 size_t fileSize = lifileSize.LowPart;
834 if (useLastModifiedTime_ && fileSize > 0)
836 FILETIME ftModifiedTime;
837 if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
839 DWORD err = getErrorNo();
841 delete[] subIdBuffer;
842 _recoveringFile =
false;
843 error(
"Failure getting file time while trying to recover.", err);
847 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
849 DWORD err = getErrorNo();
851 delete[] subIdBuffer;
852 _recoveringFile =
false;
853 error(
"Failure converting file time while trying to recover.", err);
856 sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
857 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
858 st.wDay, st.wHour, st.wMinute, st.wSecond);
859 fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
861 else if (fileSize == 0)
863 delete[] fileTimestamp;
865 delete[] subIdBuffer;
866 _recoveringFile =
false;
871 SetFilePointer(_file, 0, NULL, FILE_BEGIN);
874 ::fstat(_file, &fst);
875 ssize_t fileSize = fst.st_size;
876 ssize_t readBytes = 0;
877 if (useLastModifiedTime_ && fileSize > 0)
880 gmtime_r(&fst.st_mtime, &timeInfo);
881 strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
882 "%Y%m%dT%H%M%S", &timeInfo);
883 fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
885 else if (fileSize == 0)
887 delete[] fileTimestamp;
889 delete[] subIdBuffer;
890 _recoveringFile =
false;
894 ::lseek(_file, loc, SEEK_SET);
899 MemoryBookmarkStore::__purge();
901 if (!readFileBytes(VOID_P(&subLen),
sizeof(
size_t), &readBytes)
904 delete[] fileTimestamp;
906 delete[] subIdBuffer;
907 _recoveringFile =
false;
908 error(
"Failure reading file while trying to recover.", getErrorNo());
912 size_t totalBytes = readBytes;
914 ssize_t totalBytes = readBytes;
917 size_t tooManyBytes = 0;
919 Message::Field::FieldHash> BookmarkMap;
920 typedef std::map<Message::Field, size_t,
921 Message::Field::FieldHash>::iterator BookmarkMapIter;
923 typedef std::map<Message::Field, BookmarkMap*,
924 Message::Field::FieldHash> ReadMap;
925 typedef std::map<Message::Field, BookmarkMap*,
926 Message::Field::FieldHash>::iterator ReadMapIter;
928 while (subLen > 0 && (
size_t)readBytes ==
sizeof(
size_t) &&
929 (
size_t)totalBytes <= (
size_t)fileSize)
931 if (subLen >= ((
size_t)fileSize - (
size_t)totalBytes)
935 tooManyBytes = subLen + 1;
940 if (subIdBufferLen < subLen)
942 delete [] subIdBuffer;
943 subIdBufferLen = 2 * subLen;
944 subIdBuffer =
new char[subIdBufferLen];
946 if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
949 tooManyBytes = subLen;
952 totalBytes += readBytes;
953 sub.assign(subIdBuffer, subLen);
954 if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
960 totalBytes += readBytes;
965 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
969 tooManyBytes =
sizeof(size_t);
972 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
975 tooManyBytes =
sizeof(size_t);
978 totalBytes += readBytes;
979 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
983 tooManyBytes = bookmarkLen;
986 if (bufferLen < bookmarkLen)
989 bufferLen = 2 * bookmarkLen;
990 buffer =
new char[bufferLen];
992 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
995 tooManyBytes = bookmarkLen;
998 totalBytes += readBytes;
999 bookmarkField.assign(buffer, bookmarkLen);
1000 Subscription* subP = find(sub);
1001 BookmarkMap* bookmarks = NULL;
1002 ReadMapIter iter = recovered.find(sub);
1003 if (iter == recovered.end())
1005 Message::Field subKey;
1007 bookmarks =
new BookmarkMap();
1008 recovered[subKey] = bookmarks;
1012 bookmarks = iter->second;
1014 if (bookmarks->find(bookmarkField) != bookmarks->end())
1016 std::for_each(bookmarks->begin(), bookmarks->end(),
1019 subP->getMostRecent(
true);
1021 if (BookmarkRange::isRange(bookmarkField))
1023 subP->log(bookmarkField);
1025 else if (!subP->isDiscarded(bookmarkField))
1027 size_t sequence = subP->log(bookmarkField);
1028 Message::Field copy;
1030 bookmarks->insert(std::make_pair(copy, sequence));
1036 Message::Field copy;
1038 bookmarks->insert(std::make_pair(copy, 0));
1045 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
1048 err = (ERRTYPE) - 1;
1049 tooManyBytes =
sizeof(size_t);
1052 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
1055 tooManyBytes =
sizeof(size_t);
1058 totalBytes += readBytes;
1059 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
1062 err = (ERRTYPE) - 1;
1063 tooManyBytes = bookmarkLen;
1066 if (bufferLen < bookmarkLen)
1069 bufferLen = 2 * bookmarkLen;
1070 buffer =
new char[bufferLen];
1072 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1075 tooManyBytes = bookmarkLen;
1078 totalBytes += readBytes;
1079 bookmarkField.assign(buffer, bookmarkLen);
1080 size_t sequence = AMPS_UNSET_INDEX;
1081 ReadMapIter iter = recovered.find(sub);
1082 if (iter != recovered.end())
1084 BookmarkMap* bookmarks = iter->second;
1085 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1086 if (bookmarkIter != bookmarks->end())
1088 sequence = bookmarkIter->second;
1089 Message::Field bookmarkToClear(bookmarkIter->first);
1090 bookmarkToClear.
clear();
1091 bookmarks->erase(bookmarkIter);
1094 Subscription* subP = find(sub);
1095 if (!BookmarkRange::isRange(bookmarkField))
1097 if (sequence != AMPS_UNSET_INDEX)
1102 subP->discard(sequence);
1107 subP->discard(bookmarkField);
1112 case ENTRY_PERSISTED:
1114 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
1117 err = (ERRTYPE) - 1;
1118 tooManyBytes =
sizeof(size_t);
1121 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
1124 tooManyBytes =
sizeof(size_t);
1127 totalBytes += readBytes;
1128 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
1131 err = (ERRTYPE) - 1;
1132 tooManyBytes = bookmarkLen;
1135 if (bufferLen < bookmarkLen)
1138 bufferLen = 2 * bookmarkLen;
1139 buffer =
new char[bufferLen];
1141 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1144 tooManyBytes = bookmarkLen;
1147 totalBytes += readBytes;
1148 bookmarkField.assign(buffer, bookmarkLen);
1149 Subscription* subP = find(sub);
1150 MemoryBookmarkStore::_persisted(subP, bookmarkField);
1156 err = (ERRTYPE) - 1;
1157 tooManyBytes = (size_t)fileSize - (
size_t)totalBytes;
1162 loc = (OFF_T)totalBytes;
1163 if ((
size_t)totalBytes > (size_t)fileSize)
1165 loc = (OFF_T)fileSize;
1168 if (!readFileBytes(VOID_P(&subLen),
sizeof(size_t), &readBytes))
1171 tooManyBytes =
sizeof(size_t);
1174 totalBytes += readBytes;
1177 delete[] subIdBuffer;
1180 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1182 if (recovered.count(i->first) && !recovered[i->first]->empty())
1184 Subscription* subPtr = i->second;
1185 if (subPtr->getMostRecent(
false).len() > 1)
1187 subPtr->justRecovered();
1194 _subs[i->first] =
new Subscription(
this, i->first);
1197 if (useLastModifiedTime_ && fileTimestamp[0] !=
'\0')
1199 _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1203 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1205 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1207 Message::Field f = i->first;
1210 delete[] fileTimestamp;
1211 _recoveringFile =
false;
1217 if (err != (ERRTYPE) - 1 || loc == 0 || fileSize - loc > 128)
1219 std::ostringstream os;
1220 os <<
"Error while recovering LoggedBookmarkStore from " 1222 <<
". Record starting at " << loc
1223 <<
" reading at " << totalBytes
1224 <<
" requested " << tooManyBytes
1225 <<
" and file size is " << fileSize;
1226 error(os.str(), (err != (ERRTYPE) - 1 ? err : 0));
1232 LONG low = (LONG)loc;
1233 LONG high = (LONG)((loc >> 32) & 0xffffffff);
1234 SetFilePointer(_file, low, &high, FILE_BEGIN);
1236 SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1239 ::lseek(_file, loc, SEEK_SET);
1248 std::string _fileName;
1249 bool _recoveringFile;
1255 #endif // _LOGGEDBOOKMARKSTORE_H_ virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:277
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:344
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: LoggedBookmarkStore.hpp:246
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:349
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1387
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:336
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:299
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:263
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:206
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:203
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:102
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:226