26 #ifndef _BLOCKSTORE_H_ 27 #define _BLOCKSTORE_H_ 33 #include <amps/ampscrc.hpp> 34 #if __cplusplus >= 201103L || _MSC_VER >= 1900 40 #include <sys/timeb.h> 68 DEFAULT_BLOCK_HEADER_SIZE = 32,
69 DEFAULT_BLOCKS_PER_REALLOC = 1000,
70 DEFAULT_BLOCK_SIZE = 2048
81 amps_uint64_t _sequence;
88 Block(
size_t offset_) : _offset(offset_), _sequence(0)
89 , _nextInChain(0), _nextInList(0)
94 Block() : _offset(0), _sequence(0)
95 , _nextInChain(0), _nextInList((
Block*)(
this + 1))
99 Block* init(
size_t index_, amps_uint32_t blockSize_)
101 _offset = index_ * blockSize_;
106 Block* setOffset(
size_t offset_)
116 typedef Lock<Mutex> BufferLock;
117 typedef Unlock<Mutex> BufferUnlock;
118 typedef bool (*ResizeHandler)(size_t,
void*);
119 typedef std::vector<Block*> BlockList;
138 amps_uint32_t blocksPerRealloc_ = DEFAULT_BLOCKS_PER_REALLOC,
139 amps_uint32_t blockHeaderSize_ = DEFAULT_BLOCK_HEADER_SIZE,
140 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
141 : _buffer(buffer_), _freeList(0), _usedList(0)
142 , _endOfUsedList(0), _blocksPerRealloc(blocksPerRealloc_)
143 , _blockSize(blockSize_), _blockHeaderSize(blockHeaderSize_)
144 , _blocksAvailable(0), _resizeHandler(0), _resizeUserData(0)
153 for (BlockList::iterator i = _blockList.begin();
154 i != _blockList.end(); ++i)
172 return _blockHeaderSize;
211 return _lock.wait(timeout_);
225 _resizeHandler = resizeHandler_;
226 _resizeUserData = userData_;
245 return _endOfUsedList;
255 _blocksAvailable = freeCount_;
271 _endOfUsedList = block_;
280 _blockList.push_back(blockArray_);
288 Block*
get(amps_uint32_t numBlocksInChain_)
293 while (_blocksAvailable < numBlocksInChain_)
296 unsigned int blocksNeeded = numBlocksInChain_ - _blocksAvailable;
297 amps_uint32_t addedBlocks = (blocksNeeded / _blocksPerRealloc + 1)
299 size_t size = _buffer->getSize() + (addedBlocks * _blockSize);
306 for (
unsigned int i = 0; i < numBlocksInChain_; ++i)
310 _freeList = _freeList->_nextInList;
311 next->_nextInList = 0;
321 last->_nextInChain = next;
333 _endOfUsedList->_nextInList = first;
335 _endOfUsedList = first;
336 _blocksAvailable -= numBlocksInChain_;
347 assert(_endOfUsedList);
349 if (_usedList == block_)
352 _usedList = _usedList->_nextInList;
361 Block* used = _usedList;
364 if (used->_nextInList == block_)
366 used->_nextInList = block_->_nextInList;
369 used = used->_nextInList;
377 _flattenToFreeList(block_);
384 AMPS_ATOMIC_BASE_TYPE
put(amps_uint64_t sequence_)
387 assert(_endOfUsedList);
388 Block* used = _usedList;
389 AMPS_ATOMIC_BASE_TYPE removalCount = 0;
390 while (used && used->_sequence <= sequence_)
392 Block* next = used->_nextInList;
394 _flattenToFreeList(used);
413 Block* newEndOfUsedList = 0;
414 for (
Block* used = _usedList; used; used = used->_nextInList)
418 if (newEndOfUsedList)
420 newEndOfUsedList->_nextInList = 0;
426 _endOfUsedList = newEndOfUsedList;
428 newEndOfUsedList = used;
432 for (
Block* block = block_; block; block = next)
434 next = block->_nextInList;
435 _flattenToFreeList(block);
444 size_t startSize = _buffer->getSize();
448 startSize = _buffer->getSize();
451 amps_uint32_t numBlocks = (amps_uint32_t)(startSize) /
getBlockSize();
452 _freeList =
new Block[numBlocks];
453 _blockList.push_back(_freeList);
454 for (
size_t i = 0; i < numBlocks; ++i)
458 _freeList[numBlocks - 1]._nextInList = 0;
459 _blocksAvailable += numBlocks;
461 assert(_blocksAvailable);
468 return _blocksPerRealloc * _blockSize;
475 return _blocksPerRealloc;
491 if (_buffer->getSize() >= size_)
495 if (!_lock.wait(1000))
497 amps_invoke_waiting_function();
500 FlagFlip flip(&_resizing);
501 bool okToResize =
false;
504 BufferUnlock u(_lock);
506 okToResize = _canResize(size_);
515 size_t oldSize = _buffer->getSize();
516 amps_uint32_t oldBlocks = (amps_uint32_t)(oldSize /
getBlockSize());
517 if (oldSize >= size_)
522 _buffer->setSize(size_);
523 _buffer->zero(oldSize, size_ - oldSize);
525 *pNewBlocks_ = (amps_uint32_t)((size_ - oldSize) /
getBlockSize());
526 freeList =
new Block[*pNewBlocks_];
527 for (
size_t i = 0; i < *pNewBlocks_; ++i)
531 freeList[*pNewBlocks_ - 1]._nextInList = 0;
534 catch (
const std::bad_alloc&)
536 catch (
const std::bad_alloc& e)
539 std::ostringstream os;
540 os <<
"BlockStore failed to allocate " << size_
541 <<
" bytes for resize of store from " << _buffer->getSize()
543 throw StoreException(os.str());
554 amps_uint32_t newBlocks = 0;
556 if (!addedBlockList || !newBlocks)
561 _blockList.push_back(addedBlockList);
562 addedBlockList[newBlocks - 1]._nextInList = _freeList;
563 _freeList = addedBlockList;
564 _blocksAvailable += newBlocks;
565 assert(_blocksAvailable);
573 if (_usedList || _freeList)
577 amps_uint32_t oldSize = _blockSize;
578 _blockSize = blockSize_;
588 if (_usedList || _freeList)
592 amps_uint32_t oldSize = _blockHeaderSize;
593 _blockHeaderSize = blockHeaderSize_;
607 bool _canResize(
size_t requestedSize_)
611 return _resizeHandler(requestedSize_, _resizeUserData);
620 void _flattenToFreeList(
Block* block_)
623 Block* current = block_;
626 Block* chain = current->_nextInChain;
628 _buffer->zero(current->_offset, _blockHeaderSize);
630 current->_nextInList = _freeList;
633 current->_sequence = (amps_uint64_t)0;
634 current->_nextInChain = 0;
638 assert(_blocksAvailable);
648 Block* _endOfUsedList;
650 amps_uint32_t _blocksPerRealloc;
652 amps_uint32_t _blockSize;
653 amps_uint32_t _blockHeaderSize;
655 amps_uint32_t _blocksAvailable;
657 ResizeHandler _resizeHandler;
659 void* _resizeUserData;
661 BlockList _blockList;
663 #if __cplusplus >= 201103L || _MSC_VER >= 1900 664 std::atomic<bool> _resizing;
666 volatile bool _resizing;
void wait()
Wait for a signal.
Definition: BlockStore.hpp:200
void setResizeHandler(ResizeHandler resizeHandler_, void *userData_)
Set a resize handler that is called with the new total size of the Buffer.
Definition: BlockStore.hpp:223
amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
Set the size to use for all Blocks.
Definition: BlockStore.hpp:571
void acquireRead() const
Acquire the lock for this object.
Definition: BlockStore.hpp:179
Constants
Default constant values for BlockStore.
Definition: BlockStore.hpp:66
Buffer * getBuffer()
Return the buffer underlying the store for direct write/read.
Definition: BlockStore.hpp:600
void put(Block *block_)
Return the given chain of Blocks to the free list for reuse.
Definition: BlockStore.hpp:344
~BlockStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockStore.hpp:151
void releaseRead() const
Release the lock for this object. Used by RAII templates.
Definition: BlockStore.hpp:186
Block * front() const
Get the first used block in the store.
Definition: BlockStore.hpp:233
amps_uint32_t getDefaultResizeBlocks() const
Return the default number of blocks for each resize.
Definition: BlockStore.hpp:473
void addBlocks(Block *blockArray_)
Allow users to create Block arrays during recovery that are tracked for cleanup here with all other B...
Definition: BlockStore.hpp:278
bool wait(long timeout_)
Wait timeout_ ms for a signal.
Definition: BlockStore.hpp:209
Block * back() const
Get the last used block in the store.
Definition: BlockStore.hpp:243
void resize(size_t size_)
Resize the buffer to the requested size, adding all new space as unused Blocks for the free list...
Definition: BlockStore.hpp:552
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:61
void init()
Initialize, assuming that _buffer has no existing information.
Definition: BlockStore.hpp:442
void setEndOfUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:269
size_t getDefaultResizeSize() const
Return the default number of bytes for each resize.
Definition: BlockStore.hpp:466
void setUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:261
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
Set the size to use for the header for all Blocks.
Definition: BlockStore.hpp:586
void setFreeList(Block *block_, amps_uint32_t freeCount_)
Allow containing classes to initialize the free list in recovery.
Definition: BlockStore.hpp:252
amps_uint32_t getBlockHeaderSize() const
Get the size of a header within each Block, as set in the constructor.
Definition: BlockStore.hpp:170
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
BlockStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=DEFAULT_BLOCKS_PER_REALLOC, amps_uint32_t blockHeaderSize_=DEFAULT_BLOCK_HEADER_SIZE, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockStore using buffer_ and default block size, that grows by blocksPerRealloc_ blocks when...
Definition: BlockStore.hpp:137
void putAll(Block *block_)
Return all Blocks starting with the given Block to the free list.
Definition: BlockStore.hpp:410
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:75
Block * resizeBuffer(size_t size_, amps_uint32_t *pNewBlocks_)
Resize the buffer to the requested size, returning all new space.
Definition: BlockStore.hpp:486
void signalAll()
Signal lock waiters.
Definition: BlockStore.hpp:193
amps_uint32_t getBlockSize() const
Get the size of each Block, as set in the constructor.
Definition: BlockStore.hpp:163
AMPS_ATOMIC_BASE_TYPE put(amps_uint64_t sequence_)
Return all Blocks with sequence <= sequence_ for reuse.
Definition: BlockStore.hpp:384
Definition: ampsplusplus.hpp:106