AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <amps/BookmarkStore.hpp>
30 #include <amps/Field.hpp>
31 #include <amps/Message.hpp>
32 #include <amps/RecoveryPoint.hpp>
34 #include <atomic>
35 #include <functional>
36 #include <map>
37 #include <sstream>
38 #include <vector>
39 #include <stdlib.h>
40 #include <assert.h>
41 
42 #define AMPS_MIN_BOOKMARK_LEN 3
43 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
44 
49 
50 namespace AMPS
51 {
52 
59  {
60  protected:
61  class Subscription
62  {
63  public:
64  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
65  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
66  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
67  RecoveryIterator;
68  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
69 
70  // Start sequence at 1 so that 0 can be used during file subclasses
71  // recovery as an indicator that a message wasn't logged because
72  // isDiscarded() was true.
73  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
74  : _current(1), _currentBase(0), _least(1), _leastBase(0)
75  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
76  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
77  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
78  , _store(store_)
79  {
80  // Need our own memory for the sub id
81  _id.deepCopy(id_);
82  _store->resize(_id, (char**)&_entries,
83  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
84  setLastPersistedToEpoch();
85  }
86 
87  ~Subscription()
88  {
89  Lock<Mutex> guard(_subLock);
90  if (_entries)
91  {
92  for (size_t i = 0; i < _entriesLength; ++i)
93  {
94  _entries[i]._val.clear();
95  }
96  // resize to 0 will free _entries
97  _store->resize(_id, (char**)&_entries, 0);
98  }
99  _id.clear();
100  _recent.clear();
101  _lastPersisted.clear();
102  _recentList.clear();
103  _range.clear();
104  _recoveryTimestamp.clear();
105  }
106 
107  size_t log(const Message::Field& bookmark_)
108  {
109  if (bookmark_ == AMPS_BOOKMARK_NOW)
110  {
111  return 0;
112  }
113  Lock<Mutex> guard(_subLock);
114  // Either relog the recovery or log it
115  size_t index = recover(bookmark_, true);
116  if (index == AMPS_UNSET_INDEX)
117  {
118  // Check for wrap
119  if (_current >= _entriesLength)
120  {
121  _current = 0;
122  _currentBase += _entriesLength;
123  }
124  // Check for resize
125  // If list is too small, double it
126  if ((_current == _least && _leastBase < _currentBase) ||
127  (_current == _recoveryMin && _recoveryBase < _currentBase))
128  {
129  if (!_store->resize(_id, (char**)&_entries,
130  sizeof(Entry) * _entriesLength * 2))
131  {
132  //Try again
133  return log(bookmark_);
134  }
135  // Length was doubled
136  _entriesLength *= 2;
137  }
138 
139  // Add this entry to the end of our list
140  /*
141  if (bookmark_ == AMPS_BOOKMARK_NOW)
142  {
143  // Save a now timestamp bookmark
144  char* nowTimestamp = (char*)malloc(AMPS_TIMESTAMP_LEN);
145  struct tm timeInfo;
146  time_t now;
147  time(&now);
148  #ifdef _WIN32
149  gmtime_s(&timeInfo, &now);
150  #else
151  gmtime_r(&now, &timeInfo);
152  #endif
153  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
154  "%Y%m%dT%H%M%S", &timeInfo);
155  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
156  _entries[_current]._val.assign(nowTimestamp,
157  AMPS_TIMESTAMP_LEN);
158  _entries[_current]._active = false;
159  index = _current++;
160  return index + _currentBase;
161  }
162  else
163  */
164  {
165  // Is this an attempt at a range?
166  bool isRange = BookmarkRange::isRange(bookmark_);
167  bool startExclusive = false;
168  if (isRange)
169  {
170  _range.set(bookmark_);
171  // Stricter check on range syntax
172  if (!_range.isValid())
173  {
174  _range.clear();
175  throw CommandException("Invalid bookmark range specified.");
176  }
177  startExclusive = !_range.isStartInclusive();
178  }
179  if (isRange || _publishers.empty())
180  {
181  // Check validity and init publishers map for a range
182  Message::Field parseable = isRange ? _range.getStart() : bookmark_;
183  amps_uint64_t publisher, sequence;
184  std::vector<Field> bmList = Field::parseBookmarkList(parseable);
185  if (bmList.empty() && !Field::isTimestamp(parseable))
186  {
187  if (isRange)
188  {
189  _range.clear();
190  }
191  return 0;
192  }
193  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
194  {
195  parseBookmark(*bkmk, publisher, sequence);
196  if (publisher != (amps_uint64_t)0)
197  {
198  if (isRange && startExclusive)
199  {
200  // Compare it to our publishers map
201  PublisherIterator pub = _publishers.find(publisher);
202  if (pub == _publishers.end() || pub->second < sequence)
203  {
204  _publishers[publisher] = sequence;
205  }
206  }
207  }
208  else if (!Field::isTimestamp(*bkmk))
209  {
210  // This is an invalid bookmark, so don't save anything
211  if (isRange)
212  {
213  _range.clear();
214  if (startExclusive)
215  {
216  _publishers.clear();
217  }
218  }
219  return 0;
220  }
221  }
222  }
223  if (!isRange)
224  {
225  _entries[_current]._val.deepCopy(bookmark_);
226  }
227  else
228  {
229  Unlock<Mutex> unlock(_subLock);
230  _store->updateAdapter(this);
231  // Don't actually log a range
232  return 0;
233  }
234  }
235  _entries[_current]._active = true;
236  index = _current++;
237  }
238  return index + _currentBase;
239  }
240 
241  bool discard(size_t index_)
242  {
243  Lock<Mutex> guard(_subLock);
244  return _discard(index_);
245  }
246 
247  bool discard(const Message::Field& bookmark_)
248  {
249  // These are discarded when logged or not logged
250  if (bookmark_ == AMPS_BOOKMARK_NOW)
251  {
252  return false;
253  }
254  Lock<Mutex> guard(_subLock);
255  size_t search = _least;
256  size_t searchBase = _leastBase;
257  size_t searchMax = _current;
258  size_t searchMaxBase = _currentBase;
259  if (_least + _leastBase == _current + _currentBase)
260  {
261  if (_recoveryMin != AMPS_UNSET_INDEX)
262  {
263  search = _recoveryMin;
264  searchBase = _recoveryBase;
265  searchMax = _recoveryMax;
266  searchMaxBase = _recoveryMaxBase;
267  }
268  else // Store is empty, so nothing to do
269  {
270  return false;
271  }
272  }
273  assert(searchMax != AMPS_UNSET_INDEX);
274  assert(searchMaxBase != AMPS_UNSET_INDEX);
275  assert(search != AMPS_UNSET_INDEX);
276  assert(searchBase != AMPS_UNSET_INDEX);
277  // Search while we don't find the provided bookmark and we're in valid range
278  while (search + searchBase < searchMax + searchMaxBase)
279  {
280  if (_entries[search]._val == bookmark_)
281  {
282  return _discard(search + searchBase);
283  }
284  if (++search == _entriesLength)
285  {
286  // Least has now loooped around
287  searchBase += _entriesLength;
288  search = 0;
289  }
290  }
291  return false;
292  }
293 
294  // Get sequence number from a Field that is a bookmark
295  static void parseBookmark(const Message::Field& field_,
296  amps_uint64_t& publisherId_,
297  amps_uint64_t& sequenceNumber_)
298  {
299  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
300  }
301 
302  // Check to see if this message is older than the most recent one seen,
303  // and if it is, check if it discarded.
304  bool isDiscarded(const Message::Field& bookmark_)
305  {
306  Lock<Mutex> guard(_subLock);
307  if (BookmarkRange::isRange(bookmark_))
308  {
309  return false;
310  }
311 
312  amps_uint64_t publisher, sequence;
313  parseBookmark(bookmark_, publisher, sequence);
314  // Bookmarks like EPOCH, NOW, or invalid bookmarks we ignore
315  // A timestamp could be logged as the first bookmark when starting
316  // a new subscription.
317  if (publisher == 0 && !Field::isTimestamp(bookmark_))
318  {
319  return true;
320  }
321  // Check if we've already recovered this bookmark
322  size_t recoveredIdx = recover(bookmark_, false);
323  // Compare it to our publishers map
324  PublisherIterator pub = _publishers.find(publisher);
325  if (pub == _publishers.end() || pub->second < sequence)
326  {
327  _publishers[publisher] = sequence;
328  if (recoveredIdx == AMPS_UNSET_INDEX)
329  {
330  return false;
331  }
332  }
333  if (recoveredIdx != AMPS_UNSET_INDEX)
334  {
335  if (!_entries[recoveredIdx]._active)
336  {
337  _recovered.erase(bookmark_);
338  return true;
339  }
340  return false;
341  }
342  // During recovery, we don't really care if it's been discarded
343  // or not. We just want _publishers updated. No need for the
344  // costly linear search.
345  if (_store->_recovering)
346  {
347  return false;
348  }
349  // During failure and recovery scenarios, we'll see out of order
350  // bookmarks arrive, either because (a) we're replaying or (b)
351  // a publisher has cut over, and we've cut over to a new server.
352  // Scan the list to see if we have a match.
353  size_t base = _leastBase;
354  for (size_t i = _least; i + base < _current + _currentBase; i++)
355  {
356  if ( i >= _entriesLength )
357  {
358  i = 0;
359  base = _currentBase;
360  }
361  if (_entries[i]._val == bookmark_)
362  {
363  return !_entries[i]._active;
364  }
365  }
366 
367  return true; // message is totally discarded
368  }
369 
370  bool empty(void) const
371  {
372  if (_least == AMPS_UNSET_INDEX ||
373  ((_least + _leastBase) == (_current + _currentBase) &&
374  _recoveryMin == AMPS_UNSET_INDEX))
375  {
376  return true;
377  }
378  return false;
379  }
380 
381  void updateMostRecent()
382  {
383  Lock<Mutex> guard(_subLock);
384  _updateMostRecent();
385  }
386 
387  const BookmarkRange& getRange() const
388  {
389  return _range;
390  }
391 
392  Message::Field getMostRecentList(bool usePublishersList_ = true)
393  {
394  Lock<Mutex> guard(_subLock);
395  bool useLastPersisted = !_lastPersisted.empty() &&
396  _lastPersisted.len() > 1;
397  // when this is called, we'll take a moment to update the list
398  // of things recovered,
399  // so we don't accidentally log anything we ought not to.
400  _updateMostRecent();
401  bool useRecent = !_recent.empty() && _recent.len() > 1;
402  amps_uint64_t lastPublisher = 0;
403  amps_uint64_t lastSeq = 0;
404  amps_uint64_t recentPublisher = 0;
405  amps_uint64_t recentSeq = 0;
406  if (useLastPersisted)
407  {
408  parseBookmark(_lastPersisted, lastPublisher, lastSeq);
409  }
410  if (useRecent)
411  {
412  parseBookmark(_recent, recentPublisher, recentSeq);
413  if (empty() && useLastPersisted)
414  {
415  useRecent = false;
416  }
417  else
418  {
419  if (useLastPersisted && lastPublisher == recentPublisher)
420  {
421  if (lastSeq <= recentSeq)
422  {
423  useRecent = false;
424  }
425  else
426  {
427  useLastPersisted = false;
428  }
429  }
430  }
431  }
432  // Set size for all bookmarks that will be used
433  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
434  if (useRecent)
435  {
436  totalLen += _recent.len() + 1;
437  }
438  // If we don't have a non-EPOCH persisted ack and we don't have a
439  // non-EPOCH most recent bookmark, OR we have a range
440  // we can build a list based on all the publishers instead.
441  if (usePublishersList_
442  && ((!useLastPersisted && !useRecent)
443  || _lastPersisted == AMPS_BOOKMARK_EPOCH))
444  {
445  std::ostringstream os;
446  for (PublisherIterator pub = _publishers.begin();
447  pub != _publishers.end(); ++pub)
448  {
449  if (pub->first == 0 && pub->second == 0)
450  {
451  continue;
452  }
453  if (pub->first == recentPublisher && recentSeq < pub->second)
454  {
455  os << recentPublisher << '|' << recentSeq << "|,";
456  }
457  else
458  {
459  os << pub->first << '|' << pub->second << "|,";
460  }
461  }
462  std::string recent = os.str();
463  if (!recent.empty())
464  {
465  totalLen = recent.length();
466  if (!_recoveryTimestamp.empty())
467  {
468  totalLen += _recoveryTimestamp.len();
469  recent += std::string(_recoveryTimestamp);
470  }
471  else
472  {
473  // Remove trailing ,
474  recent.erase(--totalLen);
475  }
476  // Reset _recentList to new value and return it
477  _recentList.clear();
478  _recentList = Message::Field(recent).deepCopy();
479  if (_range.isValid())
480  {
481  if (_range.getStart() != recent
482  && _recentList != AMPS_BOOKMARK_EPOCH)
483  {
484  _range.replaceStart(_recentList, true);
485  }
486  else if (_range.isStartInclusive())
487  {
488  amps_uint64_t publisher, sequence;
489  parseBookmark(_range.getStart(), publisher,
490  sequence);
491  PublisherIterator pub = _publishers.find(publisher);
492  if (pub != _publishers.end()
493  && pub->second >= sequence)
494  {
495  _range.makeStartExclusive();
496  }
497  }
498  return _range.deepCopy();
499  }
500  return _recentList.deepCopy();
501  }
502  if (_range.isValid())
503  {
504  return _range.deepCopy();
505  }
506  }
507  if (!_recoveryTimestamp.empty() && !_range.isValid())
508  {
509  totalLen += _recoveryTimestamp.len() + 1;
510  }
511  // If we have nothing discarded, return EPOCH
512  if (totalLen == 0
513  || (_recent.len() < 2 && !empty()))
514  {
515  if (_range.isValid())
516  {
517  return _range.deepCopy();
518  }
519  if (!useRecent)
520  {
522  }
523  _setLastPersistedToEpoch();
524  return _lastPersisted.deepCopy();
525  }
526  // Remove the trailing , from the length
527  totalLen -= 1;
528  char* field = (char*)malloc(totalLen);
529  size_t len = 0;
530  if (useRecent)
531  {
532  len = _recent.len();
533  memcpy(field, _recent.data(), len);
534  if (len < totalLen)
535  {
536  field[len++] = ',';
537  }
538  }
539  if (useLastPersisted)
540  {
541  memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
542  len += _lastPersisted.len();
543  if (len < totalLen)
544  {
545  field[len++] = ',';
546  }
547  }
548  if (!_recoveryTimestamp.empty() && !_range.isValid())
549  {
550  memcpy(field + len, _recoveryTimestamp.data(),
551  _recoveryTimestamp.len());
552  // If more is to be written after this, uncomment the following
553  //len += _lastPersisted.len();
554  //if (len < totalLen) field[len++] = ',';
555  }
556  // _recentList clear will delete[] current buffer and assign will get cleared
557  _recentList.clear();
558  _recentList.assign(field, totalLen);
559  if (_range.isValid())
560  {
561  if (_recentList != AMPS_BOOKMARK_EPOCH)
562  {
563  if (_range.getStart() != _recentList)
564  {
565  _range.replaceStart(_recentList, true);
566  }
567  else if (_range.isStartInclusive())
568  {
569  amps_uint64_t publisher, sequence;
570  parseBookmark(_range.getStart(), publisher,
571  sequence);
572  PublisherIterator pub = _publishers.find(publisher);
573  if (pub != _publishers.end()
574  && pub->second >= sequence)
575  {
576  _range.makeStartExclusive();
577  }
578  }
579  }
580  return _range.deepCopy();
581  }
582  return _recentList.deepCopy();
583  }
584 
585  Message::Field getMostRecent(bool update_ = false)
586  {
587  Lock<Mutex> guard(_subLock);
588  // Return the same as last time if nothing's changed
589  // _recent is the most recent bookmark.
590  if (update_ && _store->_recentChanged)
591  {
592  _updateMostRecent();
593  }
594  if (_recent.empty())
595  {
597  }
598  else
599  {
600  return _recent;
601  }
602  }
603 
604  Message::Field getLastPersisted()
605  {
606  Lock<Mutex> guard(_subLock);
607  return _lastPersisted;
608  }
609 
610  void setMostRecent(const Message::Field& recent_)
611  {
612  _recent.clear();
613  _recent.deepCopy(recent_);
614  }
615 
616  void setRecoveryTimestamp(const char* recoveryTimestamp_,
617  size_t len_ = 0)
618  {
619  _recoveryTimestamp.clear();
620  size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
621  char* ts = (char*)malloc(len);
622  memcpy((void*)ts, (const void*)recoveryTimestamp_, len);
623  _recoveryTimestamp.assign(ts, len);
624  }
625 
626  void moveEntries(char* old_, char* new_, size_t newSize_)
627  {
628  size_t least = _least;
629  size_t leastBase = _leastBase;
630  if (_recoveryMin != AMPS_UNSET_INDEX)
631  {
632  least = _recoveryMin;
633  leastBase = _recoveryBase;
634  }
635  // First check if we grew in place, if so, just move current after least
636  if (old_ == new_)
637  {
638  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
639  {
640  memcpy(new_ + (sizeof(Entry)*_entriesLength),
641  old_, (sizeof(Entry)*least));
642  // Clear the beginning where those entries were
643  memset(old_, 0, sizeof(Entry)*least);
644  }
645  else // We have to use an intermediate buffer
646  {
647  Entry* buffer = new Entry[least];
648  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
649  //Put the beginning entries at the start of the new buffer
650  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
651  (_entriesLength - least)*sizeof(Entry));
652  //Put the end entries after the beginning entries
653  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
654  (void*)buffer, least * sizeof(Entry));
655  // Least is now at 0 so base must be increased
656  leastBase += least;
657  least = 0;
658  delete [] buffer;
659  }
660  }
661  else
662  {
663  //Put the beginning entries at the start of the new buffer
664  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
665  (_entriesLength - least)*sizeof(Entry));
666  //Put the end entries after the beginning entries
667  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
668  (void*)old_, least * sizeof(Entry));
669  // Least is now at 0 so base must be increased
670  leastBase += least;
671  least = 0;
672  }
673  if (_recoveryMin != AMPS_UNSET_INDEX)
674  {
675  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
676  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
677  (_recoveryMin + _recoveryBase);
678  _recoveryMaxBase = leastBase;
679  _recoveryMin = least;
680  _recoveryBase = leastBase;
681  }
682  else
683  {
684  _least = least;
685  }
686  _leastBase = leastBase;
687  // Current is now after everything and using the same base
688  _currentBase = _leastBase;
689  _current = least + _entriesLength;
690  }
691 
692  inline size_t getOldestBookmarkSeq()
693  {
694  Lock<Mutex> guard(_subLock);
695  // If there is nothing in the store, return -1, otherwise return lowest
696  return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
697  _least + _leastBase;
698  }
699 
700  bool lastPersisted(const Message::Field& bookmark_)
701  {
702  // These shouldn't be persisted
703  if (bookmark_ == AMPS_BOOKMARK_NOW
704  || BookmarkRange::isRange(bookmark_))
705  {
706  return false;
707  }
708  Lock<Mutex> guard(_subLock);
709  return _setLastPersisted(bookmark_);
710  }
711 
712  bool _setLastPersisted(const Message::Field& bookmark_)
713  {
714  if (!_lastPersisted.empty())
715  {
716  amps_uint64_t publisher, publisher_lastPersisted;
717  amps_uint64_t sequence, sequence_lastPersisted;
718  parseBookmark(bookmark_, publisher, sequence);
719  parseBookmark(_lastPersisted, publisher_lastPersisted,
720  sequence_lastPersisted);
721  if (publisher == publisher_lastPersisted &&
722  sequence <= sequence_lastPersisted)
723  {
724  return false;
725  }
726  }
727  // deepCopy will clear what's in _lastPersisted
728  _lastPersisted.deepCopy(bookmark_);
729  _store->_recentChanged = true;
730  _recoveryTimestamp.clear();
731  return true;
732  }
733 
734  Message::Field lastPersisted(size_t bookmark_)
735  {
736  Lock<Mutex> guard(_subLock);
737  Message::Field& bookmark = _entries[bookmark_]._val;
738  // These shouldn't be persisted
739  if (bookmark == AMPS_BOOKMARK_NOW
740  || BookmarkRange::isRange(bookmark))
741  {
742  return bookmark;
743  }
744  _setLastPersisted(bookmark);
745  return bookmark;
746  }
747 
748  // Returns the index of the recovered item, either the index where it
749  // was first stored prior to getMostRecent, or the new index if it is
750  // relogged either because this is called from log() or because it was
751  // not active but also not persisted.
752  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
753  {
754  size_t retVal = AMPS_UNSET_INDEX;
755  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
756  {
757  return retVal;
758  }
759  // Check if this is a recovered bookmark.
760  // If so, copy the existing one to the new location
761  RecoveryIterator item = _recovered.find(bookmark_);
762  if (item != _recovered.end())
763  {
764  size_t seqNo = item->second;
765  size_t index = (seqNo - _recoveryBase) % _entriesLength;
766  // If we only have recovery entries and isDiscarded is
767  // checking on an already discarded entry, update recent.
768  if (_least + _leastBase == _current + _currentBase &&
769  !_entries[index]._active)
770  {
771  _store->_recentChanged = true;
772  _recent.clear();
773  _recent = _entries[index]._val.deepCopy();
774  retVal = moveEntry(index);
775  if (retVal == AMPS_UNSET_INDEX)
776  {
777  recover(bookmark_, relogIfNotDiscarded);
778  }
779  _least = _current;
780  _leastBase = _currentBase;
781  }
782  else if (!_entries[index]._active || relogIfNotDiscarded)
783  {
784  retVal = moveEntry(index);
785  if (retVal == AMPS_UNSET_INDEX)
786  {
787  recover(bookmark_, relogIfNotDiscarded);
788  }
789  }
790  else
791  {
792  return index;
793  }
794  _recovered.erase(item);
795  if (_recovered.empty())
796  {
797  _recoveryMin = AMPS_UNSET_INDEX;
798  _recoveryBase = AMPS_UNSET_INDEX;
799  _recoveryMax = AMPS_UNSET_INDEX;
800  _recoveryMaxBase = AMPS_UNSET_INDEX;
801  }
802  else if (index == _recoveryMin)
803  {
804  while (_entries[_recoveryMin]._val.empty() &&
805  (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
806  {
807  if (++_recoveryMin == _entriesLength)
808  {
809  _recoveryMin = 0;
810  _recoveryBase += _entriesLength;
811  }
812  }
813  }
814  }
815  return retVal;
816  }
817 
818  // Return the id of this Subscription
819  Message::Field id() const
820  {
821  return _id;
822  }
823 
824  struct Entry
825  {
826  Message::Field _val; //16
827  bool _active; //17
828  char _padding[32 - sizeof(Message::Field) - sizeof(bool)]; //32
829 
830  Entry() : _active(false)
831  {
832  ;
833  }
834  };
835 
836  typedef std::vector<Entry*> EntryPtrList;
837 
838  void getRecoveryEntries(EntryPtrList& list_)
839  {
840  if (_recoveryMin == AMPS_UNSET_INDEX ||
841  _recoveryMax == AMPS_UNSET_INDEX)
842  {
843  return;
844  }
845  size_t base = _recoveryBase;
846  size_t max = _recoveryMax + _recoveryMaxBase;
847  for (size_t i = _recoveryMin; i + base < max; ++i)
848  {
849  if (i == _entriesLength)
850  {
851  i = 0;
852  base = _recoveryMaxBase;
853  }
854  //list_.insert(&(_entries[i]));
855  list_.push_back(&(_entries[i]));
856  }
857  return;
858  }
859 
860  void getActiveEntries(EntryPtrList& list_)
861  {
862  size_t base = _leastBase;
863  for (size_t i = _least; i + base < _current + _currentBase; ++i)
864  {
865  if (i >= _entriesLength)
866  {
867  i = 0;
868  base = _currentBase;
869  }
870  //list_.insert(&(_entries[i]));
871  list_.push_back(&(_entries[i]));
872  }
873  return;
874  }
875 
876  Entry* getEntryByIndex(size_t index_)
877  {
878  Lock<Mutex> guard(_subLock);
879  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
880  index_ >= _least + _leastBase)
881  ? _leastBase : _recoveryBase;
882  // Return NULL if not a valid index
883  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
884  _least + _leastBase :
885  _recoveryMin + _recoveryBase);
886  if (index_ >= _current + _currentBase || index_ < min)
887  {
888  return NULL;
889  }
890  return &(_entries[(index_ - base) % _entriesLength]);
891  }
892 
893  void justRecovered()
894  {
895  Lock<Mutex> guard(_subLock);
896  _updateMostRecent();
897  EntryPtrList list;
898  getRecoveryEntries(list);
899  setPublishersToDiscarded(&list, &_publishers);
900  }
901 
902  void setPublishersToDiscarded(EntryPtrList* recovered_,
903  PublisherMap* publishers_)
904  {
905  // Need to reset publishers to only have up to the last
906  // discarded sequence number. Messages that were in transit
907  // during previous run but not discarded should be considered
908  // new and not duplicate after a restart/recovery.
909  for (EntryPtrList::iterator i = recovered_->begin();
910  i != recovered_->end(); ++i)
911  {
912  if ((*i)->_val.empty())
913  {
914  continue;
915  }
916  amps_uint64_t publisher = (amps_uint64_t)0;
917  amps_uint64_t sequence = (amps_uint64_t)0;
918  parseBookmark((*i)->_val, publisher, sequence);
919  if (publisher && sequence && (*i)->_active &&
920  (*publishers_)[publisher] >= sequence)
921  {
922  (*publishers_)[publisher] = sequence - 1;
923  }
924  }
925  }
926 
927  void clearLastPersisted()
928  {
929  Lock<Mutex> guard(_subLock);
930  _lastPersisted.clear();
931  }
932 
933  void setLastPersistedToEpoch()
934  {
935  Lock<Mutex> guard(_subLock);
936  _setLastPersistedToEpoch();
937  }
938 
939  private:
940  Subscription(const Subscription&);
941  Subscription& operator=(const Subscription&);
942 
943  size_t moveEntry(size_t index_)
944  {
945  // Check for wrap
946  if (_current >= _entriesLength)
947  {
948  _current = 0;
949  _currentBase += _entriesLength;
950  }
951  // Check for resize
952  // If list is too small, double it
953  if ((_current == _least % _entriesLength &&
954  _leastBase < _currentBase) ||
955  (_current == _recoveryMin && _recoveryBase < _currentBase))
956  {
957  if (!_store->resize(_id, (char**)&_entries,
958  sizeof(Entry) * _entriesLength * 2))
959  {
960  return AMPS_UNSET_INDEX;
961  }
962  // Length was doubled
963  _entriesLength *= 2;
964  }
965  _entries[_current]._val = _entries[index_]._val;
966  _entries[_current]._active = _entries[index_]._active;
967  // No need to clear Field, just set it to empty
968  _entries[index_]._val.assign(NULL, 0);
969  _entries[index_]._active = false;
970  return _current++;
971  }
972 
973  void _setLastPersistedToEpoch()
974  {
975  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
976  char* field = (char*)malloc(fieldLen);
977  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
978  _lastPersisted.clear();
979  _lastPersisted.assign(field, fieldLen);
980  }
981 
982  bool _discard(size_t index_)
983  {
984  bool retVal = false;
985  // Lock should already be held
986  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
987  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
988  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
989  || index_ >= _least + _leastBase)
990  ? _leastBase : _recoveryBase;
991  // discard of a record not in the log is a no-op
992  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
993  _recoveryMin + _recoveryBase);
994  if (index_ >= _current + _currentBase || index_ < min)
995  {
996  return retVal;
997  }
998 
999  // log that this one is discarded, then
1000  // recalculate what the most recent entry is.
1001  Entry& e = _entries[(index_ - base) % _entriesLength];
1002  e._active = false;
1003 
1004  size_t index = index_;
1005  if (_recoveryMin != AMPS_UNSET_INDEX &&
1006  index_ == _recoveryMin + _recoveryBase)
1007  {
1008  // Find all to discard
1009  size_t j = _recoveryMin;
1010  while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
1011  !_entries[j]._active)
1012  {
1013  // This index might be left-over from a slow discard and we
1014  // may have reconnected. We have a few possibilities at this point.
1015  // 1. If we re-logged this bookmark, this index will point at an
1016  // empty bookmark. This could happen if the discard thread was slow
1017  // and the reconnect was fast. We wouldn't report the
1018  // the re-arrival of the bookmark as a duplicate because it
1019  // hadn't been marked as discarded. In this case, we have to
1020  // simply move past this in the recovery area.
1021  // 2. This bookmark should become _recent because we haven't
1022  // yet received anything since our last call to getMostRecent.
1023  // In this case, we need to take it out of recovered but not
1024  // clear it. The publishers map should report it as duplicate.
1025  // 3. This is the 'oldest' recovered, but we have received new
1026  // bookmarks since we got this one. We can clear it because the
1027  // publishers map should report it as a duplicate if/when it
1028  // does arrive again. Move the _recoveryMin ahead and remove it
1029  // from recovered.
1030  Message::Field& bookmark = _entries[j]._val;
1031  // Option 1 skips this and just moves on
1032  if (!bookmark.empty())
1033  {
1034  _recovered.erase(bookmark);
1035  // Make sure our publishers map will mark it discarded
1036  amps_uint64_t publisher, sequence;
1037  parseBookmark(bookmark, publisher, sequence);
1038  PublisherIterator pub = _publishers.find(publisher);
1039  if (pub == _publishers.end() || pub->second < sequence)
1040  {
1041  _publishers[publisher] = sequence;
1042  }
1043  if (_least + _leastBase == _current + _currentBase ||
1044  ((_least + _leastBase) % _entriesLength) ==
1045  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
1046  {
1047  // Option 2, reset recent
1048  retVal = true;
1049  _store->_recentChanged = true;
1050  _recoveryTimestamp.clear();
1051  _recent.clear();
1052  _recent = bookmark;
1053  bookmark.assign(NULL, 0);
1054  }
1055  else
1056  {
1057  // Option 3, simply clear this one
1058  bookmark.clear();
1059  }
1060  }
1061  // If we reach the buffer end,
1062  // keep checking from the beginning
1063  if (++j == _entriesLength)
1064  {
1065  // Least has now loooped around
1066  _recoveryBase += _entriesLength;
1067  j = 0;
1068  }
1069  }
1070  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1071  _recovered.empty());
1072  if (_recovered.empty())
1073  {
1074  _recoveryMin = AMPS_UNSET_INDEX;
1075  _recoveryBase = AMPS_UNSET_INDEX;
1076  _recoveryMax = AMPS_UNSET_INDEX;
1077  _recoveryMaxBase = AMPS_UNSET_INDEX;
1078  // Cleared recovered, want to check onward
1079  index = _least + _leastBase;
1080  }
1081  else
1082  {
1083  _recoveryMin = j;
1084  }
1085  }
1086  // if this is the first item in the list, discard all inactive ones
1087  // as long as recovery also says its okay
1088  if (index == _least + _leastBase)
1089  {
1090  // Find all to discard
1091  size_t j = _least;
1092  while (j + _leastBase < _current + _currentBase &&
1093  !_entries[j]._active)
1094  {
1095  //Must free associated memory
1096  _recent.clear();
1097  _recent = _entries[j]._val;
1098  _entries[j]._val.assign(NULL, 0);
1099  _store->_recentChanged = true;
1100  retVal = true;
1101  _recoveryTimestamp.clear();
1102  // If we reach the buffer end,
1103  // keep checking from the beginning
1104  if (++j == _entriesLength)
1105  {
1106  // Least has now loooped around
1107  _leastBase += _entriesLength;
1108  j = 0;
1109  }
1110  }
1111  _least = j;
1112  }
1113  return retVal;
1114  }
1115 
1116  void _updateMostRecent()
1117  {
1118  // Lock is already held
1119  _recovered.clear();
1120  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1121  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1122  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1123  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1124  _recoveryMin = AMPS_UNSET_INDEX;
1125  _recoveryBase = AMPS_UNSET_INDEX;
1126  _recoveryMax = AMPS_UNSET_INDEX;
1127  _recoveryMaxBase = AMPS_UNSET_INDEX;
1128  for (size_t i = start; i + base < _current + _currentBase; i++)
1129  {
1130  if ( i >= _entriesLength )
1131  {
1132  i = 0;
1133  base = _currentBase;
1134  }
1135  if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1136  {
1137  continue;
1138  }
1139  Entry& entry = _entries[i];
1140  if (!entry._val.empty())
1141  {
1142  _recovered[entry._val] = i + base;
1143  if (_recoveryMin == AMPS_UNSET_INDEX)
1144  {
1145  _recoveryMin = i;
1146  _recoveryBase = base;
1147  _recoveryMax = _current;
1148  _recoveryMaxBase = _currentBase;
1149  }
1150  }
1151  }
1152  if (_current == _entriesLength)
1153  {
1154  _current = 0;
1155  _currentBase += _entriesLength;
1156  }
1157  _least = _current;
1158  _leastBase = _currentBase;
1159  }
1160 
1161  Message::Field _id;
1162  Message::Field _recent;
1163  Message::Field _lastPersisted;
1164  Message::Field _recentList;
1165  BookmarkRange _range;
1166  Message::Field _recoveryTimestamp;
1167  size_t _current;
1168  size_t _currentBase;
1169  size_t _least;
1170  size_t _leastBase;
1171  size_t _recoveryMin;
1172  size_t _recoveryBase;
1173  size_t _recoveryMax;
1174  size_t _recoveryMaxBase;
1175  size_t _entriesLength;
1176  Entry* _entries;
1177  MemoryBookmarkStore* _store;
1178  Mutex _subLock;
1179  RecoveryMap _recovered;
1180  public:
1181  PublisherMap _publishers;
1182  };
1183 
1184  public:
1188  _subsLock(),
1189  _lock(),
1190  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1191  _recentChanged(true),
1192  _recovering(false),
1193  _recoveryPointAdapter(NULL),
1194  _recoveryPointFactory(NULL),
1195  _adapterSequence((amps_uint64_t)0),
1196  _nextAdapterUpdate((amps_uint64_t)0)
1197  { ; }
1198 
1199  typedef RecoveryPointAdapter::iterator RecoveryIterator;
1200 
1208  RecoveryPointFactory factory_ = NULL)
1209  : BookmarkStoreImpl()
1210  , _subsLock()
1211  , _lock()
1212  , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1213  , _recentChanged(true)
1214  , _recovering(true)
1215  , _recoveryPointAdapter(adapter_)
1216  , _recoveryPointFactory(factory_)
1217  , _adapterSequence((amps_uint64_t)0)
1218  , _nextAdapterUpdate((amps_uint64_t)0)
1219  {
1220  Message msg;
1221  if (!_recoveryPointFactory)
1222  {
1223  _recoveryPointFactory = &FixedRecoveryPoint::create;
1224  }
1225  for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1226  recoveryPoint != _recoveryPointAdapter.end();
1227  ++recoveryPoint)
1228  {
1229  Field subId(recoveryPoint->getSubId());
1230  msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1231  msg.setSubId(subId);
1232  Field bookmark = recoveryPoint->getBookmark();
1233  if (BookmarkRange::isRange(bookmark))
1234  {
1235  msg.setBookmark(bookmark);
1236  _log(msg);
1237  }
1238  else
1239  {
1240  std::vector<Field> bmList = Field::parseBookmarkList(bookmark);
1241  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
1242  {
1243  if (Field::isTimestamp(*bkmk))
1244  {
1245  find(subId)->setRecoveryTimestamp(bkmk->data(), bkmk->len());
1246  }
1247  else
1248  {
1249  msg.assignBookmark(bkmk->data(), bkmk->len());
1250  _isDiscarded(msg);
1251  _log(msg);
1252  _discard(msg);
1253  }
1254  }
1255  // Reset to original bookmark
1256  msg.setBookmark(bookmark);
1257  }
1258  }
1259  _recovering = false;
1260  }
1261 
1262  virtual ~MemoryBookmarkStore()
1263  {
1264  if (_recoveryPointAdapter.isValid())
1265  {
1266  _recoveryPointAdapter.close();
1267  }
1268  __purge();
1269  }
1270 
1276  virtual size_t log(Message& message_)
1277  {
1278  Lock<Mutex> guard(_lock);
1279  return _log(message_);
1280  }
1281 
1287  virtual void discard(const Message& message_)
1288  {
1289  Lock<Mutex> guard(_lock);
1290  (void)_discard(message_);
1291  }
1292 
1300  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1301  {
1302  Lock<Mutex> guard(_lock);
1303  (void)_discard(subId_, bookmarkSeqNo_);
1304  }
1305 
1312  {
1313  Lock<Mutex> guard(_lock);
1314  return _getMostRecent(subId_);
1315  }
1316 
1325  virtual bool isDiscarded(Message& message_)
1326  {
1327  Lock<Mutex> guard(_lock);
1328  return _isDiscarded(message_);
1329  }
1330 
1336  virtual void purge()
1337  {
1338  Lock<Mutex> guard(_lock);
1339  _purge();
1340  }
1341 
1347  virtual void purge(const Message::Field& subId_)
1348  {
1349  Lock<Mutex> guard(_lock);
1350  _purge(subId_);
1351  }
1352 
1357  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1358  {
1359  Lock<Mutex> guard(_lock);
1360  return _getOldestBookmarkSeq(subId_);
1361  }
1362 
1368  virtual void persisted(const Message::Field& subId_,
1369  const Message::Field& bookmark_)
1370  {
1371  Lock<Mutex> guard(_lock);
1372  _persisted(find(subId_), bookmark_);
1373  }
1374 
1381  virtual Message::Field persisted(const Message::Field& subId_,
1382  size_t bookmark_)
1383  {
1384  Lock<Mutex> guard(_lock);
1385  return _persisted(find(subId_), bookmark_);
1386  }
1387 
1392  void setServerVersion(const VersionInfo& version_)
1393  {
1394  setServerVersion(version_.getOldStyleVersion());
1395  }
1396 
1401  void setServerVersion(size_t version_)
1402  {
1403  Lock<Mutex> guard(_subsLock);
1404  _serverVersion = version_;
1405  }
1406 
1407  inline bool isWritableBookmark(size_t length)
1408  {
1409  return length >= AMPS_MIN_BOOKMARK_LEN;
1410  }
1411 
1412  typedef Subscription::EntryPtrList EntryPtrList;
1413 
1414  protected:
1415 
1416  // Called once lock is acquired
1417  size_t _log(Message& message_)
1418  {
1419  Message::Field bookmark = message_.getBookmark();
1420  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1421  if (!pSub)
1422  {
1423  Message::Field subId = message_.getSubscriptionId();
1424  if (subId.empty())
1425  {
1426  subId = message_.getSubscriptionIds();
1427  }
1428  pSub = find(subId);
1429  message_.setSubscriptionHandle(
1430  static_cast<amps_subscription_handle>(pSub));
1431  }
1432  size_t retVal = pSub->log(bookmark);
1433  message_.setBookmarkSeqNo(retVal);
1434  return retVal;
1435  }
1436 
1437  // Called once lock is acquired, or from ctor
1438  bool _discard(const Message& message_)
1439  {
1440  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1441  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1442  if (!pSub)
1443  {
1444  Message::Field subId = message_.getSubscriptionId();
1445  if (subId.empty())
1446  {
1447  subId = message_.getSubscriptionIds();
1448  }
1449  pSub = find(subId);
1450  }
1451  bool retVal = pSub->discard(bookmarkSeqNo);
1452  if (retVal)
1453  {
1454  updateAdapter(pSub);
1455  }
1456  return retVal;
1457  }
1458 
1459  // Called once lock is acquired
1460  bool _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1461  {
1462  Subscription* pSub = find(subId_);
1463  bool retVal = pSub->discard(bookmarkSeqNo_);
1464  if (retVal)
1465  {
1466  updateAdapter(pSub);
1467  }
1468  return retVal;
1469  }
1470 
1471  // Called once lock is acquired
1472  Message::Field _getMostRecent(const Message::Field& subId_,
1473  bool usePublishersList_ = true)
1474  {
1475  Subscription* pSub = find(subId_);
1476  return pSub->getMostRecentList(usePublishersList_);
1477  }
1478 
1479  // Called once lock is acquired
1480  bool _isDiscarded(Message& message_)
1481  {
1482  Message::Field subId = message_.getSubscriptionId();
1483  if (subId.empty())
1484  {
1485  subId = message_.getSubscriptionIds();
1486  }
1487  Subscription* pSub = find(subId);
1488  message_.setSubscriptionHandle(
1489  static_cast<amps_subscription_handle>(pSub));
1490  return pSub->isDiscarded(message_.getBookmark());
1491  }
1492 
1493  // Called once lock is acquired
1494  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1495  {
1496  Subscription* pSub = find(subId_);
1497  return pSub->getOldestBookmarkSeq();
1498  }
1499 
1500  // Called once lock is acquired
1501  virtual void _persisted(Subscription* pSub_,
1502  const Message::Field& bookmark_)
1503  {
1504  if (pSub_->lastPersisted(bookmark_))
1505  {
1506  updateAdapter(pSub_);
1507  }
1508  }
1509 
1510  // Called once lock is acquired
1511  virtual Message::Field _persisted(Subscription* pSub_, size_t bookmark_)
1512  {
1513  return pSub_->lastPersisted(bookmark_);
1514  }
1515 
1516  // Called once lock is acquired
1517  void _purge()
1518  {
1519  if (_recoveryPointAdapter.isValid())
1520  {
1521  _recoveryPointAdapter.purge();
1522  }
1523  __purge();
1524  }
1525 
1526  // Called once lock is acquired
1527  void __purge()
1528  {
1529  // Walk through list and clear Fields before calling clear
1530  while (!_subs.empty())
1531  {
1532  SubscriptionMap::iterator iter = _subs.begin();
1533  //The subId key is cleared when deleting the Subscription, which shares
1534  //the _data pointer in its id field.
1535  const_cast<Message::Field&>(iter->first).clear();
1536  delete (iter->second);
1537  _subs.erase(iter);
1538  }
1539  _subs.clear();
1540  }
1541 
1542  // Called once lock is acquired
1543  virtual void _purge(const Message::Field& subId_)
1544  {
1545  if (_recoveryPointAdapter.isValid())
1546  {
1547  _recoveryPointAdapter.purge(subId_);
1548  }
1549  __purge(subId_);
1550  }
1551 
1552  // Called once lock is acquired
1553  virtual void __purge(const Message::Field& subId_)
1554  {
1555  Lock<Mutex> guard(_subsLock);
1556  SubscriptionMap::iterator iter = _subs.find(subId_);
1557  if (iter == _subs.end())
1558  {
1559  return;
1560  }
1561  const_cast<Message::Field&>(iter->first).clear();
1562  delete (iter->second);
1563  _subs.erase(iter);
1564  }
1565 
1566  // Can be used by subclasses during recovery
1567  void setMostRecent(const Message::Field& subId_,
1568  const Message::Field& recent_)
1569  {
1570  find(subId_)->setMostRecent(recent_);
1571  }
1572 
1573  Mutex _subsLock;
1574  Mutex _lock;
1575  static const char ENTRY_BOOKMARK = 'b';
1576  static const char ENTRY_DISCARD = 'd';
1577  static const char ENTRY_PERSISTED = 'p';
1578 
1579  virtual Subscription* find(const Message::Field& subId_)
1580  {
1581  if (subId_.empty())
1582  {
1583  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1584  }
1585  Lock<Mutex> guard(_subsLock);
1586  if (_subs.count(subId_) == 0)
1587  {
1588  // Subscription will be created
1589  Message::Field id;
1590  id.deepCopy(subId_);
1591  _subs[id] = new Subscription(this, id);
1592  return _subs[id];
1593  }
1594  return _subs[subId_];
1595  }
1596 
1597  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1598  bool callResizeHandler_ = true)
1599  {
1600  assert(newBuffer_ != 0);
1601  if (size_ == 0) // Delete the buffer
1602  {
1603  if (*newBuffer_)
1604  {
1605  free(*newBuffer_);
1606  *newBuffer_ = NULL;
1607  }
1608  return true;
1609  }
1610  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1611  {
1612  return false;
1613  }
1614  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1615  *newBuffer_ = (char*)malloc(size_);
1616  memset(*newBuffer_, 0, size_);
1617  if (oldBuffer)
1618  {
1619  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1620  free(oldBuffer);
1621  }
1622  return true;
1623  }
1624 
1625  protected:
1626  void updateAdapter(Subscription* pSub_)
1627  {
1628  if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1629  {
1630  return;
1631  }
1632  Field bookmark = pSub_->getMostRecentList(false);
1633  RecoveryPoint update = _recoveryPointFactory(pSub_->id(), bookmark);
1634  // Use atomic seq to keep updates in order
1635  amps_uint64_t seq = _adapterSequence.fetch_add(1);
1636  Unlock<Mutex> unlock(_lock);
1637  while (_nextAdapterUpdate.load() < seq)
1638  {
1639  AMPS_USLEEP(100);
1640  }
1641  try
1642  {
1643  _recoveryPointAdapter.update(update);
1644  }
1645  catch (const std::exception&)
1646  {
1647  _nextAdapterUpdate.fetch_add(1);
1648  // Free memory, the bookmark needs to be cleared
1649  bookmark.clear();
1650  throw;
1651  }
1652  _nextAdapterUpdate.fetch_add(1);
1653  // Free memory, the bookmark needs to be cleared
1654  bookmark.clear();
1655  }
1656 
1657  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1658  SubscriptionMap _subs;
1659  size_t _serverVersion;
1660  bool _recentChanged;
1661  bool _recovering;
1662  typedef std::set<Subscription*> SubscriptionSet;
1663  RecoveryPointAdapter _recoveryPointAdapter;
1664  RecoveryPointFactory _recoveryPointFactory;
1665  std::atomic<amps_uint64_t> _adapterSequence;
1666  std::atomic<amps_uint64_t> _nextAdapterUpdate;
1667  };
1668 
1669 } // end namespace AMPS
1670 
1671 #endif //_MEMORYBOOKMARKSTORE_H_
1672 
Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1336
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1300
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1311
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1381
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
static Field stringCopy(const char *str_)
Makes a copy of str_ in a new Field.
Definition: Field.hpp:247
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1207
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1187
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1368
Message & assignBookmark(const std::string &v)
Assigns the value of the Bookmark header for this Message without copying.
Definition: Message.hpp:1256
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1401
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a Field which references the unde...
Definition: Message.hpp:1490
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1325
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1392
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1287
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1489
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:58
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1276
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
static RecoveryPoint create(const Field &subId_, const Field &bookmark_)
Use this function in BookmarkStore::setRecoveryPointFactory( std::bind(&FixedRecoveryPoint::create, std::placeholder::_1, std::placeholder::_2))
Definition: RecoveryPoint.hpp:139
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1357
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1347
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1256
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256