AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.1
LoggedBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <amps/ampsplusplus.hpp>
27 
28 #ifndef _LOGGEDBOOKMARKSTORE_H_
29 #define _LOGGEDBOOKMARKSTORE_H_
30 
32 #include <amps/RecoveryPoint.hpp>
34 #include <string>
35 #ifdef _WIN32
36  #include <windows.h>
37 #else
38  #include <sys/mman.h>
39  #include <unistd.h>
40  #include <sys/uio.h>
41 #endif
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <map>
46 #include <set>
47 
48 #if defined(sun)
49  typedef char* amps_iovec_base_ptr;
50 #else
51  typedef void* amps_iovec_base_ptr;
52 #endif
53 
58 
59 namespace AMPS
60 {
65  {
66  private:
67  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
68  {
69  Message::Field f(pair.first);
70  f.clear();
71  }
72 #ifdef _WIN32
73  typedef HANDLE FileType;
74 #else
75  typedef int FileType;
76 #endif
77  public:
88  LoggedBookmarkStore(const char* fileName_,
89  bool useLastModifiedTime_ = false)
91 #ifdef _WIN32
92  , _file(INVALID_HANDLE_VALUE)
93 #else
94  , _file(0)
95 #endif
96  , _fileName(fileName_)
97  {
98  init();
99  recover(useLastModifiedTime_, false);
100  }
101 
109  LoggedBookmarkStore(const std::string& fileName_,
110  bool useLastModifiedTime_ = false)
112 #ifdef _WIN32
113  , _file(INVALID_HANDLE_VALUE)
114 #else
115  , _file(0)
116 #endif
117  , _fileName(fileName_)
118  {
119  init();
120  recover(useLastModifiedTime_, false);
121  }
122 
138  const char* fileName_,
139  RecoveryPointFactory factory_ = NULL,
140  bool useLastModifiedTime_ = false)
141  : MemoryBookmarkStore(adapter_, factory_)
142 #ifdef _WIN32
143  , _file(INVALID_HANDLE_VALUE)
144 #else
145  , _file(0)
146 #endif
147  , _fileName(fileName_)
148  {
149  init();
150  recover(useLastModifiedTime_, true);
151  }
152 
165  const std::string& fileName_,
166  RecoveryPointFactory factory_ = NULL,
167  bool useLastModifiedTime_ = false)
168  : MemoryBookmarkStore(adapter_, factory_)
169 #ifdef _WIN32
170  , _file(INVALID_HANDLE_VALUE)
171 #else
172  , _file(0)
173 #endif
174  , _fileName(fileName_)
175  {
176  init();
177  recover(useLastModifiedTime_, true);
178  }
179 
180  virtual ~LoggedBookmarkStore()
181  {
182  close();
183  // In case _lock gets acquired by reader thread between end of this
184  // destructor and start of base class destructor, prevent write()
185  _recoveringFile = true;
186  }
187 
188  void close()
189  {
190 #ifdef _WIN32
191  CloseHandle(_file);
192 #else
193  ::close(_file);
194 #endif
195  }
196 
202  virtual size_t log(Message& message_)
203  {
204  Message::Field bookmark = message_.getBookmark();
205  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
206  Lock<Mutex> guard(_lock);
207  if (!sub)
208  {
209  Message::Field subId = message_.getSubscriptionId();
210  if (subId.empty())
211  {
212  subId = message_.getSubscriptionIds();
213  }
214  sub = find(subId);
215  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
216  }
217  write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
218  return MemoryBookmarkStore::_log(message_);
219  }
220 
225  virtual void discard(const Message& message_)
226  {
227  Message::Field bookmark = message_.getBookmark();
228  Message::Field subId = message_.getSubscriptionId();
229  if (subId.empty())
230  {
231  subId = message_.getSubscriptionIds();
232  }
233  Lock<Mutex> guard(_lock);
234  write(_file, subId, ENTRY_DISCARD, bookmark);
235  MemoryBookmarkStore::_discard(message_);
236  }
237 
245  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
246  {
247  Lock<Mutex> l(_lock);
248  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
249  if (!entry || entry->_val.empty())
250  {
251  return;
252  }
253  write(_file, subId_, ENTRY_DISCARD, entry->_val);
254  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
255  }
256 
263  {
264  Lock<Mutex> l(_lock);
265  return MemoryBookmarkStore::_getMostRecent(subId_);
266  }
267 
276  virtual bool isDiscarded(Message& message_)
277  {
278  Lock<Mutex> l(_lock);
279  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
280  if (retVal)
281  {
282  Message::Field subId = message_.getSubscriptionId();
283  if (subId.empty())
284  {
285  subId = message_.getSubscriptionIds();
286  }
287  write(_file, subId, ENTRY_BOOKMARK, message_.getBookmark());
288  write(_file, subId, ENTRY_DISCARD, message_.getBookmark());
289  }
290  return retVal;
291  }
292 
298  virtual void purge()
299  {
300  Lock<Mutex> guard(_lock);
301 #ifdef _WIN32
302  if (_file != INVALID_HANDLE_VALUE)
303  {
304  CloseHandle(_file);
305  }
306  DeleteFileA(_fileName.c_str());
307  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
308  NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
309  if ( _file == INVALID_HANDLE_VALUE )
310  {
311  DWORD err = getErrorNo();
312  std::ostringstream os;
313  os << "Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName << " for LoggedBookmarkStore";
314  error(os.str(), err);
315  return;
316  }
317 #else
318  ::close(_file);
319  ::unlink(_fileName.c_str());
320  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
321  if (_file == -1)
322  {
323  error("Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
324  return;
325  }
326 #endif
327  MemoryBookmarkStore::_purge();
328  }
329 
335  virtual void purge(const Message::Field& subId_)
336  {
337  Lock<Mutex> guard(_lock);
338  MemoryBookmarkStore::_purge(subId_);
339  std::string tmpFileName = _fileName + ".tmp";
340  __prune(tmpFileName);
341  }
342 
343  void setServerVersion(const VersionInfo& version_)
344  {
346  }
347 
348  void setServerVersion(size_t version_)
349  {
351  }
352 
353  // Yes, the argument is a non-const copy of what is passed in
354  void _prune(const std::string& tmpFileName_)
355  {
356  Lock<Mutex> guard(_lock);
357  // If nothing's changed with most recent, don't rewrite the file
358  if (!_recentChanged)
359  {
360  return;
361  }
362  if (tmpFileName_.empty())
363  {
364  __prune(_fileName + ".tmp");
365  }
366  else
367  {
368  __prune(tmpFileName_);
369  }
370  _recentChanged = false;
371  }
372 
373  void __prune(const std::string& tmpFileName_)
374  {
375 #ifdef _WIN32
376  HANDLE tmpFile;
377  tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
378  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
379  if (tmpFile == INVALID_HANDLE_VALUE )
380  {
381  DWORD err = getErrorNo();
382  std::ostringstream os;
383  os << "Failed to create temp log file " << tmpFileName_ <<
384  " to prune LoggedBookmarkStore " << _fileName;
385  error(os.str(), err);
386  return;
387  }
388 #else
389  int tmpFile;
390  tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
391  if (tmpFile == -1)
392  {
393  int err = getErrorNo();
394  std::ostringstream os;
395  os << "Failed to create temp log file " << tmpFileName_ <<
396  " to prune LoggedBookmarkStore " << _fileName;
397  error(os.str(), err);
398  return;
399  }
400 #endif
401  try
402  {
403  for (SubscriptionMap::iterator i = _subs.begin();
404  i != _subs.end(); ++i)
405  {
406  Message::Field subId = i->first;
407  assert(!subId.empty());
408  Subscription* subPtr = i->second;
409  const BookmarkRange& range = subPtr->getRange();
410  if (range.isValid())
411  {
412  write(tmpFile, subId, ENTRY_BOOKMARK, range);
413  }
414  Message::Field recent = subPtr->getMostRecent(false);
415  amps_uint64_t recentPub, recentSeq;
416  Subscription::parseBookmark(recent, recentPub, recentSeq);
417  Subscription::PublisherMap publishersDiscarded =
418  subPtr->_publishers;
419  MemoryBookmarkStore::EntryPtrList recovered;
420  subPtr->getRecoveryEntries(recovered);
421  subPtr->setPublishersToDiscarded(&recovered,
422  &publishersDiscarded);
423  char tmpBookmarkBuffer[128];
424  for (Subscription::PublisherIterator pub =
425  publishersDiscarded.begin(),
426  e = publishersDiscarded.end();
427  pub != e; ++pub)
428  {
429  // Don't log EPOCH if it got in the map
430  if (pub->first == 0 || pub->second == 0)
431  {
432  continue;
433  }
434  // Don't log the most recent yet
435  if (pub->first == recentPub)
436  {
437  continue;
438  }
439  int written = AMPS_snprintf_amps_uint64_t(
440  tmpBookmarkBuffer,
441  sizeof(tmpBookmarkBuffer),
442  pub->first);
443  *(tmpBookmarkBuffer + written++) = '|';
444  written += AMPS_snprintf_amps_uint64_t(
445  tmpBookmarkBuffer + written,
446  sizeof(tmpBookmarkBuffer)
447  - (size_t)written,
448  pub->second);
449  *(tmpBookmarkBuffer + written++) = '|';
450  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
451  write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
452  write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
453  }
454  if (isWritableBookmark(recent.len()))
455  {
456  write(tmpFile, subId, ENTRY_BOOKMARK, recent);
457  write(tmpFile, subId, ENTRY_DISCARD, recent);
458  }
459  else // set up _recentList
460  {
461  subPtr->getMostRecentList();
462  }
463  if (isWritableBookmark(subPtr->getLastPersisted().len()))
464  {
465  write(tmpFile, subId, ENTRY_PERSISTED,
466  subPtr->getLastPersisted());
467  }
468  subPtr->getActiveEntries(recovered);
469  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
470  recovered.begin();
471  entry != recovered.end(); ++entry)
472  {
473  if ((*entry)->_val.empty() ||
474  !isWritableBookmark((*entry)->_val.len()))
475  {
476  continue;
477  }
478  write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
479  if (!(*entry)->_active)
480  {
481  write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
482  }
483  }
484  }
485  }
486  catch (StoreException& ex)
487  {
488 #ifdef _WIN32
489  CloseHandle(tmpFile);
490  DeleteFileA(tmpFileName_.c_str());
491 #else
492  ::close(tmpFile);
493  unlink(tmpFileName_.c_str());
494 #endif
495  std::ostringstream os;
496  os << "Exception during prune: " << ex.what();
497  throw StoreException(os.str());
498  }
499 #ifdef _WIN32
500  CloseHandle(_file);
501  CloseHandle(tmpFile);
502  _file = INVALID_HANDLE_VALUE;
503  tmpFile = INVALID_HANDLE_VALUE;
504  // Replace file with pruned file
505  int retryCount = 3;
506  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
507  MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
508  {
509  DWORD err = getErrorNo();
510  if (--retryCount > 0)
511  {
512  continue;
513  }
514  // Try to set _file to the tmp file that won't move then throw
515  std::string desiredFileName = _fileName;
516  _fileName = tmpFileName_;
517  init();
518  std::ostringstream os;
519  os << "Failed to move completed temp file " << tmpFileName_
520  << " to " << desiredFileName
521  << " in prune in LoggedBookmarkStore. Continuing by using "
522  << tmpFileName_ << " as the LoggedBookmarkStore file.";
523  error(os.str(), err);
524  return;
525  }
526  init();
527  SetFilePointer(_file, 0, NULL, FILE_END);
528 #else
529  ::close(tmpFile);
530  ::close(_file);
531  if (-1 == ::unlink(_fileName.c_str()))
532  {
533  int err = getErrorNo();
534  // Try to set _file to the tmp file then throw
535  std::string desiredFileName = _fileName;
536  _fileName = tmpFileName_;
537  init();
538  std::ostringstream os;
539  os << "Failed to delete file " << desiredFileName
540  << " after creating temporary file " << tmpFileName_
541  << " in prune in LoggedBookmarkStore. Continuing by using "
542  << tmpFileName_ << " as the LoggedBookmarkStore file.";
543  error(os.str(), err);
544  return;
545  }
546  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
547  {
548  int err = getErrorNo();
549  // Try to set _file to the tmp file that won't move then throw
550  std::string desiredFileName = _fileName;
551  _fileName = tmpFileName_;
552  init();
553  std::ostringstream os;
554  os << "Failed to move completed temp file " << tmpFileName_
555  << " to " << desiredFileName
556  << " in prune in LoggedBookmarkStore. Continuing by using "
557  << tmpFileName_ << " as the LoggedBookmarkStore file.";
558  error(os.str(), err);
559  return;
560  }
561  init();
562  struct stat fst;
563  if (-1 == ::fstat(_file, &fst))
564  {
565  int err = getErrorNo();
566  std::ostringstream os;
567  os << "Failed to get size of pruned file " << _fileName
568  << " in prune in LoggedBookmarkStore. ";
569  error(os.str(), err);
570  return;
571  }
572  ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
573 #endif
574  }
575 
576  private:
577  virtual void _persisted(Subscription* subP_,
578  const Message::Field& bookmark_)
579  {
580  Lock<Mutex> guard(_lock);
581  write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
582  MemoryBookmarkStore::_persisted(subP_, bookmark_);
583  }
584 
585  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
586  {
587  Lock<Mutex> l(_lock);
588  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
589  if (!entryPtr || entryPtr->_val.empty())
590  {
591  return Message::Field();
592  }
593  Message::Field bookmarkField = entryPtr->_val;
594  write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
595  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
596  return bookmarkField;
597  }
598 
599 #ifdef _WIN32
600  typedef DWORD ERRTYPE ;
601  ERRTYPE getErrorNo() const
602  {
603  return GetLastError();
604  }
605 
606  void error(const std::string& message_, ERRTYPE err)
607  {
608  std::ostringstream os;
609  static const DWORD msgSize = 2048;
610  char pMsg[msgSize];
611  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
612  FORMAT_MESSAGE_ARGUMENT_ARRAY,
613  NULL, err, LANG_NEUTRAL,
614  pMsg, msgSize, NULL);
615  os << "File: " << _fileName << ". " << message_;
616  if (err != 0)
617  {
618  os << " with error " << pMsg;
619  }
620  throw StoreException(os.str());
621  }
622 #else
623  typedef int ERRTYPE;
624  ERRTYPE getErrorNo() const
625  {
626  return errno;
627  }
628 
629  void error(const std::string& message_, ERRTYPE err)
630  {
631  std::ostringstream os;
632  os << "File: " << _fileName << ". " << message_;
633  if (err != 0)
634  {
635  os << " with error " << strerror(err);
636  }
637  close();
638  throw StoreException(os.str());
639  }
640 #endif
641 
642  void init()
643  {
644 #ifdef _WIN32
645  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
646  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
647  if ( _file == INVALID_HANDLE_VALUE )
648  {
649  DWORD err = getErrorNo();
650  std::ostringstream os;
651  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
652  error(os.str(), err);
653  return;
654  }
655 #else
656  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
657  if (_file == -1)
658  {
659  int err = getErrorNo();
660  std::ostringstream os;
661  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
662  error(os.str(), err);
663  return;
664  }
665 #endif
666  }
667 
668  // This implementation will only ever use this when logging a bookmark
669  // Could be used to add a feature where discarded bookmark fields are logged in
670  // addition to the generated bookmark.
671  void write(FileType file_, const Message::Field& subId_, char type_,
672  const Message::Field& bookmark_)
673  {
674  Lock<Mutex> guard(_fileLock);
675  if (!_recoveringFile && isWritableBookmark(bookmark_.len()))
676  {
677 #ifdef _WIN32
678  DWORD written;
679  size_t len = subId_.len();
680  BOOL ok = WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
681  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
682  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
683  len = bookmark_.len();
684  ok |= WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
685  ok |= WriteFile(file_, (LPVOID)bookmark_.data(), (DWORD)len,
686  &written, NULL);
687  if (!ok)
688  {
689  error("Failed to write to bookmark log.", getErrorNo());
690  return;
691  }
692 
693 #else
694  if (file_ == -1)
695  {
696  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
697  if (file_ == -1)
698  {
699  int err = getErrorNo();
700  std::ostringstream os;
701  os << "Failed to open file " << _fileName
702  << " for write in LoggedBookmarkStore. ";
703  error(os.str(), err);
704  return;
705  }
706  }
707  struct iovec data[5];
708  size_t len = subId_.len();
709  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
710  data[0].iov_len = sizeof(size_t);
711  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
712  data[1].iov_len = len;
713  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
714  data[2].iov_len = 1;
715  size_t bookmarkLen = bookmark_.len();
716  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmarkLen;
717  data[3].iov_len = sizeof(size_t);
718  data[4].iov_base = (amps_iovec_base_ptr)(void*)bookmark_.data();
719  data[4].iov_len = bookmarkLen;
720  ssize_t written = ::writev(file_, data, 5);
721  if (written == -1)
722  {
723  error("Failed to write to bookmark log.", getErrorNo());
724  return;
725  }
726 #endif
727  }
728  }
729 
730  // This implementation will only ever use this when discarding a bookmark
731  // Could be used to add a feature where generated bookmarks are logged in
732  // addition to the bookmark field.
733  void write(FileType file_, const Message::Field& subId_,
734  char type_, size_t bookmark_)
735  {
736  Lock<Mutex> guard(_fileLock);
737  if (!_recoveringFile)
738  {
739 #ifdef _WIN32
740  DWORD written;
741  size_t len = subId_.len();
742  BOOL ok = WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
743  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
744  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
745  ok |= WriteFile(file_, (LPVOID)&bookmark_, sizeof(size_t),
746  &written, NULL);
747  if (!ok)
748  {
749  error("Failed to write bookmark sequence to file.", getErrorNo());
750  return;
751  }
752 
753 #else
754  if (file_ == -1)
755  {
756  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
757  if (file_ == -1)
758  {
759  int err = getErrorNo();
760  std::ostringstream os;
761  os << "Failed to open file " << _fileName
762  << " to write bookmark sequence in LoggedBookmarkStore. ";
763  error(os.str(), err);
764  return;
765  }
766  }
767  struct iovec data[4];
768  size_t len = subId_.len();
769  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
770  data[0].iov_len = sizeof(size_t);
771  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
772  data[1].iov_len = len;
773  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
774  data[2].iov_len = 1;
775  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmark_;
776  data[3].iov_len = sizeof(size_t);
777  ssize_t written = ::writev(file_, data, 4);
778  if (written == -1)
779  {
780  error("Failed to write bookmark sequence to file.", getErrorNo());
781  return;
782  }
783 #endif
784  }
785  }
786 
787 #ifdef _WIN32
788 #define VOID_P(buf) (LPVOID)buf
789  bool readFileBytes(LPVOID buffer, size_t numBytes, DWORD* bytesRead)
790  {
791  return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
792  }
793 #else
794 #define VOID_P(buf) (void*)buf
795  bool readFileBytes(void* buffer, size_t numBytes, ssize_t* bytesRead)
796  {
797  *bytesRead = ::read(_file, buffer, numBytes);
798  return (*bytesRead >= 0);
799  }
800 #endif
801 
802  void recover(bool useLastModifiedTime_, bool hasAdapter_)
803  {
804  size_t bufferLen = 128;
805  char* buffer = new char[bufferLen];
806  size_t subIdBufferLen = 128;
807  char* subIdBuffer = new char[bufferLen];
808  Message::Field sub;
809  size_t subLen = 0;
810  Message::Field bookmarkField;
811  size_t bookmarkLen = 0;
812  Lock<Mutex> l(_lock);
813  Lock<Mutex> guard(_fileLock);
814  _recoveringFile = true;
815  char* fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
816  fileTimestamp[0] = '\0';
817 #ifdef _WIN32
818  LARGE_INTEGER lifileSize;
819  if (GetFileSizeEx(_file, &lifileSize) == 0)
820  {
821  DWORD err = getErrorNo();
822  delete[] buffer;
823  delete[] subIdBuffer;
824  _recoveringFile = false;
825  error("Failure getting file size while trying to recover.", err);
826  return;
827  }
828 #ifdef _WIN64
829  size_t fileSize = lifileSize.QuadPart;
830 #else
831  size_t fileSize = lifileSize.LowPart;
832 #endif
833  if (useLastModifiedTime_ && fileSize > 0)
834  {
835  FILETIME ftModifiedTime;
836  if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
837  {
838  DWORD err = getErrorNo();
839  delete[] buffer;
840  delete[] subIdBuffer;
841  _recoveringFile = false;
842  error("Failure getting file time while trying to recover.", err);
843  return;
844  }
845  SYSTEMTIME st;
846  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
847  {
848  DWORD err = getErrorNo();
849  delete[] buffer;
850  delete[] subIdBuffer;
851  _recoveringFile = false;
852  error("Failure converting file time while trying to recover.", err);
853  return;
854  }
855  sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
856  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
857  st.wDay, st.wHour, st.wMinute, st.wSecond);
858  fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
859  }
860  else if (fileSize == 0)
861  {
862  delete[] fileTimestamp;
863  delete[] buffer;
864  delete[] subIdBuffer;
865  _recoveringFile = false;
866  return;
867  }
868  DWORD readBytes = 0;
869  OFF_T loc = 0;
870  SetFilePointer(_file, 0, NULL, FILE_BEGIN);
871 #else
872  struct stat fst;
873  ::fstat(_file, &fst);
874  ssize_t fileSize = fst.st_size;
875  ssize_t readBytes = 0;
876  if (useLastModifiedTime_ && fileSize > 0)
877  {
878  struct tm timeInfo;
879  gmtime_r(&fst.st_mtime, &timeInfo);
880  strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
881  "%Y%m%dT%H%M%S", &timeInfo);
882  fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
883  }
884  else if (fileSize == 0)
885  {
886  delete[] fileTimestamp;
887  delete[] buffer;
888  delete[] subIdBuffer;
889  _recoveringFile = false;
890  return;
891  }
892  OFF_T loc = 0;
893  ::lseek(_file, loc, SEEK_SET);
894 #endif
895  // We trust file recovery over Adapter recovery
896  if (hasAdapter_)
897  {
898  MemoryBookmarkStore::__purge();
899  }
900  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes)
901  || subLen > getMaxSubIdLength())
902  {
903  delete[] fileTimestamp;
904  delete[] buffer;
905  delete[] subIdBuffer;
906  _recoveringFile = false;
907  error("Failure reading file while trying to recover.", getErrorNo());
908  return;
909  }
910 #ifdef _WIN32
911  size_t totalBytes = readBytes;
912 #else
913  ssize_t totalBytes = readBytes;
914 #endif
915  ERRTYPE err = 0; // 0 no error, -1 corruption, positive is errno file error
916  size_t tooManyBytes = 0;
917  typedef std::map<Message::Field, size_t,
918  Message::Field::FieldHash> BookmarkMap;
919  typedef std::map<Message::Field, size_t,
920  Message::Field::FieldHash>::iterator BookmarkMapIter;
921  // Map of subId to set of recovered bookmarks
922  typedef std::map<Message::Field, BookmarkMap*,
923  Message::Field::FieldHash> ReadMap;
924  typedef std::map<Message::Field, BookmarkMap*,
925  Message::Field::FieldHash>::iterator ReadMapIter;
926  ReadMap recovered;
927  while (subLen > 0 && (size_t)readBytes == sizeof(size_t) &&
928  (size_t)totalBytes <= (size_t)fileSize)
929  {
930  if (subLen >= ((size_t)fileSize - (size_t)totalBytes)
931  || subLen > getMaxSubIdLength())
932  {
933  err = (ERRTYPE) - 1;
934  tooManyBytes = subLen + 1;
935  break;
936  }
937  else
938  {
939  if (subIdBufferLen < subLen)
940  {
941  delete [] subIdBuffer;
942  subIdBufferLen = 2 * subLen;
943  subIdBuffer = new char[subIdBufferLen];
944  }
945  if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
946  {
947  err = getErrorNo();
948  tooManyBytes = subLen;
949  break;
950  }
951  totalBytes += readBytes;
952  sub.assign(subIdBuffer, subLen);
953  if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
954  {
955  err = getErrorNo();
956  tooManyBytes = 1;
957  break;
958  }
959  totalBytes += readBytes;
960  switch (buffer[0])
961  {
962  case ENTRY_BOOKMARK:
963  {
964  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
965  {
966  // Corrupt final record is ok
967  err = (ERRTYPE) - 1;
968  tooManyBytes = sizeof(size_t);
969  break;
970  }
971  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
972  {
973  err = getErrorNo();
974  tooManyBytes = sizeof(size_t);
975  break;
976  }
977  totalBytes += readBytes;
978  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
979  {
980  // Corrupt final record is ok
981  err = (ERRTYPE) - 1;
982  tooManyBytes = bookmarkLen;
983  break;
984  }
985  if (bufferLen < bookmarkLen)
986  {
987  delete [] buffer;
988  bufferLen = 2 * bookmarkLen;
989  buffer = new char[bufferLen];
990  }
991  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
992  {
993  err = getErrorNo();
994  tooManyBytes = bookmarkLen;
995  break;
996  }
997  totalBytes += readBytes;
998  bookmarkField.assign(buffer, bookmarkLen);
999  Subscription* subP = find(sub);
1000  BookmarkMap* bookmarks = NULL;
1001  ReadMapIter iter = recovered.find(sub);
1002  if (iter == recovered.end())
1003  {
1004  Message::Field subKey;
1005  subKey.deepCopy(sub);
1006  bookmarks = new BookmarkMap();
1007  recovered[subKey] = bookmarks;
1008  }
1009  else
1010  {
1011  bookmarks = iter->second;
1012  }
1013  if (bookmarks->find(bookmarkField) != bookmarks->end())
1014  {
1015  std::for_each(bookmarks->begin(), bookmarks->end(),
1016  _clearBookmark);
1017  bookmarks->clear();
1018  subP->getMostRecent(true);
1019  }
1020  if (BookmarkRange::isRange(bookmarkField))
1021  {
1022  subP->log(bookmarkField);
1023  }
1024  else if (!subP->isDiscarded(bookmarkField))
1025  {
1026  size_t sequence = subP->log(bookmarkField);
1027  Message::Field copy;
1028  copy.deepCopy(bookmarkField);
1029  bookmarks->insert(std::make_pair(copy, sequence));
1030  }
1031  else
1032  {
1033  // We know it's discarded, but there may still be a
1034  // discard entry in the log, so avoid a search.
1035  Message::Field copy;
1036  copy.deepCopy(bookmarkField);
1037  bookmarks->insert(std::make_pair(copy, 0));
1038  }
1039  }
1040  break;
1041 
1042  case ENTRY_DISCARD:
1043  {
1044  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1045  {
1046  // Corrupt final record is ok
1047  err = (ERRTYPE) - 1;
1048  tooManyBytes = sizeof(size_t);
1049  break;
1050  }
1051  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1052  {
1053  err = getErrorNo();
1054  tooManyBytes = sizeof(size_t);
1055  break;
1056  }
1057  totalBytes += readBytes;
1058  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1059  {
1060  // Corrupt final record is ok
1061  err = (ERRTYPE) - 1;
1062  tooManyBytes = bookmarkLen;
1063  break;
1064  }
1065  if (bufferLen < bookmarkLen)
1066  {
1067  delete [] buffer;
1068  bufferLen = 2 * bookmarkLen;
1069  buffer = new char[bufferLen];
1070  }
1071  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1072  {
1073  err = getErrorNo();
1074  tooManyBytes = bookmarkLen;
1075  break;
1076  }
1077  totalBytes += readBytes;
1078  bookmarkField.assign(buffer, bookmarkLen);
1079  size_t sequence = AMPS_UNSET_INDEX;
1080  ReadMapIter iter = recovered.find(sub);
1081  if (iter != recovered.end())
1082  {
1083  BookmarkMap* bookmarks = iter->second;
1084  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1085  if (bookmarkIter != bookmarks->end())
1086  {
1087  sequence = bookmarkIter->second;
1088  Message::Field bookmarkToClear(bookmarkIter->first);
1089  bookmarkToClear.clear();
1090  bookmarks->erase(bookmarkIter);
1091  }
1092  }
1093  Subscription* subP = find(sub);
1094  if (!BookmarkRange::isRange(bookmarkField))
1095  {
1096  if (sequence != AMPS_UNSET_INDEX)
1097  {
1098  // A sequence of 0 means it was already discarded
1099  if (sequence)
1100  {
1101  subP->discard(sequence);
1102  }
1103  }
1104  else // Shouldn't end up here, but just in case we'll search
1105  {
1106  subP->discard(bookmarkField);
1107  }
1108  }
1109  }
1110  break;
1111  case ENTRY_PERSISTED:
1112  {
1113  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1114  {
1115  // Corrupt final record is ok
1116  err = (ERRTYPE) - 1;
1117  tooManyBytes = sizeof(size_t);
1118  break;
1119  }
1120  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1121  {
1122  err = getErrorNo();
1123  tooManyBytes = sizeof(size_t);
1124  break;
1125  }
1126  totalBytes += readBytes;
1127  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1128  {
1129  // Corrupt final record is ok
1130  err = (ERRTYPE) - 1;
1131  tooManyBytes = bookmarkLen;
1132  break;
1133  }
1134  if (bufferLen < bookmarkLen)
1135  {
1136  delete [] buffer;
1137  bufferLen = 2 * bookmarkLen;
1138  buffer = new char[bufferLen];
1139  }
1140  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1141  {
1142  err = getErrorNo();
1143  tooManyBytes = bookmarkLen;
1144  break;
1145  }
1146  totalBytes += readBytes;
1147  bookmarkField.assign(buffer, bookmarkLen);
1148  Subscription* subP = find(sub);
1149  MemoryBookmarkStore::_persisted(subP, bookmarkField);
1150  }
1151  break;
1152  default:
1153  {
1154  // Corrupt final record is ok
1155  err = (ERRTYPE) - 1;
1156  tooManyBytes = (size_t)fileSize - (size_t)totalBytes;
1157  }
1158  break;
1159  }
1160  }
1161  loc = (OFF_T)totalBytes;
1162  if ((size_t)totalBytes > (size_t)fileSize)
1163  {
1164  loc = (OFF_T)fileSize;
1165  break;
1166  }
1167  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes))
1168  {
1169  err = getErrorNo();
1170  tooManyBytes = sizeof(size_t);
1171  break;
1172  }
1173  totalBytes += readBytes;
1174  }
1175  delete[] buffer;
1176  delete[] subIdBuffer;
1177  if (err == 0)
1178  {
1179  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1180  {
1181  if (recovered.count(i->first) && !recovered[i->first]->empty())
1182  {
1183  Subscription* subPtr = i->second;
1184  if (subPtr->getMostRecent(false).len() > 1)
1185  {
1186  subPtr->justRecovered();
1187  }
1188  else
1189  {
1190  // Unlikely, but we may have recovered only undiscarded bookmarks
1191  // so we should really just restart as a new subscription.
1192  delete subPtr;
1193  _subs[i->first] = new Subscription(this, i->first);
1194  }
1195  }
1196  if (useLastModifiedTime_ && fileTimestamp[0] != '\0')
1197  {
1198  _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1199  }
1200  }
1201  }
1202  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1203  {
1204  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1205  delete i->second;
1206  Message::Field f = i->first;
1207  f.clear();
1208  }
1209  delete[] fileTimestamp;
1210  _recoveringFile = false;
1211  if (err != 0)
1212  {
1213  // Arbitrary guess if we're on the last record
1214  // We set err to -1 if we read a corrupt value or
1215  // to errno/last error if a read failed.
1216  if (err != (ERRTYPE) - 1 || loc == 0 || fileSize - loc > 128)
1217  {
1218  std::ostringstream os;
1219  os << "Error while recovering LoggedBookmarkStore from "
1220  << _fileName
1221  << ". Record starting at " << loc
1222  << " reading at " << totalBytes
1223  << " requested " << tooManyBytes
1224  << " and file size is " << fileSize;
1225  error(os.str(), (err != (ERRTYPE) - 1 ? err : 0));
1226  }
1227  else
1228  {
1229 #ifdef _WIN32
1230 #ifdef _WIN64
1231  LONG low = (LONG)loc;
1232  LONG high = (LONG)((loc >> 32) & 0xffffffff);
1233  SetFilePointer(_file, low, &high, FILE_BEGIN);
1234 #else
1235  SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1236 #endif
1237 #else
1238  ::lseek(_file, loc, SEEK_SET);
1239 #endif
1240  }
1241  }
1242  }
1243 
1244  private:
1245  FileType _file;
1246  Mutex _fileLock;
1247  std::string _fileName;
1248  bool _recoveringFile;
1249  };
1250 
1251 } // end namespace AMPS
1252 
1253 
1254 #endif // _LOGGEDBOOKMARKSTORE_H_
1255 
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:276
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:343
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: LoggedBookmarkStore.hpp:245
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:348
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1355
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:335
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:298
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:262
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:206
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:202
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:106
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:225