26 #ifndef _RECOVERYPOINTADAPTER_H_ 27 #define _RECOVERYPOINTADAPTER_H_ 35 #include <unordered_map> 36 #if __cplusplus >= 201100L || _MSC_VER >= 1900 48 class RecoveryPointAdapter;
65 virtual void purge() = 0;
70 virtual void close() = 0;
86 if (!_pAdapter || !_pAdapter->
next(_current))
97 : _pAdapter(pAdapter_)
102 bool operator==(
const iterator& rhs)
const 104 return _pAdapter == rhs._pAdapter;
106 bool operator!=(
const iterator& rhs)
const 108 return _pAdapter != rhs._pAdapter;
110 void operator++(
void)
126 : _body(body_, isRef_) { }
139 return iterator(&(_body.get()));
153 _body.get().update(recoveryPoint_);
166 _body.get().purge(subId_);
184 return _body.isValid();
187 BorrowRefHandle<RecoveryPointAdapterImpl> _body;
211 const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
212 unsigned updateThreshold_ = 10,
213 double timeoutMillis_ = 2000.0,
214 long updateIntervalMillis_ = 2000
216 : _delegate(delegate_)
217 , _updateThreshold(updateThreshold_)
218 , _timeoutMillis(timeoutMillis_)
219 , _updateIntervalMillis(updateIntervalMillis_)
224 _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
232 for (UpdateIter purged = _latestUpdates.begin();
233 purged != _latestUpdates.end(); ++purged)
235 Field clearableSubId = purged->first;
236 purged->second.
clear();
237 clearableSubId.
clear();
247 return _delegate->next(current_);
255 Lock<Mutex> lock(_lock);
256 UpdateIter lastUpdate = _latestUpdates.find(subId);
257 if (lastUpdate == _latestUpdates.end())
261 _latestUpdates[subId] = recoveryPoint_.
deepCopy();
263 if (_timeoutMillis != 0.0)
265 Timer timer(_timeoutMillis);
267 _timers[subId] = timer;
273 lastUpdate->second.deepCopy(recoveryPoint_);
275 if (++_counts[subId] >= _updateThreshold)
287 Lock<Mutex> lock(_lock);
290 for (UpdateIter purged = _latestUpdates.begin();
291 purged != _latestUpdates.end(); ++purged)
293 Field clearableSubId = purged->first;
294 purged->second.
clear();
295 clearableSubId.
clear();
297 _latestUpdates.clear();
304 _delegate->purge(subId_);
305 Lock<Mutex> lock(_lock);
306 UpdateIter purged = _latestUpdates.find(subId_);
307 if (purged != _latestUpdates.end())
309 Field clearableSubId = purged->first;
310 purged->second.
clear();
311 _latestUpdates.erase(purged);
312 _counts.erase(subId_);
313 _timers.erase(subId_);
314 clearableSubId.
clear();
327 Lock<Mutex> lock(_lock);
335 while (!_counts.empty())
348 Lock<Mutex> lock(_lock);
358 std::vector<SavedUpdate> _queuedUpdates;
361 DeferLock<Mutex> lock(_lock);
364 _lock.wait(_updateIntervalMillis);
367 for (TimerMap::iterator timer = _timers.begin();
368 timer != _timers.end(); )
370 if (timer->second.check())
372 UpdateIter
update = _latestUpdates.find(timer->first);
373 if (update != _latestUpdates.end())
377 _queuedUpdates.push_back(*update);
378 _counts.erase(update->first);
379 timer = _timers.erase(timer);
380 _latestUpdates.erase(update);
395 bool updateAll = (bool)_updateAll;
397 for (CountMap::iterator count = _counts.begin();
398 count != _counts.end(); )
400 if (updateAll || _timeoutMillis == 0.0
401 || count->second >= _updateThreshold)
403 UpdateIter
update = _latestUpdates.find(count->first);
404 if (update != _latestUpdates.end())
408 _queuedUpdates.push_back(*update);
409 count = _counts.erase(count);
410 _timers.erase(update->first);
411 _latestUpdates.erase(update);
430 for (std::vector<SavedUpdate>::iterator
update = _queuedUpdates.begin(), end = _queuedUpdates.end();
update != end; ++
update)
432 _delegate->update(
update->second);
434 clearableSubId.
clear();
437 _queuedUpdates.clear();
447 std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
453 typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
454 typedef std::pair<Field, RecoveryPoint> SavedUpdate;
455 typedef UpdateMap::value_type Update;
456 typedef UpdateMap::iterator UpdateIter;
457 typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
458 typedef TimerMap::iterator TimerIter;
459 typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
460 typedef CountMap::iterator CountIter;
463 UpdateMap _latestUpdates;
475 unsigned _updateThreshold;
478 double _timeoutMillis;
481 long _updateIntervalMillis;
483 #if __cplusplus >= 201100L || _MSC_VER >= 1900 485 std::atomic<bool> _closed;
488 std::atomic<bool> _updateAll;
491 volatile bool _closed;
494 volatile bool _updateAll;
500 #endif //_RECOVERYPOINTADAPTER_H_ virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:319
void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:157
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:302
virtual void purge()=0
Remove all data from the storage.
void prune()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:176
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:193
void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:164
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:332
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:284
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:72
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:77
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint ¤t_)=0
Recovery is done by iteration over elements in storage.
bool isValid() const
Return if this has a valid implementation.
Definition: RecoveryPointAdapter.hpp:182
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:210
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:97
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:151
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint ¤t_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:245
iterator begin()
To recover from an adapter, iterate over the adapter from begin() to end() with a RecoveryPointIterat...
Definition: RecoveryPointAdapter.hpp:137
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:325
void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:170
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:52
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Definition: ampsplusplus.hpp:102
iterator end()
Return the end of recovery marker.
Definition: RecoveryPointAdapter.hpp:144
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:252