AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.4
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <amps/BookmarkStore.hpp>
30 #include <amps/Field.hpp>
31 #include <amps/Message.hpp>
32 #include <amps/RecoveryPoint.hpp>
34 #include <functional>
35 #include <map>
36 #include <sstream>
37 #include <vector>
38 #include <stdlib.h>
39 #include <assert.h>
40 
41 #define AMPS_MIN_BOOKMARK_LEN 3
42 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
43 
48 
49 namespace AMPS
50 {
51 
58  {
59  protected:
60  class Subscription
61  {
62  public:
63  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
64  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
65  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
66  RecoveryIterator;
67  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
68 
69  // Start sequence at 1 so that 0 can be used during file subclasses
70  // recovery as an indicator that a message wasn't logged because
71  // isDiscarded() was true.
72  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
73  : _current(1), _currentBase(0), _least(1), _leastBase(0)
74  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
75  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
76  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
77  , _store(store_)
78  {
79  // Need our own memory for the sub id
80  _id.deepCopy(id_);
81  _store->resize(_id, (char**)&_entries,
82  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
83  setLastPersistedToEpoch();
84  }
85 
86  ~Subscription()
87  {
88  Lock<Mutex> guard(_subLock);
89  if (_entries)
90  {
91  for (size_t i = 0; i < _entriesLength; ++i)
92  {
93  _entries[i]._val.clear();
94  }
95  // resize to 0 will free _entries
96  _store->resize(_id, (char**)&_entries, 0);
97  }
98  _id.clear();
99  _recent.clear();
100  _lastPersisted.clear();
101  _recentList.clear();
102  _range.clear();
103  _recoveryTimestamp.clear();
104  }
105 
106  size_t log(const Message::Field& bookmark_)
107  {
108  if (bookmark_ == AMPS_BOOKMARK_NOW)
109  {
110  return 0;
111  }
112  Lock<Mutex> guard(_subLock);
113  // Either relog the recovery or log it
114  size_t index = recover(bookmark_, true);
115  if (index == AMPS_UNSET_INDEX)
116  {
117  // Check for wrap
118  if (_current >= _entriesLength)
119  {
120  _current = 0;
121  _currentBase += _entriesLength;
122  }
123  // Check for resize
124  // If list is too small, double it
125  if ((_current == _least && _leastBase < _currentBase) ||
126  (_current == _recoveryMin && _recoveryBase < _currentBase))
127  {
128  if (!_store->resize(_id, (char**)&_entries,
129  sizeof(Entry) * _entriesLength * 2))
130  {
131  //Try again
132  return log(bookmark_);
133  }
134  // Length was doubled
135  _entriesLength *= 2;
136  }
137 
138  // Add this entry to the end of our list
139  /*
140  if (bookmark_ == AMPS_BOOKMARK_NOW)
141  {
142  // Save a now timestamp bookmark
143  char* nowTimestamp = new char[AMPS_TIMESTAMP_LEN];
144  struct tm timeInfo;
145  time_t now;
146  time(&now);
147  #ifdef _WIN32
148  gmtime_s(&timeInfo, &now);
149  #else
150  gmtime_r(&now, &timeInfo);
151  #endif
152  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
153  "%Y%m%dT%H%M%S", &timeInfo);
154  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
155  _entries[_current]._val.assign(nowTimestamp,
156  AMPS_TIMESTAMP_LEN);
157  _entries[_current]._active = false;
158  index = _current++;
159  return index + _currentBase;
160  }
161  else
162  */
163  {
164  // Is this an attempt at a range?
165  bool isRange = BookmarkRange::isRange(bookmark_);
166  bool startExclusive = false;
167  if (isRange)
168  {
169  _range.set(bookmark_);
170  // Stricter check on range syntax
171  if (!_range.isValid())
172  {
173  _range.clear();
174  throw CommandException("Invalid bookmark range specified.");
175  }
176  startExclusive = !_range.isStartInclusive();
177  }
178  if (isRange || _publishers.empty())
179  {
180  // Check validity and init publishers map for a range
181  Message::Field parseable = isRange ? _range.getStart() : bookmark_;
182  amps_uint64_t publisher, sequence;
183  std::vector<Field> bmList = Field::parseBookmarkList(parseable);
184  if (bmList.empty() && !Field::isTimestamp(parseable))
185  {
186  if (isRange)
187  {
188  _range.clear();
189  }
190  return 0;
191  }
192  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
193  {
194  parseBookmark(*bkmk, publisher, sequence);
195  if (publisher != (amps_uint64_t)0)
196  {
197  if (isRange && startExclusive)
198  {
199  // Compare it to our publishers map
200  PublisherIterator pub = _publishers.find(publisher);
201  if (pub == _publishers.end() || pub->second < sequence)
202  {
203  _publishers[publisher] = sequence;
204  }
205  }
206  }
207  else if (!Field::isTimestamp(*bkmk))
208  {
209  // This is an invalid bookmark, so don't save anything
210  if (isRange)
211  {
212  _range.clear();
213  if (startExclusive)
214  {
215  _publishers.clear();
216  }
217  }
218  return 0;
219  }
220  }
221  }
222  if (!isRange)
223  {
224  _entries[_current]._val.deepCopy(bookmark_);
225  }
226  else
227  {
228  Unlock<Mutex> unlock(_subLock);
229  _store->updateAdapter(this);
230  // Don't actually log a range
231  return 0;
232  }
233  }
234  _entries[_current]._active = true;
235  index = _current++;
236  }
237  return index + _currentBase;
238  }
239 
240  bool discard(size_t index_)
241  {
242  Lock<Mutex> guard(_subLock);
243  return _discard(index_);
244  }
245 
246  bool discard(const Message::Field& bookmark_)
247  {
248  // These are discarded when logged or not logged
249  if (bookmark_ == AMPS_BOOKMARK_NOW)
250  {
251  return false;
252  }
253  Lock<Mutex> guard(_subLock);
254  size_t search = _least;
255  size_t searchBase = _leastBase;
256  size_t searchMax = _current;
257  size_t searchMaxBase = _currentBase;
258  if (_least + _leastBase == _current + _currentBase)
259  {
260  if (_recoveryMin != AMPS_UNSET_INDEX)
261  {
262  search = _recoveryMin;
263  searchBase = _recoveryBase;
264  searchMax = _recoveryMax;
265  searchMaxBase = _recoveryMaxBase;
266  }
267  else // Store is empty, so nothing to do
268  {
269  return false;
270  }
271  }
272  assert(searchMax != AMPS_UNSET_INDEX);
273  assert(searchMaxBase != AMPS_UNSET_INDEX);
274  assert(search != AMPS_UNSET_INDEX);
275  assert(searchBase != AMPS_UNSET_INDEX);
276  // Search while we don't find the provided bookmark and we're in valid range
277  while (search + searchBase < searchMax + searchMaxBase)
278  {
279  if (_entries[search]._val == bookmark_)
280  {
281  return _discard(search + searchBase);
282  }
283  if (++search == _entriesLength)
284  {
285  // Least has now loooped around
286  searchBase += _entriesLength;
287  search = 0;
288  }
289  }
290  return false;
291  }
292 
293  // Get sequence number from a Field that is a bookmark
294  static void parseBookmark(const Message::Field& field_,
295  amps_uint64_t& publisherId_,
296  amps_uint64_t& sequenceNumber_)
297  {
298  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
299  }
300 
301  // Check to see if this message is older than the most recent one seen,
302  // and if it is, check if it discarded.
303  bool isDiscarded(const Message::Field& bookmark_)
304  {
305  Lock<Mutex> guard(_subLock);
306  if (BookmarkRange::isRange(bookmark_))
307  {
308  return false;
309  }
310 
311  amps_uint64_t publisher, sequence;
312  parseBookmark(bookmark_, publisher, sequence);
313  // Bookmarks like EPOCH, NOW, or invalid bookmarks we ignore
314  // A timestamp could be logged as the first bookmark when starting
315  // a new subscription.
316  if (publisher == 0 && !Field::isTimestamp(bookmark_))
317  {
318  return true;
319  }
320  // Check if we've already recovered this bookmark
321  size_t recoveredIdx = recover(bookmark_, false);
322  // Compare it to our publishers map
323  PublisherIterator pub = _publishers.find(publisher);
324  if (pub == _publishers.end() || pub->second < sequence)
325  {
326  _publishers[publisher] = sequence;
327  if (recoveredIdx == AMPS_UNSET_INDEX)
328  {
329  return false;
330  }
331  }
332  if (recoveredIdx != AMPS_UNSET_INDEX)
333  {
334  if (!_entries[recoveredIdx]._active)
335  {
336  _recovered.erase(bookmark_);
337  return true;
338  }
339  return false;
340  }
341  // During recovery, we don't really care if it's been discarded
342  // or not. We just want _publishers updated. No need for the
343  // costly linear search.
344  if (_store->_recovering)
345  {
346  return false;
347  }
348  // During failure and recovery scenarios, we'll see out of order
349  // bookmarks arrive, either because (a) we're replaying or (b)
350  // a publisher has cut over, and we've cut over to a new server.
351  // Scan the list to see if we have a match.
352  size_t base = _leastBase;
353  for (size_t i = _least; i + base < _current + _currentBase; i++)
354  {
355  if ( i >= _entriesLength )
356  {
357  i = 0;
358  base = _currentBase;
359  }
360  if (_entries[i]._val == bookmark_)
361  {
362  return !_entries[i]._active;
363  }
364  }
365 
366  return true; // message is totally discarded
367  }
368 
369  bool empty(void) const
370  {
371  if (_least == AMPS_UNSET_INDEX ||
372  ((_least + _leastBase) == (_current + _currentBase) &&
373  _recoveryMin == AMPS_UNSET_INDEX))
374  {
375  return true;
376  }
377  return false;
378  }
379 
380  void updateMostRecent()
381  {
382  Lock<Mutex> guard(_subLock);
383  _updateMostRecent();
384  }
385 
386  const BookmarkRange& getRange() const
387  {
388  return _range;
389  }
390 
391  Message::Field getMostRecentList(bool usePublishersList_ = true)
392  {
393  Lock<Mutex> guard(_subLock);
394  bool useLastPersisted = !_lastPersisted.empty() &&
395  _lastPersisted.len() > 1;
396  // when this is called, we'll take a moment to update the list
397  // of things recovered,
398  // so we don't accidentally log anything we ought not to.
399  _updateMostRecent();
400  bool useRecent = !_recent.empty() && _recent.len() > 1;
401  amps_uint64_t lastPublisher = 0;
402  amps_uint64_t lastSeq = 0;
403  amps_uint64_t recentPublisher = 0;
404  amps_uint64_t recentSeq = 0;
405  if (useLastPersisted)
406  {
407  parseBookmark(_lastPersisted, lastPublisher, lastSeq);
408  }
409  if (useRecent)
410  {
411  parseBookmark(_recent, recentPublisher, recentSeq);
412  if (empty() && useLastPersisted)
413  {
414  useRecent = false;
415  }
416  else
417  {
418  if (useLastPersisted && lastPublisher == recentPublisher)
419  {
420  if (lastSeq <= recentSeq)
421  {
422  useRecent = false;
423  }
424  else
425  {
426  useLastPersisted = false;
427  }
428  }
429  }
430  }
431  // Set size for all bookmarks that will be used
432  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
433  if (useRecent)
434  {
435  totalLen += _recent.len() + 1;
436  }
437  // If we don't have a non-EPOCH persisted ack and we don't have a
438  // non-EPOCH most recent bookmark, OR we have a range
439  // we can build a list based on all the publishers instead.
440  if (usePublishersList_
441  && ((!useLastPersisted && !useRecent)
442  || _lastPersisted == AMPS_BOOKMARK_EPOCH))
443  {
444  std::ostringstream os;
445  for (PublisherIterator pub = _publishers.begin();
446  pub != _publishers.end(); ++pub)
447  {
448  if (pub->first == 0 && pub->second == 0)
449  {
450  continue;
451  }
452  if (pub->first == recentPublisher && recentSeq < pub->second)
453  {
454  os << recentPublisher << '|' << recentSeq << "|,";
455  }
456  else
457  {
458  os << pub->first << '|' << pub->second << "|,";
459  }
460  }
461  std::string recent = os.str();
462  if (!recent.empty())
463  {
464  totalLen = recent.length();
465  if (!_recoveryTimestamp.empty())
466  {
467  totalLen += _recoveryTimestamp.len();
468  recent += std::string(_recoveryTimestamp);
469  }
470  else
471  {
472  // Remove trailing ,
473  recent.erase(--totalLen);
474  }
475  // Reset _recentList to new value and return it
476  _recentList.clear();
477  _recentList = Message::Field(recent).deepCopy();
478  if (_range.isValid())
479  {
480  if (_range.getStart() != recent
481  && _recentList != AMPS_BOOKMARK_EPOCH)
482  {
483  _range.replaceStart(_recentList, true);
484  }
485  else if (_range.isStartInclusive())
486  {
487  amps_uint64_t publisher, sequence;
488  parseBookmark(_range.getStart(), publisher,
489  sequence);
490  PublisherIterator pub = _publishers.find(publisher);
491  if (pub != _publishers.end()
492  && pub->second >= sequence)
493  {
494  _range.makeStartExclusive();
495  }
496  }
497  return _range;
498  }
499  return _recentList;
500  }
501  if (_range.isValid())
502  {
503  return _range;
504  }
505  }
506  if (!_recoveryTimestamp.empty() && !_range.isValid())
507  {
508  totalLen += _recoveryTimestamp.len() + 1;
509  }
510  // If we have nothing discarded, return EPOCH
511  if (totalLen == 0
512  || (_recent.len() < 2 && !empty()))
513  {
514  if (_range.isValid())
515  {
516  return _range;
517  }
518  if (!useRecent)
519  {
521  }
522  _setLastPersistedToEpoch();
523  return _lastPersisted;
524  }
525  // Remove the trailing , from the length
526  totalLen -= 1;
527  char* field = new char[totalLen];
528  size_t len = 0;
529  if (useRecent)
530  {
531  len = _recent.len();
532  memcpy(field, _recent.data(), len);
533  if (len < totalLen)
534  {
535  field[len++] = ',';
536  }
537  }
538  if (useLastPersisted)
539  {
540  memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
541  len += _lastPersisted.len();
542  if (len < totalLen)
543  {
544  field[len++] = ',';
545  }
546  }
547  if (!_recoveryTimestamp.empty() && !_range.isValid())
548  {
549  memcpy(field + len, _recoveryTimestamp.data(),
550  _recoveryTimestamp.len());
551  // If more is to be written after this, uncomment the following
552  //len += _lastPersisted.len();
553  //if (len < totalLen) field[len++] = ',';
554  }
555  // _recentList clear will delete[] current buffer and assign will get cleared
556  _recentList.clear();
557  _recentList.assign(field, totalLen);
558  if (_range.isValid())
559  {
560  if (_recentList != AMPS_BOOKMARK_EPOCH)
561  {
562  if (_range.getStart() != _recentList)
563  {
564  _range.replaceStart(_recentList, true);
565  }
566  else if (_range.isStartInclusive())
567  {
568  amps_uint64_t publisher, sequence;
569  parseBookmark(_range.getStart(), publisher,
570  sequence);
571  PublisherIterator pub = _publishers.find(publisher);
572  if (pub != _publishers.end()
573  && pub->second >= sequence)
574  {
575  _range.makeStartExclusive();
576  }
577  }
578  }
579  return _range;
580  }
581  return _recentList;
582  }
583 
584  Message::Field getMostRecent(bool update_ = false)
585  {
586  Lock<Mutex> guard(_subLock);
587  // Return the same as last time if nothing's changed
588  // _recent is the most recent bookmark.
589  if (update_ && _store->_recentChanged)
590  {
591  _updateMostRecent();
592  }
593  if (_recent.empty())
594  {
596  }
597  else
598  {
599  return _recent;
600  }
601  }
602 
603  Message::Field getLastPersisted()
604  {
605  Lock<Mutex> guard(_subLock);
606  return _lastPersisted;
607  }
608 
609  void setMostRecent(const Message::Field& recent_)
610  {
611  _recent.clear();
612  _recent.deepCopy(recent_);
613  }
614 
615  void setRecoveryTimestamp(const char* recoveryTimestamp_,
616  size_t len_ = 0)
617  {
618  _recoveryTimestamp.clear();
619  size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
620  char* ts = new char[len];
621  memcpy((void*)ts, (const void*)recoveryTimestamp_, len);
622  _recoveryTimestamp.assign(ts, len);
623  }
624 
625  void moveEntries(char* old_, char* new_, size_t newSize_)
626  {
627  size_t least = _least;
628  size_t leastBase = _leastBase;
629  if (_recoveryMin != AMPS_UNSET_INDEX)
630  {
631  least = _recoveryMin;
632  leastBase = _recoveryBase;
633  }
634  // First check if we grew in place, if so, just move current after least
635  if (old_ == new_)
636  {
637  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
638  {
639  memcpy(new_ + (sizeof(Entry)*_entriesLength),
640  old_, (sizeof(Entry)*least));
641  // Clear the beginning where those entries were
642  memset(old_, 0, sizeof(Entry)*least);
643  }
644  else // We have to use an intermediate buffer
645  {
646  Entry* buffer = new Entry[least];
647  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
648  //Put the beginning entries at the start of the new buffer
649  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
650  (_entriesLength - least)*sizeof(Entry));
651  //Put the end entries after the beginning entries
652  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
653  (void*)buffer, least * sizeof(Entry));
654  // Least is now at 0 so base must be increased
655  leastBase += least;
656  least = 0;
657  delete [] buffer;
658  }
659  }
660  else
661  {
662  //Put the beginning entries at the start of the new buffer
663  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
664  (_entriesLength - least)*sizeof(Entry));
665  //Put the end entries after the beginning entries
666  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
667  (void*)old_, least * sizeof(Entry));
668  // Least is now at 0 so base must be increased
669  leastBase += least;
670  least = 0;
671  }
672  if (_recoveryMin != AMPS_UNSET_INDEX)
673  {
674  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
675  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
676  (_recoveryMin + _recoveryBase);
677  _recoveryMaxBase = leastBase;
678  _recoveryMin = least;
679  _recoveryBase = leastBase;
680  }
681  else
682  {
683  _least = least;
684  }
685  _leastBase = leastBase;
686  // Current is now after everything and using the same base
687  _currentBase = _leastBase;
688  _current = least + _entriesLength;
689  }
690 
691  inline size_t getOldestBookmarkSeq()
692  {
693  Lock<Mutex> guard(_subLock);
694  // If there is nothing in the store, return -1, otherwise return lowest
695  return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
696  _least + _leastBase;
697  }
698 
699  bool lastPersisted(const Message::Field& bookmark_)
700  {
701  // These shouldn't be persisted
702  if (bookmark_ == AMPS_BOOKMARK_NOW
703  || BookmarkRange::isRange(bookmark_))
704  {
705  return false;
706  }
707  Lock<Mutex> guard(_subLock);
708  return _setLastPersisted(bookmark_);
709  }
710 
711  bool _setLastPersisted(const Message::Field& bookmark_)
712  {
713  if (!_lastPersisted.empty())
714  {
715  amps_uint64_t publisher, publisher_lastPersisted;
716  amps_uint64_t sequence, sequence_lastPersisted;
717  parseBookmark(bookmark_, publisher, sequence);
718  parseBookmark(_lastPersisted, publisher_lastPersisted,
719  sequence_lastPersisted);
720  if (publisher == publisher_lastPersisted &&
721  sequence <= sequence_lastPersisted)
722  {
723  return false;
724  }
725  }
726  // deepCopy will clear what's in _lastPersisted
727  _lastPersisted.deepCopy(bookmark_);
728  _store->_recentChanged = true;
729  _recoveryTimestamp.clear();
730  return true;
731  }
732 
733  Message::Field lastPersisted(size_t bookmark_)
734  {
735  Lock<Mutex> guard(_subLock);
736  Message::Field& bookmark = _entries[bookmark_]._val;
737  // These shouldn't be persisted
738  if (bookmark == AMPS_BOOKMARK_NOW
739  || BookmarkRange::isRange(bookmark))
740  {
741  return bookmark;
742  }
743  _setLastPersisted(bookmark);
744  return bookmark;
745  }
746 
747  // Returns the index of the recovered item, either the index where it
748  // was first stored prior to getMostRecent, or the new index if it is
749  // relogged either because this is called from log() or because it was
750  // not active but also not persisted.
751  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
752  {
753  size_t retVal = AMPS_UNSET_INDEX;
754  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
755  {
756  return retVal;
757  }
758  // Check if this is a recovered bookmark.
759  // If so, copy the existing one to the new location
760  RecoveryIterator item = _recovered.find(bookmark_);
761  if (item != _recovered.end())
762  {
763  size_t seqNo = item->second;
764  size_t index = (seqNo - _recoveryBase) % _entriesLength;
765  // If we only have recovery entries and isDiscarded is
766  // checking on an already discarded entry, update recent.
767  if (_least + _leastBase == _current + _currentBase &&
768  !_entries[index]._active)
769  {
770  _store->_recentChanged = true;
771  _recent.clear();
772  _recent = _entries[index]._val.deepCopy();
773  retVal = moveEntry(index);
774  if (retVal == AMPS_UNSET_INDEX)
775  {
776  recover(bookmark_, relogIfNotDiscarded);
777  }
778  _least = _current;
779  _leastBase = _currentBase;
780  }
781  else if (!_entries[index]._active || relogIfNotDiscarded)
782  {
783  retVal = moveEntry(index);
784  if (retVal == AMPS_UNSET_INDEX)
785  {
786  recover(bookmark_, relogIfNotDiscarded);
787  }
788  }
789  else
790  {
791  return index;
792  }
793  _recovered.erase(item);
794  if (_recovered.empty())
795  {
796  _recoveryMin = AMPS_UNSET_INDEX;
797  _recoveryBase = AMPS_UNSET_INDEX;
798  _recoveryMax = AMPS_UNSET_INDEX;
799  _recoveryMaxBase = AMPS_UNSET_INDEX;
800  }
801  else if (index == _recoveryMin)
802  {
803  while (_entries[_recoveryMin]._val.empty() &&
804  (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
805  {
806  if (++_recoveryMin == _entriesLength)
807  {
808  _recoveryMin = 0;
809  _recoveryBase += _entriesLength;
810  }
811  }
812  }
813  }
814  return retVal;
815  }
816 
817  // Return the id of this Subscription
818  Message::Field id() const
819  {
820  return _id;
821  }
822 
823  struct Entry
824  {
825  Message::Field _val; //16
826  bool _active; //17
827  char _padding[32 - sizeof(Message::Field) - sizeof(bool)]; //32
828 
829  Entry() : _active(false)
830  {
831  ;
832  }
833  };
834 
835  typedef std::vector<Entry*> EntryPtrList;
836 
837  void getRecoveryEntries(EntryPtrList& list_)
838  {
839  if (_recoveryMin == AMPS_UNSET_INDEX ||
840  _recoveryMax == AMPS_UNSET_INDEX)
841  {
842  return;
843  }
844  size_t base = _recoveryBase;
845  size_t max = _recoveryMax + _recoveryMaxBase;
846  for (size_t i = _recoveryMin; i + base < max; ++i)
847  {
848  if (i == _entriesLength)
849  {
850  i = 0;
851  base = _recoveryMaxBase;
852  }
853  //list_.insert(&(_entries[i]));
854  list_.push_back(&(_entries[i]));
855  }
856  return;
857  }
858 
859  void getActiveEntries(EntryPtrList& list_)
860  {
861  size_t base = _leastBase;
862  for (size_t i = _least; i + base < _current + _currentBase; ++i)
863  {
864  if (i >= _entriesLength)
865  {
866  i = 0;
867  base = _currentBase;
868  }
869  //list_.insert(&(_entries[i]));
870  list_.push_back(&(_entries[i]));
871  }
872  return;
873  }
874 
875  Entry* getEntryByIndex(size_t index_)
876  {
877  Lock<Mutex> guard(_subLock);
878  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
879  index_ >= _least + _leastBase)
880  ? _leastBase : _recoveryBase;
881  // Return NULL if not a valid index
882  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
883  _least + _leastBase :
884  _recoveryMin + _recoveryBase);
885  if (index_ >= _current + _currentBase || index_ < min)
886  {
887  return NULL;
888  }
889  return &(_entries[(index_ - base) % _entriesLength]);
890  }
891 
892  void justRecovered()
893  {
894  Lock<Mutex> guard(_subLock);
895  _updateMostRecent();
896  EntryPtrList list;
897  getRecoveryEntries(list);
898  setPublishersToDiscarded(&list, &_publishers);
899  }
900 
901  void setPublishersToDiscarded(EntryPtrList* recovered_,
902  PublisherMap* publishers_)
903  {
904  // Need to reset publishers to only have up to the last
905  // discarded sequence number. Messages that were in transit
906  // during previous run but not discarded should be considered
907  // new and not duplicate after a restart/recovery.
908  for (EntryPtrList::iterator i = recovered_->begin();
909  i != recovered_->end(); ++i)
910  {
911  if ((*i)->_val.empty())
912  {
913  continue;
914  }
915  amps_uint64_t publisher = (amps_uint64_t)0;
916  amps_uint64_t sequence = (amps_uint64_t)0;
917  parseBookmark((*i)->_val, publisher, sequence);
918  if (publisher && sequence && (*i)->_active &&
919  (*publishers_)[publisher] >= sequence)
920  {
921  (*publishers_)[publisher] = sequence - 1;
922  }
923  }
924  }
925 
926  void clearLastPersisted()
927  {
928  Lock<Mutex> guard(_subLock);
929  _lastPersisted.clear();
930  }
931 
932  void setLastPersistedToEpoch()
933  {
934  Lock<Mutex> guard(_subLock);
935  _setLastPersistedToEpoch();
936  }
937 
938  private:
939  Subscription(const Subscription&);
940  Subscription& operator=(const Subscription&);
941 
942  size_t moveEntry(size_t index_)
943  {
944  // Check for wrap
945  if (_current >= _entriesLength)
946  {
947  _current = 0;
948  _currentBase += _entriesLength;
949  }
950  // Check for resize
951  // If list is too small, double it
952  if ((_current == _least % _entriesLength &&
953  _leastBase < _currentBase) ||
954  (_current == _recoveryMin && _recoveryBase < _currentBase))
955  {
956  if (!_store->resize(_id, (char**)&_entries,
957  sizeof(Entry) * _entriesLength * 2))
958  {
959  return AMPS_UNSET_INDEX;
960  }
961  // Length was doubled
962  _entriesLength *= 2;
963  }
964  _entries[_current]._val = _entries[index_]._val;
965  _entries[_current]._active = _entries[index_]._active;
966  // No need to clear Field, just set it to empty
967  _entries[index_]._val.assign(NULL, 0);
968  _entries[index_]._active = false;
969  return _current++;
970  }
971 
972  void _setLastPersistedToEpoch()
973  {
974  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
975  char* field = new char[fieldLen];
976  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
977  _lastPersisted.clear();
978  _lastPersisted.assign(field, fieldLen);
979  }
980 
981  bool _discard(size_t index_)
982  {
983  bool retVal = false;
984  // Lock should already be held
985  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
986  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
987  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
988  || index_ >= _least + _leastBase)
989  ? _leastBase : _recoveryBase;
990  // discard of a record not in the log is a no-op
991  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
992  _recoveryMin + _recoveryBase);
993  if (index_ >= _current + _currentBase || index_ < min)
994  {
995  return retVal;
996  }
997 
998  // log that this one is discarded, then
999  // recalculate what the most recent entry is.
1000  Entry& e = _entries[(index_ - base) % _entriesLength];
1001  e._active = false;
1002 
1003  size_t index = index_;
1004  if (_recoveryMin != AMPS_UNSET_INDEX &&
1005  index_ == _recoveryMin + _recoveryBase)
1006  {
1007  // Find all to discard
1008  size_t j = _recoveryMin;
1009  while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
1010  !_entries[j]._active)
1011  {
1012  // This index might be left-over from a slow discard and we
1013  // may have reconnected. We have a few possibilities at this point.
1014  // 1. If we re-logged this bookmark, this index will point at an
1015  // empty bookmark. This could happen if the discard thread was slow
1016  // and the reconnect was fast. We wouldn't report the
1017  // the re-arrival of the bookmark as a duplicate because it
1018  // hadn't been marked as discarded. In this case, we have to
1019  // simply move past this in the recovery area.
1020  // 2. This bookmark should become _recent because we haven't
1021  // yet received anything since our last call to getMostRecent.
1022  // In this case, we need to take it out of recovered but not
1023  // clear it. The publishers map should report it as duplicate.
1024  // 3. This is the 'oldest' recovered, but we have received new
1025  // bookmarks since we got this one. We can clear it because the
1026  // publishers map should report it as a duplicate if/when it
1027  // does arrive again. Move the _recoveryMin ahead and remove it
1028  // from recovered.
1029  Message::Field& bookmark = _entries[j]._val;
1030  // Option 1 skips this and just moves on
1031  if (!bookmark.empty())
1032  {
1033  _recovered.erase(bookmark);
1034  // Make sure our publishers map will mark it discarded
1035  amps_uint64_t publisher, sequence;
1036  parseBookmark(bookmark, publisher, sequence);
1037  PublisherIterator pub = _publishers.find(publisher);
1038  if (pub == _publishers.end() || pub->second < sequence)
1039  {
1040  _publishers[publisher] = sequence;
1041  }
1042  if (_least + _leastBase == _current + _currentBase ||
1043  ((_least + _leastBase) % _entriesLength) ==
1044  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
1045  {
1046  // Option 2, reset recent
1047  retVal = true;
1048  _store->_recentChanged = true;
1049  _recoveryTimestamp.clear();
1050  _recent.clear();
1051  _recent = bookmark;
1052  bookmark.assign(NULL, 0);
1053  }
1054  else
1055  {
1056  // Option 3, simply clear this one
1057  bookmark.clear();
1058  }
1059  }
1060  // If we reach the buffer end,
1061  // keep checking from the beginning
1062  if (++j == _entriesLength)
1063  {
1064  // Least has now loooped around
1065  _recoveryBase += _entriesLength;
1066  j = 0;
1067  }
1068  }
1069  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1070  _recovered.empty());
1071  if (_recovered.empty())
1072  {
1073  _recoveryMin = AMPS_UNSET_INDEX;
1074  _recoveryBase = AMPS_UNSET_INDEX;
1075  _recoveryMax = AMPS_UNSET_INDEX;
1076  _recoveryMaxBase = AMPS_UNSET_INDEX;
1077  // Cleared recovered, want to check onward
1078  index = _least + _leastBase;
1079  }
1080  else
1081  {
1082  _recoveryMin = j;
1083  }
1084  }
1085  // if this is the first item in the list, discard all inactive ones
1086  // as long as recovery also says its okay
1087  if (index == _least + _leastBase)
1088  {
1089  // Find all to discard
1090  size_t j = _least;
1091  while (j + _leastBase < _current + _currentBase &&
1092  !_entries[j]._active)
1093  {
1094  //Must free associated memory
1095  _recent.clear();
1096  _recent = _entries[j]._val;
1097  _entries[j]._val.assign(NULL, 0);
1098  _store->_recentChanged = true;
1099  retVal = true;
1100  _recoveryTimestamp.clear();
1101  // If we reach the buffer end,
1102  // keep checking from the beginning
1103  if (++j == _entriesLength)
1104  {
1105  // Least has now loooped around
1106  _leastBase += _entriesLength;
1107  j = 0;
1108  }
1109  }
1110  _least = j;
1111  }
1112  return retVal;
1113  }
1114 
1115  void _updateMostRecent()
1116  {
1117  // Lock is already held
1118  _recovered.clear();
1119  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1120  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1121  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1122  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1123  _recoveryMin = AMPS_UNSET_INDEX;
1124  _recoveryBase = AMPS_UNSET_INDEX;
1125  _recoveryMax = AMPS_UNSET_INDEX;
1126  _recoveryMaxBase = AMPS_UNSET_INDEX;
1127  for (size_t i = start; i + base < _current + _currentBase; i++)
1128  {
1129  if ( i >= _entriesLength )
1130  {
1131  i = 0;
1132  base = _currentBase;
1133  }
1134  if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1135  {
1136  continue;
1137  }
1138  Entry& entry = _entries[i];
1139  if (!entry._val.empty())
1140  {
1141  _recovered[entry._val] = i + base;
1142  if (_recoveryMin == AMPS_UNSET_INDEX)
1143  {
1144  _recoveryMin = i;
1145  _recoveryBase = base;
1146  _recoveryMax = _current;
1147  _recoveryMaxBase = _currentBase;
1148  }
1149  }
1150  }
1151  if (_current == _entriesLength)
1152  {
1153  _current = 0;
1154  _currentBase += _entriesLength;
1155  }
1156  _least = _current;
1157  _leastBase = _currentBase;
1158  }
1159 
1160  Message::Field _id;
1161  Message::Field _recent;
1162  Message::Field _lastPersisted;
1163  Message::Field _recentList;
1164  BookmarkRange _range;
1165  Message::Field _recoveryTimestamp;
1166  size_t _current;
1167  size_t _currentBase;
1168  size_t _least;
1169  size_t _leastBase;
1170  size_t _recoveryMin;
1171  size_t _recoveryBase;
1172  size_t _recoveryMax;
1173  size_t _recoveryMaxBase;
1174  size_t _entriesLength;
1175  Entry* _entries;
1176  MemoryBookmarkStore* _store;
1177  Mutex _subLock;
1178  RecoveryMap _recovered;
1179  public:
1180  PublisherMap _publishers;
1181  };
1182 
1183  public:
1187  _subsLock(),
1188  _lock(),
1189  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1190  _recentChanged(true),
1191  _recovering(false),
1192  _recoveryPointAdapter(NULL),
1193  _recoveryPointFactory(NULL)
1194  { ; }
1195 
1196  typedef RecoveryPointAdapter::iterator RecoveryIterator;
1197 
1205  RecoveryPointFactory factory_ = NULL)
1206  : BookmarkStoreImpl()
1207  , _subsLock()
1208  , _lock()
1209  , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1210  , _recentChanged(true)
1211  , _recovering(true)
1212  , _recoveryPointAdapter(adapter_)
1213  , _recoveryPointFactory(factory_)
1214  {
1215  Message msg;
1216  if (!_recoveryPointFactory)
1217  {
1218  _recoveryPointFactory = &FixedRecoveryPoint::create;
1219  }
1220  for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1221  recoveryPoint != _recoveryPointAdapter.end();
1222  ++recoveryPoint)
1223  {
1224  Field subId(recoveryPoint->getSubId());
1225  msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1226  msg.setSubId(subId);
1227  Field bookmark = recoveryPoint->getBookmark();
1228  if (BookmarkRange::isRange(bookmark))
1229  {
1230  msg.setBookmark(bookmark);
1231  _log(msg);
1232  }
1233  else
1234  {
1235  std::vector<Field> bmList = Field::parseBookmarkList(bookmark);
1236  for (std::vector<Field>::iterator bkmk = bmList.begin(); bkmk != bmList.end(); ++bkmk)
1237  {
1238  if (Field::isTimestamp(*bkmk))
1239  {
1240  find(subId)->setRecoveryTimestamp(bkmk->data(), bkmk->len());
1241  }
1242  else
1243  {
1244  msg.assignBookmark(bkmk->data(), bkmk->len());
1245  _isDiscarded(msg);
1246  _log(msg);
1247  _discard(msg);
1248  }
1249  }
1250  // Reset to original bookmark
1251  msg.setBookmark(bookmark);
1252  }
1253  }
1254  _recovering = false;
1255  }
1256 
1257  virtual ~MemoryBookmarkStore()
1258  {
1259  if (_recoveryPointAdapter.isValid())
1260  {
1261  _recoveryPointAdapter.close();
1262  }
1263  __purge();
1264  }
1265 
1271  virtual size_t log(Message& message_)
1272  {
1273  Lock<Mutex> guard(_lock);
1274  return _log(message_);
1275  }
1276 
1282  virtual void discard(const Message& message_)
1283  {
1284  Lock<Mutex> guard(_lock);
1285  (void)_discard(message_);
1286  }
1287 
1295  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1296  {
1297  Lock<Mutex> guard(_lock);
1298  (void)_discard(subId_, bookmarkSeqNo_);
1299  }
1300 
1307  {
1308  Lock<Mutex> guard(_lock);
1309  return _getMostRecent(subId_);
1310  }
1311 
1320  virtual bool isDiscarded(Message& message_)
1321  {
1322  Lock<Mutex> guard(_lock);
1323  return _isDiscarded(message_);
1324  }
1325 
1331  virtual void purge()
1332  {
1333  Lock<Mutex> guard(_lock);
1334  _purge();
1335  }
1336 
1342  virtual void purge(const Message::Field& subId_)
1343  {
1344  Lock<Mutex> guard(_lock);
1345  _purge(subId_);
1346  }
1347 
1352  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1353  {
1354  Lock<Mutex> guard(_lock);
1355  return _getOldestBookmarkSeq(subId_);
1356  }
1357 
1363  virtual void persisted(const Message::Field& subId_,
1364  const Message::Field& bookmark_)
1365  {
1366  Lock<Mutex> guard(_lock);
1367  _persisted(find(subId_), bookmark_);
1368  }
1369 
1376  virtual Message::Field persisted(const Message::Field& subId_,
1377  size_t bookmark_)
1378  {
1379  Lock<Mutex> guard(_lock);
1380  return _persisted(find(subId_), bookmark_);
1381  }
1382 
1387  void setServerVersion(const VersionInfo& version_)
1388  {
1389  setServerVersion(version_.getOldStyleVersion());
1390  }
1391 
1396  void setServerVersion(size_t version_)
1397  {
1398  Lock<Mutex> guard(_subsLock);
1399  _serverVersion = version_;
1400  }
1401 
1402  inline bool isWritableBookmark(size_t length)
1403  {
1404  return length >= AMPS_MIN_BOOKMARK_LEN;
1405  }
1406 
1407  typedef Subscription::EntryPtrList EntryPtrList;
1408 
1409  protected:
1410 
1411  // Called once lock is acquired
1412  size_t _log(Message& message_)
1413  {
1414  Message::Field bookmark = message_.getBookmark();
1415  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1416  if (!pSub)
1417  {
1418  Message::Field subId = message_.getSubscriptionId();
1419  if (subId.empty())
1420  {
1421  subId = message_.getSubscriptionIds();
1422  }
1423  pSub = find(subId);
1424  message_.setSubscriptionHandle(
1425  static_cast<amps_subscription_handle>(pSub));
1426  }
1427  size_t retVal = pSub->log(bookmark);
1428  message_.setBookmarkSeqNo(retVal);
1429  return retVal;
1430  }
1431 
1432  // Called once lock is acquired, or from ctor
1433  bool _discard(const Message& message_)
1434  {
1435  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1436  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1437  if (!pSub)
1438  {
1439  Message::Field subId = message_.getSubscriptionId();
1440  if (subId.empty())
1441  {
1442  subId = message_.getSubscriptionIds();
1443  }
1444  pSub = find(subId);
1445  }
1446  bool retVal = pSub->discard(bookmarkSeqNo);
1447  if (retVal)
1448  {
1449  updateAdapter(pSub);
1450  }
1451  return retVal;
1452  }
1453 
1454  // Called once lock is acquired
1455  bool _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1456  {
1457  Subscription* pSub = find(subId_);
1458  bool retVal = pSub->discard(bookmarkSeqNo_);
1459  if (retVal)
1460  {
1461  updateAdapter(pSub);
1462  }
1463  return retVal;
1464  }
1465 
1466  // Called once lock is acquired
1467  Message::Field _getMostRecent(const Message::Field& subId_,
1468  bool usePublishersList_ = true)
1469  {
1470  Subscription* pSub = find(subId_);
1471  return pSub->getMostRecentList(usePublishersList_);
1472  }
1473 
1474  // Called once lock is acquired
1475  bool _isDiscarded(Message& message_)
1476  {
1477  Message::Field subId = message_.getSubscriptionId();
1478  if (subId.empty())
1479  {
1480  subId = message_.getSubscriptionIds();
1481  }
1482  Subscription* pSub = find(subId);
1483  message_.setSubscriptionHandle(
1484  static_cast<amps_subscription_handle>(pSub));
1485  return pSub->isDiscarded(message_.getBookmark());
1486  }
1487 
1488  // Called once lock is acquired
1489  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1490  {
1491  Subscription* pSub = find(subId_);
1492  return pSub->getOldestBookmarkSeq();
1493  }
1494 
1495  // Called once lock is acquired
1496  virtual void _persisted(Subscription* pSub_,
1497  const Message::Field& bookmark_)
1498  {
1499  if (pSub_->lastPersisted(bookmark_))
1500  {
1501  updateAdapter(pSub_);
1502  }
1503  }
1504 
1505  // Called once lock is acquired
1506  virtual Message::Field _persisted(Subscription* pSub_, size_t bookmark_)
1507  {
1508  return pSub_->lastPersisted(bookmark_);
1509  }
1510 
1511  // Called once lock is acquired
1512  void _purge()
1513  {
1514  if (_recoveryPointAdapter.isValid())
1515  {
1516  _recoveryPointAdapter.purge();
1517  }
1518  __purge();
1519  }
1520 
1521  // Called once lock is acquired
1522  void __purge()
1523  {
1524  // Walk through list and clear Fields before calling clear
1525  while (!_subs.empty())
1526  {
1527  SubscriptionMap::iterator iter = _subs.begin();
1528  //The subId key is cleared when deleting the Subscription, which shares
1529  //the _data pointer in its id field.
1530  const_cast<Message::Field&>(iter->first).clear();
1531  delete (iter->second);
1532  _subs.erase(iter);
1533  }
1534  _subs.clear();
1535  }
1536 
1537  // Called once lock is acquired
1538  virtual void _purge(const Message::Field& subId_)
1539  {
1540  if (_recoveryPointAdapter.isValid())
1541  {
1542  _recoveryPointAdapter.purge(subId_);
1543  }
1544  __purge(subId_);
1545  }
1546 
1547  // Called once lock is acquired
1548  virtual void __purge(const Message::Field& subId_)
1549  {
1550  Lock<Mutex> guard(_subsLock);
1551  SubscriptionMap::iterator iter = _subs.find(subId_);
1552  if (iter == _subs.end())
1553  {
1554  return;
1555  }
1556  const_cast<Message::Field&>(iter->first).clear();
1557  delete (iter->second);
1558  _subs.erase(iter);
1559  }
1560 
1561  // Can be used by subclasses during recovery
1562  void setMostRecent(const Message::Field& subId_,
1563  const Message::Field& recent_)
1564  {
1565  find(subId_)->setMostRecent(recent_);
1566  }
1567 
1568  Mutex _subsLock;
1569  Mutex _lock;
1570  static const char ENTRY_BOOKMARK = 'b';
1571  static const char ENTRY_DISCARD = 'd';
1572  static const char ENTRY_PERSISTED = 'p';
1573 
1574  virtual Subscription* find(const Message::Field& subId_)
1575  {
1576  if (subId_.empty())
1577  {
1578  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1579  }
1580  Lock<Mutex> guard(_subsLock);
1581  if (_subs.count(subId_) == 0)
1582  {
1583  // Subscription will be created
1584  Message::Field id;
1585  id.deepCopy(subId_);
1586  _subs[id] = new Subscription(this, id);
1587  return _subs[id];
1588  }
1589  return _subs[subId_];
1590  }
1591 
1592  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1593  bool callResizeHandler_ = true)
1594  {
1595  assert(newBuffer_ != 0);
1596  if (size_ == 0) // Delete the buffer
1597  {
1598  if (*newBuffer_)
1599  {
1600  free(*newBuffer_);
1601  *newBuffer_ = NULL;
1602  }
1603  return true;
1604  }
1605  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1606  {
1607  return false;
1608  }
1609  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1610  *newBuffer_ = (char*)malloc(size_);
1611  memset(*newBuffer_, 0, size_);
1612  if (oldBuffer)
1613  {
1614  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1615  free(oldBuffer);
1616  }
1617  return true;
1618  }
1619 
1620  protected:
1621  void updateAdapter(Subscription* pSub_)
1622  {
1623  if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1624  {
1625  return;
1626  }
1627  // We have to make a copy here, since the return from getMostRecentList
1628  // could get changed once the lock is released.
1629  RecoveryPoint update = _recoveryPointFactory(pSub_->id(),
1630  pSub_->getMostRecentList(false)).deepCopy();
1631  Unlock<Mutex> unlock(_lock);
1632  _recoveryPointAdapter.update(update);
1633  // Free memmory of the copy, adapter should have copies of the fields
1634  update.clear();
1635  }
1636 
1637  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1638  SubscriptionMap _subs;
1639  size_t _serverVersion;
1640  bool _recentChanged;
1641  bool _recovering;
1642  typedef std::set<Subscription*> SubscriptionSet;
1643  RecoveryPointAdapter _recoveryPointAdapter;
1644  RecoveryPointFactory _recoveryPointFactory;
1645  };
1646 
1647 } // end namespace AMPS
1648 
1649 #endif //_MEMORYBOOKMARKSTORE_H_
1650 
Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
void clear()
Clear the internal state, possibly reclaiming memory.
Definition: RecoveryPoint.hpp:109
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1331
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1295
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1306
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1376
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1204
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1186
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1363
Message & assignBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1396
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1428
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1320
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1387
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1282
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:57
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1271
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
static RecoveryPoint create(const Field &subId_, const Field &bookmark_)
Use this function in BookmarkStore::setRecoveryPointFactory( std::bind(&FixedRecoveryPoint::create, std::placeholder::_1, std::placeholder::_2))
Definition: RecoveryPoint.hpp:139
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1352
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1342
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1194
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:102
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194