AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.5
LoggedBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <amps/ampsplusplus.hpp>
27 
28 #ifndef _LOGGEDBOOKMARKSTORE_H_
29 #define _LOGGEDBOOKMARKSTORE_H_
30 
32 #include <amps/RecoveryPoint.hpp>
34 #include <string>
35 #ifdef _WIN32
36  #include <windows.h>
37 #else
38  #include <sys/mman.h>
39  #include <unistd.h>
40  #include <sys/uio.h>
41 #endif
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <map>
46 #include <set>
47 
48 #if defined(sun)
49  typedef char* amps_iovec_base_ptr;
50 #else
51  typedef void* amps_iovec_base_ptr;
52 #endif
53 
58 
59 namespace AMPS
60 {
65  {
66  private:
67  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
68  {
69  Message::Field f(pair.first);
70  f.clear();
71  }
72 #ifdef _WIN32
73  typedef HANDLE FileType;
74 #else
75  typedef int FileType;
76 #endif
77  public:
88  LoggedBookmarkStore(const char* fileName_,
89  bool useLastModifiedTime_ = false)
91 #ifdef _WIN32
92  , _file(INVALID_HANDLE_VALUE)
93 #else
94  , _file(0)
95 #endif
96  , _fileName(fileName_)
97  {
98  init();
99  recover(useLastModifiedTime_, false);
100  }
101 
109  LoggedBookmarkStore(const std::string& fileName_,
110  bool useLastModifiedTime_ = false)
112 #ifdef _WIN32
113  , _file(INVALID_HANDLE_VALUE)
114 #else
115  , _file(0)
116 #endif
117  , _fileName(fileName_)
118  {
119  init();
120  recover(useLastModifiedTime_, false);
121  }
122 
138  const char* fileName_,
139  RecoveryPointFactory factory_ = NULL,
140  bool useLastModifiedTime_ = false)
141  : MemoryBookmarkStore(adapter_, factory_)
142 #ifdef _WIN32
143  , _file(INVALID_HANDLE_VALUE)
144 #else
145  , _file(0)
146 #endif
147  , _fileName(fileName_)
148  {
149  init();
150  recover(useLastModifiedTime_, true);
151  }
152 
165  const std::string& fileName_,
166  RecoveryPointFactory factory_ = NULL,
167  bool useLastModifiedTime_ = false)
168  : MemoryBookmarkStore(adapter_, factory_)
169 #ifdef _WIN32
170  , _file(INVALID_HANDLE_VALUE)
171 #else
172  , _file(0)
173 #endif
174  , _fileName(fileName_)
175  {
176  init();
177  recover(useLastModifiedTime_, true);
178  }
179 
180  virtual ~LoggedBookmarkStore()
181  {
182  // ~MemoryBookmarkStore handles closing the adapter
183  close();
184  // In case _lock gets acquired by reader thread between end of this
185  // destructor and start of base class destructor, prevent write()
186  _recoveringFile = true;
187  }
188 
189  void close()
190  {
191 #ifdef _WIN32
192  CloseHandle(_file);
193 #else
194  ::close(_file);
195 #endif
196  }
197 
203  virtual size_t log(Message& message_)
204  {
205  Message::Field bookmark = message_.getBookmark();
206  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
207  Lock<Mutex> guard(_lock);
208  if (!sub)
209  {
210  Message::Field subId = message_.getSubscriptionId();
211  if (subId.empty())
212  {
213  subId = message_.getSubscriptionIds();
214  }
215  sub = find(subId);
216  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
217  }
218  write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
219  return MemoryBookmarkStore::_log(message_);
220  }
221 
226  virtual void discard(const Message& message_)
227  {
228  Message::Field bookmark = message_.getBookmark();
229  Message::Field subId = message_.getSubscriptionId();
230  if (subId.empty())
231  {
232  subId = message_.getSubscriptionIds();
233  }
234  Lock<Mutex> guard(_lock);
235  write(_file, subId, ENTRY_DISCARD, bookmark);
236  MemoryBookmarkStore::_discard(message_);
237  }
238 
246  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
247  {
248  Lock<Mutex> l(_lock);
249  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
250  if (!entry || entry->_val.empty())
251  {
252  return;
253  }
254  write(_file, subId_, ENTRY_DISCARD, entry->_val);
255  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
256  }
257 
264  {
265  Lock<Mutex> l(_lock);
266  return MemoryBookmarkStore::_getMostRecent(subId_);
267  }
268 
277  virtual bool isDiscarded(Message& message_)
278  {
279  Lock<Mutex> l(_lock);
280  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
281  if (retVal)
282  {
283  Message::Field subId = message_.getSubscriptionId();
284  if (subId.empty())
285  {
286  subId = message_.getSubscriptionIds();
287  }
288  write(_file, subId, ENTRY_BOOKMARK, message_.getBookmark());
289  write(_file, subId, ENTRY_DISCARD, message_.getBookmark());
290  }
291  return retVal;
292  }
293 
299  virtual void purge()
300  {
301  Lock<Mutex> guard(_lock);
302 #ifdef _WIN32
303  if (_file != INVALID_HANDLE_VALUE)
304  {
305  CloseHandle(_file);
306  }
307  DeleteFileA(_fileName.c_str());
308  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
309  NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
310  if ( _file == INVALID_HANDLE_VALUE )
311  {
312  DWORD err = getErrorNo();
313  std::ostringstream os;
314  os << "Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName << " for LoggedBookmarkStore";
315  error(os.str(), err);
316  return;
317  }
318 #else
319  ::close(_file);
320  ::unlink(_fileName.c_str());
321  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
322  if (_file == -1)
323  {
324  error("Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
325  return;
326  }
327 #endif
328  MemoryBookmarkStore::_purge();
329  }
330 
336  virtual void purge(const Message::Field& subId_)
337  {
338  Lock<Mutex> guard(_lock);
339  MemoryBookmarkStore::_purge(subId_);
340  std::string tmpFileName = _fileName + ".tmp";
341  __prune(tmpFileName);
342  }
343 
344  void setServerVersion(const VersionInfo& version_)
345  {
347  }
348 
349  void setServerVersion(size_t version_)
350  {
352  }
353 
354  // Yes, the argument is a non-const copy of what is passed in
355  void _prune(const std::string& tmpFileName_)
356  {
357  Lock<Mutex> guard(_lock);
358  // If nothing's changed with most recent, don't rewrite the file
359  if (!_recentChanged)
360  {
361  return;
362  }
363  if (tmpFileName_.empty())
364  {
365  __prune(_fileName + ".tmp");
366  }
367  else
368  {
369  __prune(tmpFileName_);
370  }
371  _recentChanged = false;
372  }
373 
374  void __prune(const std::string& tmpFileName_)
375  {
376 #ifdef _WIN32
377  HANDLE tmpFile;
378  tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
379  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
380  if (tmpFile == INVALID_HANDLE_VALUE )
381  {
382  DWORD err = getErrorNo();
383  std::ostringstream os;
384  os << "Failed to create temp log file " << tmpFileName_ <<
385  " to prune LoggedBookmarkStore " << _fileName;
386  error(os.str(), err);
387  return;
388  }
389 #else
390  int tmpFile;
391  tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
392  if (tmpFile == -1)
393  {
394  int err = getErrorNo();
395  std::ostringstream os;
396  os << "Failed to create temp log file " << tmpFileName_ <<
397  " to prune LoggedBookmarkStore " << _fileName;
398  error(os.str(), err);
399  return;
400  }
401 #endif
402  try
403  {
404  for (SubscriptionMap::iterator i = _subs.begin();
405  i != _subs.end(); ++i)
406  {
407  Message::Field subId = i->first;
408  assert(!subId.empty());
409  Subscription* subPtr = i->second;
410  const BookmarkRange& range = subPtr->getRange();
411  if (range.isValid())
412  {
413  write(tmpFile, subId, ENTRY_BOOKMARK, range);
414  }
415  Message::Field recent = subPtr->getMostRecent(false);
416  amps_uint64_t recentPub, recentSeq;
417  Subscription::parseBookmark(recent, recentPub, recentSeq);
418  Subscription::PublisherMap publishersDiscarded =
419  subPtr->_publishers;
420  MemoryBookmarkStore::EntryPtrList recovered;
421  subPtr->getRecoveryEntries(recovered);
422  subPtr->setPublishersToDiscarded(&recovered,
423  &publishersDiscarded);
424  char tmpBookmarkBuffer[128];
425  for (Subscription::PublisherIterator pub =
426  publishersDiscarded.begin(),
427  e = publishersDiscarded.end();
428  pub != e; ++pub)
429  {
430  // Don't log EPOCH if it got in the map
431  if (pub->first == 0 || pub->second == 0)
432  {
433  continue;
434  }
435  // Don't log the most recent yet
436  if (pub->first == recentPub)
437  {
438  continue;
439  }
440  int written = AMPS_snprintf_amps_uint64_t(
441  tmpBookmarkBuffer,
442  sizeof(tmpBookmarkBuffer),
443  pub->first);
444  *(tmpBookmarkBuffer + written++) = '|';
445  written += AMPS_snprintf_amps_uint64_t(
446  tmpBookmarkBuffer + written,
447  sizeof(tmpBookmarkBuffer)
448  - (size_t)written,
449  pub->second);
450  *(tmpBookmarkBuffer + written++) = '|';
451  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
452  write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
453  write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
454  }
455  if (isWritableBookmark(recent.len()))
456  {
457  write(tmpFile, subId, ENTRY_BOOKMARK, recent);
458  write(tmpFile, subId, ENTRY_DISCARD, recent);
459  }
460  else // set up _recentList
461  {
462  subPtr->getMostRecentList();
463  }
464  if (isWritableBookmark(subPtr->getLastPersisted().len()))
465  {
466  write(tmpFile, subId, ENTRY_PERSISTED,
467  subPtr->getLastPersisted());
468  }
469  subPtr->getActiveEntries(recovered);
470  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
471  recovered.begin();
472  entry != recovered.end(); ++entry)
473  {
474  if ((*entry)->_val.empty() ||
475  !isWritableBookmark((*entry)->_val.len()))
476  {
477  continue;
478  }
479  write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
480  if (!(*entry)->_active)
481  {
482  write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
483  }
484  }
485  }
486  }
487  catch (StoreException& ex)
488  {
489 #ifdef _WIN32
490  CloseHandle(tmpFile);
491  DeleteFileA(tmpFileName_.c_str());
492 #else
493  ::close(tmpFile);
494  unlink(tmpFileName_.c_str());
495 #endif
496  std::ostringstream os;
497  os << "Exception during prune: " << ex.what();
498  throw StoreException(os.str());
499  }
500 #ifdef _WIN32
501  CloseHandle(_file);
502  CloseHandle(tmpFile);
503  _file = INVALID_HANDLE_VALUE;
504  tmpFile = INVALID_HANDLE_VALUE;
505  // Replace file with pruned file
506  int retryCount = 3;
507  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
508  MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
509  {
510  DWORD err = getErrorNo();
511  if (--retryCount > 0)
512  {
513  continue;
514  }
515  // Try to set _file to the tmp file that won't move then throw
516  std::string desiredFileName = _fileName;
517  _fileName = tmpFileName_;
518  init();
519  std::ostringstream os;
520  os << "Failed to move completed temp file " << tmpFileName_
521  << " to " << desiredFileName
522  << " in prune in LoggedBookmarkStore. Continuing by using "
523  << tmpFileName_ << " as the LoggedBookmarkStore file.";
524  error(os.str(), err);
525  return;
526  }
527  init();
528  SetFilePointer(_file, 0, NULL, FILE_END);
529 #else
530  ::close(tmpFile);
531  ::close(_file);
532  if (-1 == ::unlink(_fileName.c_str()))
533  {
534  int err = getErrorNo();
535  // Try to set _file to the tmp file then throw
536  std::string desiredFileName = _fileName;
537  _fileName = tmpFileName_;
538  init();
539  std::ostringstream os;
540  os << "Failed to delete file " << desiredFileName
541  << " after creating temporary file " << tmpFileName_
542  << " in prune in LoggedBookmarkStore. Continuing by using "
543  << tmpFileName_ << " as the LoggedBookmarkStore file.";
544  error(os.str(), err);
545  return;
546  }
547  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
548  {
549  int err = getErrorNo();
550  // Try to set _file to the tmp file that won't move then throw
551  std::string desiredFileName = _fileName;
552  _fileName = tmpFileName_;
553  init();
554  std::ostringstream os;
555  os << "Failed to move completed temp file " << tmpFileName_
556  << " to " << desiredFileName
557  << " in prune in LoggedBookmarkStore. Continuing by using "
558  << tmpFileName_ << " as the LoggedBookmarkStore file.";
559  error(os.str(), err);
560  return;
561  }
562  init();
563  struct stat fst;
564  if (-1 == ::fstat(_file, &fst))
565  {
566  int err = getErrorNo();
567  std::ostringstream os;
568  os << "Failed to get size of pruned file " << _fileName
569  << " in prune in LoggedBookmarkStore. ";
570  error(os.str(), err);
571  return;
572  }
573  ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
574 #endif
575  }
576 
577  private:
578  virtual void _persisted(Subscription* subP_,
579  const Message::Field& bookmark_)
580  {
581  Lock<Mutex> guard(_lock);
582  write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
583  MemoryBookmarkStore::_persisted(subP_, bookmark_);
584  }
585 
586  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
587  {
588  Lock<Mutex> l(_lock);
589  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
590  if (!entryPtr || entryPtr->_val.empty())
591  {
592  return Message::Field();
593  }
594  Message::Field bookmarkField = entryPtr->_val;
595  write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
596  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
597  return bookmarkField;
598  }
599 
600 #ifdef _WIN32
601  typedef DWORD ERRTYPE;
602  ERRTYPE getErrorNo() const
603  {
604  return GetLastError();
605  }
606 
607  void error(const std::string& message_, ERRTYPE err)
608  {
609  std::ostringstream os;
610  static const DWORD msgSize = 2048;
611  char pMsg[msgSize];
612  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
613  FORMAT_MESSAGE_ARGUMENT_ARRAY,
614  NULL, err, LANG_NEUTRAL,
615  pMsg, msgSize, NULL);
616  os << "File: " << _fileName << ". " << message_;
617  if (err != 0)
618  {
619  os << " with error " << pMsg;
620  }
621  throw StoreException(os.str());
622  }
623 #else
624  typedef int ERRTYPE;
625  ERRTYPE getErrorNo() const
626  {
627  return errno;
628  }
629 
630  void error(const std::string& message_, ERRTYPE err)
631  {
632  std::ostringstream os;
633  os << "File: " << _fileName << ". " << message_;
634  if (err != 0)
635  {
636  os << " with error " << strerror(err);
637  }
638  close();
639  throw StoreException(os.str());
640  }
641 #endif
642 
643  void init()
644  {
645 #ifdef _WIN32
646  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
647  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
648  if ( _file == INVALID_HANDLE_VALUE )
649  {
650  DWORD err = getErrorNo();
651  std::ostringstream os;
652  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
653  error(os.str(), err);
654  return;
655  }
656 #else
657  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
658  if (_file == -1)
659  {
660  int err = getErrorNo();
661  std::ostringstream os;
662  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
663  error(os.str(), err);
664  return;
665  }
666 #endif
667  }
668 
669  // This implementation will only ever use this when logging a bookmark
670  // Could be used to add a feature where discarded bookmark fields are logged in
671  // addition to the generated bookmark.
672  void write(FileType file_, const Message::Field& subId_, char type_,
673  const Message::Field& bookmark_)
674  {
675  Lock<Mutex> guard(_fileLock);
676  if (!_recoveringFile && isWritableBookmark(bookmark_.len()))
677  {
678 #ifdef _WIN32
679  DWORD written;
680  size_t len = subId_.len();
681  BOOL ok = WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
682  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
683  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
684  len = bookmark_.len();
685  ok |= WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
686  ok |= WriteFile(file_, (LPVOID)bookmark_.data(), (DWORD)len,
687  &written, NULL);
688  if (!ok)
689  {
690  error("Failed to write to bookmark log.", getErrorNo());
691  return;
692  }
693 
694 #else
695  if (file_ == -1)
696  {
697  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
698  if (file_ == -1)
699  {
700  int err = getErrorNo();
701  std::ostringstream os;
702  os << "Failed to open file " << _fileName
703  << " for write in LoggedBookmarkStore. ";
704  error(os.str(), err);
705  return;
706  }
707  }
708  struct iovec data[5];
709  size_t len = subId_.len();
710  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
711  data[0].iov_len = sizeof(size_t);
712  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
713  data[1].iov_len = len;
714  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
715  data[2].iov_len = 1;
716  size_t bookmarkLen = bookmark_.len();
717  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmarkLen;
718  data[3].iov_len = sizeof(size_t);
719  data[4].iov_base = (amps_iovec_base_ptr)(void*)bookmark_.data();
720  data[4].iov_len = bookmarkLen;
721  ssize_t written = ::writev(file_, data, 5);
722  if (written == -1)
723  {
724  error("Failed to write to bookmark log.", getErrorNo());
725  return;
726  }
727 #endif
728  }
729  }
730 
731  // This implementation will only ever use this when discarding a bookmark
732  // Could be used to add a feature where generated bookmarks are logged in
733  // addition to the bookmark field.
734  void write(FileType file_, const Message::Field& subId_,
735  char type_, size_t bookmark_)
736  {
737  Lock<Mutex> guard(_fileLock);
738  if (!_recoveringFile)
739  {
740 #ifdef _WIN32
741  DWORD written;
742  size_t len = subId_.len();
743  BOOL ok = WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
744  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
745  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
746  ok |= WriteFile(file_, (LPVOID)&bookmark_, sizeof(size_t),
747  &written, NULL);
748  if (!ok)
749  {
750  error("Failed to write bookmark sequence to file.", getErrorNo());
751  return;
752  }
753 
754 #else
755  if (file_ == -1)
756  {
757  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
758  if (file_ == -1)
759  {
760  int err = getErrorNo();
761  std::ostringstream os;
762  os << "Failed to open file " << _fileName
763  << " to write bookmark sequence in LoggedBookmarkStore. ";
764  error(os.str(), err);
765  return;
766  }
767  }
768  struct iovec data[4];
769  size_t len = subId_.len();
770  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
771  data[0].iov_len = sizeof(size_t);
772  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
773  data[1].iov_len = len;
774  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
775  data[2].iov_len = 1;
776  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmark_;
777  data[3].iov_len = sizeof(size_t);
778  ssize_t written = ::writev(file_, data, 4);
779  if (written == -1)
780  {
781  error("Failed to write bookmark sequence to file.", getErrorNo());
782  return;
783  }
784 #endif
785  }
786  }
787 
788 #ifdef _WIN32
789 #define VOID_P(buf) (LPVOID)buf
790  bool readFileBytes(LPVOID buffer, size_t numBytes, DWORD* bytesRead)
791  {
792  return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
793  }
794 #else
795 #define VOID_P(buf) (void*)buf
796  bool readFileBytes(void* buffer, size_t numBytes, ssize_t* bytesRead)
797  {
798  *bytesRead = ::read(_file, buffer, numBytes);
799  return (*bytesRead >= 0);
800  }
801 #endif
802 
803  void recover(bool useLastModifiedTime_, bool hasAdapter_)
804  {
805  size_t bufferLen = 128;
806  char* buffer = new char[bufferLen];
807  size_t subIdBufferLen = 128;
808  char* subIdBuffer = new char[bufferLen];
809  Message::Field sub;
810  size_t subLen = 0;
811  Message::Field bookmarkField;
812  size_t bookmarkLen = 0;
813  Lock<Mutex> l(_lock);
814  Lock<Mutex> guard(_fileLock);
815  _recoveringFile = true;
816  char* fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
817  fileTimestamp[0] = '\0';
818 #ifdef _WIN32
819  LARGE_INTEGER lifileSize;
820  if (GetFileSizeEx(_file, &lifileSize) == 0)
821  {
822  DWORD err = getErrorNo();
823  delete[] buffer;
824  delete[] subIdBuffer;
825  _recoveringFile = false;
826  error("Failure getting file size while trying to recover.", err);
827  return;
828  }
829 #ifdef _WIN64
830  size_t fileSize = lifileSize.QuadPart;
831 #else
832  size_t fileSize = lifileSize.LowPart;
833 #endif
834  if (useLastModifiedTime_ && fileSize > 0)
835  {
836  FILETIME ftModifiedTime;
837  if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
838  {
839  DWORD err = getErrorNo();
840  delete[] buffer;
841  delete[] subIdBuffer;
842  _recoveringFile = false;
843  error("Failure getting file time while trying to recover.", err);
844  return;
845  }
846  SYSTEMTIME st;
847  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
848  {
849  DWORD err = getErrorNo();
850  delete[] buffer;
851  delete[] subIdBuffer;
852  _recoveringFile = false;
853  error("Failure converting file time while trying to recover.", err);
854  return;
855  }
856  sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
857  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
858  st.wDay, st.wHour, st.wMinute, st.wSecond);
859  fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
860  }
861  else if (fileSize == 0)
862  {
863  delete[] fileTimestamp;
864  delete[] buffer;
865  delete[] subIdBuffer;
866  _recoveringFile = false;
867  return;
868  }
869  DWORD readBytes = 0;
870  OFF_T loc = 0;
871  SetFilePointer(_file, 0, NULL, FILE_BEGIN);
872 #else
873  struct stat fst;
874  ::fstat(_file, &fst);
875  ssize_t fileSize = fst.st_size;
876  ssize_t readBytes = 0;
877  if (useLastModifiedTime_ && fileSize > 0)
878  {
879  struct tm timeInfo;
880  gmtime_r(&fst.st_mtime, &timeInfo);
881  strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
882  "%Y%m%dT%H%M%S", &timeInfo);
883  fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
884  }
885  else if (fileSize == 0)
886  {
887  delete[] fileTimestamp;
888  delete[] buffer;
889  delete[] subIdBuffer;
890  _recoveringFile = false;
891  return;
892  }
893  OFF_T loc = 0;
894  ::lseek(_file, loc, SEEK_SET);
895 #endif
896  // We trust file recovery over Adapter recovery
897  if (hasAdapter_)
898  {
899  MemoryBookmarkStore::__purge();
900  }
901  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes)
902  || subLen > getMaxSubIdLength())
903  {
904  delete[] fileTimestamp;
905  delete[] buffer;
906  delete[] subIdBuffer;
907  _recoveringFile = false;
908  error("Failure reading file while trying to recover.", getErrorNo());
909  return;
910  }
911 #ifdef _WIN32
912  size_t totalBytes = readBytes;
913 #else
914  ssize_t totalBytes = readBytes;
915 #endif
916  ERRTYPE err = 0; // 0 no error, -1 corruption, positive is errno file error
917  size_t tooManyBytes = 0;
918  typedef std::map<Message::Field, size_t,
919  Message::Field::FieldHash> BookmarkMap;
920  typedef std::map<Message::Field, size_t,
921  Message::Field::FieldHash>::iterator BookmarkMapIter;
922  // Map of subId to set of recovered bookmarks
923  typedef std::map<Message::Field, BookmarkMap*,
924  Message::Field::FieldHash> ReadMap;
925  typedef std::map<Message::Field, BookmarkMap*,
926  Message::Field::FieldHash>::iterator ReadMapIter;
927  ReadMap recovered;
928  while (subLen > 0 && (size_t)readBytes == sizeof(size_t) &&
929  (size_t)totalBytes <= (size_t)fileSize)
930  {
931  if (subLen >= ((size_t)fileSize - (size_t)totalBytes)
932  || subLen > getMaxSubIdLength())
933  {
934  err = (ERRTYPE) - 1;
935  tooManyBytes = subLen + 1;
936  break;
937  }
938  else
939  {
940  if (subIdBufferLen < subLen)
941  {
942  delete [] subIdBuffer;
943  subIdBufferLen = 2 * subLen;
944  subIdBuffer = new char[subIdBufferLen];
945  }
946  if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
947  {
948  err = getErrorNo();
949  tooManyBytes = subLen;
950  break;
951  }
952  totalBytes += readBytes;
953  sub.assign(subIdBuffer, subLen);
954  if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
955  {
956  err = getErrorNo();
957  tooManyBytes = 1;
958  break;
959  }
960  totalBytes += readBytes;
961  switch (buffer[0])
962  {
963  case ENTRY_BOOKMARK:
964  {
965  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
966  {
967  // Corrupt final record is ok
968  err = (ERRTYPE) - 1;
969  tooManyBytes = sizeof(size_t);
970  break;
971  }
972  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
973  {
974  err = getErrorNo();
975  tooManyBytes = sizeof(size_t);
976  break;
977  }
978  totalBytes += readBytes;
979  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
980  {
981  // Corrupt final record is ok
982  err = (ERRTYPE) - 1;
983  tooManyBytes = bookmarkLen;
984  break;
985  }
986  if (bufferLen < bookmarkLen)
987  {
988  delete [] buffer;
989  bufferLen = 2 * bookmarkLen;
990  buffer = new char[bufferLen];
991  }
992  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
993  {
994  err = getErrorNo();
995  tooManyBytes = bookmarkLen;
996  break;
997  }
998  totalBytes += readBytes;
999  bookmarkField.assign(buffer, bookmarkLen);
1000  Subscription* subP = find(sub);
1001  BookmarkMap* bookmarks = NULL;
1002  ReadMapIter iter = recovered.find(sub);
1003  if (iter == recovered.end())
1004  {
1005  Message::Field subKey;
1006  subKey.deepCopy(sub);
1007  bookmarks = new BookmarkMap();
1008  recovered[subKey] = bookmarks;
1009  }
1010  else
1011  {
1012  bookmarks = iter->second;
1013  }
1014  if (bookmarks->find(bookmarkField) != bookmarks->end())
1015  {
1016  std::for_each(bookmarks->begin(), bookmarks->end(),
1017  _clearBookmark);
1018  bookmarks->clear();
1019  subP->getMostRecent(true);
1020  }
1021  if (BookmarkRange::isRange(bookmarkField))
1022  {
1023  subP->log(bookmarkField);
1024  }
1025  else if (!subP->isDiscarded(bookmarkField))
1026  {
1027  size_t sequence = subP->log(bookmarkField);
1028  Message::Field copy;
1029  copy.deepCopy(bookmarkField);
1030  bookmarks->insert(std::make_pair(copy, sequence));
1031  }
1032  else
1033  {
1034  // We know it's discarded, but there may still be a
1035  // discard entry in the log, so avoid a search.
1036  Message::Field copy;
1037  copy.deepCopy(bookmarkField);
1038  bookmarks->insert(std::make_pair(copy, 0));
1039  }
1040  }
1041  break;
1042 
1043  case ENTRY_DISCARD:
1044  {
1045  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1046  {
1047  // Corrupt final record is ok
1048  err = (ERRTYPE) - 1;
1049  tooManyBytes = sizeof(size_t);
1050  break;
1051  }
1052  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1053  {
1054  err = getErrorNo();
1055  tooManyBytes = sizeof(size_t);
1056  break;
1057  }
1058  totalBytes += readBytes;
1059  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1060  {
1061  // Corrupt final record is ok
1062  err = (ERRTYPE) - 1;
1063  tooManyBytes = bookmarkLen;
1064  break;
1065  }
1066  if (bufferLen < bookmarkLen)
1067  {
1068  delete [] buffer;
1069  bufferLen = 2 * bookmarkLen;
1070  buffer = new char[bufferLen];
1071  }
1072  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1073  {
1074  err = getErrorNo();
1075  tooManyBytes = bookmarkLen;
1076  break;
1077  }
1078  totalBytes += readBytes;
1079  bookmarkField.assign(buffer, bookmarkLen);
1080  size_t sequence = AMPS_UNSET_INDEX;
1081  ReadMapIter iter = recovered.find(sub);
1082  if (iter != recovered.end())
1083  {
1084  BookmarkMap* bookmarks = iter->second;
1085  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1086  if (bookmarkIter != bookmarks->end())
1087  {
1088  sequence = bookmarkIter->second;
1089  Message::Field bookmarkToClear(bookmarkIter->first);
1090  bookmarkToClear.clear();
1091  bookmarks->erase(bookmarkIter);
1092  }
1093  }
1094  Subscription* subP = find(sub);
1095  if (!BookmarkRange::isRange(bookmarkField))
1096  {
1097  if (sequence != AMPS_UNSET_INDEX)
1098  {
1099  // A sequence of 0 means it was already discarded
1100  if (sequence)
1101  {
1102  subP->discard(sequence);
1103  }
1104  }
1105  else // Shouldn't end up here, but just in case we'll search
1106  {
1107  subP->discard(bookmarkField);
1108  }
1109  }
1110  }
1111  break;
1112  case ENTRY_PERSISTED:
1113  {
1114  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1115  {
1116  // Corrupt final record is ok
1117  err = (ERRTYPE) - 1;
1118  tooManyBytes = sizeof(size_t);
1119  break;
1120  }
1121  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1122  {
1123  err = getErrorNo();
1124  tooManyBytes = sizeof(size_t);
1125  break;
1126  }
1127  totalBytes += readBytes;
1128  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1129  {
1130  // Corrupt final record is ok
1131  err = (ERRTYPE) - 1;
1132  tooManyBytes = bookmarkLen;
1133  break;
1134  }
1135  if (bufferLen < bookmarkLen)
1136  {
1137  delete [] buffer;
1138  bufferLen = 2 * bookmarkLen;
1139  buffer = new char[bufferLen];
1140  }
1141  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1142  {
1143  err = getErrorNo();
1144  tooManyBytes = bookmarkLen;
1145  break;
1146  }
1147  totalBytes += readBytes;
1148  bookmarkField.assign(buffer, bookmarkLen);
1149  Subscription* subP = find(sub);
1150  MemoryBookmarkStore::_persisted(subP, bookmarkField);
1151  }
1152  break;
1153  default:
1154  {
1155  // Corrupt final record is ok
1156  err = (ERRTYPE) - 1;
1157  tooManyBytes = (size_t)fileSize - (size_t)totalBytes;
1158  }
1159  break;
1160  }
1161  }
1162  loc = (OFF_T)totalBytes;
1163  if ((size_t)totalBytes > (size_t)fileSize)
1164  {
1165  loc = (OFF_T)fileSize;
1166  break;
1167  }
1168  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes))
1169  {
1170  err = getErrorNo();
1171  tooManyBytes = sizeof(size_t);
1172  break;
1173  }
1174  totalBytes += readBytes;
1175  }
1176  delete[] buffer;
1177  delete[] subIdBuffer;
1178  if (err == 0)
1179  {
1180  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1181  {
1182  if (recovered.count(i->first) && !recovered[i->first]->empty())
1183  {
1184  Subscription* subPtr = i->second;
1185  if (subPtr->getMostRecent(false).len() > 1)
1186  {
1187  subPtr->justRecovered();
1188  }
1189  else
1190  {
1191  // Unlikely, but we may have recovered only undiscarded bookmarks
1192  // so we should really just restart as a new subscription.
1193  delete subPtr;
1194  _subs[i->first] = new Subscription(this, i->first);
1195  }
1196  }
1197  if (useLastModifiedTime_ && fileTimestamp[0] != '\0')
1198  {
1199  _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1200  }
1201  }
1202  }
1203  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1204  {
1205  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1206  delete i->second;
1207  Message::Field f = i->first;
1208  f.clear();
1209  }
1210  delete[] fileTimestamp;
1211  _recoveringFile = false;
1212  if (err != 0)
1213  {
1214  // Arbitrary guess if we're on the last record
1215  // We set err to -1 if we read a corrupt value or
1216  // to errno/last error if a read failed.
1217  if (err != (ERRTYPE) - 1 || loc == 0 || fileSize - loc > 128)
1218  {
1219  std::ostringstream os;
1220  os << "Error while recovering LoggedBookmarkStore from "
1221  << _fileName
1222  << ". Record starting at " << loc
1223  << " reading at " << totalBytes
1224  << " requested " << tooManyBytes
1225  << " and file size is " << fileSize;
1226  error(os.str(), (err != (ERRTYPE) - 1 ? err : 0));
1227  }
1228  else
1229  {
1230 #ifdef _WIN32
1231 #ifdef _WIN64
1232  LONG low = (LONG)loc;
1233  LONG high = (LONG)((loc >> 32) & 0xffffffff);
1234  SetFilePointer(_file, low, &high, FILE_BEGIN);
1235 #else
1236  SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1237 #endif
1238 #else
1239  ::lseek(_file, loc, SEEK_SET);
1240 #endif
1241  }
1242  }
1243  }
1244 
1245  private:
1246  FileType _file;
1247  Mutex _fileLock;
1248  std::string _fileName;
1249  bool _recoveringFile;
1250  };
1251 
1252 } // end namespace AMPS
1253 
1254 
1255 #endif // _LOGGEDBOOKMARKSTORE_H_
1256 
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:277
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:344
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: LoggedBookmarkStore.hpp:246
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:349
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1387
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:336
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:299
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:263
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:206
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:203
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:226