26 #ifndef _RINGBOOKMARKSTORE_H_ 27 #define _RINGBOOKMARKSTORE_H_ 29 #define AMPS_RING_POSITIONS 3 31 #define AMPS_RING_BYTES_BOOKMARK (AMPS_MAX_BOOKMARK_LEN * 6 + 8) 32 #define AMPS_RING_ENTRY_SIZE 1024 33 #define AMPS_RING_BYTES_SUBID ( AMPS_RING_ENTRY_SIZE - ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) ) 34 #define AMPS_RING_ENTRIES 32 43 #include <sys/types.h> 47 #if !defined(MREMAP_MAYMOVE) 48 #define MREMAP_MAYMOVE 0 70 struct SubscriptionPosition
84 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
88 , _ringRecovering(true)
96 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
100 , _ringRecovering(
true)
102 init(fileName_.c_str());
108 UnmapViewOfFile(_log);
110 CloseHandle(_mapFile);
111 _mapFile = INVALID_HANDLE_VALUE;
113 _file = INVALID_HANDLE_VALUE;
115 munmap(_log, _fileSize);
122 _ringRecovering =
true;
131 Lock<Mutex> guard(_lock);
132 size_t ret = MemoryBookmarkStore::_log(message_);
133 if (BookmarkRange::isRange(message_.
getBookmark()))
140 Field recent = MemoryBookmarkStore::_getMostRecent(subId,
false);
141 write(subId, recent);
154 Lock<Mutex> guard(_lock);
155 if (MemoryBookmarkStore::_discard(message_) && _recentChanged)
162 Field recent = MemoryBookmarkStore::_getMostRecent(subId,
false);
163 write(subId, recent);
165 _recentChanged =
false;
178 Lock<Mutex> guard(_lock);
179 if (MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_)
182 Field recent = MemoryBookmarkStore::_getMostRecent(subId_,
false);
183 write(subId_, recent);
185 _recentChanged =
false;
197 Lock<Mutex> guard(_lock);
198 MemoryBookmarkStore::_persisted(find(subId_), bookmark_);
201 Field recent = MemoryBookmarkStore::_getMostRecent(subId_,
false);
202 write(subId_, recent);
204 _recentChanged =
false;
215 Lock<Mutex> guard(_lock);
216 return MemoryBookmarkStore::_getMostRecent(subId_);
226 Lock<Mutex> guard(_lock);
227 _positionMap.clear();
228 memset(_log, 0, _fileSize);
229 MemoryBookmarkStore::_purge();
240 Lock<Mutex> guard(_lock);
241 Lock<Mutex> fileGuard(_fileLock);
242 Lock<Mutex> posGuard(_posLock);
243 if (_positionMap.count(subId_) == 0)
248 MemoryBookmarkStore::_purge(subId_);
250 SubscriptionPosition pos = _positionMap[subId_];
251 memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
252 AMPS_RING_ENTRY_SIZE);
255 for (
size_t index = pos._index; index < _currentIndex - 1; ++index)
257 char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
258 memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
259 char* end = (
char*)memchr(start,
'\0', AMPS_RING_BYTES_SUBID);
264 sub.assign(start, (
size_t)(end - start));
265 _positionMap[sub]._index = index;
267 _positionMap.erase(subId_);
271 memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
272 AMPS_RING_ENTRY_SIZE);
276 void init(
const char* fileName_)
279 _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
280 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
281 if ( _file == INVALID_HANDLE_VALUE )
283 DWORD err = getErrorNo();
284 std::ostringstream os;
285 os <<
"Failed to create file " << fileName_ <<
" for RingBookmarkStore\n";
286 error(os.str(), err);
288 LARGE_INTEGER liFileSize;
289 if (GetFileSizeEx(_file, &liFileSize) == 0)
291 error(
"Failure getting file size for RingBookmarkStore.", getErrorNo());
293 DWORD fsLow = liFileSize.LowPart;
294 DWORD fsHigh = liFileSize.HighPart;
296 size_t fileSize = liFileSize.QuadPart;
298 size_t fileSize = liFileSize.LowPart;
300 size_t existingSize = AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE;
301 if (existingSize > fileSize)
303 fsLow = (DWORD)existingSize;
305 fsHigh = (DWORD)(existingSize >> 32);
308 setFileSize(fsHigh, fsLow);
310 _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
313 int err = getErrorNo();
314 std::ostringstream os;
315 os <<
"Failed to open log file " << fileName_ <<
" for RingBookmarkStore";
316 error(os.str(), err);
319 if (fstat(_fd, &statBuf) == -1)
321 int err = getErrorNo();
322 std::ostringstream os;
323 os <<
"Failed to stat log file " << fileName_ <<
" for RingBookmarkStore";
324 error(os.str(), err);
326 size_t fSize = (size_t)statBuf.st_size;
328 if (::write(_fd,
"\0\0\0\0", 4) != 4)
330 error(
"Failed to initialize empty file.", getErrorNo());
332 setFileSize((fSize > AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE ?
333 fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
339 DWORD getErrorNo()
const 341 return GetLastError();
344 void error(
const std::string& message_, DWORD err)
346 std::ostringstream os;
347 static const DWORD msgSize = 2048;
349 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
350 FORMAT_MESSAGE_ARGUMENT_ARRAY,
351 NULL, err, LANG_NEUTRAL,
352 pMsg, msgSize, NULL);
353 os << message_ <<
". Error is " << pMsg;
354 throw StoreException(os.str());
357 int getErrorNo()
const 362 void error(
const std::string& message_,
int err)
364 std::ostringstream os;
365 os << message_ <<
". Error is " << strerror(err);
366 throw StoreException(os.str());
374 Lock<Mutex> guard(_fileLock);
375 if ( !_ringRecovering)
377 if (bookmark_.
len() > AMPS_RING_BYTES_BOOKMARK)
379 throw StoreException(
"Bookmark is too large for fixed size storage. Consider rebuilding after changing AMPS_RING_BYTES_BOOKMARK in include/RingBookmarkStore.hpp");
381 SubscriptionPosition& pos = findPos(subId_);
382 size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
384 char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
388 offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
389 size_t len = bookmark_.
len();
391 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
394 memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
396 offset = offset - len - 1;
399 pos._current = nextPos;
404 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize() - 1) & 0xFFFFFFFFFFFFFFFF);
406 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (
size_t)~(getPageSize() - 1);
408 if (!FlushViewOfFile(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE))
410 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize() - 1);
411 if (msync(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
414 error(
"Failed to sync mapped memory", getErrorNo());
420 void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
422 bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
425 UnmapViewOfFile(_log);
426 CloseHandle(_mapFile);
427 _positionMap.clear();
429 _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
430 if (_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
432 error(
"Failed to create map of log file", getErrorNo());
437 size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
439 size_t sz = (size_t)newSizeLow_;
441 _log = (
char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
444 error(
"Failed to map log file to memory", getErrorNo());
457 void setFileSize(
size_t newSize_)
459 bool remap = (_log != 0);
461 size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
467 if (newSize_ <= _fileSize)
472 if (lseek(_fd, (off_t)sz, SEEK_SET) == -1)
474 error(
"Seek failed for RingBookmarkStore", getErrorNo());
476 if (::write(_fd,
"", 1) == -1)
478 error(
"Failed to grow RingBookmarkStore", getErrorNo());
480 void* newLog = MAP_FAILED;
483 _positionMap.clear();
486 newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
489 newLog = mmap(_log + _fileSize, sz, PROT_READ | PROT_WRITE,
490 MAP_SHARED | MAP_FIXED, _fd, (off_t)sz);
494 munmap(_log, _fileSize);
495 newLog = mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0);
502 newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
506 if (newLog == MAP_FAILED)
510 error(
"Failed to map log file to memory", getErrorNo());
512 _log =
static_cast<char*
>(newLog);
523 _ringRecovering =
true;
528 size_t maxEntries = _fileSize / AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize / AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
529 for (; _currentIndex < maxEntries; ++_currentIndex)
531 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
537 char* end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_SUBID);
544 sub.assign(offset, (
size_t)(end - offset));
546 Subscription* subPtr = MemoryBookmarkStore::find(sub);
549 SubscriptionPosition& pos = _positionMap[sub];
550 pos._index = _currentIndex;
551 offset += AMPS_RING_BYTES_SUBID;
552 size_t foundCursor = AMPS_RING_POSITIONS;
553 for (pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
555 if (offset[pos._current * AMPS_RING_BYTES_BOOKMARK] ==
'*')
558 pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
560 if (offset[foundCursor * AMPS_RING_BYTES_BOOKMARK] ==
'*')
562 pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
567 if (pos._current >= AMPS_RING_POSITIONS)
575 offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
577 end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_BOOKMARK);
578 if (end && end != offset)
581 bookmarkField.assign(offset + 1, (
size_t)(end - offset - 1));
583 if (!BookmarkRange::isRange(bookmarkField))
586 subPtr->isDiscarded(bookmarkField);
587 subPtr->discard(subPtr->log(bookmarkField));
591 subPtr->log(bookmarkField);
596 _ringRecovering =
false;
601 Lock<Mutex> guard(_posLock);
602 if (_positionMap.count(subId_) == 0)
606 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
607 size_t len = subId_.
len();
608 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
612 subId.assign(offset, len);
613 _positionMap[subId]._index = _currentIndex;
614 _positionMap[subId]._current = 0;
617 memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
621 return _positionMap[subId_];
624 MemoryBookmarkStore::Subscription* find(
const Message::Field& subId_)
628 throw StoreException(
"A valid subscription ID must be provided to the RingBookmarkStore");
631 return MemoryBookmarkStore::find(subId_);
637 size_t _currentIndex;
646 typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
647 PositionMap _positionMap;
648 bool _ringRecovering;
650 static DWORD getPageSize()
652 static DWORD pageSize = 0;
655 SYSTEM_INFO SYS_INFO;
656 GetSystemInfo(&SYS_INFO);
657 pageSize = SYS_INFO.dwPageSize;
662 static size_t getPageSize()
664 static size_t pageSize = 0UL;
667 pageSize = (size_t)sysconf(_SC_PAGESIZE);
679 #endif // _RINGBOOKMARKSTORE_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:224
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:68
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1186
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: RingBookmarkStore.hpp:194
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a Field which references the unde...
Definition: Message.hpp:1490
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:152
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:213
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:81
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:129
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:176
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:238