26 #ifndef _HYBRIDPUBLISHSTORE_H_ 27 #define _HYBRIDPUBLISHSTORE_H_ 32 #if __cplusplus >= 201100L || _MSC_VER >= 1900 60 : _store(NULL), _handler(NULL), _data(NULL)
73 #if __cplusplus >= 201100L || _MSC_VER >= 1900 74 std::atomic<size_t> _entries;
75 std::atomic<size_t> _errorCount;
76 std::atomic<amps_uint64_t> _lastIndex;
78 volatile size_t _entries;
79 volatile size_t _errorCount;
80 volatile amps_uint64_t _lastIndex;
83 SwappingOutReplayer(
PublishStore* pStore_,
size_t entries_)
84 : _pStore(pStore_), _entries(entries_)
85 , _errorCount(0), _lastIndex(0)
93 amps_uint64_t lastIndex()
100 if (_entries > 0 && _errorCount == 0)
105 _pStore->
store(message_,
false);
108 message_.getMessage(),
132 bool errorOnPublishGap_ =
false)
134 , _memStore(maxMemoryCapacity_, errorOnPublishGap_)
135 , _fileStore(fileName_, errorOnPublishGap_)
136 , _cap(maxMemoryCapacity_)
137 , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
138 , _lowestIndexInMemory(0)
139 , _holdSwapping(false)
141 _handlerData._store =
this;
157 bool errorOnPublishGap_ =
false)
159 , _memStore(maxMemoryCapacity_, errorOnPublishGap_)
160 , _fileStore(fileName_, errorOnPublishGap_)
161 , _cap(maxMemoryCapacity_)
162 , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
163 , _lowestIndexInMemory(0)
164 , _holdSwapping(false)
166 _handlerData._store =
this;
186 amps_uint32_t blockSize_,
bool errorOnPublishGap_ =
false)
188 , _memStore(maxMemoryCapacity_, blockSize_, errorOnPublishGap_)
189 , _fileStore(fileName_, blockSize_, errorOnPublishGap_)
190 , _cap(maxMemoryCapacity_)
191 , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
192 , _lowestIndexInMemory(0)
193 , _holdSwapping(false)
195 _handlerData._store =
this;
208 Lock<Mutex> guard(_lock);
209 _lowWatermark = lowWatermark_;
220 Lock<Mutex> guard(_lock);
221 return _lowWatermark;
229 Lock<Mutex> guard(_lock);
230 while (_holdSwapping)
232 if (!_lock.wait(1000))
234 Unlock<Mutex> u(_lock);
235 amps_invoke_waiting_function();
239 FlagFlip flip(&_holdSwapping);
241 Unlock<Mutex> u(_lock);
244 _memStore.discardUpTo(_fileStore.getLastPersisted());
245 Lock<Mutex> l(_lock);
249 _fileStore.discardUpTo(index_);
250 if (_lowestIndexInMemory <= index_)
252 _memStore.discardUpTo(index_);
253 _lowestIndexInMemory = index_ + 1;
265 Lock<Mutex> guard(_lock);
266 while (_holdSwapping)
268 if (!_lock.wait(1000))
270 amps_invoke_waiting_function();
274 FlagFlip flip(&_holdSwapping);
276 Unlock<Mutex> u(_lock);
277 _fileStore.replay(replayer_);
278 _memStore.replay(replayer_);
287 return _fileStore.unpersistedCount() + _memStore.unpersistedCount();
300 Lock<Mutex> guard(_lock);
301 amps_uint64_t waitFor = _getHybridHighestUnpersisted();
304 if (waitFor == unset)
310 bool timedOut =
false;
311 long waitTime = (timeout_ < 1000) ? timeout_ : 1000;
312 AMPS_START_TIMER(timeout_)
314 while (!timedOut && waitFor >= _getHybridLowestUnpersisted() &&
315 _getHybridLowestUnpersisted() != unset)
317 if (!_lock.wait(waitTime))
320 AMPS_RESET_TIMER(timedOut, timeout_);
321 waitTime = (timeout_ < 1000) ? timeout_ : 1000;
322 Unlock<Mutex> unlck(_lock);
323 amps_invoke_waiting_function();
327 if (timedOut && waitFor >= _getHybridLowestUnpersisted() &&
328 _getHybridLowestUnpersisted() != unset)
330 throw TimedOutException(
"Timed out waiting to flush publish store.");
335 while (waitFor >= _getHybridLowestUnpersisted() &&
336 _getHybridLowestUnpersisted() != unset)
340 Unlock<Mutex> unlck(_lock);
341 amps_invoke_waiting_function();
348 amps_uint64_t lowestIndexInMemory;
350 Lock<Mutex> guard(_lock);
351 lowestIndexInMemory = _lowestIndexInMemory;
353 if (index_ < lowestIndexInMemory)
355 return _fileStore.replaySingle(replayer_, index_);
359 return _memStore.replaySingle(replayer_, index_);
368 Lock<Mutex> guard(_lock);
369 while (_holdSwapping)
371 if (!_lock.wait(1000))
373 Unlock<Mutex> u(_lock);
374 amps_invoke_waiting_function();
377 if (_memStore.unpersistedCount() >= _cap && !_holdSwapping)
380 FlagFlip flip(&_holdSwapping);
381 SwappingOutReplayer swapper(&_fileStore,
382 _memStore.unpersistedCount() - _lowWatermark);
384 Unlock<Mutex> u(_lock);
385 _memStore.replay(swapper);
388 if (swapper.getErrors() == 0)
390 _lowestIndexInMemory = swapper.lastIndex();
391 _memStore.discardUpTo(_lowestIndexInMemory++);
394 return _memStore.store(message_);
399 _handlerData.init(handler_, data_);
400 _fileStore.setResizeHandler(HybridPublishStore::resizeHandler,
401 (
void*)&_handlerData);
406 return _handlerData._handler;
411 Lock<Mutex> guard(_lock);
412 return _getHybridLowestUnpersisted();
415 amps_uint64_t getHighestUnpersisted()
const 417 Lock<Mutex> guard(_lock);
418 return _getHybridHighestUnpersisted();
423 Lock<Mutex> guard(_lock);
424 return _getHybridLastPersisted();
427 inline virtual void setErrorOnPublishGap(
bool errorOnPublishGap_)
429 StoreImpl::setErrorOnPublishGap(errorOnPublishGap_);
430 _memStore.setErrorOnPublishGap(errorOnPublishGap_);
431 _fileStore.setErrorOnPublishGap(errorOnPublishGap_);
437 static bool resizeHandler(
Store store_,
size_t size_,
void* data_)
439 HandlerData* handlerData = (HandlerData*)data_;
441 return handlerData->_handler(store_, size_, handlerData->_data);
445 amps_uint64_t _getHybridLowestUnpersisted()
const 447 amps_uint64_t filemin = _fileStore.getLowestUnpersisted();
448 amps_uint64_t memmin = _memStore.getLowestUnpersisted();
449 if (filemin == AMPS_UNSET_SEQUENCE)
453 if (memmin == AMPS_UNSET_SEQUENCE || memmin > filemin)
462 amps_uint64_t _getHybridHighestUnpersisted()
const 464 amps_uint64_t filemax = _fileStore.getHighestUnpersisted();
465 amps_uint64_t memmax = _memStore.getHighestUnpersisted();
466 if (filemax == AMPS_UNSET_SEQUENCE)
470 if (memmax == AMPS_UNSET_SEQUENCE || memmax < filemax)
478 amps_uint64_t _getHybridLastPersisted()
481 if (!_lowestIndexInMemory &&
482 _fileStore.unpersistedCount() == 0)
484 _fileStore.discardUpTo(_memStore.getLastPersisted());
485 return _fileStore.getLastPersisted();
487 amps_uint64_t memLast = _memStore.getLastPersisted();
488 amps_uint64_t fileLast = _fileStore.getLastPersisted();
489 return (memLast < fileLast) ? memLast : fileLast;
495 size_t _lowWatermark;
496 amps_uint64_t _lowestIndexInMemory;
498 HandlerData _handlerData;
499 #if __cplusplus >= 201100L || _MSC_VER >= 1900 500 std::atomic<bool> _holdSwapping;
502 volatile bool _holdSwapping;
509 #endif //_HYBRIDPUBLISHSTORE_H_ Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
HybridPublishStore(const char *fileName_, size_t maxMemoryCapacity_, bool errorOnPublishGap_=false)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:131
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_, bool errorOnPublishGap_=false)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:156
Provides AMPS::PublishStore, a publish store that uses memory-mapped files to provide a publish store...
void replay(StoreReplayer &replayer_)
Used internally by Client to replay messages in the store to AMPS after a successful connection...
Definition: HybridPublishStore.hpp:263
amps_uint64_t store(const Message &message_)
Used internally by Client to put messages into the Store.
Definition: HybridPublishStore.hpp:366
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_, amps_uint32_t blockSize_, bool errorOnPublishGap_=false)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:185
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: HybridPublishStore.hpp:298
size_t unpersistedCount() const
The number of messages in the Store that have not been discarded.
Definition: HybridPublishStore.hpp:285
A StoreImpl implementation that uses a memory-mapped file to provide a publish store that persists ac...
Definition: PublishStore.hpp:46
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: HybridPublishStore.hpp:409
Core type, function, and class declarations for the AMPS C++ client.
size_t getLowWatermark()
Get how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:218
void setLowWatermark(size_t lowWatermark_)
Set how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:206
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1091
Provides AMPS::MemoryPublishStore, a publish store that holds messages in memory. ...
void setResizeHandler(PublishStoreResizeHandler handler_, void *data_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: HybridPublishStore.hpp:397
void discardUpTo(amps_uint64_t index_)
Discard all messages in the store up to and including index_.
Definition: HybridPublishStore.hpp:227
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1223
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: HybridPublishStore.hpp:421
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:251
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: HybridPublishStore.hpp:346
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
A StoreImpl implementation that uses MemoryStoreBuffer as its buffer to hold published messages in me...
Definition: MemoryPublishStore.hpp:44
An implementation of StoreImpl for publication.
Definition: HybridPublishStore.hpp:50