AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.3
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <amps/BookmarkStore.hpp>
30 #include <amps/Field.hpp>
31 #include <amps/Message.hpp>
32 #include <amps/RecoveryPoint.hpp>
34 #include <functional>
35 #include <map>
36 #include <sstream>
37 #include <vector>
38 #include <stdlib.h>
39 #include <assert.h>
40 
41 #define AMPS_MIN_BOOKMARK_LEN 3
42 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
43 
48 
49 namespace AMPS
50 {
51 
58  {
59  protected:
60  class Subscription
61  {
62  public:
63  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
64  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
65  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
66  RecoveryIterator;
67  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
68 
69  // Start sequence at 1 so that 0 can be used during file subclasses
70  // recovery as an indicator that a message wasn't logged because
71  // isDiscarded() was true.
72  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
73  : _current(1), _currentBase(0), _least(1), _leastBase(0)
74  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
75  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
76  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
77  , _store(store_)
78  {
79  // Need our own memory for the sub id
80  _id.deepCopy(id_);
81  _store->resize(_id, (char**)&_entries,
82  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
83  setLastPersistedToEpoch();
84  }
85 
86  ~Subscription()
87  {
88  Lock<Mutex> guard(_subLock);
89  if (_entries)
90  {
91  for (size_t i = 0; i < _entriesLength; ++i)
92  {
93  _entries[i]._val.clear();
94  }
95  // resize to 0 will free _entries
96  _store->resize(_id, (char**)&_entries, 0);
97  }
98  _id.clear();
99  _recent.clear();
100  _lastPersisted.clear();
101  _recentList.clear();
102  _range.clear();
103  _recoveryTimestamp.clear();
104  }
105 
106  size_t log(const Message::Field& bookmark_)
107  {
108  if (bookmark_ == AMPS_BOOKMARK_NOW)
109  {
110  return 0;
111  }
112  Lock<Mutex> guard(_subLock);
113  // Either relog the recovery or log it
114  size_t index = recover(bookmark_, true);
115  if (index == AMPS_UNSET_INDEX)
116  {
117  // Check for wrap
118  if (_current >= _entriesLength)
119  {
120  _current = 0;
121  _currentBase += _entriesLength;
122  }
123  // Check for resize
124  // If list is too small, double it
125  if ((_current == _least && _leastBase < _currentBase) ||
126  (_current == _recoveryMin && _recoveryBase < _currentBase))
127  {
128  if (!_store->resize(_id, (char**)&_entries,
129  sizeof(Entry) * _entriesLength * 2))
130  {
131  //Try again
132  return log(bookmark_);
133  }
134  // Length was doubled
135  _entriesLength *= 2;
136  }
137 
138  // Add this entry to the end of our list
139  /*
140  if (bookmark_ == AMPS_BOOKMARK_NOW)
141  {
142  // Save a now timestamp bookmark
143  char* nowTimestamp = new char[AMPS_TIMESTAMP_LEN];
144  struct tm timeInfo;
145  time_t now;
146  time(&now);
147  #ifdef _WIN32
148  gmtime_s(&timeInfo, &now);
149  #else
150  gmtime_r(&now, &timeInfo);
151  #endif
152  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
153  "%Y%m%dT%H%M%S", &timeInfo);
154  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
155  _entries[_current]._val.assign(nowTimestamp,
156  AMPS_TIMESTAMP_LEN);
157  _entries[_current]._active = false;
158  index = _current++;
159  return index + _currentBase;
160  }
161  else
162  */
163  {
164  // Is this an attempt at a range?
165  if (!BookmarkRange::isRange(bookmark_))
166  {
167  _entries[_current]._val.deepCopy(bookmark_);
168  }
169  else
170  {
171  // Deep copy of the range is saved
172  _range.set(bookmark_);
173  // Stricter check on range syntax
174  if (!_range.isValid())
175  {
176  throw CommandException("Invalid bookmark range specified.");
177  }
178  if (!_range.isStartInclusive())
179  {
180  // Put it in our publishers map
181  amps_uint64_t publisher, sequence;
182  std::vector<Field> bmList = Field::parseBookmarkList(_range.getStart());
183  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
184  {
185  parseBookmark(*bkmk, publisher, sequence);
186  if (publisher != (amps_uint64_t)0)
187  {
188  // Compare it to our publishers map
189  PublisherIterator pub = _publishers.find(publisher);
190  if (pub == _publishers.end() || pub->second < sequence)
191  {
192  _publishers[publisher] = sequence;
193  }
194  }
195  }
196  }
197  Unlock<Mutex> unlock(_subLock);
198  _store->updateAdapter(this);
199  // Don't actually log a range
200  return 0;
201  }
202  }
203  _entries[_current]._active = true;
204  index = _current++;
205  }
206  return index + _currentBase;
207  }
208 
209  bool discard(size_t index_)
210  {
211  Lock<Mutex> guard(_subLock);
212  return _discard(index_);
213  }
214 
215  bool discard(const Message::Field& bookmark_)
216  {
217  // These are discarded when logged or not logged
218  if (bookmark_ == AMPS_BOOKMARK_NOW)
219  {
220  return false;
221  }
222  Lock<Mutex> guard(_subLock);
223  size_t search = _least;
224  size_t searchBase = _leastBase;
225  size_t searchMax = _current;
226  size_t searchMaxBase = _currentBase;
227  if (_least + _leastBase == _current + _currentBase)
228  {
229  if (_recoveryMin != AMPS_UNSET_INDEX)
230  {
231  search = _recoveryMin;
232  searchBase = _recoveryBase;
233  searchMax = _recoveryMax;
234  searchMaxBase = _recoveryMaxBase;
235  }
236  else // Store is empty, so nothing to do
237  {
238  return false;
239  }
240  }
241  assert(searchMax != AMPS_UNSET_INDEX);
242  assert(searchMaxBase != AMPS_UNSET_INDEX);
243  assert(search != AMPS_UNSET_INDEX);
244  assert(searchBase != AMPS_UNSET_INDEX);
245  // Search while we don't find the provided bookmark and we're in valid range
246  while (search + searchBase < searchMax + searchMaxBase)
247  {
248  if (_entries[search]._val == bookmark_)
249  {
250  return _discard(search + searchBase);
251  }
252  if (++search == _entriesLength)
253  {
254  // Least has now loooped around
255  searchBase += _entriesLength;
256  search = 0;
257  }
258  }
259  return false;
260  }
261 
262  // Get sequence number from a Field that is a bookmark
263  static void parseBookmark(const Message::Field& field_,
264  amps_uint64_t& publisherId_,
265  amps_uint64_t& sequenceNumber_)
266  {
267  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
268  }
269 
270  // Check to see if this message is older than the most recent one seen,
271  // and if it is, check if it discarded.
272  bool isDiscarded(const Message::Field& bookmark_)
273  {
274  Lock<Mutex> guard(_subLock);
275  if (BookmarkRange::isRange(bookmark_))
276  {
277  return false;
278  }
279  // Check if we've already recovered this bookmark
280  size_t recoveredIdx = recover(bookmark_, false);
281 
282  amps_uint64_t publisher, sequence;
283  parseBookmark(bookmark_, publisher, sequence);
284  // Compare it to our publishers map
285  PublisherIterator pub = _publishers.find(publisher);
286  if (pub == _publishers.end() || pub->second < sequence)
287  {
288  _publishers[publisher] = sequence;
289  if (recoveredIdx == AMPS_UNSET_INDEX)
290  {
291  return false;
292  }
293  }
294  if (recoveredIdx != AMPS_UNSET_INDEX)
295  {
296  if (!_entries[recoveredIdx]._active)
297  {
298  _recovered.erase(bookmark_);
299  return true;
300  }
301  return false;
302  }
303  // During recovery, we don't really care if it's been discarded
304  // or not. We just want _publishers updated. No need for the
305  // costly linear search.
306  if (_store->_recovering)
307  {
308  return false;
309  }
310  // During failure and recovery scenarios, we'll see out of order
311  // bookmarks arrive, either because (a) we're replaying or (b)
312  // a publisher has cut over, and we've cut over to a new server.
313  // Scan the list to see if we have a match.
314  size_t base = _leastBase;
315  for (size_t i = _least; i + base < _current + _currentBase; i++)
316  {
317  if ( i >= _entriesLength )
318  {
319  i = 0;
320  base = _currentBase;
321  }
322  if (_entries[i]._val == bookmark_)
323  {
324  return !_entries[i]._active;
325  }
326  }
327 
328  return true; // message is totally discarded
329  }
330 
331  bool empty(void) const
332  {
333  if (_least == AMPS_UNSET_INDEX ||
334  ((_least + _leastBase) == (_current + _currentBase) &&
335  _recoveryMin == AMPS_UNSET_INDEX))
336  {
337  return true;
338  }
339  return false;
340  }
341 
342  void updateMostRecent()
343  {
344  Lock<Mutex> guard(_subLock);
345  _updateMostRecent();
346  }
347 
348  const BookmarkRange& getRange() const
349  {
350  return _range;
351  }
352 
353  Message::Field getMostRecentList(bool usePublishersList_ = true)
354  {
355  Lock<Mutex> guard(_subLock);
356  bool useLastPersisted = !_lastPersisted.empty() &&
357  _lastPersisted.len() > 1;
358  // when this is called, we'll take a moment to update the list
359  // of things recovered,
360  // so we don't accidentally log anything we ought not to.
361  _updateMostRecent();
362  bool useRecent = !_recent.empty() && _recent.len() > 1;
363  amps_uint64_t lastPublisher = 0;
364  amps_uint64_t lastSeq = 0;
365  amps_uint64_t recentPublisher = 0;
366  amps_uint64_t recentSeq = 0;
367  if (useLastPersisted)
368  {
369  parseBookmark(_lastPersisted, lastPublisher, lastSeq);
370  }
371  if (useRecent)
372  {
373  parseBookmark(_recent, recentPublisher, recentSeq);
374  if (empty() && useLastPersisted)
375  {
376  useRecent = false;
377  }
378  else
379  {
380  if (useLastPersisted && lastPublisher == recentPublisher)
381  {
382  if (lastSeq <= recentSeq)
383  {
384  useRecent = false;
385  }
386  else
387  {
388  useLastPersisted = false;
389  }
390  }
391  }
392  }
393  // Set size for all bookmarks that will be used
394  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
395  if (useRecent)
396  {
397  totalLen += _recent.len() + 1;
398  }
399  // If we don't have a non-EPOCH persisted ack and we don't have a
400  // non-EPOCH most recent bookmark, OR we have a range
401  // we can build a list based on all the publishers instead.
402  if (usePublishersList_
403  && ((!useLastPersisted && !useRecent)
404  || _lastPersisted == AMPS_BOOKMARK_EPOCH))
405  {
406  std::ostringstream os;
407  for (PublisherIterator pub = _publishers.begin();
408  pub != _publishers.end(); ++pub)
409  {
410  if (pub->first == 0 && pub->second == 0)
411  {
412  continue;
413  }
414  if (pub->first == recentPublisher && recentSeq < pub->second)
415  {
416  os << recentPublisher << '|' << recentSeq << "|,";
417  }
418  else
419  {
420  os << pub->first << '|' << pub->second << "|,";
421  }
422  }
423  std::string recent = os.str();
424  totalLen = recent.length();
425  if (!recent.empty())
426  {
427  if (!_recoveryTimestamp.empty())
428  {
429  totalLen += _recoveryTimestamp.len();
430  recent += std::string(_recoveryTimestamp);
431  }
432  else
433  {
434  // Remove trailing ,
435  recent.erase(--totalLen);
436  }
437  // Reset _recentList to new value and return it
438  _recentList.clear();
439  _recentList = Message::Field(recent).deepCopy();
440  if (_range.isValid())
441  {
442  if (_range.getStart() != recent
443  && _recentList != AMPS_BOOKMARK_EPOCH)
444  {
445  _range.replaceStart(_recentList, true);
446  }
447  else if (_range.isStartInclusive())
448  {
449  amps_uint64_t publisher, sequence;
450  parseBookmark(_range.getStart(), publisher,
451  sequence);
452  PublisherIterator pub = _publishers.find(publisher);
453  if (pub != _publishers.end()
454  && pub->second >= sequence)
455  {
456  _range.makeStartExclusive();
457  }
458  }
459  return _range;
460  }
461  return _recentList;
462  }
463  if (_range.isValid())
464  {
465  return _range;
466  }
467  }
468  if (!_recoveryTimestamp.empty() && !_range.isValid())
469  {
470  totalLen += _recoveryTimestamp.len() + 1;
471  }
472  // If we have nothing discarded, return EPOCH
473  if (totalLen == 0
474  || (_recent.len() < 2 && !empty()))
475  {
476  if (_range.isValid())
477  {
478  return _range;
479  }
480  if (!useRecent)
481  {
483  }
484  _setLastPersistedToEpoch();
485  return _lastPersisted;
486  }
487  // Remove the trailing , from the length
488  totalLen -= 1;
489  char* field = new char[totalLen];
490  size_t len = 0;
491  if (useRecent)
492  {
493  len = _recent.len();
494  memcpy(field, _recent.data(), len);
495  if (len < totalLen)
496  {
497  field[len++] = ',';
498  }
499  }
500  if (useLastPersisted)
501  {
502  memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
503  len += _lastPersisted.len();
504  if (len < totalLen)
505  {
506  field[len++] = ',';
507  }
508  }
509  if (!_recoveryTimestamp.empty() && !_range.isValid())
510  {
511  memcpy(field + len, _recoveryTimestamp.data(),
512  _recoveryTimestamp.len());
513  // If more is to be written after this, uncomment the following
514  //len += _lastPersisted.len();
515  //if (len < totalLen) field[len++] = ',';
516  }
517  // _recentList clear will delete[] current buffer and assign will get cleared
518  _recentList.clear();
519  _recentList.assign(field, totalLen);
520  if (_range.isValid())
521  {
522  if (_recentList != AMPS_BOOKMARK_EPOCH)
523  {
524  if (_range.getStart() != _recentList)
525  {
526  _range.replaceStart(_recentList, true);
527  }
528  else if (_range.isStartInclusive())
529  {
530  amps_uint64_t publisher, sequence;
531  parseBookmark(_range.getStart(), publisher,
532  sequence);
533  PublisherIterator pub = _publishers.find(publisher);
534  if (pub != _publishers.end()
535  && pub->second >= sequence)
536  {
537  _range.makeStartExclusive();
538  }
539  }
540  }
541  return _range;
542  }
543  return _recentList;
544  }
545 
546  Message::Field getMostRecent(bool update_ = false)
547  {
548  Lock<Mutex> guard(_subLock);
549  // Return the same as last time if nothing's changed
550  // _recent is the most recent bookmark.
551  if (update_ && _store->_recentChanged)
552  {
553  _updateMostRecent();
554  }
555  if (_recent.empty())
556  {
558  }
559  else
560  {
561  return _recent;
562  }
563  }
564 
565  Message::Field getLastPersisted()
566  {
567  Lock<Mutex> guard(_subLock);
568  return _lastPersisted;
569  }
570 
571  void setMostRecent(const Message::Field& recent_)
572  {
573  _recent.clear();
574  _recent.deepCopy(recent_);
575  }
576 
577  void setRecoveryTimestamp(const char* recoveryTimestamp_,
578  size_t len_ = 0)
579  {
580  _recoveryTimestamp.clear();
581  size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
582  char* ts = new char[len];
583  memcpy((void*)ts, (const void*)recoveryTimestamp_, len);
584  _recoveryTimestamp.assign(ts, len);
585  }
586 
587  void moveEntries(char* old_, char* new_, size_t newSize_)
588  {
589  size_t least = _least;
590  size_t leastBase = _leastBase;
591  if (_recoveryMin != AMPS_UNSET_INDEX)
592  {
593  least = _recoveryMin;
594  leastBase = _recoveryBase;
595  }
596  // First check if we grew in place, if so, just move current after least
597  if (old_ == new_)
598  {
599  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
600  {
601  memcpy(new_ + (sizeof(Entry)*_entriesLength),
602  old_, (sizeof(Entry)*least));
603  // Clear the beginning where those entries were
604  memset(old_, 0, sizeof(Entry)*least);
605  }
606  else // We have to use an intermediate buffer
607  {
608  Entry* buffer = new Entry[least];
609  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
610  //Put the beginning entries at the start of the new buffer
611  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
612  (_entriesLength - least)*sizeof(Entry));
613  //Put the end entries after the beginning entries
614  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
615  (void*)buffer, least * sizeof(Entry));
616  // Least is now at 0 so base must be increased
617  leastBase += least;
618  least = 0;
619  delete [] buffer;
620  }
621  }
622  else
623  {
624  //Put the beginning entries at the start of the new buffer
625  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
626  (_entriesLength - least)*sizeof(Entry));
627  //Put the end entries after the beginning entries
628  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
629  (void*)old_, least * sizeof(Entry));
630  // Least is now at 0 so base must be increased
631  leastBase += least;
632  least = 0;
633  }
634  if (_recoveryMin != AMPS_UNSET_INDEX)
635  {
636  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
637  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
638  (_recoveryMin + _recoveryBase);
639  _recoveryMaxBase = leastBase;
640  _recoveryMin = least;
641  _recoveryBase = leastBase;
642  }
643  else
644  {
645  _least = least;
646  }
647  _leastBase = leastBase;
648  // Current is now after everything and using the same base
649  _currentBase = _leastBase;
650  _current = least + _entriesLength;
651  }
652 
653  inline size_t getOldestBookmarkSeq()
654  {
655  Lock<Mutex> guard(_subLock);
656  // If there is nothing in the store, return -1, otherwise return lowest
657  return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
658  _least + _leastBase;
659  }
660 
661  bool lastPersisted(const Message::Field& bookmark_)
662  {
663  // These shouldn't be persisted
664  if (bookmark_ == AMPS_BOOKMARK_NOW
665  || BookmarkRange::isRange(bookmark_))
666  {
667  return false;
668  }
669  Lock<Mutex> guard(_subLock);
670  return _setLastPersisted(bookmark_);
671  }
672 
673  bool _setLastPersisted(const Message::Field& bookmark_)
674  {
675  if (!_lastPersisted.empty())
676  {
677  amps_uint64_t publisher, publisher_lastPersisted;
678  amps_uint64_t sequence, sequence_lastPersisted;
679  parseBookmark(bookmark_, publisher, sequence);
680  parseBookmark(_lastPersisted, publisher_lastPersisted,
681  sequence_lastPersisted);
682  if (publisher == publisher_lastPersisted &&
683  sequence <= sequence_lastPersisted)
684  {
685  return false;
686  }
687  }
688  // deepCopy will clear what's in _lastPersisted
689  _lastPersisted.deepCopy(bookmark_);
690  _store->_recentChanged = true;
691  _recoveryTimestamp.clear();
692  return true;
693  }
694 
695  Message::Field lastPersisted(size_t bookmark_)
696  {
697  Lock<Mutex> guard(_subLock);
698  Message::Field& bookmark = _entries[bookmark_]._val;
699  // These shouldn't be persisted
700  if (bookmark == AMPS_BOOKMARK_NOW
701  || BookmarkRange::isRange(bookmark))
702  {
703  return bookmark;
704  }
705  _setLastPersisted(bookmark);
706  return bookmark;
707  }
708 
709  // Returns the index of the recovered item, either the index where it
710  // was first stored prior to getMostRecent, or the new index if it is
711  // relogged either because this is called from log() or because it was
712  // not active but also not persisted.
713  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
714  {
715  size_t retVal = AMPS_UNSET_INDEX;
716  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
717  {
718  return retVal;
719  }
720  // Check if this is a recovered bookmark.
721  // If so, copy the existing one to the new location
722  RecoveryIterator item = _recovered.find(bookmark_);
723  if (item != _recovered.end())
724  {
725  size_t seqNo = item->second;
726  size_t index = (seqNo - _recoveryBase) % _entriesLength;
727  // If we only have recovery entries and isDiscarded is
728  // checking on an already discarded entry, update recent.
729  if (_least + _leastBase == _current + _currentBase &&
730  !_entries[index]._active)
731  {
732  _store->_recentChanged = true;
733  _recent.clear();
734  _recent = _entries[index]._val.deepCopy();
735  retVal = moveEntry(index);
736  if (retVal == AMPS_UNSET_INDEX)
737  {
738  recover(bookmark_, relogIfNotDiscarded);
739  }
740  _least = _current;
741  _leastBase = _currentBase;
742  }
743  else if (!_entries[index]._active || relogIfNotDiscarded)
744  {
745  retVal = moveEntry(index);
746  if (retVal == AMPS_UNSET_INDEX)
747  {
748  recover(bookmark_, relogIfNotDiscarded);
749  }
750  }
751  else
752  {
753  return index;
754  }
755  _recovered.erase(item);
756  if (_recovered.empty())
757  {
758  _recoveryMin = AMPS_UNSET_INDEX;
759  _recoveryBase = AMPS_UNSET_INDEX;
760  _recoveryMax = AMPS_UNSET_INDEX;
761  _recoveryMaxBase = AMPS_UNSET_INDEX;
762  }
763  else if (index == _recoveryMin)
764  {
765  while (_entries[_recoveryMin]._val.empty() &&
766  (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
767  {
768  if (++_recoveryMin == _entriesLength)
769  {
770  _recoveryMin = 0;
771  _recoveryBase += _entriesLength;
772  }
773  }
774  }
775  }
776  return retVal;
777  }
778 
779  // Return the id of this Subscription
780  Message::Field id() const
781  {
782  return _id;
783  }
784 
785  struct Entry
786  {
787  Message::Field _val; //16
788  bool _active; //17
789  char _padding[32 - sizeof(Message::Field) - sizeof(bool)]; //32
790 
791  Entry() : _active(false)
792  {
793  ;
794  }
795  };
796 
797  struct EntryHash
798  {
799  Field::FieldHash _hasher;
800 
801  size_t operator()(const Entry* entryPtr_) const
802  {
803  return _hasher(entryPtr_->_val);
804  }
805 
806  bool operator()(const Entry* lhsPtr_, const Entry* rhsPtr_) const
807  {
808  return _hasher(lhsPtr_->_val, rhsPtr_->_val);
809  }
810  };
811 
812  //typedef std::set<Entry*, EntryHash> EntryPtrList;
813  typedef std::vector<Entry*> EntryPtrList;
814 
815  void getRecoveryEntries(EntryPtrList& list_)
816  {
817  if (_recoveryMin == AMPS_UNSET_INDEX ||
818  _recoveryMax == AMPS_UNSET_INDEX)
819  {
820  return;
821  }
822  size_t base = _recoveryBase;
823  size_t max = _recoveryMax + _recoveryMaxBase;
824  for (size_t i = _recoveryMin; i + base < max; ++i)
825  {
826  if (i == _entriesLength)
827  {
828  i = 0;
829  base = _recoveryMaxBase;
830  }
831  //list_.insert(&(_entries[i]));
832  list_.push_back(&(_entries[i]));
833  }
834  return;
835  }
836 
837  void getActiveEntries(EntryPtrList& list_)
838  {
839  size_t base = _leastBase;
840  for (size_t i = _least; i + base < _current + _currentBase; ++i)
841  {
842  if (i >= _entriesLength)
843  {
844  i = 0;
845  base = _currentBase;
846  }
847  //list_.insert(&(_entries[i]));
848  list_.push_back(&(_entries[i]));
849  }
850  return;
851  }
852 
853  Entry* getEntryByIndex(size_t index_)
854  {
855  Lock<Mutex> guard(_subLock);
856  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
857  index_ >= _least + _leastBase)
858  ? _leastBase : _recoveryBase;
859  // Return NULL if not a valid index
860  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
861  _least + _leastBase :
862  _recoveryMin + _recoveryBase);
863  if (index_ >= _current + _currentBase || index_ < min)
864  {
865  return NULL;
866  }
867  return &(_entries[(index_ - base) % _entriesLength]);
868  }
869 
870  void justRecovered()
871  {
872  Lock<Mutex> guard(_subLock);
873  _updateMostRecent();
874  EntryPtrList list;
875  getRecoveryEntries(list);
876  setPublishersToDiscarded(&list, &_publishers);
877  }
878 
879  void setPublishersToDiscarded(EntryPtrList* recovered_,
880  PublisherMap* publishers_)
881  {
882  // Need to reset publishers to only have up to the last
883  // discarded sequence number. Messages that were in transit
884  // during previous run but not discarded should be considered
885  // new and not duplicate after a restart/recovery.
886  for (EntryPtrList::iterator i = recovered_->begin();
887  i != recovered_->end(); ++i)
888  {
889  if ((*i)->_val.empty())
890  {
891  continue;
892  }
893  amps_uint64_t publisher = (amps_uint64_t)0;
894  amps_uint64_t sequence = (amps_uint64_t)0;
895  parseBookmark((*i)->_val, publisher, sequence);
896  if (publisher && sequence && (*i)->_active &&
897  (*publishers_)[publisher] >= sequence)
898  {
899  (*publishers_)[publisher] = sequence - 1;
900  }
901  }
902  }
903 
904  void clearLastPersisted()
905  {
906  Lock<Mutex> guard(_subLock);
907  _lastPersisted.clear();
908  }
909 
910  void setLastPersistedToEpoch()
911  {
912  Lock<Mutex> guard(_subLock);
913  _setLastPersistedToEpoch();
914  }
915 
916  private:
917  Subscription(const Subscription&);
918  Subscription& operator=(const Subscription&);
919 
920  size_t moveEntry(size_t index_)
921  {
922  // Check for wrap
923  if (_current >= _entriesLength)
924  {
925  _current = 0;
926  _currentBase += _entriesLength;
927  }
928  // Check for resize
929  // If list is too small, double it
930  if ((_current == _least % _entriesLength &&
931  _leastBase < _currentBase) ||
932  (_current == _recoveryMin && _recoveryBase < _currentBase))
933  {
934  if (!_store->resize(_id, (char**)&_entries,
935  sizeof(Entry) * _entriesLength * 2))
936  {
937  return AMPS_UNSET_INDEX;
938  }
939  // Length was doubled
940  _entriesLength *= 2;
941  }
942  _entries[_current]._val = _entries[index_]._val;
943  _entries[_current]._active = _entries[index_]._active;
944  // No need to clear Field, just set it to empty
945  _entries[index_]._val.assign(NULL, 0);
946  _entries[index_]._active = false;
947  return _current++;
948  }
949 
950  void _setLastPersistedToEpoch()
951  {
952  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
953  char* field = new char[fieldLen];
954  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
955  _lastPersisted.clear();
956  _lastPersisted.assign(field, fieldLen);
957  }
958 
959  bool _discard(size_t index_)
960  {
961  bool retVal = false;
962  // Lock should already be held
963  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
964  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
965  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
966  || index_ >= _least + _leastBase)
967  ? _leastBase : _recoveryBase;
968  // discard of a record not in the log is a no-op
969  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
970  _recoveryMin + _recoveryBase);
971  if (index_ >= _current + _currentBase || index_ < min)
972  {
973  return retVal;
974  }
975 
976  // log that this one is discarded, then
977  // recalculate what the most recent entry is.
978  Entry& e = _entries[(index_ - base) % _entriesLength];
979  e._active = false;
980 
981  size_t index = index_;
982  if (_recoveryMin != AMPS_UNSET_INDEX &&
983  index_ == _recoveryMin + _recoveryBase)
984  {
985  // Find all to discard
986  size_t j = _recoveryMin;
987  while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
988  !_entries[j]._active)
989  {
990  // This index might be left-over from a slow discard and we
991  // may have reconnected. We have a few possibilities at this point.
992  // 1. If we re-logged this bookmark, this index will point at an
993  // empty bookmark. This could happen if the discard thread was slow
994  // and the reconnect was fast. We wouldn't report the
995  // the re-arrival of the bookmark as a duplicate because it
996  // hadn't been marked as discarded. In this case, we have to
997  // simply move past this in the recovery area.
998  // 2. This bookmark should become _recent because we haven't
999  // yet received anything since our last call to getMostRecent.
1000  // In this case, we need to take it out of recovered but not
1001  // clear it. The publishers map should report it as duplicate.
1002  // 3. This is the 'oldest' recovered, but we have received new
1003  // bookmarks since we got this one. We can clear it because the
1004  // publishers map should report it as a duplicate if/when it
1005  // does arrive again. Move the _recoveryMin ahead and remove it
1006  // from recovered.
1007  Message::Field& bookmark = _entries[j]._val;
1008  // Option 1 skips this and just moves on
1009  if (!bookmark.empty())
1010  {
1011  _recovered.erase(bookmark);
1012  // Make sure our publishers map will mark it discarded
1013  amps_uint64_t publisher, sequence;
1014  parseBookmark(bookmark, publisher, sequence);
1015  PublisherIterator pub = _publishers.find(publisher);
1016  if (pub == _publishers.end() || pub->second < sequence)
1017  {
1018  _publishers[publisher] = sequence;
1019  }
1020  if (_least + _leastBase == _current + _currentBase ||
1021  ((_least + _leastBase) % _entriesLength) ==
1022  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
1023  {
1024  // Option 2, reset recent
1025  retVal = true;
1026  _store->_recentChanged = true;
1027  _recoveryTimestamp.clear();
1028  _recent.clear();
1029  _recent = bookmark;
1030  bookmark.assign(NULL, 0);
1031  }
1032  else
1033  {
1034  // Option 3, simply clear this one
1035  bookmark.clear();
1036  }
1037  }
1038  // If we reach the buffer end,
1039  // keep checking from the beginning
1040  if (++j == _entriesLength)
1041  {
1042  // Least has now loooped around
1043  _recoveryBase += _entriesLength;
1044  j = 0;
1045  }
1046  }
1047  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1048  _recovered.empty());
1049  if (_recovered.empty())
1050  {
1051  _recoveryMin = AMPS_UNSET_INDEX;
1052  _recoveryBase = AMPS_UNSET_INDEX;
1053  _recoveryMax = AMPS_UNSET_INDEX;
1054  _recoveryMaxBase = AMPS_UNSET_INDEX;
1055  // Cleared recovered, want to check onward
1056  index = _least + _leastBase;
1057  }
1058  else
1059  {
1060  _recoveryMin = j;
1061  }
1062  }
1063  // if this is the first item in the list, discard all inactive ones
1064  // as long as recovery also says its okay
1065  if (index == _least + _leastBase)
1066  {
1067  // Find all to discard
1068  size_t j = _least;
1069  while (j + _leastBase < _current + _currentBase &&
1070  !_entries[j]._active)
1071  {
1072  //Must free associated memory
1073  _recent.clear();
1074  _recent = _entries[j]._val;
1075  _entries[j]._val.assign(NULL, 0);
1076  _store->_recentChanged = true;
1077  retVal = true;
1078  _recoveryTimestamp.clear();
1079  // If we reach the buffer end,
1080  // keep checking from the beginning
1081  if (++j == _entriesLength)
1082  {
1083  // Least has now loooped around
1084  _leastBase += _entriesLength;
1085  j = 0;
1086  }
1087  }
1088  _least = j;
1089  }
1090  return retVal;
1091  }
1092 
1093  void _updateMostRecent()
1094  {
1095  // Lock is already held
1096  _recovered.clear();
1097  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1098  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1099  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1100  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1101  _recoveryMin = AMPS_UNSET_INDEX;
1102  _recoveryBase = AMPS_UNSET_INDEX;
1103  _recoveryMax = AMPS_UNSET_INDEX;
1104  _recoveryMaxBase = AMPS_UNSET_INDEX;
1105  for (size_t i = start; i + base < _current + _currentBase; i++)
1106  {
1107  if ( i >= _entriesLength )
1108  {
1109  i = 0;
1110  base = _currentBase;
1111  }
1112  if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1113  {
1114  continue;
1115  }
1116  Entry& entry = _entries[i];
1117  if (!entry._val.empty())
1118  {
1119  _recovered[entry._val] = i + base;
1120  if (_recoveryMin == AMPS_UNSET_INDEX)
1121  {
1122  _recoveryMin = i;
1123  _recoveryBase = base;
1124  _recoveryMax = _current;
1125  _recoveryMaxBase = _currentBase;
1126  }
1127  }
1128  }
1129  if (_current == _entriesLength)
1130  {
1131  _current = 0;
1132  _currentBase += _entriesLength;
1133  }
1134  _least = _current;
1135  _leastBase = _currentBase;
1136  }
1137 
1138  Message::Field _id;
1139  Message::Field _recent;
1140  Message::Field _lastPersisted;
1141  Message::Field _recentList;
1142  BookmarkRange _range;
1143  Message::Field _recoveryTimestamp;
1144  size_t _current;
1145  size_t _currentBase;
1146  size_t _least;
1147  size_t _leastBase;
1148  size_t _recoveryMin;
1149  size_t _recoveryBase;
1150  size_t _recoveryMax;
1151  size_t _recoveryMaxBase;
1152  size_t _entriesLength;
1153  Entry* _entries;
1154  MemoryBookmarkStore* _store;
1155  Mutex _subLock;
1156  RecoveryMap _recovered;
1157  public:
1158  PublisherMap _publishers;
1159  };
1160 
1161  public:
1165  _subsLock(),
1166  _lock(),
1167  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1168  _recentChanged(true),
1169  _recovering(false),
1170  _recoveryPointAdapter(NULL),
1171  _recoveryPointFactory(NULL)
1172  { ; }
1173 
1174  typedef RecoveryPointAdapter::iterator RecoveryIterator;
1175 
1183  RecoveryPointFactory factory_ = NULL)
1184  : BookmarkStoreImpl()
1185  , _subsLock()
1186  , _lock()
1187  , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1188  , _recentChanged(true)
1189  , _recovering(true)
1190  , _recoveryPointAdapter(adapter_)
1191  , _recoveryPointFactory(factory_)
1192  {
1193  Message msg;
1194  if (!_recoveryPointFactory)
1195  {
1196  _recoveryPointFactory = &FixedRecoveryPoint::create;
1197  }
1198  for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1199  recoveryPoint != _recoveryPointAdapter.end();
1200  ++recoveryPoint)
1201  {
1202  Field subId(recoveryPoint->getSubId());
1203  msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1204  msg.setSubId(subId);
1205  Field bookmark = recoveryPoint->getBookmark();
1206  if (BookmarkRange::isRange(bookmark))
1207  {
1208  msg.setBookmark(bookmark);
1209  _log(msg);
1210  }
1211  else
1212  {
1213  std::vector<Field> bmList = Field::parseBookmarkList(bookmark);
1214  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
1215  {
1216  if (Field::isTimestamp(*bkmk))
1217  {
1218  find(subId)->setRecoveryTimestamp(bkmk->data(), bkmk->len());
1219  }
1220  else
1221  {
1222  msg.assignBookmark(bkmk->data(), bkmk->len());
1223  _isDiscarded(msg);
1224  _log(msg);
1225  _discard(msg);
1226  }
1227  }
1228  // Reset to original bookmark
1229  msg.setBookmark(bookmark);
1230  }
1231  }
1232  _recovering = false;
1233  }
1234 
1235  virtual ~MemoryBookmarkStore()
1236  {
1237  if (_recoveryPointAdapter.isValid())
1238  {
1239  _recoveryPointAdapter.close();
1240  }
1241  __purge();
1242  }
1243 
1249  virtual size_t log(Message& message_)
1250  {
1251  Lock<Mutex> guard(_lock);
1252  return _log(message_);
1253  }
1254 
1260  virtual void discard(const Message& message_)
1261  {
1262  Lock<Mutex> guard(_lock);
1263  (void)_discard(message_);
1264  }
1265 
1273  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1274  {
1275  Lock<Mutex> guard(_lock);
1276  (void)_discard(subId_, bookmarkSeqNo_);
1277  }
1278 
1285  {
1286  Lock<Mutex> guard(_lock);
1287  return _getMostRecent(subId_);
1288  }
1289 
1298  virtual bool isDiscarded(Message& message_)
1299  {
1300  Lock<Mutex> guard(_lock);
1301  return _isDiscarded(message_);
1302  }
1303 
1309  virtual void purge()
1310  {
1311  Lock<Mutex> guard(_lock);
1312  _purge();
1313  }
1314 
1320  virtual void purge(const Message::Field& subId_)
1321  {
1322  Lock<Mutex> guard(_lock);
1323  _purge(subId_);
1324  }
1325 
1330  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1331  {
1332  Lock<Mutex> guard(_lock);
1333  return _getOldestBookmarkSeq(subId_);
1334  }
1335 
1341  virtual void persisted(const Message::Field& subId_,
1342  const Message::Field& bookmark_)
1343  {
1344  Lock<Mutex> guard(_lock);
1345  _persisted(find(subId_), bookmark_);
1346  }
1347 
1354  virtual Message::Field persisted(const Message::Field& subId_,
1355  size_t bookmark_)
1356  {
1357  Lock<Mutex> guard(_lock);
1358  return _persisted(find(subId_), bookmark_);
1359  }
1360 
1365  void setServerVersion(const VersionInfo& version_)
1366  {
1367  setServerVersion(version_.getOldStyleVersion());
1368  }
1369 
1374  void setServerVersion(size_t version_)
1375  {
1376  Lock<Mutex> guard(_subsLock);
1377  _serverVersion = version_;
1378  }
1379 
1380  inline bool isWritableBookmark(size_t length)
1381  {
1382  return length >= AMPS_MIN_BOOKMARK_LEN;
1383  }
1384 
1385  typedef Subscription::EntryPtrList EntryPtrList;
1386 
1387  protected:
1388 
1389  // Called once lock is acquired
1390  size_t _log(Message& message_)
1391  {
1392  Message::Field bookmark = message_.getBookmark();
1393  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1394  if (!pSub)
1395  {
1396  Message::Field subId = message_.getSubscriptionId();
1397  if (subId.empty())
1398  {
1399  subId = message_.getSubscriptionIds();
1400  }
1401  pSub = find(subId);
1402  message_.setSubscriptionHandle(
1403  static_cast<amps_subscription_handle>(pSub));
1404  }
1405  size_t retVal = pSub->log(bookmark);
1406  message_.setBookmarkSeqNo(retVal);
1407  return retVal;
1408  }
1409 
1410  // Called once lock is acquired, or from ctor
1411  bool _discard(const Message& message_)
1412  {
1413  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1414  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1415  if (!pSub)
1416  {
1417  Message::Field subId = message_.getSubscriptionId();
1418  if (subId.empty())
1419  {
1420  subId = message_.getSubscriptionIds();
1421  }
1422  pSub = find(subId);
1423  }
1424  bool retVal = pSub->discard(bookmarkSeqNo);
1425  if (retVal)
1426  {
1427  updateAdapter(pSub);
1428  }
1429  return retVal;
1430  }
1431 
1432  // Called once lock is acquired
1433  bool _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1434  {
1435  Subscription* pSub = find(subId_);
1436  bool retVal = pSub->discard(bookmarkSeqNo_);
1437  if (retVal)
1438  {
1439  updateAdapter(pSub);
1440  }
1441  return retVal;
1442  }
1443 
1444  // Called once lock is acquired
1445  Message::Field _getMostRecent(const Message::Field& subId_,
1446  bool usePublishersList_ = true)
1447  {
1448  Subscription* pSub = find(subId_);
1449  return pSub->getMostRecentList(usePublishersList_);
1450  }
1451 
1452  // Called once lock is acquired
1453  bool _isDiscarded(Message& message_)
1454  {
1455  Message::Field subId = message_.getSubscriptionId();
1456  if (subId.empty())
1457  {
1458  subId = message_.getSubscriptionIds();
1459  }
1460  Subscription* pSub = find(subId);
1461  message_.setSubscriptionHandle(
1462  static_cast<amps_subscription_handle>(pSub));
1463  return pSub->isDiscarded(message_.getBookmark());
1464  }
1465 
1466  // Called once lock is acquired
1467  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1468  {
1469  Subscription* pSub = find(subId_);
1470  return pSub->getOldestBookmarkSeq();
1471  }
1472 
1473  // Called once lock is acquired
1474  virtual void _persisted(Subscription* pSub_,
1475  const Message::Field& bookmark_)
1476  {
1477  if (pSub_->lastPersisted(bookmark_))
1478  {
1479  updateAdapter(pSub_);
1480  }
1481  }
1482 
1483  // Called once lock is acquired
1484  virtual Message::Field _persisted(Subscription* pSub_, size_t bookmark_)
1485  {
1486  return pSub_->lastPersisted(bookmark_);
1487  }
1488 
1489  // Called once lock is acquired
1490  void _purge()
1491  {
1492  if (_recoveryPointAdapter.isValid())
1493  {
1494  _recoveryPointAdapter.purge();
1495  }
1496  __purge();
1497  }
1498 
1499  // Called once lock is acquired
1500  void __purge()
1501  {
1502  // Walk through list and clear Fields before calling clear
1503  while (!_subs.empty())
1504  {
1505  SubscriptionMap::iterator iter = _subs.begin();
1506  //The subId key is cleared when deleting the Subscription, which shares
1507  //the _data pointer in its id field.
1508  const_cast<Message::Field&>(iter->first).clear();
1509  delete (iter->second);
1510  _subs.erase(iter);
1511  }
1512  _subs.clear();
1513  }
1514 
1515  // Called once lock is acquired
1516  virtual void _purge(const Message::Field& subId_)
1517  {
1518  if (_recoveryPointAdapter.isValid())
1519  {
1520  _recoveryPointAdapter.purge(subId_);
1521  }
1522  __purge(subId_);
1523  }
1524 
1525  // Called once lock is acquired
1526  virtual void __purge(const Message::Field& subId_)
1527  {
1528  Lock<Mutex> guard(_subsLock);
1529  SubscriptionMap::iterator iter = _subs.find(subId_);
1530  if (iter == _subs.end())
1531  {
1532  return;
1533  }
1534  const_cast<Message::Field&>(iter->first).clear();
1535  delete (iter->second);
1536  _subs.erase(iter);
1537  }
1538 
1539  // Can be used by subclasses during recovery
1540  void setMostRecent(const Message::Field& subId_,
1541  const Message::Field& recent_)
1542  {
1543  find(subId_)->setMostRecent(recent_);
1544  }
1545 
1546  Mutex _subsLock;
1547  Mutex _lock;
1548  static const char ENTRY_BOOKMARK = 'b';
1549  static const char ENTRY_DISCARD = 'd';
1550  static const char ENTRY_PERSISTED = 'p';
1551 
1552  virtual Subscription* find(const Message::Field& subId_)
1553  {
1554  if (subId_.empty())
1555  {
1556  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1557  }
1558  Lock<Mutex> guard(_subsLock);
1559  if (_subs.count(subId_) == 0)
1560  {
1561  // Subscription will be created
1562  Message::Field id;
1563  id.deepCopy(subId_);
1564  _subs[id] = new Subscription(this, id);
1565  return _subs[id];
1566  }
1567  return _subs[subId_];
1568  }
1569 
1570  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1571  bool callResizeHandler_ = true)
1572  {
1573  assert(newBuffer_ != 0);
1574  if (size_ == 0) // Delete the buffer
1575  {
1576  if (*newBuffer_)
1577  {
1578  free(*newBuffer_);
1579  *newBuffer_ = NULL;
1580  }
1581  return true;
1582  }
1583  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1584  {
1585  return false;
1586  }
1587  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1588  *newBuffer_ = (char*)malloc(size_);
1589  memset(*newBuffer_, 0, size_);
1590  if (oldBuffer)
1591  {
1592  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1593  free(oldBuffer);
1594  }
1595  return true;
1596  }
1597 
1598  protected:
1599  void updateAdapter(Subscription* pSub_)
1600  {
1601  if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1602  {
1603  return;
1604  }
1605  // We have to make a copy here, since the return from getMostRecentList
1606  // could get changed once the lock is released.
1607  RecoveryPoint update = _recoveryPointFactory(pSub_->id(),
1608  pSub_->getMostRecentList(false)).deepCopy();
1609  Unlock<Mutex> unlock(_lock);
1610  _recoveryPointAdapter.update(update);
1611  // Free memmory of the copy, adapter should have copies of the fields
1612  update.clear();
1613  }
1614 
1615  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1616  SubscriptionMap _subs;
1617  size_t _serverVersion;
1618  bool _recentChanged;
1619  bool _recovering;
1620  typedef std::set<Subscription*> SubscriptionSet;
1621  RecoveryPointAdapter _recoveryPointAdapter;
1622  RecoveryPointFactory _recoveryPointFactory;
1623  };
1624 
1625 } // end namespace AMPS
1626 
1627 #endif //_MEMORYBOOKMARKSTORE_H_
1628 
Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
void clear()
Clear the internal state, possibly reclaiming memory.
Definition: RecoveryPoint.hpp:109
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1309
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1273
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1284
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1354
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1182
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1164
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1341
Message & assignBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1374
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1298
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1365
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1260
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1249
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
static RecoveryPoint create(const Field &subId_, const Field &bookmark_)
Use this function in BookmarkStore::setRecoveryPointFactory( std::bind(&FixedRecoveryPoint::create, std::placeholder::_1, std::placeholder::_2))
Definition: RecoveryPoint.hpp:139
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1330
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1320
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:102
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194