AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.1
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <amps/BookmarkStore.hpp>
30 #include <amps/Field.hpp>
31 #include <amps/Message.hpp>
32 #include <amps/RecoveryPoint.hpp>
34 #include <map>
35 #include <sstream>
36 #include <vector>
37 #include <stdlib.h>
38 #include <assert.h>
39 
40 #define AMPS_MIN_BOOKMARK_LEN 3
41 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
42 
47 
48 namespace AMPS
49 {
50 
57  {
58  protected:
59  class Subscription
60  {
61  public:
62  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
63  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
64  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
65  RecoveryIterator;
66  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
67 
68  // Start sequence at 1 so that 0 can be used during file subclasses
69  // recovery as an indicator that a message wasn't logged because
70  // isDiscarded() was true.
71  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
72  : _current(1), _currentBase(0), _least(1), _leastBase(0)
73  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
74  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
75  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
76  , _store(store_)
77  {
78  // Need our own memory for the sub id
79  _id.deepCopy(id_);
80  _store->resize(_id, (char**)&_entries,
81  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
82  setLastPersistedToEpoch();
83  }
84 
85  ~Subscription()
86  {
87  Lock<Mutex> guard(_subLock);
88  if (_entries)
89  {
90  for (size_t i = 0; i < _entriesLength; ++i)
91  {
92  _entries[i]._val.clear();
93  }
94  // resize to 0 will free _entries
95  _store->resize(_id, (char**)&_entries, 0);
96  }
97  _id.clear();
98  _recent.clear();
99  _lastPersisted.clear();
100  _recentList.clear();
101  _range.clear();
102  _recoveryTimestamp.clear();
103  }
104 
105  size_t log(const Message::Field& bookmark_)
106  {
107  if (bookmark_ == AMPS_BOOKMARK_NOW)
108  {
109  return 0;
110  }
111  Lock<Mutex> guard(_subLock);
112  // Either relog the recovery or log it
113  size_t index = recover(bookmark_, true);
114  if (index == AMPS_UNSET_INDEX)
115  {
116  // Check for wrap
117  if (_current >= _entriesLength)
118  {
119  _current = 0;
120  _currentBase += _entriesLength;
121  }
122  // Check for resize
123  // If list is too small, double it
124  if ((_current == _least && _leastBase < _currentBase) ||
125  (_current == _recoveryMin && _recoveryBase < _currentBase))
126  {
127  if (!_store->resize(_id, (char**)&_entries,
128  sizeof(Entry) * _entriesLength * 2))
129  {
130  //Try again
131  return log(bookmark_);
132  }
133  // Length was doubled
134  _entriesLength *= 2;
135  }
136 
137  // Add this entry to the end of our list
138  /*
139  if (bookmark_ == AMPS_BOOKMARK_NOW)
140  {
141  // Save a now timestamp bookmark
142  char* nowTimestamp = new char[AMPS_TIMESTAMP_LEN];
143  struct tm timeInfo;
144  time_t now;
145  time(&now);
146  #ifdef _WIN32
147  gmtime_s(&timeInfo, &now);
148  #else
149  gmtime_r(&now, &timeInfo);
150  #endif
151  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
152  "%Y%m%dT%H%M%S", &timeInfo);
153  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
154  _entries[_current]._val.assign(nowTimestamp,
155  AMPS_TIMESTAMP_LEN);
156  _entries[_current]._active = false;
157  index = _current++;
158  return index + _currentBase;
159  }
160  else
161  */
162  {
163  // Is this an attempt at a range?
164  if (!BookmarkRange::isRange(bookmark_))
165  {
166  _entries[_current]._val.deepCopy(bookmark_);
167  }
168  else
169  {
170  // Deep copy of the range is saved
171  _range.set(bookmark_);
172  // Stricter check on range syntax
173  if (!_range.isValid())
174  {
175  throw CommandException("Invalid bookmark range specified.");
176  }
177  _store->updateAdapter(this);
178  if (!_range.isStartInclusive())
179  {
180  // Put it in our publishers map
181  amps_uint64_t publisher, sequence;
182  std::vector<Field> bmList = Field::parseBookmarkList(_range.getStart());
183  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
184  {
185  parseBookmark(*bkmk, publisher, sequence);
186  if (publisher != (amps_uint64_t)0)
187  {
188  // Compare it to our publishers map
189  PublisherIterator pub = _publishers.find(publisher);
190  if (pub == _publishers.end() || pub->second < sequence)
191  {
192  _publishers[publisher] = sequence;
193  }
194  }
195  }
196  }
197  // Don't actually log a range
198  return 0;
199  }
200  }
201  _entries[_current]._active = true;
202  index = _current++;
203  }
204  return index + _currentBase;
205  }
206 
207  bool discard(size_t index_)
208  {
209  Lock<Mutex> guard(_subLock);
210  return _discard(index_);
211  }
212 
213  bool discard(const Message::Field& bookmark_)
214  {
215  // These are discarded when logged or not logged
216  if (bookmark_ == AMPS_BOOKMARK_NOW)
217  {
218  return false;
219  }
220  Lock<Mutex> guard(_subLock);
221  size_t search = _least;
222  size_t searchBase = _leastBase;
223  size_t searchMax = _current;
224  size_t searchMaxBase = _currentBase;
225  if (_least + _leastBase == _current + _currentBase)
226  {
227  if (_recoveryMin != AMPS_UNSET_INDEX)
228  {
229  search = _recoveryMin;
230  searchBase = _recoveryBase;
231  searchMax = _recoveryMax;
232  searchMaxBase = _recoveryMaxBase;
233  }
234  else // Store is empty, so nothing to do
235  {
236  return false;
237  }
238  }
239  assert(searchMax != AMPS_UNSET_INDEX);
240  assert(searchMaxBase != AMPS_UNSET_INDEX);
241  assert(search != AMPS_UNSET_INDEX);
242  assert(searchBase != AMPS_UNSET_INDEX);
243  // Search while we don't find the provided bookmark and we're in valid range
244  while (search + searchBase < searchMax + searchMaxBase)
245  {
246  if (_entries[search]._val == bookmark_)
247  {
248  return _discard(search + searchBase);
249  }
250  if (++search == _entriesLength)
251  {
252  // Least has now loooped around
253  searchBase += _entriesLength;
254  search = 0;
255  }
256  }
257  return false;
258  }
259 
260  // Get sequence number from a Field that is a bookmark
261  static void parseBookmark(const Message::Field& field_,
262  amps_uint64_t& publisherId_,
263  amps_uint64_t& sequenceNumber_)
264  {
265  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
266  }
267 
268  // Check to see if this message is older than the most recent one seen,
269  // and if it is, check if it discarded.
270  bool isDiscarded(const Message::Field& bookmark_)
271  {
272  Lock<Mutex> guard(_subLock);
273  if (BookmarkRange::isRange(bookmark_))
274  {
275  return false;
276  }
277  // Check if we've already recovered this bookmark
278  size_t recoveredIdx = recover(bookmark_, false);
279 
280  amps_uint64_t publisher, sequence;
281  parseBookmark(bookmark_, publisher, sequence);
282  // Compare it to our publishers map
283  PublisherIterator pub = _publishers.find(publisher);
284  if (pub == _publishers.end() || pub->second < sequence)
285  {
286  _publishers[publisher] = sequence;
287  if (recoveredIdx == AMPS_UNSET_INDEX)
288  {
289  return false;
290  }
291  }
292  if (recoveredIdx != AMPS_UNSET_INDEX)
293  {
294  if (!_entries[recoveredIdx]._active)
295  {
296  _recovered.erase(bookmark_);
297  return true;
298  }
299  return false;
300  }
301  // During recovery, we don't really care if it's been discarded
302  // or not. We just want _publishers updated. No need for the
303  // costly linear search.
304  if (_store->_recovering)
305  {
306  return false;
307  }
308  // During failure and recovery scenarios, we'll see out of order
309  // bookmarks arrive, either because (a) we're replaying or (b)
310  // a publisher has cut over, and we've cut over to a new server.
311  // Scan the list to see if we have a match.
312  size_t base = _leastBase;
313  for (size_t i = _least; i + base < _current + _currentBase; i++)
314  {
315  if ( i >= _entriesLength )
316  {
317  i = 0;
318  base = _currentBase;
319  }
320  if (_entries[i]._val == bookmark_)
321  {
322  return !_entries[i]._active;
323  }
324  }
325 
326  return true; // message is totally discarded
327  }
328 
329  bool empty(void) const
330  {
331  if (_least == AMPS_UNSET_INDEX ||
332  ((_least + _leastBase) == (_current + _currentBase) &&
333  _recoveryMin == AMPS_UNSET_INDEX))
334  {
335  return true;
336  }
337  return false;
338  }
339 
340  void updateMostRecent()
341  {
342  Lock<Mutex> guard(_subLock);
343  _updateMostRecent();
344  }
345 
346  const BookmarkRange& getRange() const
347  {
348  return _range;
349  }
350 
351  Message::Field getMostRecentList(bool usePublishersList_ = true)
352  {
353  Lock<Mutex> guard(_subLock);
354  bool useLastPersisted = !_lastPersisted.empty() &&
355  _lastPersisted.len() > 1;
356  // when this is called, we'll take a moment to update the list
357  // of things recovered,
358  // so we don't accidentally log anything we ought not to.
359  _updateMostRecent();
360  bool useRecent = !_recent.empty() && _recent.len() > 1;
361  amps_uint64_t lastPublisher = 0;
362  amps_uint64_t lastSeq = 0;
363  amps_uint64_t recentPublisher = 0;
364  amps_uint64_t recentSeq = 0;
365  if (useLastPersisted)
366  {
367  parseBookmark(_lastPersisted, lastPublisher, lastSeq);
368  }
369  if (useRecent)
370  {
371  parseBookmark(_recent, recentPublisher, recentSeq);
372  if (empty() && useLastPersisted)
373  {
374  useRecent = false;
375  }
376  else
377  {
378  if (useLastPersisted && lastPublisher == recentPublisher)
379  {
380  if (lastSeq <= recentSeq)
381  {
382  useRecent = false;
383  }
384  else
385  {
386  useLastPersisted = false;
387  }
388  }
389  }
390  }
391  // Set size for all bookmarks that will be used
392  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
393  if (useRecent)
394  {
395  totalLen += _recent.len() + 1;
396  }
397  // If we don't have a non-EPOCH persisted ack and we don't have a
398  // non-EPOCH most recent bookmark, OR we have a range
399  // we can build a list based on all the publishers instead.
400  if (usePublishersList_
401  && ((!useLastPersisted && !useRecent)
402  || _lastPersisted == AMPS_BOOKMARK_EPOCH))
403  {
404  std::ostringstream os;
405  for (PublisherIterator pub = _publishers.begin();
406  pub != _publishers.end(); ++pub)
407  {
408  if (pub->first == 0 && pub->second == 0)
409  {
410  continue;
411  }
412  if (pub->first == recentPublisher && recentSeq < pub->second)
413  {
414  os << recentPublisher << '|' << recentSeq << "|,";
415  }
416  else
417  {
418  os << pub->first << '|' << pub->second << "|,";
419  }
420  }
421  std::string recent = os.str();
422  totalLen = recent.length();
423  if (!recent.empty())
424  {
425  if (!_recoveryTimestamp.empty())
426  {
427  totalLen += _recoveryTimestamp.len();
428  recent += std::string(_recoveryTimestamp);
429  }
430  else
431  {
432  // Remove trailing ,
433  recent.erase(--totalLen);
434  }
435  // Reset _recentList to new value and return it
436  _recentList.clear();
437  _recentList = Message::Field(recent).deepCopy();
438  if (_range.isValid())
439  {
440  if (_range.getStart() != recent
441  && _recentList != AMPS_BOOKMARK_EPOCH)
442  {
443  _range.replaceStart(_recentList, true);
444  }
445  else if (_range.isStartInclusive())
446  {
447  amps_uint64_t publisher, sequence;
448  parseBookmark(_range.getStart(), publisher,
449  sequence);
450  PublisherIterator pub = _publishers.find(publisher);
451  if (pub != _publishers.end()
452  && pub->second >= sequence)
453  {
454  _range.makeStartExclusive();
455  }
456  }
457  return _range;
458  }
459  return _recentList;
460  }
461  if (_range.isValid())
462  {
463  return _range;
464  }
465  }
466  if (!_recoveryTimestamp.empty() && !_range.isValid())
467  {
468  totalLen += _recoveryTimestamp.len() + 1;
469  }
470  // If we have nothing discarded, return EPOCH
471  if (totalLen == 0
472  || (_recent.len() < 2 && !empty()))
473  {
474  if (_range.isValid())
475  {
476  return _range;
477  }
478  if (!useRecent)
479  {
481  }
482  _setLastPersistedToEpoch();
483  return _lastPersisted;
484  }
485  // Remove the trailing , from the length
486  totalLen -= 1;
487  char* field = new char[totalLen];
488  size_t len = 0;
489  if (useRecent)
490  {
491  len = _recent.len();
492  memcpy(field, _recent.data(), len);
493  if (len < totalLen)
494  {
495  field[len++] = ',';
496  }
497  }
498  if (useLastPersisted)
499  {
500  memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
501  len += _lastPersisted.len();
502  if (len < totalLen)
503  {
504  field[len++] = ',';
505  }
506  }
507  if (!_recoveryTimestamp.empty() && !_range.isValid())
508  {
509  memcpy(field + len, _recoveryTimestamp.data(),
510  _recoveryTimestamp.len());
511  // If more is to be written after this, uncomment the following
512  //len += _lastPersisted.len();
513  //if (len < totalLen) field[len++] = ',';
514  }
515  // _recentList clear will delete[] current buffer and assign will get cleared
516  _recentList.clear();
517  _recentList.assign(field, totalLen);
518  if (_range.isValid())
519  {
520  if (_recentList != AMPS_BOOKMARK_EPOCH)
521  {
522  if (_range.getStart() != _recentList)
523  {
524  _range.replaceStart(_recentList, true);
525  }
526  else if (_range.isStartInclusive())
527  {
528  amps_uint64_t publisher, sequence;
529  parseBookmark(_range.getStart(), publisher,
530  sequence);
531  PublisherIterator pub = _publishers.find(publisher);
532  if (pub != _publishers.end()
533  && pub->second >= sequence)
534  {
535  _range.makeStartExclusive();
536  }
537  }
538  }
539  return _range;
540  }
541  return _recentList;
542  }
543 
544  Message::Field getMostRecent(bool update_ = false)
545  {
546  Lock<Mutex> guard(_subLock);
547  // Return the same as last time if nothing's changed
548  // _recent is the most recent bookmark.
549  if (update_ && _store->_recentChanged)
550  {
551  _updateMostRecent();
552  }
553  if (_recent.empty())
554  {
556  }
557  else
558  {
559  return _recent;
560  }
561  }
562 
563  Message::Field getLastPersisted()
564  {
565  Lock<Mutex> guard(_subLock);
566  return _lastPersisted;
567  }
568 
569  void setMostRecent(const Message::Field& recent_)
570  {
571  _recent.clear();
572  _recent.deepCopy(recent_);
573  }
574 
575  void setRecoveryTimestamp(const char* recoveryTimestamp_,
576  size_t len_ = 0)
577  {
578  _recoveryTimestamp.clear();
579  size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
580  char* ts = new char[len];
581  memcpy((void*)ts, (const void*)recoveryTimestamp_, len);
582  _recoveryTimestamp.assign(ts, len);
583  }
584 
585  void moveEntries(char* old_, char* new_, size_t newSize_)
586  {
587  size_t least = _least;
588  size_t leastBase = _leastBase;
589  if (_recoveryMin != AMPS_UNSET_INDEX)
590  {
591  least = _recoveryMin;
592  leastBase = _recoveryBase;
593  }
594  // First check if we grew in place, if so, just move current after least
595  if (old_ == new_)
596  {
597  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
598  {
599  memcpy(new_ + (sizeof(Entry)*_entriesLength),
600  old_, (sizeof(Entry)*least));
601  // Clear the beginning where those entries were
602  memset(old_, 0, sizeof(Entry)*least);
603  }
604  else // We have to use an intermediate buffer
605  {
606  Entry* buffer = new Entry[least];
607  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
608  //Put the beginning entries at the start of the new buffer
609  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
610  (_entriesLength - least)*sizeof(Entry));
611  //Put the end entries after the beginning entries
612  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
613  (void*)buffer, least * sizeof(Entry));
614  // Least is now at 0 so base must be increased
615  leastBase += least;
616  least = 0;
617  delete [] buffer;
618  }
619  }
620  else
621  {
622  //Put the beginning entries at the start of the new buffer
623  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
624  (_entriesLength - least)*sizeof(Entry));
625  //Put the end entries after the beginning entries
626  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
627  (void*)old_, least * sizeof(Entry));
628  // Least is now at 0 so base must be increased
629  leastBase += least;
630  least = 0;
631  }
632  if (_recoveryMin != AMPS_UNSET_INDEX)
633  {
634  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
635  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
636  (_recoveryMin + _recoveryBase);
637  _recoveryMaxBase = leastBase;
638  _recoveryMin = least;
639  _recoveryBase = leastBase;
640  }
641  else
642  {
643  _least = least;
644  }
645  _leastBase = leastBase;
646  // Current is now after everything and using the same base
647  _currentBase = _leastBase;
648  _current = least + _entriesLength;
649  }
650 
651  inline size_t getOldestBookmarkSeq()
652  {
653  Lock<Mutex> guard(_subLock);
654  // If there is nothing in the store, return -1, otherwise return lowest
655  return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
656  _least + _leastBase;
657  }
658 
659  bool lastPersisted(const Message::Field& bookmark_)
660  {
661  // These shouldn't be persisted
662  if (bookmark_ == AMPS_BOOKMARK_NOW
663  || BookmarkRange::isRange(bookmark_))
664  {
665  return false;
666  }
667  Lock<Mutex> guard(_subLock);
668  return _setLastPersisted(bookmark_);
669  }
670 
671  bool _setLastPersisted(const Message::Field& bookmark_)
672  {
673  if (!_lastPersisted.empty())
674  {
675  amps_uint64_t publisher, publisher_lastPersisted;
676  amps_uint64_t sequence, sequence_lastPersisted;
677  parseBookmark(bookmark_, publisher, sequence);
678  parseBookmark(_lastPersisted, publisher_lastPersisted,
679  sequence_lastPersisted);
680  if (publisher == publisher_lastPersisted &&
681  sequence <= sequence_lastPersisted)
682  {
683  return false;
684  }
685  }
686  // deepCopy will clear what's in _lastPersisted
687  _lastPersisted.deepCopy(bookmark_);
688  _store->_recentChanged = true;
689  _recoveryTimestamp.clear();
690  return true;
691  }
692 
693  Message::Field lastPersisted(size_t bookmark_)
694  {
695  Lock<Mutex> guard(_subLock);
696  Message::Field& bookmark = _entries[bookmark_]._val;
697  // These shouldn't be persisted
698  if (bookmark == AMPS_BOOKMARK_NOW
699  || BookmarkRange::isRange(bookmark))
700  {
701  return bookmark;
702  }
703  _setLastPersisted(bookmark);
704  return bookmark;
705  }
706 
707  // Returns the index of the recovered item, either the index where it
708  // was first stored prior to getMostRecent, or the new index if it is
709  // relogged either because this is called from log() or because it was
710  // not active but also not persisted.
711  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
712  {
713  size_t retVal = AMPS_UNSET_INDEX;
714  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
715  {
716  return retVal;
717  }
718  // Check if this is a recovered bookmark.
719  // If so, copy the existing one to the new location
720  RecoveryIterator item = _recovered.find(bookmark_);
721  if (item != _recovered.end())
722  {
723  size_t seqNo = item->second;
724  size_t index = (seqNo - _recoveryBase) % _entriesLength;
725  // If we only have recovery entries and isDiscarded is
726  // checking on an already discarded entry, update recent.
727  if (_least + _leastBase == _current + _currentBase &&
728  !_entries[index]._active)
729  {
730  _store->_recentChanged = true;
731  _recent.clear();
732  _recent = _entries[index]._val.deepCopy();
733  retVal = moveEntry(index);
734  if (retVal == AMPS_UNSET_INDEX)
735  {
736  recover(bookmark_, relogIfNotDiscarded);
737  }
738  _least = _current;
739  _leastBase = _currentBase;
740  }
741  else if (!_entries[index]._active || relogIfNotDiscarded)
742  {
743  retVal = moveEntry(index);
744  if (retVal == AMPS_UNSET_INDEX)
745  {
746  recover(bookmark_, relogIfNotDiscarded);
747  }
748  }
749  else
750  {
751  return index;
752  }
753  _recovered.erase(item);
754  if (_recovered.empty())
755  {
756  _recoveryMin = AMPS_UNSET_INDEX;
757  _recoveryBase = AMPS_UNSET_INDEX;
758  _recoveryMax = AMPS_UNSET_INDEX;
759  _recoveryMaxBase = AMPS_UNSET_INDEX;
760  }
761  else if (index == _recoveryMin)
762  {
763  while (_entries[_recoveryMin]._val.empty() &&
764  (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
765  {
766  if (++_recoveryMin == _entriesLength)
767  {
768  _recoveryMin = 0;
769  _recoveryBase += _entriesLength;
770  }
771  }
772  }
773  }
774  return retVal;
775  }
776 
777  // Return the id of this Subscription
778  Message::Field id() const
779  {
780  return _id;
781  }
782 
783  struct Entry
784  {
785  Message::Field _val; //16
786  bool _active; //17
787  char _padding[32 - sizeof(Message::Field) - sizeof(bool)]; //32
788 
789  Entry() : _active(false)
790  {
791  ;
792  }
793  };
794 
795  struct EntryHash
796  {
797  Field::FieldHash _hasher;
798 
799  size_t operator()(const Entry* entryPtr_) const
800  {
801  return _hasher(entryPtr_->_val);
802  }
803 
804  bool operator()(const Entry* lhsPtr_, const Entry* rhsPtr_) const
805  {
806  return _hasher(lhsPtr_->_val, rhsPtr_->_val);
807  }
808  };
809 
810  //typedef std::set<Entry*, EntryHash> EntryPtrList;
811  typedef std::vector<Entry*> EntryPtrList;
812 
813  void getRecoveryEntries(EntryPtrList& list_)
814  {
815  if (_recoveryMin == AMPS_UNSET_INDEX ||
816  _recoveryMax == AMPS_UNSET_INDEX)
817  {
818  return;
819  }
820  size_t base = _recoveryBase;
821  size_t max = _recoveryMax + _recoveryMaxBase;
822  for (size_t i = _recoveryMin; i + base < max; ++i)
823  {
824  if (i == _entriesLength)
825  {
826  i = 0;
827  base = _recoveryMaxBase;
828  }
829  //list_.insert(&(_entries[i]));
830  list_.push_back(&(_entries[i]));
831  }
832  return;
833  }
834 
835  void getActiveEntries(EntryPtrList& list_)
836  {
837  size_t base = _leastBase;
838  for (size_t i = _least; i + base < _current + _currentBase; ++i)
839  {
840  if (i >= _entriesLength)
841  {
842  i = 0;
843  base = _currentBase;
844  }
845  //list_.insert(&(_entries[i]));
846  list_.push_back(&(_entries[i]));
847  }
848  return;
849  }
850 
851  Entry* getEntryByIndex(size_t index_)
852  {
853  Lock<Mutex> guard(_subLock);
854  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
855  index_ >= _least + _leastBase)
856  ? _leastBase : _recoveryBase;
857  // Return NULL if not a valid index
858  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
859  _least + _leastBase :
860  _recoveryMin + _recoveryBase);
861  if (index_ >= _current + _currentBase || index_ < min)
862  {
863  return NULL;
864  }
865  return &(_entries[(index_ - base) % _entriesLength]);
866  }
867 
868  void justRecovered()
869  {
870  Lock<Mutex> guard(_subLock);
871  _updateMostRecent();
872  EntryPtrList list;
873  getRecoveryEntries(list);
874  setPublishersToDiscarded(&list, &_publishers);
875  }
876 
877  void setPublishersToDiscarded(EntryPtrList* recovered_,
878  PublisherMap* publishers_)
879  {
880  // Need to reset publishers to only have up to the last
881  // discarded sequence number. Messages that were in transit
882  // during previous run but not discarded should be considered
883  // new and not duplicate after a restart/recovery.
884  for (EntryPtrList::iterator i = recovered_->begin();
885  i != recovered_->end(); ++i)
886  {
887  if ((*i)->_val.empty())
888  {
889  continue;
890  }
891  amps_uint64_t publisher = (amps_uint64_t)0;
892  amps_uint64_t sequence = (amps_uint64_t)0;
893  parseBookmark((*i)->_val, publisher, sequence);
894  if (publisher && sequence && (*i)->_active &&
895  (*publishers_)[publisher] >= sequence)
896  {
897  (*publishers_)[publisher] = sequence - 1;
898  }
899  }
900  }
901 
902  void clearLastPersisted()
903  {
904  Lock<Mutex> guard(_subLock);
905  _lastPersisted.clear();
906  }
907 
908  void setLastPersistedToEpoch()
909  {
910  Lock<Mutex> guard(_subLock);
911  _setLastPersistedToEpoch();
912  }
913 
914  private:
915  Subscription(const Subscription&);
916  Subscription& operator=(const Subscription&);
917 
918  size_t moveEntry(size_t index_)
919  {
920  // Check for wrap
921  if (_current >= _entriesLength)
922  {
923  _current = 0;
924  _currentBase += _entriesLength;
925  }
926  // Check for resize
927  // If list is too small, double it
928  if ((_current == _least % _entriesLength &&
929  _leastBase < _currentBase) ||
930  (_current == _recoveryMin && _recoveryBase < _currentBase))
931  {
932  if (!_store->resize(_id, (char**)&_entries,
933  sizeof(Entry) * _entriesLength * 2))
934  {
935  return AMPS_UNSET_INDEX;
936  }
937  // Length was doubled
938  _entriesLength *= 2;
939  }
940  _entries[_current]._val = _entries[index_]._val;
941  _entries[_current]._active = _entries[index_]._active;
942  // No need to clear Field, just set it to empty
943  _entries[index_]._val.assign(NULL, 0);
944  _entries[index_]._active = false;
945  return _current++;
946  }
947 
948  void _setLastPersistedToEpoch()
949  {
950  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
951  char* field = new char[fieldLen];
952  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
953  _lastPersisted.clear();
954  _lastPersisted.assign(field, fieldLen);
955  }
956 
957  bool _discard(size_t index_)
958  {
959  bool retVal = false;
960  // Lock should already be held
961  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
962  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
963  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
964  || index_ >= _least + _leastBase)
965  ? _leastBase : _recoveryBase;
966  // discard of a record not in the log is a no-op
967  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
968  _recoveryMin + _recoveryBase);
969  if (index_ >= _current + _currentBase || index_ < min)
970  {
971  return retVal;
972  }
973 
974  // log that this one is discarded, then
975  // recalculate what the most recent entry is.
976  Entry& e = _entries[(index_ - base) % _entriesLength];
977  e._active = false;
978 
979  size_t index = index_;
980  if (_recoveryMin != AMPS_UNSET_INDEX &&
981  index_ == _recoveryMin + _recoveryBase)
982  {
983  // Find all to discard
984  size_t j = _recoveryMin;
985  while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
986  !_entries[j]._active)
987  {
988  // This index might be left-over from a slow discard and we
989  // may have reconnected. We have a few possibilities at this point.
990  // 1. If we re-logged this bookmark, this index will point at an
991  // empty bookmark. This could happen if the discard thread was slow
992  // and the reconnect was fast. We wouldn't report the
993  // the re-arrival of the bookmark as a duplicate because it
994  // hadn't been marked as discarded. In this case, we have to
995  // simply move past this in the recovery area.
996  // 2. This bookmark should become _recent because we haven't
997  // yet received anything since our last call to getMostRecent.
998  // In this case, we need to take it out of recovered but not
999  // clear it. The publishers map should report it as duplicate.
1000  // 3. This is the 'oldest' recovered, but we have received new
1001  // bookmarks since we got this one. We can clear it because the
1002  // publishers map should report it as a duplicate if/when it
1003  // does arrive again. Move the _recoveryMin ahead and remove it
1004  // from recovered.
1005  Message::Field& bookmark = _entries[j]._val;
1006  // Option 1 skips this and just moves on
1007  if (!bookmark.empty())
1008  {
1009  _recovered.erase(bookmark);
1010  // Make sure our publishers map will mark it discarded
1011  amps_uint64_t publisher, sequence;
1012  parseBookmark(bookmark, publisher, sequence);
1013  PublisherIterator pub = _publishers.find(publisher);
1014  if (pub == _publishers.end() || pub->second < sequence)
1015  {
1016  _publishers[publisher] = sequence;
1017  }
1018  if (_least + _leastBase == _current + _currentBase ||
1019  ((_least + _leastBase) % _entriesLength) ==
1020  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
1021  {
1022  // Option 2, reset recent
1023  retVal = true;
1024  _store->_recentChanged = true;
1025  _recoveryTimestamp.clear();
1026  _recent.clear();
1027  _recent = bookmark;
1028  bookmark.assign(NULL, 0);
1029  }
1030  else
1031  {
1032  // Option 3, simply clear this one
1033  bookmark.clear();
1034  }
1035  }
1036  // If we reach the buffer end,
1037  // keep checking from the beginning
1038  if (++j == _entriesLength)
1039  {
1040  // Least has now loooped around
1041  _recoveryBase += _entriesLength;
1042  j = 0;
1043  }
1044  }
1045  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1046  _recovered.empty());
1047  if (_recovered.empty())
1048  {
1049  _recoveryMin = AMPS_UNSET_INDEX;
1050  _recoveryBase = AMPS_UNSET_INDEX;
1051  _recoveryMax = AMPS_UNSET_INDEX;
1052  _recoveryMaxBase = AMPS_UNSET_INDEX;
1053  // Cleared recovered, want to check onward
1054  index = _least + _leastBase;
1055  }
1056  else
1057  {
1058  _recoveryMin = j;
1059  }
1060  }
1061  // if this is the first item in the list, discard all inactive ones
1062  // as long as recovery also says its okay
1063  if (index == _least + _leastBase)
1064  {
1065  // Find all to discard
1066  size_t j = _least;
1067  while (j + _leastBase < _current + _currentBase &&
1068  !_entries[j]._active)
1069  {
1070  //Must free associated memory
1071  _recent.clear();
1072  _recent = _entries[j]._val;
1073  _entries[j]._val.assign(NULL, 0);
1074  _store->_recentChanged = true;
1075  retVal = true;
1076  _recoveryTimestamp.clear();
1077  // If we reach the buffer end,
1078  // keep checking from the beginning
1079  if (++j == _entriesLength)
1080  {
1081  // Least has now loooped around
1082  _leastBase += _entriesLength;
1083  j = 0;
1084  }
1085  }
1086  _least = j;
1087  }
1088  return retVal;
1089  }
1090 
1091  void _updateMostRecent()
1092  {
1093  // Lock is already held
1094  _recovered.clear();
1095  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1096  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1097  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1098  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1099  _recoveryMin = AMPS_UNSET_INDEX;
1100  _recoveryBase = AMPS_UNSET_INDEX;
1101  _recoveryMax = AMPS_UNSET_INDEX;
1102  _recoveryMaxBase = AMPS_UNSET_INDEX;
1103  for (size_t i = start; i + base < _current + _currentBase; i++)
1104  {
1105  if ( i >= _entriesLength )
1106  {
1107  i = 0;
1108  base = _currentBase;
1109  }
1110  if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1111  {
1112  continue;
1113  }
1114  Entry& entry = _entries[i];
1115  if (!entry._val.empty())
1116  {
1117  _recovered[entry._val] = i + base;
1118  if (_recoveryMin == AMPS_UNSET_INDEX)
1119  {
1120  _recoveryMin = i;
1121  _recoveryBase = base;
1122  _recoveryMax = _current;
1123  _recoveryMaxBase = _currentBase;
1124  }
1125  }
1126  }
1127  if (_current == _entriesLength)
1128  {
1129  _current = 0;
1130  _currentBase += _entriesLength;
1131  }
1132  _least = _current;
1133  _leastBase = _currentBase;
1134  }
1135 
1136  Message::Field _id;
1137  Message::Field _recent;
1138  Message::Field _lastPersisted;
1139  Message::Field _recentList;
1140  BookmarkRange _range;
1141  Message::Field _recoveryTimestamp;
1142  size_t _current;
1143  size_t _currentBase;
1144  size_t _least;
1145  size_t _leastBase;
1146  size_t _recoveryMin;
1147  size_t _recoveryBase;
1148  size_t _recoveryMax;
1149  size_t _recoveryMaxBase;
1150  size_t _entriesLength;
1151  Entry* _entries;
1152  MemoryBookmarkStore* _store;
1153  Mutex _subLock;
1154  RecoveryMap _recovered;
1155  public:
1156  PublisherMap _publishers;
1157  };
1158 
1159  public:
1163  _subsLock(),
1164  _lock(),
1165  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1166  _recentChanged(true),
1167  _recovering(false),
1168  _recoveryPointAdapter(NULL),
1169  _recoveryPointFactory(NULL)
1170  { ; }
1171 
1172  typedef RecoveryPointAdapter::iterator RecoveryIterator;
1173 
1181  RecoveryPointFactory factory_ = NULL)
1182  : BookmarkStoreImpl()
1183  , _subsLock()
1184  , _lock()
1185  , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1186  , _recentChanged(true)
1187  , _recovering(true)
1188  , _recoveryPointAdapter(adapter_)
1189  , _recoveryPointFactory(factory_)
1190  {
1191  Message msg;
1192  for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1193  recoveryPoint != _recoveryPointAdapter.end();
1194  ++recoveryPoint)
1195  {
1196  Field subId(recoveryPoint->getSubId());
1197  msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1198  msg.setSubId(subId);
1199  Field bookmark = recoveryPoint->getBookmark();
1200  if (BookmarkRange::isRange(bookmark))
1201  {
1202  msg.setBookmark(bookmark);
1203  _log(msg);
1204  }
1205  else
1206  {
1207  std::vector<Field> bmList = Field::parseBookmarkList(bookmark);
1208  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
1209  {
1210  if (Field::isTimestamp(*bkmk))
1211  {
1212  find(subId)->setRecoveryTimestamp(bkmk->data(), bkmk->len());
1213  }
1214  else
1215  {
1216  msg.assignBookmark(bkmk->data(), bkmk->len());
1217  _isDiscarded(msg);
1218  _log(msg);
1219  _discard(msg);
1220  }
1221  }
1222  // Reset to original bookmark
1223  msg.setBookmark(bookmark);
1224  }
1225  }
1226  _recovering = false;
1227  }
1228 
1229  virtual ~MemoryBookmarkStore()
1230  {
1231  __purge();
1232  }
1233 
1239  virtual size_t log(Message& message_)
1240  {
1241  Lock<Mutex> guard(_lock);
1242  return _log(message_);
1243  }
1244 
1250  virtual void discard(const Message& message_)
1251  {
1252  Lock<Mutex> guard(_lock);
1253  (void)_discard(message_);
1254  }
1255 
1263  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1264  {
1265  Lock<Mutex> guard(_lock);
1266  (void)_discard(subId_, bookmarkSeqNo_);
1267  }
1268 
1275  {
1276  Lock<Mutex> guard(_lock);
1277  return _getMostRecent(subId_);
1278  }
1279 
1288  virtual bool isDiscarded(Message& message_)
1289  {
1290  Lock<Mutex> guard(_lock);
1291  return _isDiscarded(message_);
1292  }
1293 
1299  virtual void purge()
1300  {
1301  Lock<Mutex> guard(_lock);
1302  _purge();
1303  }
1304 
1310  virtual void purge(const Message::Field& subId_)
1311  {
1312  Lock<Mutex> guard(_lock);
1313  _purge(subId_);
1314  }
1315 
1320  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1321  {
1322  Lock<Mutex> guard(_lock);
1323  return _getOldestBookmarkSeq(subId_);
1324  }
1325 
1331  virtual void persisted(const Message::Field& subId_,
1332  const Message::Field& bookmark_)
1333  {
1334  Lock<Mutex> guard(_lock);
1335  _persisted(find(subId_), bookmark_);
1336  }
1337 
1344  virtual Message::Field persisted(const Message::Field& subId_,
1345  size_t bookmark_)
1346  {
1347  Lock<Mutex> guard(_lock);
1348  return _persisted(find(subId_), bookmark_);
1349  }
1350 
1355  void setServerVersion(const VersionInfo& version_)
1356  {
1357  setServerVersion(version_.getOldStyleVersion());
1358  }
1359 
1364  void setServerVersion(size_t version_)
1365  {
1366  Lock<Mutex> guard(_subsLock);
1367  _serverVersion = version_;
1368  }
1369 
1370  inline bool isWritableBookmark(size_t length)
1371  {
1372  return length >= AMPS_MIN_BOOKMARK_LEN;
1373  }
1374 
1375  typedef Subscription::EntryPtrList EntryPtrList;
1376 
1377  protected:
1378 
1379  // Called once lock is acquired
1380  size_t _log(Message& message_)
1381  {
1382  Message::Field bookmark = message_.getBookmark();
1383  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1384  if (!pSub)
1385  {
1386  Message::Field subId = message_.getSubscriptionId();
1387  if (subId.empty())
1388  {
1389  subId = message_.getSubscriptionIds();
1390  }
1391  pSub = find(subId);
1392  message_.setSubscriptionHandle(
1393  static_cast<amps_subscription_handle>(pSub));
1394  }
1395  size_t retVal = pSub->log(bookmark);
1396  message_.setBookmarkSeqNo(retVal);
1397  return retVal;
1398  }
1399 
1400  // Called once lock is acquired
1401  bool _discard(const Message& message_)
1402  {
1403  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1404  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1405  if (!pSub)
1406  {
1407  Message::Field subId = message_.getSubscriptionId();
1408  if (subId.empty())
1409  {
1410  subId = message_.getSubscriptionIds();
1411  }
1412  pSub = find(subId);
1413  }
1414  bool retVal = pSub->discard(bookmarkSeqNo);
1415  if (retVal)
1416  {
1417  updateAdapter(pSub);
1418  }
1419  return retVal;
1420  }
1421 
1422  // Called once lock is acquired
1423  bool _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1424  {
1425  Subscription* pSub = find(subId_);
1426  bool retVal = pSub->discard(bookmarkSeqNo_);
1427  if (retVal)
1428  {
1429  updateAdapter(pSub);
1430  }
1431  return retVal;
1432  }
1433 
1434  // Called once lock is acquired
1435  Message::Field _getMostRecent(const Message::Field& subId_,
1436  bool usePublishersList_ = true)
1437  {
1438  Subscription* pSub = find(subId_);
1439  return pSub->getMostRecentList(usePublishersList_);
1440  }
1441 
1442  // Called once lock is acquired
1443  bool _isDiscarded(Message& message_)
1444  {
1445  Message::Field subId = message_.getSubscriptionId();
1446  if (subId.empty())
1447  {
1448  subId = message_.getSubscriptionIds();
1449  }
1450  Subscription* pSub = find(subId);
1451  message_.setSubscriptionHandle(
1452  static_cast<amps_subscription_handle>(pSub));
1453  return pSub->isDiscarded(message_.getBookmark());
1454  }
1455 
1456  // Called once lock is acquired
1457  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1458  {
1459  Subscription* pSub = find(subId_);
1460  return pSub->getOldestBookmarkSeq();
1461  }
1462 
1463  // Called once lock is acquired
1464  virtual void _persisted(Subscription* pSub_,
1465  const Message::Field& bookmark_)
1466  {
1467  if (pSub_->lastPersisted(bookmark_))
1468  {
1469  updateAdapter(pSub_);
1470  }
1471  }
1472 
1473  // Called once lock is acquired
1474  virtual Message::Field _persisted(Subscription* pSub_, size_t bookmark_)
1475  {
1476  return pSub_->lastPersisted(bookmark_);
1477  }
1478 
1479  // Called once lock is acquired
1480  void _purge()
1481  {
1482  if (_recoveryPointAdapter.isValid())
1483  {
1484  _recoveryPointAdapter.purge();
1485  }
1486  __purge();
1487  }
1488 
1489  // Called once lock is acquired
1490  void __purge()
1491  {
1492  // Walk through list and clear Fields before calling clear
1493  while (!_subs.empty())
1494  {
1495  SubscriptionMap::iterator iter = _subs.begin();
1496  //The subId key is cleared when deleting the Subscription, which shares
1497  //the _data pointer in its id field.
1498  const_cast<Message::Field&>(iter->first).clear();
1499  delete (iter->second);
1500  _subs.erase(iter);
1501  }
1502  _subs.clear();
1503  }
1504 
1505  // Called once lock is acquired
1506  virtual void _purge(const Message::Field& subId_)
1507  {
1508  if (_recoveryPointAdapter.isValid())
1509  {
1510  _recoveryPointAdapter.purge(subId_);
1511  }
1512  __purge(subId_);
1513  }
1514 
1515  // Called once lock is acquired
1516  virtual void __purge(const Message::Field& subId_)
1517  {
1518  Lock<Mutex> guard(_subsLock);
1519  SubscriptionMap::iterator iter = _subs.find(subId_);
1520  if (iter == _subs.end())
1521  {
1522  return;
1523  }
1524  const_cast<Message::Field&>(iter->first).clear();
1525  delete (iter->second);
1526  _subs.erase(iter);
1527  }
1528 
1529  // Can be used by subclasses during recovery
1530  void setMostRecent(const Message::Field& subId_,
1531  const Message::Field& recent_)
1532  {
1533  find(subId_)->setMostRecent(recent_);
1534  }
1535 
1536  Mutex _subsLock;
1537  Mutex _lock;
1538  static const char ENTRY_BOOKMARK = 'b';
1539  static const char ENTRY_DISCARD = 'd';
1540  static const char ENTRY_PERSISTED = 'p';
1541 
1542  virtual Subscription* find(const Message::Field& subId_)
1543  {
1544  if (subId_.empty())
1545  {
1546  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1547  }
1548  Lock<Mutex> guard(_subsLock);
1549  if (_subs.count(subId_) == 0)
1550  {
1551  // Subscription will be created
1552  Message::Field id;
1553  id.deepCopy(subId_);
1554  _subs[id] = new Subscription(this, id);
1555  return _subs[id];
1556  }
1557  return _subs[subId_];
1558  }
1559 
1560  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1561  bool callResizeHandler_ = true)
1562  {
1563  assert(newBuffer_ != 0);
1564  if (size_ == 0) // Delete the buffer
1565  {
1566  if (*newBuffer_)
1567  {
1568  free(*newBuffer_);
1569  *newBuffer_ = NULL;
1570  }
1571  return true;
1572  }
1573  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1574  {
1575  return false;
1576  }
1577  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1578  *newBuffer_ = (char*)malloc(size_);
1579  memset(*newBuffer_, 0, size_);
1580  if (oldBuffer)
1581  {
1582  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1583  free(oldBuffer);
1584  }
1585  return true;
1586  }
1587 
1588  protected:
1589  void updateAdapter(Subscription* pSub_)
1590  {
1591  if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1592  {
1593  return;
1594  }
1595  if (_recoveryPointFactory)
1596  {
1597  RecoveryPoint update(_recoveryPointFactory(pSub_->id(),
1598  pSub_->getMostRecentList(false)));
1599  _recoveryPointAdapter.update(update);
1600  }
1601  else
1602  {
1603  RecoveryPoint update(new FixedRecoveryPoint(pSub_->id(),
1604  pSub_->getMostRecentList(false)));
1605  _recoveryPointAdapter.update(update);
1606  }
1607  }
1608 
1609  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1610  SubscriptionMap _subs;
1611  size_t _serverVersion;
1612  bool _recentChanged;
1613  bool _recovering;
1614  typedef std::set<Subscription*> SubscriptionSet;
1615  RecoveryPointAdapter _recoveryPointAdapter;
1616  RecoveryPointFactory _recoveryPointFactory;
1617  };
1618 
1619 } // end namespace AMPS
1620 
1621 #endif //_MEMORYBOOKMARKSTORE_H_
1622 
Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1299
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1263
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1274
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1344
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1180
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1162
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1331
Message & assignBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1364
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1288
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1355
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1250
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1239
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1320
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1310
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:106
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194