AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.5
RecoveryPointAdapter.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RECOVERYPOINTADAPTER_H_
27 #define _RECOVERYPOINTADAPTER_H_
28 
29 #include <amps/Field.hpp>
30 #include <amps/RecoveryPoint.hpp>
31 #include <vector>
32 #include <iterator>
33 #include <memory>
34 #include <thread>
35 #include <unordered_map>
36 #if __cplusplus >= 201100L || _MSC_VER >= 1900
37  #include <atomic>
38 #endif
39 
44 
45 namespace AMPS
46 {
47 
48  class RecoveryPointAdapter;
49 
52  class RecoveryPointAdapterImpl : public RefBody
53  {
54  public:
55  virtual ~RecoveryPointAdapterImpl() { }
60  virtual bool next(RecoveryPoint& current_) = 0;
63  virtual void update(RecoveryPoint& recoveryPoint_) = 0;
65  virtual void purge() = 0;
68  virtual void purge(const Field& subId_) = 0;
70  virtual void close() = 0;
72  virtual void prune() { ; }
73  };
74 
77  class RecoveryPointAdapter // -V690
78  {
79  public:
80  class iterator
81  {
82  RecoveryPointAdapterImpl* _pAdapter;
83  RecoveryPoint _current;
84  inline void advance()
85  {
86  if (!_pAdapter || !_pAdapter->next(_current))
87  {
88  _pAdapter = NULL;
89  }
90  }
91 
92  public:
93  iterator() // end
94  : _pAdapter(NULL)
95  {;}
96  iterator(RecoveryPointAdapterImpl* pAdapter_)
97  : _pAdapter(pAdapter_)
98  {
99  advance();
100  }
101 
102  bool operator==(const iterator& rhs) const
103  {
104  return _pAdapter == rhs._pAdapter;
105  }
106  bool operator!=(const iterator& rhs) const
107  {
108  return _pAdapter != rhs._pAdapter;
109  }
110  void operator++(void)
111  {
112  advance();
113  }
114  RecoveryPoint operator*(void)
115  {
116  return _current;
117  }
118  RecoveryPoint* operator->(void)
119  {
120  return &_current;
121  }
122  };
123 
124  RecoveryPointAdapter() : _body() { }
125  RecoveryPointAdapter(RecoveryPointAdapterImpl* body_, bool isRef_ = true)
126  : _body(body_, isRef_) { }
128  : _body(rhs_._body)
129  { }
130 
137  iterator begin()
138  {
139  return iterator(&(_body.get()));
140  }
141 
144  iterator end()
145  {
146  return iterator();
147  }
148 
151  void update(RecoveryPoint& recoveryPoint_)
152  {
153  _body.get().update(recoveryPoint_);
154  }
155 
157  void purge()
158  {
159  _body.get().purge();
160  }
161 
164  void purge(const Field& subId_)
165  {
166  _body.get().purge(subId_);
167  }
168 
170  void close()
171  {
172  _body.get().close();
173  }
174 
176  void prune()
177  {
178  _body.get().prune();
179  }
180 
182  bool isValid() const
183  {
184  return _body.isValid();
185  }
186  private:
187  BorrowRefHandle<RecoveryPointAdapterImpl> _body;
188  };
189 
194  {
195  public:
211  const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
212  unsigned updateThreshold_ = 10,
213  double timeoutMillis_ = 2000.0,
214  long updateIntervalMillis_ = 2000
215  )
216  : _delegate(delegate_)
217  , _updateThreshold(updateThreshold_)
218  , _timeoutMillis(timeoutMillis_)
219  , _updateIntervalMillis(updateIntervalMillis_)
220  , _closed(false)
221  , _updateAll(false)
222  {
223  // Start the update thread
224  _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
225  this);
226  }
227 
229  {
230  _close();
231  _thread.join();
232  for (UpdateIter purged = _latestUpdates.begin();
233  purged != _latestUpdates.end(); ++purged)
234  {
235  Field clearableSubId = purged->first;
236  purged->second.clear();
237  clearableSubId.clear();
238  }
239  }
240 
245  virtual bool next(RecoveryPoint& current_)
246  {
247  return _delegate->next(current_);
248  }
249 
252  virtual void update(RecoveryPoint& recoveryPoint_)
253  {
254  Field subId = recoveryPoint_.getSubId();
255  Lock<Mutex> lock(_lock);
256  UpdateIter lastUpdate = _latestUpdates.find(subId);
257  if (lastUpdate == _latestUpdates.end())
258  {
259  // New sub id, use deep copies and a new Timer.
260  subId = subId.deepCopy();
261  _latestUpdates[subId] = recoveryPoint_.deepCopy();
262  _counts[subId] = 1;
263  if (_timeoutMillis != 0.0) // -V550
264  {
265  Timer timer(_timeoutMillis);
266  timer.start();
267  _timers[subId] = timer;
268  }
269  }
270  else
271  {
272  // SubId already exists, set to new recovery point.
273  lastUpdate->second.deepCopy(recoveryPoint_);
274  // Increment and check the count.
275  if (++_counts[subId] >= _updateThreshold)
276  {
277  // Time to update, make sure update thread wakes up.
278  _lock.signalAll();
279  }
280  }
281  }
282 
284  virtual void purge()
285  {
286  _delegate->purge();
287  Lock<Mutex> lock(_lock);
288  _counts.clear();
289  _timers.clear();
290  for (UpdateIter purged = _latestUpdates.begin();
291  purged != _latestUpdates.end(); ++purged)
292  {
293  Field clearableSubId = purged->first;
294  purged->second.clear();
295  clearableSubId.clear();
296  }
297  _latestUpdates.clear();
298  }
299 
302  virtual void purge(const Field& subId_)
303  {
304  _delegate->purge(subId_);
305  Lock<Mutex> lock(_lock);
306  UpdateIter purged = _latestUpdates.find(subId_);
307  if (purged != _latestUpdates.end())
308  {
309  Field clearableSubId = purged->first;
310  purged->second.clear();
311  _latestUpdates.erase(purged);
312  _counts.erase(subId_);
313  _timers.erase(subId_);
314  clearableSubId.clear();
315  }
316  }
317 
319  virtual void close()
320  {
321  _close();
322  }
323 
325  virtual void updateAll()
326  {
327  Lock<Mutex> lock(_lock);
328  _runUpdateAll();
329  }
330 
332  virtual void _runUpdateAll()
333  {
334  _updateAll = true;
335  while (!_counts.empty())
336  {
337  _lock.signalAll();
338  _lock.wait(250);
339  }
340  }
341 
342  protected:
343  void _close()
344  {
345  // Save all cached updates before shutting down update thread.
346  if (!_closed)
347  {
348  Lock<Mutex> lock(_lock);
349  _runUpdateAll();
350  _closed = true;
351  _lock.signalAll();
352  }
353  _delegate->close();
354  }
355  void updateThread()
356  {
357  // A place to hold updates to save
358  std::vector<SavedUpdate> _queuedUpdates;
359  while (!_closed)
360  {
361  DeferLock<Mutex> lock(_lock);
362  lock.lock();
363  // Wait for a signal or update interval
364  _lock.wait(_updateIntervalMillis);
365 
366  // Check for timeouts
367  for (TimerMap::iterator timer = _timers.begin();
368  timer != _timers.end(); )
369  {
370  if (timer->second.check())
371  {
372  UpdateIter update = _latestUpdates.find(timer->first);
373  if (update != _latestUpdates.end())
374  {
375  // Remove subId from all, clear of subId will
376  // occur after save.
377  _queuedUpdates.push_back(*update);
378  _counts.erase(update->first);
379  timer = _timers.erase(timer);
380  _latestUpdates.erase(update);
381  }
382  else
383  {
384  ++timer;
385  }
386  }
387  else
388  {
389  ++timer;
390  }
391  }
392 
393  // Need a local version so it doesn't change after we unlock to
394  // deliver updates.
395  bool updateAll = (bool)_updateAll;
396  // Check for update counts
397  for (CountMap::iterator count = _counts.begin();
398  count != _counts.end(); )
399  {
400  if (updateAll || _timeoutMillis == 0.0 // -V550
401  || count->second >= _updateThreshold)
402  {
403  UpdateIter update = _latestUpdates.find(count->first);
404  if (update != _latestUpdates.end())
405  {
406  // Remove subId from all, clear of subId will
407  // occur after save.
408  _queuedUpdates.push_back(*update);
409  count = _counts.erase(count);
410  _timers.erase(update->first);
411  _latestUpdates.erase(update);
412  }
413  else
414  {
415  ++count;
416  }
417  }
418  else
419  {
420  ++count;
421  }
422  }
423  // Release the lock unless we're doing an update all, then we
424  // hold it until updates are completed and signal when done.
425  if (!updateAll)
426  {
427  lock.unlock();
428  }
429  // Shouldn't need the lock to send the updates
430  for (std::vector<SavedUpdate>::iterator update = _queuedUpdates.begin(), end = _queuedUpdates.end(); update != end; ++update)
431  {
432  _delegate->update(update->second);
433  Field clearableSubId(update->first);
434  clearableSubId.clear();
435  update->second.clear();
436  }
437  _queuedUpdates.clear();
438  if (updateAll)
439  {
440  _updateAll = false;
441  _lock.signalAll();
442  }
443  } // -V1020
444  }
445 
446  // The adapter doing the real saves
447  std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
448 
449  // Lock used to protect _latestUpdates, _timers, and _counts.
450  Mutex _lock;
451 
452  // Types for our maps
453  typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
454  typedef std::pair<Field, RecoveryPoint> SavedUpdate;
455  typedef UpdateMap::value_type Update;
456  typedef UpdateMap::iterator UpdateIter;
457  typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
458  typedef TimerMap::iterator TimerIter;
459  typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
460  typedef CountMap::iterator CountIter;
461 
462  // Saves the most recent update for each sub id.
463  UpdateMap _latestUpdates;
464 
465  // Saves a timer for each sub id that is reset each time we save.
466  TimerMap _timers;
467 
468  // Saves a count of how many updates have come in since last save.
469  CountMap _counts;
470 
471  // The thread doing the saves.
472  std::thread _thread;
473 
474  // How many updates before we force a save.
475  unsigned _updateThreshold;
476 
477  // How long between getting first cached update and save.
478  double _timeoutMillis;
479 
480  // How long between automatic checks of the timers.
481  long _updateIntervalMillis;
482 
483 #if __cplusplus >= 201100L || _MSC_VER >= 1900
484  // The update thread runs until this is true.
485  std::atomic<bool> _closed;
486 
487  // Flag to tell update thread to save everything.
488  std::atomic<bool> _updateAll;
489 #else
490  // The update thread runs until this is true.
491  volatile bool _closed;
492 
493  // Flag to tell update thread to save everything.
494  volatile bool _updateAll;
495 #endif
496  };
497 
498 }
499 
500 #endif //_RECOVERYPOINTADAPTER_H_
501 
virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:319
void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:157
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:302
virtual void purge()=0
Remove all data from the storage.
void prune()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:176
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:193
void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:164
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:332
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:284
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:72
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint &current_)=0
Recovery is done by iteration over elements in storage.
bool isValid() const
Return if this has a valid implementation.
Definition: RecoveryPointAdapter.hpp:182
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:210
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:97
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:151
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:245
iterator begin()
To recover from an adapter, iterate over the adapter from begin() to end() with a RecoveryPointIterat...
Definition: RecoveryPointAdapter.hpp:137
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:325
void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:170
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:52
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:103
iterator end()
Return the end of recovery marker.
Definition: RecoveryPointAdapter.hpp:144
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:252