26 #ifndef _HYBRIDPUBLISHSTORE_H_
27 #define _HYBRIDPUBLISHSTORE_H_
57 : _store(NULL), _handler(NULL), _data(NULL)
70 volatile size_t _entries, _errorCount;
71 volatile amps_uint64_t _lastIndex;
73 SwappingOutReplayer(
PublishStore* pStore_,
size_t entries_)
74 : _pStore(pStore_), _entries(entries_)
75 , _errorCount(0), _lastIndex(0)
83 amps_uint64_t lastIndex()
90 if (_entries > 0 && _errorCount == 0)
95 _pStore->
store(message_,
false);
98 message_.getMessage(),
120 , _memStore(maxMemoryCapacity_)
121 , _fileStore(fileName_)
122 , _cap(maxMemoryCapacity_)
123 , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
124 , _lowestIndexInMemory(0)
125 , _holdSwapping(false)
127 _handlerData._store =
this;
141 , _memStore(maxMemoryCapacity_)
142 , _fileStore(fileName_)
143 , _cap(maxMemoryCapacity_)
144 , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
145 , _lowestIndexInMemory(0)
146 , _holdSwapping(false)
148 _handlerData._store =
this;
161 Lock<Mutex> guard(_lock);
162 _lowWatermark = lowWatermark_;
173 Lock<Mutex> guard(_lock);
174 return _lowWatermark;
182 Lock<Mutex> guard(_lock);
183 while (_holdSwapping)
185 if (!_lock.wait(1000))
187 Unlock<Mutex> u(_lock);
188 amps_invoke_waiting_function();
192 FlagFlip flip(&_holdSwapping);
194 Unlock<Mutex> u(_lock);
198 Lock<Mutex> l(_lock);
203 if (_lowestIndexInMemory <= index_)
206 _lowestIndexInMemory = index_ + 1;
218 Lock<Mutex> guard(_lock);
219 while (_holdSwapping)
221 if (!_lock.wait(1000))
223 amps_invoke_waiting_function();
227 FlagFlip flip(&_holdSwapping);
229 Unlock<Mutex> u(_lock);
230 _fileStore.
replay(replayer_);
231 _memStore.
replay(replayer_);
253 Lock<Mutex> guard(_lock);
254 amps_uint64_t waitFor = _getHybridHighestUnpersisted();
257 if (waitFor == unset)
263 bool timedOut =
false;
264 long waitTime = (timeout_ < 1000) ? timeout_ : 1000;
265 AMPS_START_TIMER(timeout_)
267 while (!timedOut && waitFor >= _getHybridLowestUnpersisted() &&
268 _getHybridLowestUnpersisted() != unset)
270 if (!_lock.wait(waitTime))
273 AMPS_RESET_TIMER(timedOut, timeout_);
274 waitTime = (timeout_ < 1000) ? timeout_ : 1000;
275 Unlock<Mutex> unlck(_lock);
276 amps_invoke_waiting_function();
280 if (timedOut && waitFor >= _getHybridLowestUnpersisted() &&
281 _getHybridLowestUnpersisted() != unset)
283 throw TimedOutException(
"Timed out waiting to flush publish store.");
288 while (waitFor >= _getHybridLowestUnpersisted() &&
289 _getHybridLowestUnpersisted() != unset)
293 Unlock<Mutex> unlck(_lock);
294 amps_invoke_waiting_function();
301 amps_uint64_t lowestIndexInMemory;
303 Lock<Mutex> guard(_lock);
304 lowestIndexInMemory = _lowestIndexInMemory;
306 if (index_ < lowestIndexInMemory)
321 Lock<Mutex> guard(_lock);
323 while (_holdSwapping)
325 if (!_lock.wait(1000))
327 Unlock<Mutex> u(_lock);
328 amps_invoke_waiting_function();
334 FlagFlip flip(&_holdSwapping);
335 SwappingOutReplayer swapper(&_fileStore,
338 Unlock<Mutex> u(_lock);
339 _memStore.
replay(swapper);
342 if (swapper.getErrors() == 0)
344 _lowestIndexInMemory = swapper.lastIndex();
348 return _memStore.
store(message_);
353 _handlerData.init(handler_, data_);
355 (
void*)&_handlerData);
360 return _handlerData._handler;
365 Lock<Mutex> guard(_lock);
366 return _getHybridLowestUnpersisted();
369 amps_uint64_t getHighestUnpersisted()
const
371 Lock<Mutex> guard(_lock);
372 return _getHybridHighestUnpersisted();
377 Lock<Mutex> guard(_lock);
378 return _getHybridLastPersisted();
384 static bool resizeHandler(
Store store_,
size_t size_,
void* data_)
386 HandlerData* handlerData = (HandlerData*)data_;
388 return handlerData->_handler(store_, size_, handlerData->_data);
392 amps_uint64_t _getHybridLowestUnpersisted()
const
396 if (filemin == AMPS_UNSET_SEQUENCE)
400 if (memmin == AMPS_UNSET_SEQUENCE || memmin > filemin)
409 amps_uint64_t _getHybridHighestUnpersisted()
const
411 amps_uint64_t filemax = _fileStore.getHighestUnpersisted();
412 amps_uint64_t memmax = _memStore.getHighestUnpersisted();
413 if (filemax == AMPS_UNSET_SEQUENCE)
417 if (memmax == AMPS_UNSET_SEQUENCE || memmax < filemax)
425 amps_uint64_t _getHybridLastPersisted()
428 if (!_lowestIndexInMemory &&
436 return (memLast < fileLast) ? memLast : fileLast;
439 MemoryPublishStore _memStore;
440 PublishStore _fileStore;
442 size_t _lowWatermark;
443 amps_uint64_t _lowestIndexInMemory;
445 HandlerData _handlerData;
446 volatile bool _holdSwapping;
Provides AMPS::MemoryPublishStore, a publish store that holds messages in memory.
Provides AMPS::PublishStore, a publish store that uses memory-mapped files to provide a publish store...
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Core type, function, and class declarations for the AMPS C++ client.
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:921
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:492
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:547
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:685
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:628
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:579
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:697
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
An implementation of StoreImpl for publication.
Definition: HybridPublishStore.hpp:48
size_t unpersistedCount() const
The number of messages in the Store that have not been discarded.
Definition: HybridPublishStore.hpp:238
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: HybridPublishStore.hpp:251
void setResizeHandler(PublishStoreResizeHandler handler_, void *data_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: HybridPublishStore.hpp:351
void setLowWatermark(size_t lowWatermark_)
Set how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:159
amps_uint64_t store(const Message &message_)
Used internally by Client to put messages into the Store.
Definition: HybridPublishStore.hpp:319
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: HybridPublishStore.hpp:375
HybridPublishStore(const char *fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:118
size_t getLowWatermark()
Get how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:171
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:139
void replay(StoreReplayer &replayer_)
Used internally by Client to replay messages in the store to AMPS after a successful connection.
Definition: HybridPublishStore.hpp:216
void discardUpTo(amps_uint64_t index_)
Discard all messages in the store up to and including index_.
Definition: HybridPublishStore.hpp:180
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: HybridPublishStore.hpp:363
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: HybridPublishStore.hpp:299
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:512
A StoreImpl implementation that uses a memory-mapped file to provide a publish store that persists ac...
Definition: PublishStore.hpp:47
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:928
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:989
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1013
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:900
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1035