AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.0
RingBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RINGBOOKMARKSTORE_H_
27 #define _RINGBOOKMARKSTORE_H_
28 
29 #define AMPS_RING_POSITIONS 3
30 // Setting bookmark max at 6 bookmarks in a range (5 commas, open, :, and close)
31 #define AMPS_RING_BYTES_BOOKMARK (AMPS_MAX_BOOKMARK_LEN * 6 + 8)
32 #define AMPS_RING_ENTRY_SIZE 1024
33 #define AMPS_RING_BYTES_SUBID ( AMPS_RING_ENTRY_SIZE - ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) )
34 #define AMPS_RING_ENTRIES 32
35 
37 #ifdef _WIN32
38  #include <windows.h>
39 #else
40  #include <sys/mman.h>
41  #include <unistd.h>
42 #endif
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 
47 #if !defined(MREMAP_MAYMOVE)
48  #define MREMAP_MAYMOVE 0
49 #endif
50 
55 
56 namespace AMPS
57 {
69  {
70  struct SubscriptionPosition
71  {
72  size_t _index;
73  size_t _current;
74  };
75 
76  public:
81  RingBookmarkStore(const char* fileName_)
82  : MemoryBookmarkStore(), _fileSize(0), _currentIndex(0), _log(0)
83 #ifdef _WIN32
84  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
85 #else
86  , _fd(0)
87 #endif
88  , _ringRecovering(true)
89  {
90  init(fileName_);
91  }
92 
93  RingBookmarkStore(const std::string& fileName_)
94  : MemoryBookmarkStore(), _fileSize(0), _currentIndex(0), _log(0)
95 #ifdef _WIN32
96  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
97 #else
98  , _fd(0)
99 #endif
100  , _ringRecovering(true)
101  {
102  init(fileName_.c_str());
103  }
104 
105  virtual ~RingBookmarkStore()
106  {
107 #ifdef _WIN32
108  UnmapViewOfFile(_log);
109  _log = 0;
110  CloseHandle(_mapFile);
111  _mapFile = INVALID_HANDLE_VALUE;
112  CloseHandle(_file);
113  _file = INVALID_HANDLE_VALUE;
114 #else
115  munmap(_log, _fileSize);
116  _log = 0;
117  close(_fd);
118  _fd = 0;
119 #endif
120  // In case _lock gets acquired by reader thread between end of this
121  // destructor and start of base class destructor, prevent write()
122  _ringRecovering = true;
123  }
124 
129  virtual size_t log(Message& message_)
130  {
131  Lock<Mutex> guard(_lock);
132  size_t ret = MemoryBookmarkStore::_log(message_);
133  if (BookmarkRange::isRange(message_.getBookmark()))
134  {
135  Message::Field subId = message_.getSubscriptionId();
136  if (subId.empty())
137  {
138  subId = message_.getSubscriptionIds();
139  }
140  Field recent = MemoryBookmarkStore::_getMostRecent(subId, false);
141  write(subId, recent);
142  recent.clear();
143  }
144  return ret;
145  }
146 
152  virtual void discard(const Message& message_)
153  {
154  Lock<Mutex> guard(_lock);
155  if (MemoryBookmarkStore::_discard(message_) && _recentChanged)
156  {
157  Message::Field subId = message_.getSubscriptionId();
158  if (subId.empty())
159  {
160  subId = message_.getSubscriptionIds();
161  }
162  Field recent = MemoryBookmarkStore::_getMostRecent(subId, false);
163  write(subId, recent);
164  recent.clear();
165  _recentChanged = false;
166  }
167  }
168 
176  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
177  {
178  Lock<Mutex> guard(_lock);
179  if (MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_)
180  && _recentChanged)
181  {
182  Field recent = MemoryBookmarkStore::_getMostRecent(subId_, false);
183  write(subId_, recent);
184  recent.clear();
185  _recentChanged = false;
186  }
187  }
188 
194  virtual void persisted(const Message::Field& subId_,
195  const Message::Field& bookmark_)
196  {
197  Lock<Mutex> guard(_lock);
198  MemoryBookmarkStore::_persisted(find(subId_), bookmark_);
199  if (_recentChanged)
200  {
201  Field recent = MemoryBookmarkStore::_getMostRecent(subId_, false);
202  write(subId_, recent);
203  recent.clear();
204  _recentChanged = false;
205  }
206  }
207 
214  {
215  Lock<Mutex> guard(_lock);
216  return MemoryBookmarkStore::_getMostRecent(subId_);
217  }
218 
224  virtual void purge()
225  {
226  Lock<Mutex> guard(_lock);
227  _positionMap.clear();
228  memset(_log, 0, _fileSize);
229  MemoryBookmarkStore::_purge();
230  _currentIndex = 0;
231  }
232 
238  virtual void purge(const Message::Field& subId_)
239  {
240  Lock<Mutex> guard(_lock);
241  Lock<Mutex> fileGuard(_fileLock);
242  Lock<Mutex> posGuard(_posLock);
243  if (_positionMap.count(subId_) == 0)
244  {
245  return;
246  }
247  // Remove from memory
248  MemoryBookmarkStore::_purge(subId_);
249  // Remove from the file
250  SubscriptionPosition pos = _positionMap[subId_];
251  memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
252  AMPS_RING_ENTRY_SIZE);
253  // Move any following subs back an index
254  Message::Field sub;
255  for (size_t index = pos._index; index < _currentIndex - 1; ++index)
256  {
257  char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
258  memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
259  char* end = (char*)memchr(start, '\0', AMPS_RING_BYTES_SUBID);
260  if (!end)
261  {
262  break;
263  }
264  sub.assign(start, (size_t)(end - start));
265  _positionMap[sub]._index = index;
266  }
267  _positionMap.erase(subId_);
268  // We have one less sub
269  --_currentIndex;
270  // Clear the end
271  memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
272  AMPS_RING_ENTRY_SIZE);
273  }
274 
275  private:
276  void init(const char* fileName_)
277  {
278 #ifdef _WIN32
279  _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
280  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
281  if ( _file == INVALID_HANDLE_VALUE )
282  {
283  DWORD err = getErrorNo();
284  std::ostringstream os;
285  os << "Failed to create file " << fileName_ << " for RingBookmarkStore\n";
286  error(os.str(), err);
287  }
288  LARGE_INTEGER liFileSize;
289  if (GetFileSizeEx(_file, &liFileSize) == 0)
290  {
291  error("Failure getting file size for RingBookmarkStore.", getErrorNo());
292  }
293  DWORD fsLow = liFileSize.LowPart;
294  DWORD fsHigh = liFileSize.HighPart;
295 #ifdef _WIN64
296  size_t fileSize = liFileSize.QuadPart;
297 #else
298  size_t fileSize = liFileSize.LowPart;
299 #endif
300  size_t existingSize = AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE;
301  if (existingSize > fileSize)
302  {
303  fsLow = (DWORD)existingSize;
304 #ifdef _WIN64
305  fsHigh = (DWORD)(existingSize >> 32);
306 #endif
307  }
308  setFileSize(fsHigh, fsLow);
309 #else
310  _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
311  if (_fd == -1)
312  {
313  int err = getErrorNo();
314  std::ostringstream os;
315  os << "Failed to open log file " << fileName_ << " for RingBookmarkStore";
316  error(os.str(), err);
317  }
318  struct stat statBuf;
319  if (fstat(_fd, &statBuf) == -1)
320  {
321  int err = getErrorNo();
322  std::ostringstream os;
323  os << "Failed to stat log file " << fileName_ << " for RingBookmarkStore";
324  error(os.str(), err);
325  }
326  size_t fSize = (size_t)statBuf.st_size;
327  if (fSize == 0)
328  if (::write(_fd, "\0\0\0\0", 4) != 4)
329  {
330  error("Failed to initialize empty file.", getErrorNo());
331  }
332  setFileSize((fSize > AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE ?
333  fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
334 #endif
335  recover();
336  }
337 
338 #ifdef _WIN32
339  DWORD getErrorNo() const
340  {
341  return GetLastError();
342  }
343 
344  void error(const std::string& message_, DWORD err)
345  {
346  std::ostringstream os;
347  static const DWORD msgSize = 2048;
348  char pMsg[msgSize];
349  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
350  FORMAT_MESSAGE_ARGUMENT_ARRAY,
351  NULL, err, LANG_NEUTRAL,
352  pMsg, msgSize, NULL);
353  os << message_ << ". Error is " << pMsg;
354  throw StoreException(os.str());
355  }
356 #else
357  int getErrorNo() const
358  {
359  return errno;
360  }
361 
362  void error(const std::string& message_, int err)
363  {
364  std::ostringstream os;
365  os << message_ << ". Error is " << strerror(err);
366  throw StoreException(os.str());
367  }
368 #endif
369 
370  // Used to log the new _mostRecent bookmark
371  void write(const Message::Field& subId_,
372  const Message::Field& bookmark_)
373  {
374  Lock<Mutex> guard(_fileLock);
375  if ( !_ringRecovering)
376  {
377  if (bookmark_.len() > AMPS_RING_BYTES_BOOKMARK)
378  {
379  throw StoreException("Bookmark is too large for fixed size storage. Consider rebuilding after changing AMPS_RING_BYTES_BOOKMARK in include/RingBookmarkStore.hpp");
380  }
381  SubscriptionPosition& pos = findPos(subId_);
382  size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
383  // Get pointer to start of next position for cursor
384  char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
385  // Write the 'cursor' to start of following entry and advance offset
386  *offset = '*';
387  // Change offset to beginning of current bookmark
388  offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
389  size_t len = bookmark_.len();
390  // Write the bookmark and advance offset
391  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
392  offset += len;
393  // Set extra bytes to NULL
394  memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
395  // Return to beginning and change the cursor
396  offset = offset - len - 1;
397  *offset = '+';
398  // Update current for the next write
399  pos._current = nextPos;
400 
401  // Sync the changes to disk
402 #ifdef _WIN32
403 #ifdef _WIN64
404  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize() - 1) & 0xFFFFFFFFFFFFFFFF);
405 #else
406  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (size_t)~(getPageSize() - 1);
407 #endif
408  if (!FlushViewOfFile(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE))
409 #else
410  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize() - 1);
411  if (msync(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
412 #endif
413  {
414  error("Failed to sync mapped memory", getErrorNo());
415  }
416  }
417  }
418 
419 #ifdef _WIN32
420  void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
421  {
422  bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
423  if (remap)
424  {
425  UnmapViewOfFile(_log);
426  CloseHandle(_mapFile);
427  _positionMap.clear();
428  }
429  _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
430  if (_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
431  {
432  error("Failed to create map of log file", getErrorNo());
433  _log = 0;
434  _fileSize = 0;
435  }
436 #ifdef _WIN64
437  size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
438 #else
439  size_t sz = (size_t)newSizeLow_;
440 #endif
441  _log = (char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
442  if (_log == NULL)
443  {
444  error("Failed to map log file to memory", getErrorNo());
445  _log = 0;
446  _fileSize = 0;
447  return;
448  }
449  _fileSize = sz;
450  // Call recover to reset the _positionMap
451  if (remap)
452  {
453  recover();
454  }
455  }
456 #else
457  void setFileSize(size_t newSize_)
458  {
459  bool remap = (_log != 0);
460  // Make sure we're using a multiple of page size
461  size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
462  if (sz < newSize_)
463  {
464  sz += getPageSize();
465  }
466  // Improper resize attempt
467  if (newSize_ <= _fileSize)
468  {
469  return;
470  }
471  // Extend the underlying file
472  if (lseek(_fd, (off_t)sz, SEEK_SET) == -1)
473  {
474  error("Seek failed for RingBookmarkStore", getErrorNo());
475  }
476  if (::write(_fd, "", 1) == -1)
477  {
478  error("Failed to grow RingBookmarkStore", getErrorNo());
479  }
480  void* newLog = MAP_FAILED;
481  if (_log)
482  {
483  _positionMap.clear();
484 
485 #ifdef linux
486  newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
487 #else
488  // try making a new mmap right after the current one.
489  newLog = mmap(_log + _fileSize, sz, PROT_READ | PROT_WRITE,
490  MAP_SHARED | MAP_FIXED, _fd, (off_t)sz);
491  if (newLog != _log)
492  {
493  // this mmap is relatively small; better to just close the old mmap and reset.
494  munmap(_log, _fileSize);
495  newLog = mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0);
496  }
497 #endif
498  }
499  else // New mapping
500  {
501  // New mapping, map the full file size for recovery or else it std size
502  newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
503  }
504  _fileSize = sz;
505 
506  if (newLog == MAP_FAILED)
507  {
508  _log = 0;
509  _fileSize = 0;
510  error("Failed to map log file to memory", getErrorNo());
511  }
512  _log = static_cast<char*>(newLog);
513  if (remap)
514  {
515  recover();
516  }
517  }
518 #endif
519 
520  void recover(void)
521  {
522  //Lock<Mutex> guard(_lock);
523  _ringRecovering = true;
524  Message::Field sub;
525  Message::Field bookmarkField;
526 
527  _currentIndex = 0;
528  size_t maxEntries = _fileSize / AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize / AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
529  for (; _currentIndex < maxEntries; ++_currentIndex)
530  {
531  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
532  if (*offset == '\0')
533  {
534  break;
535  }
536  //It's possible we wrote the subId and not NULLs, so be careful
537  char* end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_SUBID);
538  if (!end)
539  {
540  // Failed subscription id read, we're done
541  break;
542  }
543  // Safe to continue
544  sub.assign(offset, (size_t)(end - offset));
545  // Put this sub into the MemoryBookmarkStore
546  Subscription* subPtr = MemoryBookmarkStore::find(sub);
547  // Put this sub into the _positionMap
548  // This is recovery, so do it directly and not with findPos
549  SubscriptionPosition& pos = _positionMap[sub];
550  pos._index = _currentIndex;
551  offset += AMPS_RING_BYTES_SUBID;
552  size_t foundCursor = AMPS_RING_POSITIONS;
553  for (pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
554  {
555  if (offset[pos._current * AMPS_RING_BYTES_BOOKMARK] == '*')
556  {
557  // Subtract one position
558  pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
559  // Subtract one more if a second bookmark is found
560  if (offset[foundCursor * AMPS_RING_BYTES_BOOKMARK] == '*')
561  {
562  pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
563  }
564  break;
565  }
566  }
567  if (pos._current >= AMPS_RING_POSITIONS)
568  {
569  // No valid bookmark found, just use 0
570  pos._current = 0;
571  }
572  else
573  {
574  // We found a cursor
575  offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
576  //It's possible we wrote bookmark and not NULLs, so be careful
577  end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_BOOKMARK);
578  if (end && end != offset)
579  {
580  // add 1 to account for leading '+'
581  bookmarkField.assign(offset + 1, (size_t)(end - offset - 1));
582  // log, discard to make it the most recent
583  if (!BookmarkRange::isRange(bookmarkField))
584  {
585  // This adds bookmark to _publishers
586  subPtr->isDiscarded(bookmarkField);
587  subPtr->discard(subPtr->log(bookmarkField));
588  }
589  else
590  {
591  subPtr->log(bookmarkField);
592  }
593  }
594  }
595  }
596  _ringRecovering = false;
597  }
598 
599  SubscriptionPosition& findPos(const Message::Field& subId_)
600  {
601  Lock<Mutex> guard(_posLock);
602  if (_positionMap.count(subId_) == 0)
603  {
604  // New subid
605  // Move to its start position and write the sub id
606  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
607  size_t len = subId_.len();
608  memcpy(offset, static_cast<const void*>(subId_.data()), len);
609  // Add it to the map with the current index
610  // Use the data written to the mmap for the subid
611  Message::Field subId;
612  subId.assign(offset, len);
613  _positionMap[subId]._index = _currentIndex;
614  _positionMap[subId]._current = 0;
615  // Fill extra spaces with NULL
616  offset += len;
617  memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
618  // Advance current index
619  ++_currentIndex;
620  }
621  return _positionMap[subId_];
622  }
623 
624  MemoryBookmarkStore::Subscription* find(const Message::Field& subId_)
625  {
626  if (subId_.empty())
627  {
628  throw StoreException("A valid subscription ID must be provided to the RingBookmarkStore");
629  }
630  findPos(subId_);
631  return MemoryBookmarkStore::find(subId_);
632  }
633 
634 
635  Mutex _fileLock;
636  size_t _fileSize;
637  size_t _currentIndex;
638  char* _log;
639 #ifdef _WIN32
640  HANDLE _file;
641  HANDLE _mapFile;
642 #else
643  int _fd;
644 #endif
645  Mutex _posLock;
646  typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
647  PositionMap _positionMap;
648  bool _ringRecovering;
649 #ifdef _WIN32
650  static DWORD getPageSize()
651  {
652  static DWORD pageSize = 0;
653  if (pageSize == 0)
654  {
655  SYSTEM_INFO SYS_INFO;
656  GetSystemInfo(&SYS_INFO);
657  pageSize = SYS_INFO.dwPageSize;
658  }
659  return pageSize;
660  }
661 #else
662  static size_t getPageSize()
663  {
664  static size_t pageSize = 0UL;
665  if (pageSize == 0)
666  {
667  pageSize = (size_t)sysconf(_SC_PAGESIZE);
668  }
669  return pageSize;
670  }
671 #endif
672 
673 
674  };
675 
676 } // end namespace AMPS
677 
678 
679 #endif // _RINGBOOKMARKSTORE_H_
680 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:224
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:68
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1186
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: RingBookmarkStore.hpp:194
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a Field which references the unde...
Definition: Message.hpp:1490
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:152
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:213
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:81
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:129
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:176
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:238