AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
RecoveryPointAdapter.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RECOVERYPOINTADAPTER_H_
27 #define _RECOVERYPOINTADAPTER_H_
28 
29 #include <amps/Field.hpp>
30 #include <amps/RecoveryPoint.hpp>
31 #include <vector>
32 #include <iterator>
33 #include <memory>
34 #include <thread>
35 #include <unordered_map>
36 #if __cplusplus >= 201100L || _MSC_VER >= 1900
37  #include <atomic>
38 #endif
39 
44 
45 namespace AMPS
46 {
47 
48  class RecoveryPointAdapter;
49 
52  class RecoveryPointAdapterImpl : public RefBody
53  {
54  public:
55  virtual ~RecoveryPointAdapterImpl() { }
60  virtual bool next(RecoveryPoint& current_) = 0;
63  virtual void update(RecoveryPoint& recoveryPoint_) = 0;
65  virtual void purge() = 0;
68  virtual void purge(const Field& subId_) = 0;
70  virtual void close() = 0;
72  virtual void prune() { ; }
73  };
74 
77  class RecoveryPointAdapter // -V690
78  {
79  public:
80  class iterator
81  {
82  RecoveryPointAdapterImpl* _pAdapter;
83  RecoveryPoint _current;
84  inline void advance()
85  {
86  if (!_pAdapter || !_pAdapter->next(_current))
87  {
88  _pAdapter = NULL;
89  }
90  }
91 
92  public:
93  iterator() // end
94  : _pAdapter(NULL)
95  {;}
96  iterator(RecoveryPointAdapterImpl* pAdapter_)
97  : _pAdapter(pAdapter_)
98  {
99  advance();
100  }
101 
102  bool operator==(const iterator& rhs) const
103  {
104  return _pAdapter == rhs._pAdapter;
105  }
106  bool operator!=(const iterator& rhs) const
107  {
108  return _pAdapter != rhs._pAdapter;
109  }
110  void operator++(void)
111  {
112  advance();
113  }
114  RecoveryPoint operator*(void)
115  {
116  return _current;
117  }
118  RecoveryPoint* operator->(void)
119  {
120  return &_current;
121  }
122  };
123 
124  RecoveryPointAdapter() : _body() { }
125  RecoveryPointAdapter(RecoveryPointAdapterImpl* body_, bool isRef_ = true)
126  : _body(body_, isRef_) { }
128  : _body(rhs_._body)
129  { }
130 
137  iterator begin()
138  {
139  return iterator(&(_body.get()));
140  }
141 
144  iterator end()
145  {
146  return iterator();
147  }
148 
151  void update(RecoveryPoint& recoveryPoint_)
152  {
153  _body.get().update(recoveryPoint_);
154  }
155 
157  void purge()
158  {
159  _body.get().purge();
160  }
161 
164  void purge(const Field& subId_)
165  {
166  _body.get().purge(subId_);
167  }
168 
170  void close()
171  {
172  _body.get().close();
173  }
174 
176  void prune()
177  {
178  _body.get().prune();
179  }
180 
182  bool isValid() const
183  {
184  return _body.isValid();
185  }
186  private:
187  BorrowRefHandle<RecoveryPointAdapterImpl> _body;
188  };
189 
194  {
195  public:
211  const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
212  unsigned updateThreshold_ = 10,
213  double timeoutMillis_ = 2000.0,
214  long updateIntervalMillis_ = 2000
215  )
216  : _delegate(delegate_)
217  , _updateThreshold(updateThreshold_)
218  , _timeoutMillis(timeoutMillis_)
219  , _updateIntervalMillis(updateIntervalMillis_)
220  , _closed(false)
221  , _updateAll(false)
222  {
223  // Start the update thread
224  _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
225  this);
226  }
227 
229  {
230  _close();
231  _thread.join();
232  for (UpdateIter purged = _latestUpdates.begin();
233  purged != _latestUpdates.end(); ++purged)
234  {
235  Field clearableSubId = purged->first;
236  purged->second.clear();
237  clearableSubId.clear();
238  }
239  }
240 
245  virtual bool next(RecoveryPoint& current_)
246  {
247  return _delegate->next(current_);
248  }
249 
252  virtual void update(RecoveryPoint& recoveryPoint_)
253  {
254  if (_closed)
255  {
256  return;
257  }
258  Field subId = recoveryPoint_.getSubId();
259  Lock<Mutex> lock(_lock);
260  UpdateIter lastUpdate = _latestUpdates.find(subId);
261  if (lastUpdate == _latestUpdates.end())
262  {
263  // New sub id, use deep copies and a new Timer.
264  subId = subId.deepCopy();
265  _latestUpdates[subId] = recoveryPoint_.deepCopy();
266  _counts[subId] = 1;
267  if (_timeoutMillis != 0.0) // -V550
268  {
269  Timer timer(_timeoutMillis);
270  timer.start();
271  _timers[subId] = timer;
272  }
273  }
274  else
275  {
276  // SubId already exists, set to new recovery point.
277  lastUpdate->second.deepCopy(recoveryPoint_);
278  // Increment and check the count.
279  if (++_counts[subId] >= _updateThreshold)
280  {
281  // Time to update, make sure update thread wakes up.
282  _lock.signalAll();
283  }
284  }
285  }
286 
288  virtual void purge()
289  {
290  if (_closed)
291  {
292  return;
293  }
294  _delegate->purge();
295  Lock<Mutex> lock(_lock);
296  _counts.clear();
297  _timers.clear();
298  for (UpdateIter purged = _latestUpdates.begin();
299  purged != _latestUpdates.end(); ++purged)
300  {
301  Field clearableSubId = purged->first;
302  purged->second.clear();
303  clearableSubId.clear();
304  }
305  _latestUpdates.clear();
306  }
307 
310  virtual void purge(const Field& subId_)
311  {
312  if (_closed)
313  {
314  return;
315  }
316  _delegate->purge(subId_);
317  Lock<Mutex> lock(_lock);
318  UpdateIter purged = _latestUpdates.find(subId_);
319  if (purged != _latestUpdates.end())
320  {
321  Field clearableSubId = purged->first;
322  purged->second.clear();
323  _latestUpdates.erase(purged);
324  _counts.erase(subId_);
325  _timers.erase(subId_);
326  clearableSubId.clear();
327  }
328  }
329 
331  virtual void close()
332  {
333  _close();
334  }
335 
337  virtual void updateAll()
338  {
339  Lock<Mutex> lock(_lock);
340  _runUpdateAll();
341  }
342 
344  virtual void _runUpdateAll()
345  {
346  if (_closed)
347  {
348  return;
349  }
350  _updateAll = true;
351  while (!_counts.empty())
352  {
353  _lock.signalAll();
354  _lock.wait(250);
355  }
356  }
357 
358  protected:
359  void _close()
360  {
361  // Save all cached updates before shutting down update thread.
362  if (!_closed)
363  {
364  Lock<Mutex> lock(_lock);
365  _runUpdateAll();
366  _closed = true;
367  _lock.signalAll();
368  }
369  _delegate->close();
370  }
371  void updateThread()
372  {
373  // A place to hold updates to save
374  std::vector<SavedUpdate> _queuedUpdates;
375  while (!_closed)
376  {
377  DeferLock<Mutex> lock(_lock);
378  lock.lock();
379  // Wait for a signal or update interval
380  _lock.wait(_updateIntervalMillis);
381 
382  // Check for timeouts
383  for (TimerMap::iterator timer = _timers.begin();
384  timer != _timers.end(); )
385  {
386  if (timer->second.check())
387  {
388  UpdateIter update = _latestUpdates.find(timer->first);
389  if (update != _latestUpdates.end())
390  {
391  // Remove subId from all, clear of subId will
392  // occur after save.
393  _queuedUpdates.push_back(*update);
394  _counts.erase(update->first);
395  timer = _timers.erase(timer);
396  _latestUpdates.erase(update);
397  }
398  else
399  {
400  ++timer;
401  }
402  }
403  else
404  {
405  ++timer;
406  }
407  }
408 
409  // Need a local version so it doesn't change after we unlock to
410  // deliver updates.
411  bool updateAll = (bool)_updateAll;
412  // Check for update counts
413  for (CountMap::iterator count = _counts.begin();
414  count != _counts.end(); )
415  {
416  if (updateAll || _timeoutMillis == 0.0 // -V550
417  || count->second >= _updateThreshold)
418  {
419  UpdateIter update = _latestUpdates.find(count->first);
420  if (update != _latestUpdates.end())
421  {
422  // Remove subId from all, clear of subId will
423  // occur after save.
424  _queuedUpdates.push_back(*update);
425  count = _counts.erase(count);
426  _timers.erase(update->first);
427  _latestUpdates.erase(update);
428  }
429  else
430  {
431  ++count;
432  }
433  }
434  else
435  {
436  ++count;
437  }
438  }
439  // Release the lock unless we're doing an update all, then we
440  // hold it until updates are completed and signal when done.
441  if (!updateAll)
442  {
443  lock.unlock();
444  }
445  // Shouldn't need the lock to send the updates
446  for (std::vector<SavedUpdate>::iterator update = _queuedUpdates.begin(), end = _queuedUpdates.end(); update != end; ++update)
447  {
448  _delegate->update(update->second);
449  Field clearableSubId(update->first);
450  clearableSubId.clear();
451  update->second.clear();
452  }
453  _queuedUpdates.clear();
454  if (updateAll)
455  {
456  _updateAll = false;
457  _lock.signalAll();
458  }
459  } // -V1020
460  }
461 
462  // The adapter doing the real saves
463  std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
464 
465  // Lock used to protect _latestUpdates, _timers, and _counts.
466  Mutex _lock;
467 
468  // Types for our maps
469  typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
470  typedef std::pair<Field, RecoveryPoint> SavedUpdate;
471  typedef UpdateMap::value_type Update;
472  typedef UpdateMap::iterator UpdateIter;
473  typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
474  typedef TimerMap::iterator TimerIter;
475  typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
476  typedef CountMap::iterator CountIter;
477 
478  // Saves the most recent update for each sub id.
479  UpdateMap _latestUpdates;
480 
481  // Saves a timer for each sub id that is reset each time we save.
482  TimerMap _timers;
483 
484  // Saves a count of how many updates have come in since last save.
485  CountMap _counts;
486 
487  // The thread doing the saves.
488  std::thread _thread;
489 
490  // How many updates before we force a save.
491  unsigned _updateThreshold;
492 
493  // How long between getting first cached update and save.
494  double _timeoutMillis;
495 
496  // How long between automatic checks of the timers.
497  long _updateIntervalMillis;
498 
499 #if __cplusplus >= 201100L || _MSC_VER >= 1900
500  // The update thread runs until this is true.
501  std::atomic<bool> _closed;
502 
503  // Flag to tell update thread to save everything.
504  std::atomic<bool> _updateAll;
505 #else
506  // The update thread runs until this is true.
507  volatile bool _closed;
508 
509  // Flag to tell update thread to save everything.
510  volatile bool _updateAll;
511 #endif
512  };
513 
514 }
515 
516 #endif //_RECOVERYPOINTADAPTER_H_
517 
virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:331
void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:157
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:310
virtual void purge()=0
Remove all data from the storage.
void prune()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:176
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:193
void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:164
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:344
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:288
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:72
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint &current_)=0
Recovery is done by iteration over elements in storage.
bool isValid() const
Return if this has a valid implementation.
Definition: RecoveryPointAdapter.hpp:182
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:210
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:97
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:151
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:245
iterator begin()
To recover from an adapter, iterate over the adapter from begin() to end() with a RecoveryPointIterat...
Definition: RecoveryPointAdapter.hpp:137
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:337
void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:170
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:52
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:103
iterator end()
Return the end of recovery marker.
Definition: RecoveryPointAdapter.hpp:144
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:252