26 #ifndef _RINGBOOKMARKSTORE_H_ 27 #define _RINGBOOKMARKSTORE_H_ 29 #define AMPS_RING_POSITIONS 3 31 #define AMPS_RING_BYTES_BOOKMARK (AMPS_MAX_BOOKMARK_LEN * 6 + 8) 32 #define AMPS_RING_ENTRY_SIZE 1024 33 #define AMPS_RING_BYTES_SUBID ( AMPS_RING_ENTRY_SIZE - ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) ) 34 #define AMPS_RING_ENTRIES 32 43 #include <sys/types.h> 47 #if !defined(MREMAP_MAYMOVE) 48 #define MREMAP_MAYMOVE 0 70 struct SubscriptionPosition
84 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
88 , _ringRecovering(true)
96 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
100 , _ringRecovering(
true)
102 init(fileName_.c_str());
108 UnmapViewOfFile(_log);
110 CloseHandle(_mapFile);
111 _mapFile = INVALID_HANDLE_VALUE;
113 _file = INVALID_HANDLE_VALUE;
115 munmap(_log, _fileSize);
122 _ringRecovering =
true;
131 Lock<Mutex> guard(_lock);
132 size_t ret = MemoryBookmarkStore::_log(message_);
133 if (BookmarkRange::isRange(message_.
getBookmark()))
140 write(subId, MemoryBookmarkStore::_getMostRecent(subId,
false));
152 Lock<Mutex> guard(_lock);
153 if (MemoryBookmarkStore::_discard(message_) && _recentChanged)
160 write(subId, MemoryBookmarkStore::_getMostRecent(subId,
false));
161 _recentChanged =
false;
174 Lock<Mutex> guard(_lock);
175 if (MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_)
178 write(subId_, MemoryBookmarkStore::_getMostRecent(subId_,
false));
179 _recentChanged =
false;
191 Lock<Mutex> guard(_lock);
192 MemoryBookmarkStore::_persisted(find(subId_), bookmark_);
195 write(subId_, MemoryBookmarkStore::_getMostRecent(subId_,
false));
196 _recentChanged =
false;
207 Lock<Mutex> guard(_lock);
208 return MemoryBookmarkStore::_getMostRecent(subId_);
218 Lock<Mutex> guard(_lock);
219 _positionMap.clear();
220 memset(_log, 0, _fileSize);
221 MemoryBookmarkStore::_purge();
232 Lock<Mutex> guard(_lock);
233 Lock<Mutex> fileGuard(_fileLock);
234 Lock<Mutex> posGuard(_posLock);
235 if (_positionMap.count(subId_) == 0)
240 MemoryBookmarkStore::_purge(subId_);
242 SubscriptionPosition pos = _positionMap[subId_];
243 memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
244 AMPS_RING_ENTRY_SIZE);
247 for (
size_t index = pos._index; index < _currentIndex - 1; ++index)
249 char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
250 memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
251 char* end = (
char*)memchr(start,
'\0', AMPS_RING_BYTES_SUBID);
256 sub.assign(start, (
size_t)(end - start));
257 _positionMap[sub]._index = index;
259 _positionMap.erase(subId_);
263 memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
264 AMPS_RING_ENTRY_SIZE);
268 void init(
const char* fileName_)
271 _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
272 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
273 if ( _file == INVALID_HANDLE_VALUE )
275 DWORD err = getErrorNo();
276 std::ostringstream os;
277 os <<
"Failed to create file " << fileName_ <<
" for RingBookmarkStore\n";
278 error(os.str(), err);
280 LARGE_INTEGER liFileSize;
281 if (GetFileSizeEx(_file, &liFileSize) == 0)
283 error(
"Failure getting file size for RingBookmarkStore.", getErrorNo());
285 DWORD fsLow = liFileSize.LowPart;
286 DWORD fsHigh = liFileSize.HighPart;
288 size_t fileSize = liFileSize.QuadPart;
290 size_t fileSize = liFileSize.LowPart;
292 size_t existingSize = AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE;
293 if (existingSize > fileSize)
295 fsLow = (DWORD)existingSize;
297 fsHigh = (DWORD)(existingSize >> 32);
300 setFileSize(fsHigh, fsLow);
302 _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
305 int err = getErrorNo();
306 std::ostringstream os;
307 os <<
"Failed to open log file " << fileName_ <<
" for RingBookmarkStore";
308 error(os.str(), err);
311 if (fstat(_fd, &statBuf) == -1)
313 int err = getErrorNo();
314 std::ostringstream os;
315 os <<
"Failed to stat log file " << fileName_ <<
" for RingBookmarkStore";
316 error(os.str(), err);
318 size_t fSize = (size_t)statBuf.st_size;
320 if (::write(_fd,
"\0\0\0\0", 4) != 4)
322 error(
"Failed to initialize empty file.", getErrorNo());
324 setFileSize((fSize > AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE ?
325 fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
331 DWORD getErrorNo()
const 333 return GetLastError();
336 void error(
const std::string& message_, DWORD err)
338 std::ostringstream os;
339 static const DWORD msgSize = 2048;
341 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
342 FORMAT_MESSAGE_ARGUMENT_ARRAY,
343 NULL, err, LANG_NEUTRAL,
344 pMsg, msgSize, NULL);
345 os << message_ <<
". Error is " << pMsg;
346 throw StoreException(os.str());
349 int getErrorNo()
const 354 void error(
const std::string& message_,
int err)
356 std::ostringstream os;
357 os << message_ <<
". Error is " << strerror(err);
358 throw StoreException(os.str());
366 Lock<Mutex> guard(_fileLock);
367 if ( !_ringRecovering)
369 if (bookmark_.
len() > AMPS_RING_BYTES_BOOKMARK)
371 throw StoreException(
"Bookmark is too large for fixed size storage. Consider rebuilding after changing AMPS_RING_BYTES_BOOKMARK in include/RingBookmarkStore.hpp");
373 SubscriptionPosition& pos = findPos(subId_);
374 size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
376 char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
380 offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
381 size_t len = bookmark_.
len();
383 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
386 memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
388 offset = offset - len - 1;
391 pos._current = nextPos;
396 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize() - 1) & 0xFFFFFFFFFFFFFFFF);
398 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (
size_t)~(getPageSize() - 1);
400 if (!FlushViewOfFile(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE))
402 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize() - 1);
403 if (msync(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
406 error(
"Failed to sync mapped memory", getErrorNo());
412 void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
414 bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
417 UnmapViewOfFile(_log);
418 CloseHandle(_mapFile);
419 _positionMap.clear();
421 _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
422 if (_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
424 error(
"Failed to create map of log file", getErrorNo());
429 size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
431 size_t sz = (size_t)newSizeLow_;
433 _log = (
char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
436 error(
"Failed to map log file to memory", getErrorNo());
449 void setFileSize(
size_t newSize_)
451 bool remap = (_log != 0);
453 size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
459 if (newSize_ <= _fileSize)
464 if (lseek(_fd, (off_t)sz, SEEK_SET) == -1)
466 error(
"Seek failed for RingBookmarkStore", getErrorNo());
468 if (::write(_fd,
"", 1) == -1)
470 error(
"Failed to grow RingBookmarkStore", getErrorNo());
472 void* newLog = MAP_FAILED;
475 _positionMap.clear();
478 newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
481 newLog = mmap(_log + _fileSize, sz, PROT_READ | PROT_WRITE,
482 MAP_SHARED | MAP_FIXED, _fd, (off_t)sz);
486 munmap(_log, _fileSize);
487 newLog = mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0);
494 newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
498 if (newLog == MAP_FAILED)
502 error(
"Failed to map log file to memory", getErrorNo());
504 _log =
static_cast<char*
>(newLog);
515 _ringRecovering =
true;
520 size_t maxEntries = _fileSize / AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize / AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
521 for (; _currentIndex < maxEntries; ++_currentIndex)
523 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
529 char* end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_SUBID);
536 sub.assign(offset, (
size_t)(end - offset));
538 Subscription* subPtr = MemoryBookmarkStore::find(sub);
541 SubscriptionPosition& pos = _positionMap[sub];
542 pos._index = _currentIndex;
543 offset += AMPS_RING_BYTES_SUBID;
544 size_t foundCursor = AMPS_RING_POSITIONS;
545 for (pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
547 if (offset[pos._current * AMPS_RING_BYTES_BOOKMARK] ==
'*')
550 pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
552 if (offset[foundCursor * AMPS_RING_BYTES_BOOKMARK] ==
'*')
554 pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
559 if (pos._current >= AMPS_RING_POSITIONS)
567 offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
569 end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_BOOKMARK);
570 if (end && end != offset)
573 bookmarkField.assign(offset + 1, (
size_t)(end - offset - 1));
575 if (!BookmarkRange::isRange(bookmarkField))
578 subPtr->isDiscarded(bookmarkField);
579 subPtr->discard(subPtr->log(bookmarkField));
583 subPtr->log(bookmarkField);
588 _ringRecovering =
false;
593 Lock<Mutex> guard(_posLock);
594 if (_positionMap.count(subId_) == 0)
598 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
599 size_t len = subId_.
len();
600 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
604 subId.assign(offset, len);
605 _positionMap[subId]._index = _currentIndex;
606 _positionMap[subId]._current = 0;
609 memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
613 return _positionMap[subId_];
620 throw StoreException(
"A valid subscription ID must be provided to the RingBookmarkStore");
623 return MemoryBookmarkStore::find(subId_);
629 size_t _currentIndex;
638 typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
639 PositionMap _positionMap;
640 bool _ringRecovering;
642 static DWORD getPageSize()
644 static DWORD pageSize = 0;
647 SYSTEM_INFO SYS_INFO;
648 GetSystemInfo(&SYS_INFO);
649 pageSize = SYS_INFO.dwPageSize;
651 static size_t getPageSize()
653 static size_t pageSize = 0UL;
656 pageSize = (size_t)sysconf(_SC_PAGESIZE);
668 #endif // _RINGBOOKMARKSTORE_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:216
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:68
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1164
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: RingBookmarkStore.hpp:188
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:150
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:205
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:81
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:129
Definition: ampsplusplus.hpp:102
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:172
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:230