AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
HybridPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HYBRIDPUBLISHSTORE_H_
27 #define _HYBRIDPUBLISHSTORE_H_
28 
29 #include <amps/ampsplusplus.hpp>
31 #include <amps/PublishStore.hpp>
32 #if __cplusplus >= 201100L || _MSC_VER >= 1900
33  #include <atomic>
34 #endif
35 
40 
41 namespace AMPS
42 {
51  {
52  class HandlerData
53  {
54  public:
55  HybridPublishStore* _store;
57  void* _data;
58 
59  HandlerData()
60  : _store(NULL), _handler(NULL), _data(NULL)
61  { ; }
62 
63  void init(PublishStoreResizeHandler handler_, void* data_)
64  {
65  _handler = handler_;
66  _data = data_;
67  }
68  };
69 
70  class SwappingOutReplayer : public StoreReplayer
71  {
72  PublishStore* _pStore;
73 #if __cplusplus >= 201100L || _MSC_VER >= 1900
74  std::atomic<size_t> _entries;
75  std::atomic<size_t> _errorCount;
76  std::atomic<amps_uint64_t> _lastIndex;
77 #else
78  volatile size_t _entries;
79  volatile size_t _errorCount;
80  volatile amps_uint64_t _lastIndex;
81 #endif
82  public:
83  SwappingOutReplayer(PublishStore* pStore_, size_t entries_)
84  : _pStore(pStore_), _entries(entries_)
85  , _errorCount(0), _lastIndex(0)
86  { }
87 
88  size_t getErrors()
89  {
90  return _errorCount;
91  }
92 
93  amps_uint64_t lastIndex()
94  {
95  return _lastIndex;
96  }
97 
98  void execute(Message& message_)
99  {
100  if (_entries > 0 && _errorCount == 0)
101  {
102  try
103  {
104  {
105  _pStore->store(message_, false);
106  }
107  _lastIndex = amps_message_get_field_uint64(
108  message_.getMessage(),
109  AMPS_Sequence);
110  }
111  catch (...)
112  {
113  ++_errorCount;
114  }
115  --_entries;
116  }
117  }
118  };
119 
120  public:
131  HybridPublishStore(const char* fileName_, size_t maxMemoryCapacity_,
132  bool errorOnPublishGap_ = false)
133  : StoreImpl(errorOnPublishGap_)
134  , _memStore(maxMemoryCapacity_, errorOnPublishGap_)
135  , _fileStore(fileName_, errorOnPublishGap_)
136  , _cap(maxMemoryCapacity_)
137  , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
138  , _lowestIndexInMemory(0)
139  , _holdSwapping(false)
140  {
141  _handlerData._store = this;
142  _memStore.addRef();
143  _fileStore.addRef();
144  }
145 
156  HybridPublishStore(const std::string& fileName_, size_t maxMemoryCapacity_,
157  bool errorOnPublishGap_ = false)
158  : StoreImpl(errorOnPublishGap_)
159  , _memStore(maxMemoryCapacity_, errorOnPublishGap_)
160  , _fileStore(fileName_, errorOnPublishGap_)
161  , _cap(maxMemoryCapacity_)
162  , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
163  , _lowestIndexInMemory(0)
164  , _holdSwapping(false)
165  {
166  _handlerData._store = this;
167  _memStore.addRef();
168  _fileStore.addRef();
169  }
170 
185  HybridPublishStore(const std::string& fileName_, size_t maxMemoryCapacity_,
186  amps_uint32_t blockSize_, bool errorOnPublishGap_ = false)
187  : StoreImpl(errorOnPublishGap_)
188  , _memStore(maxMemoryCapacity_, blockSize_, errorOnPublishGap_)
189  , _fileStore(fileName_, blockSize_, errorOnPublishGap_)
190  , _cap(maxMemoryCapacity_)
191  , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
192  , _lowestIndexInMemory(0)
193  , _holdSwapping(false)
194  {
195  _handlerData._store = this;
196  _memStore.addRef();
197  _fileStore.addRef();
198  }
199 
206  void setLowWatermark(size_t lowWatermark_)
207  {
208  Lock<Mutex> guard(_lock);
209  _lowWatermark = lowWatermark_;
210  }
211 
219  {
220  Lock<Mutex> guard(_lock);
221  return _lowWatermark;
222  }
223 
227  void discardUpTo(amps_uint64_t index_)
228  {
229  Lock<Mutex> guard(_lock);
230  while (_holdSwapping)
231  {
232  if (!_lock.wait(1000))
233  {
234  Unlock<Mutex> u(_lock);
235  amps_invoke_waiting_function();
236  }
237  }
238  // Set _holdSwapping true to end of function
239  FlagFlip flip(&_holdSwapping);
240  {
241  Unlock<Mutex> u(_lock);
242  if (!index_)
243  {
244  _memStore.discardUpTo(_fileStore.getLastPersisted());
245  Lock<Mutex> l(_lock);
246  _lock.signalAll();
247  return;
248  }
249  _fileStore.discardUpTo(index_);
250  if (_lowestIndexInMemory <= index_)
251  {
252  _memStore.discardUpTo(index_);
253  _lowestIndexInMemory = index_ + 1;
254  }
255  }
256  _lock.signalAll();
257  }
258 
263  void replay(StoreReplayer& replayer_)
264  {
265  Lock<Mutex> guard(_lock);
266  while (_holdSwapping)
267  {
268  if (!_lock.wait(1000))
269  {
270  amps_invoke_waiting_function();
271  }
272  }
273  // Set _holdSwapping true to end of function
274  FlagFlip flip(&_holdSwapping);
275  {
276  Unlock<Mutex> u(_lock);
277  _fileStore.replay(replayer_);
278  _memStore.replay(replayer_);
279  }
280  _lock.signalAll();
281  }
282 
285  size_t unpersistedCount() const
286  {
287  return _fileStore.unpersistedCount() + _memStore.unpersistedCount();
288  }
289 
298  virtual void flush(long timeout_)
299  {
300  Lock<Mutex> guard(_lock);
301  amps_uint64_t waitFor = _getHybridHighestUnpersisted();
302  amps_uint64_t unset = getUnsetSequence();
303  // Check that we aren't already empty
304  if (waitFor == unset)
305  {
306  return;
307  }
308  if (timeout_ > 0)
309  {
310  bool timedOut = false;
311  long waitTime = (timeout_ < 1000) ? timeout_ : 1000;
312  AMPS_START_TIMER(timeout_)
313  // While timeout hasn't expired and we haven't had everything acked
314  while (!timedOut && waitFor >= _getHybridLowestUnpersisted() &&
315  _getHybridLowestUnpersisted() != unset)
316  {
317  if (!_lock.wait(waitTime))
318  {
319  // May have woken up early, check real time
320  AMPS_RESET_TIMER(timedOut, timeout_);
321  waitTime = (timeout_ < 1000) ? timeout_ : 1000;
322  Unlock<Mutex> unlck(_lock);
323  amps_invoke_waiting_function();
324  }
325  }
326  // If we timed out and still haven't caught up with the acks
327  if (timedOut && waitFor >= _getHybridLowestUnpersisted() &&
328  _getHybridLowestUnpersisted() != unset)
329  {
330  throw TimedOutException("Timed out waiting to flush publish store.");
331  }
332  }
333  else
334  {
335  while (waitFor >= _getHybridLowestUnpersisted() &&
336  _getHybridLowestUnpersisted() != unset)
337  {
338  // Use timeout version so python can interrupt
339  _lock.wait(1000);
340  Unlock<Mutex> unlck(_lock);
341  amps_invoke_waiting_function();
342  }
343  }
344  }
345 
346  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
347  {
348  amps_uint64_t lowestIndexInMemory;
349  {
350  Lock<Mutex> guard(_lock);
351  lowestIndexInMemory = _lowestIndexInMemory;
352  }
353  if (index_ < lowestIndexInMemory)
354  {
355  return _fileStore.replaySingle(replayer_, index_);
356  }
357  else
358  {
359  return _memStore.replaySingle(replayer_, index_);
360  }
361  }
362 
366  amps_uint64_t store(const Message& message_)
367  {
368  Lock<Mutex> guard(_lock);
369  while (_holdSwapping)
370  {
371  if (!_lock.wait(1000))
372  {
373  Unlock<Mutex> u(_lock);
374  amps_invoke_waiting_function();
375  }
376  }
377  if (_memStore.unpersistedCount() >= _cap && !_holdSwapping)
378  {
379  // Set _holdSwapping true to end of function
380  FlagFlip flip(&_holdSwapping);
381  SwappingOutReplayer swapper(&_fileStore,
382  _memStore.unpersistedCount() - _lowWatermark);
383  {
384  Unlock<Mutex> u(_lock);
385  _memStore.replay(swapper);
386  }
387  _lock.signalAll();
388  if (swapper.getErrors() == 0)
389  {
390  _lowestIndexInMemory = swapper.lastIndex();
391  _memStore.discardUpTo(_lowestIndexInMemory++);
392  }
393  }
394  return _memStore.store(message_);
395  }
396 
397  void setResizeHandler(PublishStoreResizeHandler handler_, void* data_)
398  {
399  _handlerData.init(handler_, data_);
400  _fileStore.setResizeHandler(HybridPublishStore::resizeHandler,
401  (void*)&_handlerData);
402  }
403 
404  inline virtual PublishStoreResizeHandler getResizeHandler() const
405  {
406  return _handlerData._handler;
407  }
408 
409  amps_uint64_t getLowestUnpersisted() const
410  {
411  Lock<Mutex> guard(_lock);
412  return _getHybridLowestUnpersisted();
413  }
414 
415  amps_uint64_t getHighestUnpersisted() const
416  {
417  Lock<Mutex> guard(_lock);
418  return _getHybridHighestUnpersisted();
419  }
420 
421  amps_uint64_t getLastPersisted(void)
422  {
423  Lock<Mutex> guard(_lock);
424  return _getHybridLastPersisted();
425  }
426 
427  inline virtual void setErrorOnPublishGap(bool errorOnPublishGap_)
428  {
429  StoreImpl::setErrorOnPublishGap(errorOnPublishGap_);
430  _memStore.setErrorOnPublishGap(errorOnPublishGap_);
431  _fileStore.setErrorOnPublishGap(errorOnPublishGap_);
432  }
433 
434  private:
435 
436  // Resize handlers are invoked with Store not const Store&
437  static bool resizeHandler(Store store_, size_t size_, void* data_) // -V813
438  {
439  HandlerData* handlerData = (HandlerData*)data_;
440  //Unlock<Mutex> hybridUnlock(handlerData->_store->_lock);
441  return handlerData->_handler(store_, size_, handlerData->_data);
442  }
443 
444  // Lock should be held
445  amps_uint64_t _getHybridLowestUnpersisted() const
446  {
447  amps_uint64_t filemin = _fileStore.getLowestUnpersisted();
448  amps_uint64_t memmin = _memStore.getLowestUnpersisted();
449  if (filemin == AMPS_UNSET_SEQUENCE)
450  {
451  return memmin;
452  }
453  if (memmin == AMPS_UNSET_SEQUENCE || memmin > filemin)
454  {
455  return filemin;
456  }
457  // Only left with memmin <= filemin
458  return memmin;
459  }
460 
461  // Lock should be held
462  amps_uint64_t _getHybridHighestUnpersisted() const
463  {
464  amps_uint64_t filemax = _fileStore.getHighestUnpersisted();
465  amps_uint64_t memmax = _memStore.getHighestUnpersisted();
466  if (filemax == AMPS_UNSET_SEQUENCE)
467  {
468  return memmax;
469  }
470  if (memmax == AMPS_UNSET_SEQUENCE || memmax < filemax)
471  {
472  return filemax;
473  }
474  // Only left with memmax >= filemax
475  return memmax;
476  }
477 
478  amps_uint64_t _getHybridLastPersisted()
479  {
480  // If we've never swapped and nothing is in file
481  if (!_lowestIndexInMemory &&
482  _fileStore.unpersistedCount() == 0)
483  {
484  _fileStore.discardUpTo(_memStore.getLastPersisted());
485  return _fileStore.getLastPersisted();
486  }
487  amps_uint64_t memLast = _memStore.getLastPersisted();
488  amps_uint64_t fileLast = _fileStore.getLastPersisted();
489  return (memLast < fileLast) ? memLast : fileLast;
490  }
491 
492  MemoryPublishStore _memStore;
493  PublishStore _fileStore;
494  size_t _cap;
495  size_t _lowWatermark;
496  amps_uint64_t _lowestIndexInMemory;
497  mutable Mutex _lock;
498  HandlerData _handlerData;
499 #if __cplusplus >= 201100L || _MSC_VER >= 1900
500  std::atomic<bool> _holdSwapping;
501 #else
502  volatile bool _holdSwapping;
503 #endif
504 
505  };//end HybridPublishStore
506 
507 }//end namespace AMPS
508 
509 #endif //_HYBRIDPUBLISHSTORE_H_
510 
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
HybridPublishStore(const char *fileName_, size_t maxMemoryCapacity_, bool errorOnPublishGap_=false)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:131
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_, bool errorOnPublishGap_=false)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:156
Provides AMPS::PublishStore, a publish store that uses memory-mapped files to provide a publish store...
void replay(StoreReplayer &replayer_)
Used internally by Client to replay messages in the store to AMPS after a successful connection...
Definition: HybridPublishStore.hpp:263
amps_uint64_t store(const Message &message_)
Used internally by Client to put messages into the Store.
Definition: HybridPublishStore.hpp:366
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_, amps_uint32_t blockSize_, bool errorOnPublishGap_=false)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:185
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: HybridPublishStore.hpp:298
size_t unpersistedCount() const
The number of messages in the Store that have not been discarded.
Definition: HybridPublishStore.hpp:285
A StoreImpl implementation that uses a memory-mapped file to provide a publish store that persists ac...
Definition: PublishStore.hpp:46
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: HybridPublishStore.hpp:409
Core type, function, and class declarations for the AMPS C++ client.
size_t getLowWatermark()
Get how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:218
void setLowWatermark(size_t lowWatermark_)
Set how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:206
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1091
Provides AMPS::MemoryPublishStore, a publish store that holds messages in memory. ...
void setResizeHandler(PublishStoreResizeHandler handler_, void *data_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: HybridPublishStore.hpp:397
void discardUpTo(amps_uint64_t index_)
Discard all messages in the store up to and including index_.
Definition: HybridPublishStore.hpp:227
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1223
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: HybridPublishStore.hpp:421
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:251
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: HybridPublishStore.hpp:346
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
A StoreImpl implementation that uses MemoryStoreBuffer as its buffer to hold published messages in me...
Definition: MemoryPublishStore.hpp:44
An implementation of StoreImpl for publication.
Definition: HybridPublishStore.hpp:50