28 #ifndef _LOGGEDBOOKMARKSTORE_H_ 29 #define _LOGGEDBOOKMARKSTORE_H_ 42 #include <sys/types.h> 49 typedef char* amps_iovec_base_ptr;
51 typedef void* amps_iovec_base_ptr;
67 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
73 typedef HANDLE FileType;
89 bool useLastModifiedTime_ =
false)
92 , _file(INVALID_HANDLE_VALUE)
96 , _fileName(fileName_)
99 recover(useLastModifiedTime_,
false);
110 bool useLastModifiedTime_ =
false)
113 , _file(INVALID_HANDLE_VALUE)
117 , _fileName(fileName_)
120 recover(useLastModifiedTime_,
false);
138 const char* fileName_,
139 RecoveryPointFactory factory_ = NULL,
140 bool useLastModifiedTime_ =
false)
143 , _file(INVALID_HANDLE_VALUE)
147 , _fileName(fileName_)
150 recover(useLastModifiedTime_,
true);
165 const std::string& fileName_,
166 RecoveryPointFactory factory_ = NULL,
167 bool useLastModifiedTime_ =
false)
170 , _file(INVALID_HANDLE_VALUE)
174 , _fileName(fileName_)
177 recover(useLastModifiedTime_,
true);
185 _recoveringFile =
true;
205 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
206 Lock<Mutex> guard(_lock);
215 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
217 write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
218 return MemoryBookmarkStore::_log(message_);
233 Lock<Mutex> guard(_lock);
234 write(_file, subId, ENTRY_DISCARD, bookmark);
235 MemoryBookmarkStore::_discard(message_);
247 Lock<Mutex> l(_lock);
248 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
249 if (!entry || entry->_val.empty())
253 write(_file, subId_, ENTRY_DISCARD, entry->_val);
254 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
264 Lock<Mutex> l(_lock);
265 return MemoryBookmarkStore::_getMostRecent(subId_);
278 Lock<Mutex> l(_lock);
279 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
287 write(_file, subId, ENTRY_BOOKMARK, message_.
getBookmark());
288 write(_file, subId, ENTRY_DISCARD, message_.
getBookmark());
300 Lock<Mutex> guard(_lock);
302 if (_file != INVALID_HANDLE_VALUE)
306 DeleteFileA(_fileName.c_str());
307 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
308 NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
309 if ( _file == INVALID_HANDLE_VALUE )
311 DWORD err = getErrorNo();
312 std::ostringstream os;
313 os <<
"Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName <<
" for LoggedBookmarkStore";
314 error(os.str(), err);
319 ::unlink(_fileName.c_str());
320 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
323 error(
"Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
327 MemoryBookmarkStore::_purge();
337 Lock<Mutex> guard(_lock);
338 MemoryBookmarkStore::_purge(subId_);
339 std::string tmpFileName = _fileName +
".tmp";
340 __prune(tmpFileName);
354 void _prune(
const std::string& tmpFileName_)
356 Lock<Mutex> guard(_lock);
362 if (tmpFileName_.empty())
364 __prune(_fileName +
".tmp");
368 __prune(tmpFileName_);
370 _recentChanged =
false;
373 void __prune(
const std::string& tmpFileName_)
377 tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
378 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
379 if (tmpFile == INVALID_HANDLE_VALUE )
381 DWORD err = getErrorNo();
382 std::ostringstream os;
383 os <<
"Failed to create temp log file " << tmpFileName_ <<
384 " to prune LoggedBookmarkStore " << _fileName;
385 error(os.str(), err);
390 tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
393 int err = getErrorNo();
394 std::ostringstream os;
395 os <<
"Failed to create temp log file " << tmpFileName_ <<
396 " to prune LoggedBookmarkStore " << _fileName;
397 error(os.str(), err);
403 for (SubscriptionMap::iterator i = _subs.begin();
404 i != _subs.end(); ++i)
407 assert(!subId.
empty());
408 Subscription* subPtr = i->second;
409 const BookmarkRange& range = subPtr->getRange();
412 write(tmpFile, subId, ENTRY_BOOKMARK, range);
415 amps_uint64_t recentPub, recentSeq;
416 Subscription::parseBookmark(recent, recentPub, recentSeq);
417 Subscription::PublisherMap publishersDiscarded =
419 MemoryBookmarkStore::EntryPtrList recovered;
420 subPtr->getRecoveryEntries(recovered);
421 subPtr->setPublishersToDiscarded(&recovered,
422 &publishersDiscarded);
423 char tmpBookmarkBuffer[128];
424 for (Subscription::PublisherIterator pub =
425 publishersDiscarded.begin(),
426 e = publishersDiscarded.end();
430 if (pub->first == 0 || pub->second == 0)
435 if (pub->first == recentPub)
439 int written = AMPS_snprintf_amps_uint64_t(
441 sizeof(tmpBookmarkBuffer),
443 *(tmpBookmarkBuffer + written++) =
'|';
444 written += AMPS_snprintf_amps_uint64_t(
445 tmpBookmarkBuffer + written,
446 sizeof(tmpBookmarkBuffer)
449 *(tmpBookmarkBuffer + written++) =
'|';
451 write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
452 write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
454 if (isWritableBookmark(recent.
len()))
456 write(tmpFile, subId, ENTRY_BOOKMARK, recent);
457 write(tmpFile, subId, ENTRY_DISCARD, recent);
461 subPtr->getMostRecentList();
463 if (isWritableBookmark(subPtr->getLastPersisted().len()))
465 write(tmpFile, subId, ENTRY_PERSISTED,
466 subPtr->getLastPersisted());
468 subPtr->getActiveEntries(recovered);
469 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
471 entry != recovered.end(); ++entry)
473 if ((*entry)->_val.empty() ||
474 !isWritableBookmark((*entry)->_val.len()))
478 write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
479 if (!(*entry)->_active)
481 write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
486 catch (StoreException& ex)
489 CloseHandle(tmpFile);
490 DeleteFileA(tmpFileName_.c_str());
493 unlink(tmpFileName_.c_str());
495 std::ostringstream os;
496 os <<
"Exception during prune: " << ex.what();
497 throw StoreException(os.str());
501 CloseHandle(tmpFile);
502 _file = INVALID_HANDLE_VALUE;
503 tmpFile = INVALID_HANDLE_VALUE;
506 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
507 MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
509 DWORD err = getErrorNo();
510 if (--retryCount > 0)
515 std::string desiredFileName = _fileName;
516 _fileName = tmpFileName_;
518 std::ostringstream os;
519 os <<
"Failed to move completed temp file " << tmpFileName_
520 <<
" to " << desiredFileName
521 <<
" in prune in LoggedBookmarkStore. Continuing by using " 522 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
523 error(os.str(), err);
527 SetFilePointer(_file, 0, NULL, FILE_END);
531 if (-1 == ::unlink(_fileName.c_str()))
533 int err = getErrorNo();
535 std::string desiredFileName = _fileName;
536 _fileName = tmpFileName_;
538 std::ostringstream os;
539 os <<
"Failed to delete file " << desiredFileName
540 <<
" after creating temporary file " << tmpFileName_
541 <<
" in prune in LoggedBookmarkStore. Continuing by using " 542 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
543 error(os.str(), err);
546 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
548 int err = getErrorNo();
550 std::string desiredFileName = _fileName;
551 _fileName = tmpFileName_;
553 std::ostringstream os;
554 os <<
"Failed to move completed temp file " << tmpFileName_
555 <<
" to " << desiredFileName
556 <<
" in prune in LoggedBookmarkStore. Continuing by using " 557 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
558 error(os.str(), err);
563 if (-1 == ::fstat(_file, &fst))
565 int err = getErrorNo();
566 std::ostringstream os;
567 os <<
"Failed to get size of pruned file " << _fileName
568 <<
" in prune in LoggedBookmarkStore. ";
569 error(os.str(), err);
572 ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
577 virtual void _persisted(Subscription* subP_,
580 Lock<Mutex> guard(_lock);
581 write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
582 MemoryBookmarkStore::_persisted(subP_, bookmark_);
585 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
587 Lock<Mutex> l(_lock);
588 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
589 if (!entryPtr || entryPtr->_val.empty())
594 write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
595 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
596 return bookmarkField;
600 typedef DWORD ERRTYPE ;
601 ERRTYPE getErrorNo()
const 603 return GetLastError();
606 void error(
const std::string& message_, ERRTYPE err)
608 std::ostringstream os;
609 static const DWORD msgSize = 2048;
611 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
612 FORMAT_MESSAGE_ARGUMENT_ARRAY,
613 NULL, err, LANG_NEUTRAL,
614 pMsg, msgSize, NULL);
615 os <<
"File: " << _fileName <<
". " << message_;
618 os <<
" with error " << pMsg;
620 throw StoreException(os.str());
624 ERRTYPE getErrorNo()
const 629 void error(
const std::string& message_, ERRTYPE err)
631 std::ostringstream os;
632 os <<
"File: " << _fileName <<
". " << message_;
635 os <<
" with error " << strerror(err);
638 throw StoreException(os.str());
645 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
646 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
647 if ( _file == INVALID_HANDLE_VALUE )
649 DWORD err = getErrorNo();
650 std::ostringstream os;
651 os <<
"Failed to initialize log file " << _fileName <<
" for LoggedBookmarkStore";
652 error(os.str(), err);
656 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
659 int err = getErrorNo();
660 std::ostringstream os;
661 os <<
"Failed to initialize log file " << _fileName <<
" for LoggedBookmarkStore";
662 error(os.str(), err);
671 void write(FileType file_,
const Message::Field& subId_,
char type_,
674 Lock<Mutex> guard(_fileLock);
675 if (!_recoveringFile && isWritableBookmark(bookmark_.
len()))
679 size_t len = subId_.
len();
680 BOOL ok = WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
681 ok |= WriteFile(file_, (LPVOID)subId_.
data(), (DWORD)len, &written, NULL);
682 ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
683 len = bookmark_.
len();
684 ok |= WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
685 ok |= WriteFile(file_, (LPVOID)bookmark_.
data(), (DWORD)len,
689 error(
"Failed to write to bookmark log.", getErrorNo());
696 file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
699 int err = getErrorNo();
700 std::ostringstream os;
701 os <<
"Failed to open file " << _fileName
702 <<
" for write in LoggedBookmarkStore. ";
703 error(os.str(), err);
707 struct iovec data[5];
708 size_t len = subId_.
len();
709 data[0].iov_base = (amps_iovec_base_ptr)(
void*)&len;
710 data[0].iov_len =
sizeof(size_t);
711 data[1].iov_base = (amps_iovec_base_ptr)(
void*)subId_.
data();
712 data[1].iov_len = len;
713 data[2].iov_base = (amps_iovec_base_ptr)(
void*)&type_;
715 size_t bookmarkLen = bookmark_.
len();
716 data[3].iov_base = (amps_iovec_base_ptr)(
void*)&bookmarkLen;
717 data[3].iov_len =
sizeof(size_t);
718 data[4].iov_base = (amps_iovec_base_ptr)(
void*)bookmark_.
data();
719 data[4].iov_len = bookmarkLen;
720 ssize_t written = ::writev(file_, data, 5);
723 error(
"Failed to write to bookmark log.", getErrorNo());
734 char type_,
size_t bookmark_)
736 Lock<Mutex> guard(_fileLock);
737 if (!_recoveringFile)
741 size_t len = subId_.
len();
742 BOOL ok = WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
743 ok |= WriteFile(file_, (LPVOID)subId_.
data(), (DWORD)len, &written, NULL);
744 ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
745 ok |= WriteFile(file_, (LPVOID)&bookmark_,
sizeof(
size_t),
749 error(
"Failed to write bookmark sequence to file.", getErrorNo());
756 file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
759 int err = getErrorNo();
760 std::ostringstream os;
761 os <<
"Failed to open file " << _fileName
762 <<
" to write bookmark sequence in LoggedBookmarkStore. ";
763 error(os.str(), err);
767 struct iovec data[4];
768 size_t len = subId_.
len();
769 data[0].iov_base = (amps_iovec_base_ptr)(
void*)&len;
770 data[0].iov_len =
sizeof(size_t);
771 data[1].iov_base = (amps_iovec_base_ptr)(
void*)subId_.
data();
772 data[1].iov_len = len;
773 data[2].iov_base = (amps_iovec_base_ptr)(
void*)&type_;
775 data[3].iov_base = (amps_iovec_base_ptr)(
void*)&bookmark_;
776 data[3].iov_len =
sizeof(size_t);
777 ssize_t written = ::writev(file_, data, 4);
780 error(
"Failed to write bookmark sequence to file.", getErrorNo());
788 #define VOID_P(buf) (LPVOID)buf 789 bool readFileBytes(LPVOID buffer,
size_t numBytes, DWORD* bytesRead)
791 return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
794 #define VOID_P(buf) (void*)buf 795 bool readFileBytes(
void* buffer,
size_t numBytes, ssize_t* bytesRead)
797 *bytesRead = ::read(_file, buffer, numBytes);
798 return (*bytesRead >= 0);
802 void recover(
bool useLastModifiedTime_,
bool hasAdapter_)
804 size_t bufferLen = 128;
805 char* buffer =
new char[bufferLen];
806 size_t subIdBufferLen = 128;
807 char* subIdBuffer =
new char[bufferLen];
811 size_t bookmarkLen = 0;
812 Lock<Mutex> l(_lock);
813 Lock<Mutex> guard(_fileLock);
814 _recoveringFile =
true;
815 char* fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
816 fileTimestamp[0] =
'\0';
818 LARGE_INTEGER lifileSize;
819 if (GetFileSizeEx(_file, &lifileSize) == 0)
821 DWORD err = getErrorNo();
823 delete[] subIdBuffer;
824 _recoveringFile =
false;
825 error(
"Failure getting file size while trying to recover.", err);
829 size_t fileSize = lifileSize.QuadPart;
831 size_t fileSize = lifileSize.LowPart;
833 if (useLastModifiedTime_ && fileSize > 0)
835 FILETIME ftModifiedTime;
836 if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
838 DWORD err = getErrorNo();
840 delete[] subIdBuffer;
841 _recoveringFile =
false;
842 error(
"Failure getting file time while trying to recover.", err);
846 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
848 DWORD err = getErrorNo();
850 delete[] subIdBuffer;
851 _recoveringFile =
false;
852 error(
"Failure converting file time while trying to recover.", err);
855 sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
856 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
857 st.wDay, st.wHour, st.wMinute, st.wSecond);
858 fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
860 else if (fileSize == 0)
862 delete[] fileTimestamp;
864 delete[] subIdBuffer;
865 _recoveringFile =
false;
870 SetFilePointer(_file, 0, NULL, FILE_BEGIN);
873 ::fstat(_file, &fst);
874 ssize_t fileSize = fst.st_size;
875 ssize_t readBytes = 0;
876 if (useLastModifiedTime_ && fileSize > 0)
879 gmtime_r(&fst.st_mtime, &timeInfo);
880 strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
881 "%Y%m%dT%H%M%S", &timeInfo);
882 fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
884 else if (fileSize == 0)
886 delete[] fileTimestamp;
888 delete[] subIdBuffer;
889 _recoveringFile =
false;
893 ::lseek(_file, loc, SEEK_SET);
898 MemoryBookmarkStore::__purge();
900 if (!readFileBytes(VOID_P(&subLen),
sizeof(
size_t), &readBytes)
903 delete[] fileTimestamp;
905 delete[] subIdBuffer;
906 _recoveringFile =
false;
907 error(
"Failure reading file while trying to recover.", getErrorNo());
911 size_t totalBytes = readBytes;
913 ssize_t totalBytes = readBytes;
916 size_t tooManyBytes = 0;
918 Message::Field::FieldHash> BookmarkMap;
919 typedef std::map<Message::Field, size_t,
920 Message::Field::FieldHash>::iterator BookmarkMapIter;
922 typedef std::map<Message::Field, BookmarkMap*,
923 Message::Field::FieldHash> ReadMap;
924 typedef std::map<Message::Field, BookmarkMap*,
925 Message::Field::FieldHash>::iterator ReadMapIter;
927 while (subLen > 0 && (
size_t)readBytes ==
sizeof(
size_t) &&
928 (
size_t)totalBytes <= (
size_t)fileSize)
930 if (subLen >= ((
size_t)fileSize - (
size_t)totalBytes)
934 tooManyBytes = subLen + 1;
939 if (subIdBufferLen < subLen)
941 delete [] subIdBuffer;
942 subIdBufferLen = 2 * subLen;
943 subIdBuffer =
new char[subIdBufferLen];
945 if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
948 tooManyBytes = subLen;
951 totalBytes += readBytes;
952 sub.assign(subIdBuffer, subLen);
953 if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
959 totalBytes += readBytes;
964 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
968 tooManyBytes =
sizeof(size_t);
971 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
974 tooManyBytes =
sizeof(size_t);
977 totalBytes += readBytes;
978 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
982 tooManyBytes = bookmarkLen;
985 if (bufferLen < bookmarkLen)
988 bufferLen = 2 * bookmarkLen;
989 buffer =
new char[bufferLen];
991 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
994 tooManyBytes = bookmarkLen;
997 totalBytes += readBytes;
998 bookmarkField.assign(buffer, bookmarkLen);
999 Subscription* subP = find(sub);
1000 BookmarkMap* bookmarks = NULL;
1001 ReadMapIter iter = recovered.find(sub);
1002 if (iter == recovered.end())
1004 Message::Field subKey;
1006 bookmarks =
new BookmarkMap();
1007 recovered[subKey] = bookmarks;
1011 bookmarks = iter->second;
1013 if (bookmarks->find(bookmarkField) != bookmarks->end())
1015 std::for_each(bookmarks->begin(), bookmarks->end(),
1018 subP->getMostRecent(
true);
1020 if (BookmarkRange::isRange(bookmarkField))
1022 subP->log(bookmarkField);
1024 else if (!subP->isDiscarded(bookmarkField))
1026 size_t sequence = subP->log(bookmarkField);
1027 Message::Field copy;
1029 bookmarks->insert(std::make_pair(copy, sequence));
1035 Message::Field copy;
1037 bookmarks->insert(std::make_pair(copy, 0));
1044 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
1047 err = (ERRTYPE) - 1;
1048 tooManyBytes =
sizeof(size_t);
1051 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
1054 tooManyBytes =
sizeof(size_t);
1057 totalBytes += readBytes;
1058 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
1061 err = (ERRTYPE) - 1;
1062 tooManyBytes = bookmarkLen;
1065 if (bufferLen < bookmarkLen)
1068 bufferLen = 2 * bookmarkLen;
1069 buffer =
new char[bufferLen];
1071 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1074 tooManyBytes = bookmarkLen;
1077 totalBytes += readBytes;
1078 bookmarkField.assign(buffer, bookmarkLen);
1079 size_t sequence = AMPS_UNSET_INDEX;
1080 ReadMapIter iter = recovered.find(sub);
1081 if (iter != recovered.end())
1083 BookmarkMap* bookmarks = iter->second;
1084 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1085 if (bookmarkIter != bookmarks->end())
1087 sequence = bookmarkIter->second;
1088 Message::Field bookmarkToClear(bookmarkIter->first);
1089 bookmarkToClear.
clear();
1090 bookmarks->erase(bookmarkIter);
1093 Subscription* subP = find(sub);
1094 if (!BookmarkRange::isRange(bookmarkField))
1096 if (sequence != AMPS_UNSET_INDEX)
1101 subP->discard(sequence);
1106 subP->discard(bookmarkField);
1111 case ENTRY_PERSISTED:
1113 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
1116 err = (ERRTYPE) - 1;
1117 tooManyBytes =
sizeof(size_t);
1120 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
1123 tooManyBytes =
sizeof(size_t);
1126 totalBytes += readBytes;
1127 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
1130 err = (ERRTYPE) - 1;
1131 tooManyBytes = bookmarkLen;
1134 if (bufferLen < bookmarkLen)
1137 bufferLen = 2 * bookmarkLen;
1138 buffer =
new char[bufferLen];
1140 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1143 tooManyBytes = bookmarkLen;
1146 totalBytes += readBytes;
1147 bookmarkField.assign(buffer, bookmarkLen);
1148 Subscription* subP = find(sub);
1149 MemoryBookmarkStore::_persisted(subP, bookmarkField);
1155 err = (ERRTYPE) - 1;
1156 tooManyBytes = (size_t)fileSize - (
size_t)totalBytes;
1161 loc = (OFF_T)totalBytes;
1162 if ((
size_t)totalBytes > (size_t)fileSize)
1164 loc = (OFF_T)fileSize;
1167 if (!readFileBytes(VOID_P(&subLen),
sizeof(size_t), &readBytes))
1170 tooManyBytes =
sizeof(size_t);
1173 totalBytes += readBytes;
1176 delete[] subIdBuffer;
1179 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1181 if (recovered.count(i->first) && !recovered[i->first]->empty())
1183 Subscription* subPtr = i->second;
1184 if (subPtr->getMostRecent(
false).len() > 1)
1186 subPtr->justRecovered();
1193 _subs[i->first] =
new Subscription(
this, i->first);
1196 if (useLastModifiedTime_ && fileTimestamp[0] !=
'\0')
1198 _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1202 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1204 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1206 Message::Field f = i->first;
1209 delete[] fileTimestamp;
1210 _recoveringFile =
false;
1216 if (err != (ERRTYPE) - 1 || loc == 0 || fileSize - loc > 128)
1218 std::ostringstream os;
1219 os <<
"Error while recovering LoggedBookmarkStore from " 1221 <<
". Record starting at " << loc
1222 <<
" reading at " << totalBytes
1223 <<
" requested " << tooManyBytes
1224 <<
" and file size is " << fileSize;
1225 error(os.str(), (err != (ERRTYPE) - 1 ? err : 0));
1231 LONG low = (LONG)loc;
1232 LONG high = (LONG)((loc >> 32) & 0xffffffff);
1233 SetFilePointer(_file, low, &high, FILE_BEGIN);
1235 SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1238 ::lseek(_file, loc, SEEK_SET);
1247 std::string _fileName;
1248 bool _recoveringFile;
1254 #endif // _LOGGEDBOOKMARKSTORE_H_ virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:276
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:343
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: LoggedBookmarkStore.hpp:245
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:348
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1355
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:335
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:298
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:262
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:206
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:202
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:106
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:225