AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.3
MMapBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MMAPBOOKMARKSTORE_H_
27 #define _MMAPBOOKMARKSTORE_H_
28 
30 #include <amps/RecoveryPoint.hpp>
32 #ifdef _WIN32
33  #include <windows.h>
34 #else
35  #include <sys/mman.h>
36  #include <unistd.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #include <set>
42 
43 #define AMPS_INITIAL_LOG_SIZE 40960UL
44 
49 
50 namespace AMPS
51 {
57  {
58  private:
59 #ifdef _WIN32
60  typedef HANDLE FileType;
61  HANDLE _mapFile;
62 #else
63  typedef int FileType;
64 #endif
65  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
66  {
67  Message::Field f(pair.first);
68  f.clear();
69  }
70 
71  public:
82  MMapBookmarkStore(const char* fileName_, bool useLastModifiedTime_ = false)
83  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
84  , _logOffset(0), _log(0), _fileTimestamp(0)
85 #ifdef _WIN32
86  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
87 #else
88  , _file(0)
89 #endif
90  {
91  if (init(useLastModifiedTime_))
92  {
93  recover(useLastModifiedTime_, false);
94  }
95  }
96 
107  MMapBookmarkStore(const std::string& fileName_,
108  bool useLastModifiedTime_ = false)
109  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
110  , _logOffset(0), _log(0), _fileTimestamp(0)
111 #ifdef _WIN32
112  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
113 #else
114  , _file(0)
115 #endif
116  {
117  if (init(useLastModifiedTime_))
118  {
119  recover(useLastModifiedTime_, false);
120  }
121  }
122 
138  const char* fileName_,
139  RecoveryPointFactory factory_ = NULL,
140  bool useLastModifiedTime_ = false)
141  : MemoryBookmarkStore(adapter_, factory_)
142  , _fileName(fileName_), _fileSize(0)
143  , _logOffset(0), _log(0), _fileTimestamp(0)
144 #ifdef _WIN32
145  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
146 #else
147  , _file(0)
148 #endif
149  {
150  if (init(useLastModifiedTime_))
151  {
152  recover(useLastModifiedTime_, true);
153  }
154  }
155 
171  const std::string& fileName_,
172  RecoveryPointFactory factory_ = NULL,
173  bool useLastModifiedTime_ = false)
174  : MemoryBookmarkStore(adapter_, factory_)
175  , _fileName(fileName_), _fileSize(0)
176  , _logOffset(0), _log(0), _fileTimestamp(0)
177 #ifdef _WIN32
178  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
179 #else
180  , _file(0)
181 #endif
182  {
183  if (init(useLastModifiedTime_))
184  {
185  recover(useLastModifiedTime_, true);
186  }
187  }
188 
189  virtual ~MMapBookmarkStore()
190  {
191 #ifdef _WIN32
192  UnmapViewOfFile(_log);
193  CloseHandle(_mapFile);
194  CloseHandle(_file);
195 #else
196  munmap(_log, _fileSize);
197  ::close(_file);
198 #endif
199  // In case _lock gets acquired by reader thread between end of this
200  // destructor and start of base class destructor, prevent write()
201  _recovering = true;
202  }
203 
209  virtual size_t log(Message& message_)
210  {
211  Message::Field bookmark = message_.getBookmark();
212  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
213  Lock<Mutex> guard(_lock);
214  if (!sub)
215  {
216  Message::Field subId = message_.getSubscriptionId();
217  if (subId.empty())
218  {
219  subId = message_.getSubscriptionIds();
220  }
221  sub = find(subId);
222  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
223  }
224  write(sub->id(), ENTRY_BOOKMARK, bookmark);
225  return MemoryBookmarkStore::_log(message_);
226  }
227 
232  virtual void discard(const Message& message_)
233  {
234  Message::Field bookmark = message_.getBookmark();
235  Message::Field subId = message_.getSubscriptionId();
236  if (subId.empty())
237  {
238  subId = message_.getSubscriptionIds();
239  }
240  Lock<Mutex> guard(_lock);
241  write(subId, ENTRY_DISCARD, bookmark);
242  MemoryBookmarkStore::_discard(message_);
243  }
244 
252  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
253  {
254  Lock<Mutex> guard(_lock);
255  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
256  if (!entry || entry->_val.empty())
257  {
258  return;
259  }
260  write(subId_, ENTRY_DISCARD, entry->_val);
261  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
262  }
263 
270  {
271  Lock<Mutex> guard(_lock);
272  return MemoryBookmarkStore::_getMostRecent(subId_);
273  }
274 
283  virtual bool isDiscarded(Message& message_)
284  {
285  Lock<Mutex> l(_lock);
286  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
287  if (retVal)
288  {
289  Message::Field subId = message_.getSubscriptionId();
290  if (subId.empty())
291  {
292  subId = message_.getSubscriptionIds();
293  }
294  write(subId, ENTRY_BOOKMARK, message_.getBookmark());
295  write(subId, ENTRY_DISCARD, message_.getBookmark());
296  }
297  return retVal;
298  }
299 
305  virtual void purge()
306  {
307  Lock<Mutex> guard(_lock);
308  Lock<Mutex> fileGuard(_fileLock);
309  memset(_log, 0, _logOffset);
310  _logOffset = 0;
311  MemoryBookmarkStore::_purge();
312  }
313 
319  virtual void purge(const Message::Field& subId_)
320  {
321  Lock<Mutex> guard(_lock);
322  Lock<Mutex> fileGuard(_fileLock);
323  MemoryBookmarkStore::_purge(subId_);
324  std::string tmpFileName = _fileName + ".tmp";
325  __prune(tmpFileName);
326  }
327 
328  void setServerVersion(const VersionInfo& version_)
329  {
330  Lock<Mutex> guard(_lock);
332  }
333 
334  void setServerVersion(size_t version_)
335  {
336  Lock<Mutex> guard(_lock);
338  }
339 
340  // Yes, the argument is a non-const copy of what is passed in
341  void _prune(const std::string& tmpFileName_)
342  {
343  Lock<Mutex> guard(_lock);
344  Lock<Mutex> fileGuard(_fileLock);
345  // If nothing's changed with most recent, don't rewrite the file
346  if (!_recentChanged)
347  {
348  return;
349  }
350  if (tmpFileName_.empty())
351  {
352  __prune(_fileName + ".tmp");
353  }
354  else
355  {
356  __prune(tmpFileName_);
357  }
358  _recentChanged = false;
359  }
360 
361  private:
362  void __prune(const std::string& tmpFileName_)
363  {
364  size_t sz = AMPS_INITIAL_LOG_SIZE;
365  FileType file;
366  char* log = NULL;
367  size_t bytesWritten = 0;
368 #ifdef _WIN32
369  file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
370  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
371  if ( file == INVALID_HANDLE_VALUE )
372  {
373  DWORD err = getErrorNo();
374  std::ostringstream os;
375  os << "Failed to create temp store file " << tmpFileName_ <<
376  " to prune MMapBookmarkStore " << _fileName;
377  error(os.str(), err);
378  }
379  HANDLE mapFile = NULL;
380  try
381  {
382  sz = _setFileSize(sz, &log, file, &mapFile);
383  }
384  catch (StoreException& ex)
385  {
386  if (mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
387  {
388  CloseHandle(file);
389  std::ostringstream os;
390  os << "Failed to create map of temp file " << tmpFileName_
391  << " while resizing it to prune MMapBookmarkStore " << _fileName
392  << ": " << ex.what();
393  throw StoreException(os.str());
394  return;
395  }
396  if (log == NULL)
397  {
398  CloseHandle(mapFile);
399  CloseHandle(file);
400  std::ostringstream os;
401  os << "Failed to map temp file " << tmpFileName_
402  << " to memory while resizing it to prune MMapBookmarkStore "
403  << _fileName << ": " << ex.what();
404  throw StoreException(os.str());
405  return;
406  }
407  }
408  if (sz == 0)
409  {
410  DWORD err = getErrorNo();
411  UnmapViewOfFile(log);
412  CloseHandle(mapFile);
413  CloseHandle(file);
414  std::ostringstream os;
415  os << "Failed to grow tmp file " << tmpFileName_
416  << " to prune MMapBookmarkStore " << _fileName;
417  error(os.str(), err);
418  }
419 #else
420  file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
421  if (file == -1)
422  {
423  int err = getErrorNo();
424  std::ostringstream os;
425  os << "Failed to create temp store file " << tmpFileName_ <<
426  " to prune MMapBookmarkStore " << _fileName;
427  error(os.str(), err);
428  return;
429  }
430  if (::write(file, "\0\0\0\0", 4) == -1)
431  {
432  int err = getErrorNo();
433  std::ostringstream os;
434  os << "Failed to write header to temp file " << tmpFileName_
435  << " to prune MMapBookmarkStore " << _fileName;
436  error(os.str(), err);
437  return;
438  }
439  try
440  {
441  sz = _setFileSize(sz, &log, file, 0);
442  }
443  catch (StoreException& ex)
444  {
445  std::ostringstream os;
446  os << "Failed to grow tmp file " << tmpFileName_
447  << " to prune MMapBookmarkStore " << _fileName << ex.what();
448  throw StoreException(os.str());
449  }
450  if (sz == 0)
451  {
452  int err = getErrorNo();
453  log = NULL;
454  ::close(file);
455  std::ostringstream os;
456  os << "Failed to grow tmp file " << tmpFileName_
457  << " to prune MMapBookmarkStore " << _fileName;
458  error(os.str(), err);
459  }
460 #endif
461  try
462  {
463  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
464  {
465  Message::Field subId = i->first;
466  assert(!subId.empty());
467  size_t subIdLen = subId.len();
468  Subscription* mapSubPtr = i->second;
469  const BookmarkRange& range = mapSubPtr->getRange();
470  if (range.isValid())
471  {
472  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
473  }
474  Message::Field recent = mapSubPtr->getMostRecent(false);
475  amps_uint64_t recentPub, recentSeq;
476  Subscription::parseBookmark(recent, recentPub, recentSeq);
477  Subscription::PublisherMap publishersDiscarded =
478  mapSubPtr->_publishers;
479  MemoryBookmarkStore::EntryPtrList recovered;
480  mapSubPtr->getRecoveryEntries(recovered);
481  mapSubPtr->setPublishersToDiscarded(&recovered,
482  &publishersDiscarded);
483  char tmpBookmarkBuffer[128];
484  for (Subscription::PublisherIterator pub =
485  publishersDiscarded.begin(),
486  e = publishersDiscarded.end();
487  pub != e; ++pub)
488  {
489  // Don't log EPOCH if it got in the map
490  if (pub->first == 0 || pub->second == 0)
491  {
492  continue;
493  }
494  // Don't log the most recent yet
495  if (pub->first == recentPub)
496  {
497  continue;
498  }
499  int written = AMPS_snprintf_amps_uint64_t(
500  tmpBookmarkBuffer,
501  sizeof(tmpBookmarkBuffer),
502  pub->first);
503  *(tmpBookmarkBuffer + written++) = '|';
504  written += AMPS_snprintf_amps_uint64_t(
505  tmpBookmarkBuffer + written,
506  sizeof(tmpBookmarkBuffer)
507  - (size_t)written,
508  pub->second);
509  *(tmpBookmarkBuffer + written++) = '|';
510  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
511  // Check we'll be in the current boundaries
512  size_t blockLen = subIdLen + 2 * sizeof(size_t) + tmpBookmark.len() + 1;
513  if (bytesWritten + blockLen + blockLen >= sz)
514  {
515 #ifdef _WIN32
516  sz = _setFileSize(sz * 2, &log, file, &mapFile);
517 #else
518  sz = _setFileSize(sz * 2, &log, file, sz);
519 #endif
520  }
521  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
522  write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
523  }
524  if (isWritableBookmark(recent.len()))
525  {
526  // Check we'll be in the current boundaries
527  size_t blockLen = subIdLen + 2 * sizeof(size_t) + recent.len() + 1;
528  if (bytesWritten + blockLen + blockLen >= sz)
529  {
530 #ifdef _WIN32
531  sz = _setFileSize(sz * 2, &log, file, &mapFile);
532 #else
533  sz = _setFileSize(sz * 2, &log, file, sz);
534 #endif
535  }
536  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
537  write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
538  }
539  else // set up _recentList
540  {
541  mapSubPtr->getMostRecentList();
542  }
543  Message::Field bookmark = mapSubPtr->getLastPersisted();
544  if (isWritableBookmark(bookmark.len()))
545  {
546  // Check we'll be in the current boundaries
547  size_t blockLen = subIdLen + 2 * sizeof(size_t) +
548  bookmark.len() + 1;
549  if (bytesWritten + blockLen >= sz)
550  {
551 #ifdef _WIN32
552  sz = _setFileSize(sz * 2, &log, file, &mapFile);
553 #else
554  sz = _setFileSize(sz * 2, &log, file, sz);
555 #endif
556  }
557  write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
558  mapSubPtr->getLastPersisted());
559  }
560  mapSubPtr->getActiveEntries(recovered);
561  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
562  recovered.begin();
563  entry != recovered.end(); ++entry)
564  {
565  if ((*entry)->_val.empty() ||
566  !isWritableBookmark((*entry)->_val.len()))
567  {
568  continue;
569  }
570  // Check we'll be in the current boundaries
571  size_t blockLen = subIdLen + 2 * sizeof(size_t) +
572  (*entry)->_val.len() + 1;
573  if (bytesWritten + blockLen >= sz)
574  {
575 #ifdef _WIN32
576  sz = _setFileSize(sz * 2, &log, file, &mapFile);
577 #else
578  sz = _setFileSize(sz * 2, &log, file, sz);
579 #endif
580  }
581  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
582  (*entry)->_val);
583  if (!(*entry)->_active)
584  {
585  // Check we'll be in the current boundaries
586  if (bytesWritten + blockLen >= sz)
587  {
588 #ifdef _WIN32
589  sz = _setFileSize(sz * 2, &log, file, &mapFile);
590 #else
591  sz = _setFileSize(sz * 2, &log, file, sz);
592 #endif
593  }
594  write(&log, &bytesWritten, subId, ENTRY_DISCARD,
595  (*entry)->_val);
596  }
597  }
598  }
599  }
600  catch (StoreException& ex)
601  {
602 #ifdef _WIN32
603  UnmapViewOfFile(log);
604  CloseHandle(mapFile);
605  CloseHandle(file);
606 #else
607  ::close(file);
608  ::unlink(tmpFileName_.c_str());
609 #endif
610  std::ostringstream os;
611  os << "Exception during prune: " << ex.what();
612  throw StoreException(os.str());
613  }
614 #ifdef _WIN32
615  BOOL success = FlushViewOfFile(_log, 0);
616  success |= UnmapViewOfFile(_log);
617  _log = NULL;
618  success |= CloseHandle(_mapFile);
619  success |= CloseHandle(_file);
620  if (!success)
621  {
622  DWORD err = getErrorNo();
623  std::ostringstream os;
624  os << "Failed to flush, unmap, and close current file "
625  << _fileName
626  << " in prune in MMapBookmarkStore. ";
627  error(os.str(), err);
628  }
629  _mapFile = INVALID_HANDLE_VALUE;
630  _file = INVALID_HANDLE_VALUE;
631  success = FlushViewOfFile(log, 0);
632  success |= UnmapViewOfFile(log);
633  log = NULL;
634  success |= CloseHandle(mapFile);
635  success |= CloseHandle(file);
636  if (!success)
637  {
638  DWORD err = getErrorNo();
639  std::ostringstream os;
640  os << "Failed to flush, unmap and close completed temp file "
641  << tmpFileName_
642  << " in prune in MMapBookmarkStore. ";
643  error(os.str(), err);
644  }
645  mapFile = INVALID_HANDLE_VALUE;
646  file = INVALID_HANDLE_VALUE;
647  // Replace current file with pruned file
648  int retryCount = 3;
649  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
650  MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
651  {
652  DWORD err = getErrorNo();
653  if (--retryCount > 0)
654  {
655  continue;
656  }
657  // Try to set _file to the tmp file that won't move then throw
658  std::string desiredFileName = _fileName;
659  _fileName = tmpFileName_;
660  init();
661  std::ostringstream os;
662  os << "Failed to move completed temp file " << tmpFileName_
663  << " to " << desiredFileName
664  << " in prune in MMapBookmarkStore. Continuing by using "
665  << tmpFileName_ << " as the MMapBookmarkStore file.";
666  error(os.str(), err);
667  }
668  // Call init to set up file again
669  init();
670 #else
671  munmap(_log, _fileSize);
672  _log = NULL;
673  ::close(_file);
674  munmap(log, sz);
675  ::close(file);
676  if (-1 == ::unlink(_fileName.c_str()))
677  {
678  int err = getErrorNo();
679  // Try to set _file to the tmp file that won't move then throw
680  std::string desiredFileName = _fileName;
681  _fileName = tmpFileName_;
682  init();
683  std::ostringstream os;
684  os << "Failed to delete file " << desiredFileName
685  << " after creating temporary file " << tmpFileName_
686  << " in prune in MMapBookmarkStore. Continuing by using "
687  << tmpFileName_ << " as the MMapBookmarkStore file.";
688  error(os.str(), err);
689  }
690  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
691  {
692  int err = getErrorNo();
693  // Try to set _file to the tmp file that won't move then throw
694  std::string desiredFileName = _fileName;
695  _fileName = tmpFileName_;
696  init();
697  std::ostringstream os;
698  os << "Failed to move completed temp file " << tmpFileName_
699  << " to " << desiredFileName
700  << " in prune in MMapBookmarkStore. Continuing by using "
701  << tmpFileName_ << " as the MMapBookmarkStore file.";
702  error(os.str(), err);
703  }
704  // Call init to set up file again
705  init();
706 #endif
707  _logOffset = bytesWritten;
708  }
709 
710  virtual void _persisted(Subscription* subP_,
711  const Message::Field& bookmarkField_)
712  {
713  Lock<Mutex> l(_lock);
714  write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
715  MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
716  }
717 
718  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
719  {
720  Lock<Mutex> l(_lock);
721  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
722  if (!entryPtr || entryPtr->_val.empty())
723  {
724  return Message::Field();
725  }
726  Message::Field bookmarkField = entryPtr->_val;
727  write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
728  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
729  return bookmarkField;
730  }
731 
732  // Returns true if file exists and is larger than 0 bytes and therefore
733  // should be used for recovery.
734  bool init(bool useLastModifiedTime_ = false)
735  {
736  bool retVal = true;
737 #ifdef _WIN32
738  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
739  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
740  if ( _file == INVALID_HANDLE_VALUE )
741  {
742  DWORD err = getErrorNo();
743  std::ostringstream os;
744  os << "Failed to initialize file " << _fileName << " for MMapBookmarkStore";
745  error(os.str(), err);
746  }
747  LARGE_INTEGER liFileSize;
748  if (GetFileSizeEx(_file, &liFileSize) == 0)
749  {
750  DWORD err = getErrorNo();
751  CloseHandle(_file);
752  std::ostringstream os;
753  os << "Failure getting initial file size for MMapBookmarkStore " << _fileName;
754  error(os.str(), err);
755  return false;
756  }
757 #ifdef _WIN64
758  size_t fileSize = liFileSize.QuadPart;
759 #else
760  size_t fileSize = liFileSize.LowPart;
761 #endif
762  if (useLastModifiedTime_ && fileSize > 0)
763  {
764  FILETIME ftModifiedTime;
765  if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
766  {
767  DWORD err = getErrorNo();
768  CloseHandle(_file);
769  _recovering = false;
770  error("Failure getting file time while trying to recover.", err);
771  return false;
772  }
773  SYSTEMTIME st;
774  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
775  {
776  DWORD err = getErrorNo();
777  CloseHandle(_file);
778  _recovering = false;
779  error("Failure converting file time while trying to recover.", err);
780  return false;
781  }
782  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
783  sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
784  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
785  st.wDay, st.wHour, st.wMinute, st.wSecond);
786  _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
787  }
788  retVal = (fileSize != 0);
789  setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
790  AMPS_INITIAL_LOG_SIZE : fileSize);
791 #else
792  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
793  if (_file == -1)
794  {
795  int err = getErrorNo();
796  std::ostringstream os;
797  os << "Failed to initialize log file " << _fileName << " for MMapBookmarkStore";
798  error(os.str(), err);
799  }
800  struct stat statBuf;
801  if (fstat(_file, &statBuf) == -1)
802  {
803  int err = getErrorNo();
804  ::close(_file);
805  std::ostringstream os;
806  os << "Failed to stat log file " << _fileName << " for MMapBookmarkStore";
807  error(os.str(), err);
808  return false;
809  }
810  size_t fSize = (size_t)statBuf.st_size;
811  if (fSize == 0)
812  {
813  retVal = false;
814  if (::write(_file, "\0\0\0\0", 4) == -1)
815  {
816  int err = getErrorNo();
817  ::close(_file);
818  std::ostringstream os;
819  os << "Failed to write header to log file " << _fileName
820  << " for MMapBookmarkStore";
821  error(os.str(), err);
822  return false;
823  }
824  }
825  else if (useLastModifiedTime_)
826  {
827  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
828  struct tm timeInfo;
829  gmtime_r(&statBuf.st_mtime, &timeInfo);
830  strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
831  "%Y%m%dT%H%M%S", &timeInfo);
832  _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
833  }
834 
835  setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize - 1 : AMPS_INITIAL_LOG_SIZE);
836 #endif
837  return retVal;
838  }
839 
840 #ifdef _WIN32
841  DWORD getErrorNo() const
842  {
843  return GetLastError();
844  }
845 
846  void error(const std::string& message_, DWORD err)
847  {
848  std::ostringstream os;
849  static const DWORD msgSize = 2048;
850  char pMsg[msgSize];
851  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
852  FORMAT_MESSAGE_ARGUMENT_ARRAY,
853  NULL, err, LANG_NEUTRAL,
854  pMsg, msgSize, NULL);
855  os << "File: " << _fileName << ". " << message_ << " with error " << pMsg;
856  throw StoreException(os.str());
857  }
858 #else
859  int getErrorNo() const
860  {
861  return errno;
862  }
863 
864  void error(const std::string& message_, int err)
865  {
866  std::ostringstream os;
867  os << message_ << ". Error is " << strerror(err);
868  throw StoreException(os.str());
869  }
870 #endif
871 #if defined(sparc)
872 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; }
873 #define AMPS_READ8(p, v) { memcpy(&v,p,8); }
874 #else
875 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; }
876 #define AMPS_READ8(p,v) { v = *(const size_t*)p; }
877 #endif
878 
879  // This implementation will use this when logging a bookmark or a persisted
880  void write(const Message::Field& subId_,
881  char type_, const Message::Field& bookmark_)
882  {
883  Lock<Mutex> guard(_fileLock);
884  write(&_log, &_logOffset, subId_, type_, bookmark_);
885  }
886 
887  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
888  char type_, const Message::Field& bookmark_)
889  {
890  if (!_recovering && isWritableBookmark(bookmark_.len()))
891  {
892  size_t len = subId_.len();
893  // Check we'll be in the current boundaries
894  size_t blockLen = len + 2 * sizeof(size_t) + bookmark_.len() + 1;
895  if (*logOffsetPtr + blockLen >= _fileSize)
896  {
897  setFileSize(_fileSize * 2);
898  }
899  char* offset = *logPtr + *logOffsetPtr;
900  AMPS_WRITE8(offset, len);
901  offset += sizeof(size_t);
902  memcpy(offset, static_cast<const void*>(subId_.data()), len);
903  offset += len;
904  *offset++ = type_;
905  len = bookmark_.len();
906  AMPS_WRITE8(offset, len);
907  offset += sizeof(size_t);
908  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
909  *logOffsetPtr += blockLen;
910  }
911  }
912 
913  // This implementation will only ever use this when discarding a bookmark
914  // Could be used to add a feature where generated bookmarks are logged in
915  // addition to the bookmark field.
916  void write(const Message::Field& subId_, char type_, size_t bookmark_)
917  {
918  Lock<Mutex> guard(_fileLock);
919  write(&_log, &_logOffset, subId_, type_, bookmark_);
920  }
921 
922  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
923  char type_, size_t bookmark_)
924  {
925  if (!_recovering)
926  {
927  size_t len = subId_.len();
928  size_t blockLen = len + 2 * sizeof(size_t) + 1;
929  // Check we'll be in the current boundaries
930  if (*logOffsetPtr + blockLen >= _fileSize)
931  {
932  setFileSize(_fileSize * 2);
933  }
934  char* offset = *logPtr + *logOffsetPtr;
935  *(reinterpret_cast<size_t*>(offset)) = len;
936  offset += sizeof(size_t);
937  memcpy(offset, static_cast<const void*>(subId_.data()), len);
938  offset += len;
939  *offset++ = type_;
940  *(reinterpret_cast<size_t*>(offset)) = bookmark_;
941  *logOffsetPtr += blockLen;
942  }
943  }
944 
945  void setFileSize(size_t newSize_)
946  {
947  if (_log && newSize_ <= _fileSize) // Improper resize attempt
948  {
949  return;
950  }
951 #ifdef _WIN32
952  _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
953 #else
954  _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
955 #endif
956  }
957 
958  // Returns new file size, 0 if there is a failure
959  size_t _setFileSize(size_t newSize_, char** log_, FileType file_,
960 #ifdef WIN32
961  HANDLE* mapFile_
962 #else
963  size_t fileSize_
964 #endif
965  )
966  {
967  // Make sure we're using a multiple of page size
968  size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
969  if (sz < newSize_ || sz == 0)
970  {
971  sz += getPageSize();
972  }
973 #ifdef _WIN32
974  if (*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
975  {
976  if (*log_)
977  {
978  FlushViewOfFile(*log_, 0);
979  UnmapViewOfFile(*log_);
980  }
981  CloseHandle(*mapFile_);
982  }
983 #ifdef _WIN64
984  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
985 #else
986  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
987 #endif
988  if (*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
989  {
990  DWORD errNo = getErrorNo();
991  CloseHandle(file_);
992  std::ostringstream os;
993  os << "Failed to create map of MMapBookmarkStore file " << _fileName
994  << " during resize.";
995  error(os.str(), errNo);
996  *log_ = 0;
997  return 0;
998  }
999  else
1000  {
1001  *log_ = (char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
1002  if (*log_ == NULL)
1003  {
1004  DWORD errNo = getErrorNo();
1005  CloseHandle(*mapFile_);
1006  CloseHandle(file_);
1007  std::ostringstream os;
1008  os << "Failed to map MMapBookmarkStore file " << _fileName
1009  << " to memory during resize.";
1010  error(os.str(), errNo);
1011  *log_ = 0;
1012  return 0;
1013  }
1014  }
1015 #else
1016  // Extend the underlying file
1017  if (lseek(file_, (off_t)sz, SEEK_SET) == -1)
1018  {
1019  int err = getErrorNo();
1020  ::close(file_);
1021  std::ostringstream os;
1022  os << "Failed to seek in MMapBookmarkStore file " << _fileName
1023  << " during resize.";
1024  error(os.str(), err);
1025  }
1026  if (::write(file_, "", 1) == -1)
1027  {
1028  int err = getErrorNo();
1029  ::close(file_);
1030  std::ostringstream os;
1031  os << "Failed to grow MMapBookmarkStore file " << _fileName
1032  << " during resize.";
1033  error(os.str(), err);
1034  }
1035  if (*log_)
1036  {
1037 #if defined(linux)
1038  *log_ = static_cast<char*>(mremap(*log_, fileSize_, sz,
1039  MREMAP_MAYMOVE));
1040 #else
1041  munmap(*log_, fileSize_);
1042  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
1043  MAP_SHARED, file_, 0));
1044 #endif
1045  }
1046  else // New mapping
1047  {
1048  // New mapping, map the full file size for recovery or else it std size
1049  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
1050  MAP_SHARED, file_, 0));
1051  }
1052 
1053  if ((void*)(*log_) == MAP_FAILED)
1054  {
1055  int err = getErrorNo();
1056  ::close(file_);
1057  *log_ = 0;
1058  std::ostringstream os;
1059  os << "Failed to map MMapBookmarkStore file " << _fileName
1060  << " to memory during resize.";
1061  error(os.str(), err);
1062  return 0;
1063  }
1064 #endif
1065  return sz;
1066  }
1067 
1068  void recover(bool useLastModifiedTime_ = false,
1069  bool hasAdapter_ = false)
1070  {
1071  Message::Field sub;
1072  Message::Field bookmarkField;
1073  size_t bookmarkLen = 0;
1074  size_t lastGoodOffset = 0;
1075  bool inError = false;
1076  Lock<Mutex> guard(_lock);
1077  Lock<Mutex> fileGuard(_fileLock);
1078  _recovering = true;
1079  // Map of bookmark to sequence number
1080  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1081  typedef std::map<Message::Field, size_t,
1082  Message::Field::FieldHash>::iterator BookmarkMapIter;
1083  // Map of subId to set of recovered bookmarks
1084  typedef std::map<Message::Field, BookmarkMap*,
1085  Message::Field::FieldHash> ReadMap;
1086  typedef std::map<Message::Field, BookmarkMap*,
1087  Message::Field::FieldHash>::iterator ReadMapIter;
1088  ReadMap recovered;
1089  size_t subLen = *(reinterpret_cast<size_t*>(_log));
1090  while (!inError && subLen > 0)
1091  {
1092  // If we recover something, remove anything adapter recovered
1093  if (_logOffset == 0 && hasAdapter_)
1094  {
1095  MemoryBookmarkStore::__purge();
1096  }
1097  _logOffset += sizeof(size_t);
1098  sub.assign(_log + _logOffset, subLen);
1099  _logOffset += subLen;
1100  switch (_log[_logOffset++])
1101  {
1102  case (char)-1:
1103  return;
1104  case ENTRY_BOOKMARK:
1105  {
1106  AMPS_READ8((_log + _logOffset), bookmarkLen);
1107  _logOffset += sizeof(size_t);
1108  bookmarkField.assign(_log + _logOffset, bookmarkLen);
1109  _logOffset += bookmarkLen;
1110  Subscription* subP = find(sub);
1111  BookmarkMap* bookmarks = NULL;
1112  ReadMapIter iter = recovered.find(sub);
1113  if (iter == recovered.end())
1114  {
1115  Message::Field subKey;
1116  subKey.deepCopy(sub);
1117  bookmarks = new BookmarkMap();
1118  recovered[subKey] = bookmarks;
1119  }
1120  else
1121  {
1122  bookmarks = iter->second;
1123  }
1124  if (bookmarks->find(bookmarkField) != bookmarks->end())
1125  {
1126  std::for_each(bookmarks->begin(), bookmarks->end(),
1127  _clearBookmark);
1128  bookmarks->clear();
1129  subP->getMostRecent(true);
1130  }
1131  if (BookmarkRange::isRange(bookmarkField))
1132  {
1133  subP->log(bookmarkField);
1134  }
1135  else if (!subP->isDiscarded(bookmarkField))
1136  {
1137  size_t sequence = subP->log(bookmarkField);
1138  Message::Field copy;
1139  copy.deepCopy(bookmarkField);
1140  bookmarks->insert(std::make_pair(copy, sequence));
1141  }
1142  else
1143  {
1144  // We know it's discarded, but there may still be a
1145  // discard entry in the log, so avoid a search.
1146  Message::Field copy;
1147  copy.deepCopy(bookmarkField);
1148  bookmarks->insert(std::make_pair(copy, 0));
1149  }
1150  }
1151  break;
1152  case ENTRY_DISCARD:
1153  {
1154  AMPS_READ8((_log + _logOffset), bookmarkLen);
1155  _logOffset += sizeof(size_t);
1156  bookmarkField.assign(_log + _logOffset, bookmarkLen);
1157  _logOffset += bookmarkLen;
1158  size_t sequence = AMPS_UNSET_INDEX;
1159  ReadMapIter iter = recovered.find(sub);
1160  if (iter != recovered.end())
1161  {
1162  BookmarkMap* bookmarks = iter->second;
1163  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1164  if (bookmarkIter != bookmarks->end())
1165  {
1166  sequence = bookmarkIter->second;
1167  Message::Field bookmarkToClear(bookmarkIter->first);
1168  bookmarkToClear.clear();
1169  bookmarks->erase(bookmarkIter);
1170  }
1171  }
1172  if (!BookmarkRange::isRange(bookmarkField))
1173  {
1174  Subscription* subP = find(sub);
1175  if (sequence != AMPS_UNSET_INDEX)
1176  {
1177  // A sequence of 0 means it was already discarded
1178  if (sequence)
1179  {
1180  subP->discard(sequence);
1181  }
1182  }
1183  else // Shouldn't end up here, but just in case we'll search
1184  {
1185  subP->discard(bookmarkField);
1186  }
1187  }
1188  }
1189  break;
1190  case ENTRY_PERSISTED:
1191  {
1192  AMPS_READ8((_log + _logOffset), bookmarkLen);
1193  _logOffset += sizeof(size_t);
1194  bookmarkField.assign(_log + _logOffset, bookmarkLen);
1195  _logOffset += bookmarkLen;
1196  MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1197  }
1198  break;
1199  default:
1200  if (lastGoodOffset == 0)
1201  {
1202  error("Error while recovering MMapBookmarkStore file.", getErrorNo());
1203  }
1204  else
1205  {
1206  _logOffset = lastGoodOffset;
1207  inError = true;
1208  }
1209  }
1210  lastGoodOffset = _logOffset;
1211  if (!inError && _logOffset + 8 < _fileSize)
1212  {
1213  subLen = *(reinterpret_cast<size_t*>(_log + _logOffset));
1214  }
1215  }
1216  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1217  {
1218  if (recovered.count(i->first) && !recovered[i->first]->empty())
1219  {
1220  if (i->second->getMostRecent(false).len() > 1)
1221  {
1222  i->second->justRecovered();
1223  }
1224  else
1225  {
1226  // Unlikely, but we may have recovered only undiscarded
1227  // bookmarks so just restart as a new subscription.
1228  delete i->second;
1229  _subs[i->first] = new Subscription(this, i->first);
1230  }
1231  }
1232  if (useLastModifiedTime_ && _fileTimestamp)
1233  {
1234  _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1235  }
1236  }
1237  if (_fileTimestamp)
1238  {
1239  delete[] _fileTimestamp;
1240  _fileTimestamp = 0;
1241  }
1242  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1243  {
1244  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1245  delete i->second;
1246  Message::Field f = i->first;
1247  f.clear();
1248  }
1249  _recovering = false;
1250  }
1251 
1252  Mutex _fileLock;
1253  std::string _fileName;
1254  size_t _fileSize;
1255  size_t _logOffset;
1256  char* _log;
1257  char* _fileTimestamp;
1258  FileType _file;
1259  // Each entry begins with a single byte indicating the type of entry:
1260  // a new bookmark, or a discard of a previous one.
1261  static size_t getPageSize()
1262  {
1263  static size_t pageSize;
1264  if (pageSize == 0)
1265  {
1266 #ifdef _WIN32
1267  SYSTEM_INFO SYS_INFO;
1268  GetSystemInfo(&SYS_INFO);
1269  pageSize = SYS_INFO.dwPageSize;
1270 #else
1271  pageSize = (size_t)sysconf(_SC_PAGESIZE);
1272 #endif
1273  }
1274  return pageSize;
1275  }
1276 
1277  };
1278 
1279 } // end namespace AMPS
1280 
1281 
1282 #endif // _MMAPBOOKMARKSTORE_H_
1283 
virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:305
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:232
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:269
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:328
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MMapBookmarkStore.hpp:252
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:319
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1365
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:170
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:334
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:209
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:137
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:102
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:107
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:283