AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.3
BlockStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKSTORE_H_
27 #define _BLOCKSTORE_H_
28 #include <amps/ampsplusplus.hpp>
29 #include <amps/Buffer.hpp>
30 #include <sstream>
31 #include <string>
32 #include <map>
33 #include <amps/ampscrc.hpp>
34 #if __cplusplus >= 201103L || _MSC_VER >= 1900
35  #include <atomic>
36 #endif
37 
38 #ifdef _WIN32
39  #include <intrin.h>
40  #include <sys/timeb.h>
41 #else
42  #include <sys/time.h>
43 #endif
44 #include <iostream>
45 
50 
51 namespace AMPS
52 {
58  class BlockStore
59  {
60  public:
63  enum Constants : amps_uint32_t
64  {
65  DEFAULT_BLOCK_HEADER_SIZE = 32,
66  DEFAULT_BLOCKS_PER_REALLOC = 1000,
67  DEFAULT_BLOCK_SIZE = 2048
68  };
69 
72  class Block
73  {
74  public:
75  // The offset of the Block's data in the buffer.
76  size_t _offset;
77  // The sequence number associated with the Block.
78  amps_uint64_t _sequence;
79  // The next Block in the chain when data is in multiple Blocks.
80  Block* _nextInChain;
81  // The next Block in list of available or free Blocks.
82  Block* _nextInList;
83 
84  // Create Block with given offset
85  Block(size_t offset_) : _offset(offset_), _sequence(0)
86  , _nextInChain(0), _nextInList(0)
87  { ; }
88 
89  // Create Block with _nextInList at an address one Block farther
90  // than self. Convenient for creating arrays of Blocks.
91  Block() : _offset(0), _sequence(0)
92  , _nextInChain(0), _nextInList((Block*)(this + 1))
93  { ; }
94 
95  // Init Block to an offset at index_ * blockSize_
96  Block* init(size_t index_, amps_uint32_t blockSize_)
97  {
98  _offset = index_ * blockSize_;
99  return this;
100  }
101 
102  // Set Block to given offset and return pointer to self
103  Block* setOffset(size_t offset_)
104  {
105  _offset = offset_;
106  return this;
107  }
108 
109  };
110 
111  private:
112  // Typedefs
113  typedef Lock<Mutex> BufferLock;
114  typedef Unlock<Mutex> BufferUnlock;
115  typedef bool (*ResizeHandler)(size_t, void*);
116  typedef std::vector<Block*> BlockList;
117 
118  public:
134  BlockStore(Buffer* buffer_,
135  amps_uint32_t blocksPerRealloc_ = DEFAULT_BLOCKS_PER_REALLOC,
136  amps_uint32_t blockHeaderSize_ = DEFAULT_BLOCK_HEADER_SIZE,
137  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
138  : _buffer(buffer_), _freeList(0), _usedList(0)
139  , _endOfUsedList(0), _blocksPerRealloc(blocksPerRealloc_)
140  , _blockSize(blockSize_), _blockHeaderSize(blockHeaderSize_)
141  , _blocksAvailable(0), _resizeHandler(0), _resizeUserData(0)
142  , _resizing(false)
143  {
144  }
145 
149  {
150  for (BlockList::iterator i = _blockList.begin();
151  i != _blockList.end(); ++i)
152  {
153  delete[] *i;
154  }
155  delete _buffer;
156  }
157 
160  amps_uint32_t getBlockSize() const
161  {
162  return _blockSize;
163  }
164 
167  amps_uint32_t getBlockHeaderSize() const
168  {
169  return _blockHeaderSize;
170  }
171 
176  void acquireRead() const
177  {
178  _lock.acquireRead();
179  }
180 
183  void releaseRead() const
184  {
185  _lock.releaseRead();
186  }
187 
190  void signalAll()
191  {
192  _lock.signalAll();
193  }
194 
197  void wait()
198  {
199  _lock.wait();
200  }
201 
206  bool wait(long timeout_)
207  {
208  return _lock.wait(timeout_);
209  }
210 
220  void setResizeHandler(ResizeHandler resizeHandler_, void* userData_)
221  {
222  _resizeHandler = resizeHandler_;
223  _resizeUserData = userData_;
224  }
225 
229  // Lock should already be acquired
230  Block* front() const
231  {
232  return _usedList;
233  }
234 
239  // Lock should already be acquired
240  Block* back() const
241  {
242  return _endOfUsedList;
243  }
244 
248  // Lock should already be acquired
249  void setFreeList(Block* block_, amps_uint32_t freeCount_)
250  {
251  _freeList = block_;
252  _blocksAvailable = freeCount_;
253  }
254 
257  // Lock should already be acquired
258  void setUsedList(Block* block_)
259  {
260  _usedList = block_;
261  }
262 
265  // Lock should already be acquired
266  void setEndOfUsedList(Block* block_)
267  {
268  _endOfUsedList = block_;
269  }
270 
274  // Lock should already be acquired
275  void addBlocks(Block* blockArray_)
276  {
277  _blockList.push_back(blockArray_);
278  }
279 
284  // Lock should already be acquired
285  Block* get(amps_uint32_t numBlocksInChain_)
286  {
287  // Check that we have enough blocks
288  // Do this in a loop since resize can possibly return without resizing
289  // and may still leave us needing more space.
290  while (_blocksAvailable < numBlocksInChain_)
291  {
292  // Resize by required multiple of blockPerRealloc
293  unsigned int blocksNeeded = numBlocksInChain_ - _blocksAvailable;
294  amps_uint32_t addedBlocks = (blocksNeeded / _blocksPerRealloc + 1)
295  * _blocksPerRealloc;
296  size_t size = _buffer->getSize() + (addedBlocks * _blockSize);
297  resize(size);
298  }
299  // Return first free block with others as _nextInChain
300  Block* first = 0;
301  Block* last = 0;
302  Block* next = 0;
303  for (unsigned int i = 0; i < numBlocksInChain_; ++i)
304  {
305  // Take from free list and advance
306  next = _freeList;
307  _freeList = _freeList->_nextInList;
308  next->_nextInList = 0;
309  if (!first)
310  {
311  // First, set it up
312  first = next;
313  last = next;
314  }
315  else
316  {
317  // Not first, add it to chain
318  last->_nextInChain = next;
319  last = next;
320  }
321  }
322  assert(first);
323  // Set _usedList or add it to the end of the used list
324  if (!_usedList)
325  {
326  _usedList = first;
327  }
328  else
329  {
330  _endOfUsedList->_nextInList = first;
331  }
332  _endOfUsedList = first;
333  _blocksAvailable -= numBlocksInChain_;
334  return first;
335  }
336 
340  // Lock should already be acquired
341  void put(Block* block_)
342  {
343  assert(_usedList);
344  assert(_endOfUsedList);
345  // Remove from used list
346  if (_usedList == block_)
347  {
348  // Easy
349  _usedList = _usedList->_nextInList;
350  if (!_usedList)
351  {
352  _endOfUsedList = 0;
353  }
354  }
355  else
356  {
357  // Search and remove the block
358  Block* used = _usedList;
359  while (used)
360  {
361  if (used->_nextInList == block_)
362  {
363  used->_nextInList = block_->_nextInList;
364  break;
365  }
366  used = used->_nextInList;
367  if (!_usedList) // -V1051
368  {
369  _endOfUsedList = 0;
370  }
371  }
372  }
373  // Add to free list
374  _flattenToFreeList(block_);
375  }
376 
380  // Lock should already be acquired
381  AMPS_ATOMIC_BASE_TYPE put(amps_uint64_t sequence_)
382  {
383  assert(_usedList);
384  assert(_endOfUsedList);
385  Block* used = _usedList;
386  AMPS_ATOMIC_BASE_TYPE removalCount = 0;
387  while (used && used->_sequence <= sequence_)
388  {
389  Block* next = used->_nextInList;
390  // Add to free list
391  _flattenToFreeList(used);
392  used = next;
393  ++removalCount;
394  }
395  _usedList = used;
396  if (!used)
397  {
398  _endOfUsedList = 0;
399  }
400  return removalCount;
401  }
402 
406  // Lock should already be acquired
407  void putAll(Block* block_)
408  {
409  // Remove from used list
410  Block* newEndOfUsedList = 0;
411  for (Block* used = _usedList; used; used = used->_nextInList)
412  {
413  if (used == block_)
414  {
415  if (newEndOfUsedList)
416  {
417  newEndOfUsedList->_nextInList = 0;
418  }
419  else
420  {
421  _usedList = 0;
422  }
423  _endOfUsedList = newEndOfUsedList;
424  }
425  newEndOfUsedList = used;
426  }
427  // Add all remaining to free list
428  Block* next = 0;
429  for (Block* block = block_; block; block = next)
430  {
431  next = block->_nextInList;
432  _flattenToFreeList(block);
433  }
434  }
435 
438  // Lock should already be held
439  void init()
440  {
441  size_t startSize = _buffer->getSize();
442  if (!startSize)
443  {
445  startSize = _buffer->getSize();
446  }
447  // How many blocks are we resizing
448  amps_uint32_t numBlocks = (amps_uint32_t)(startSize) / getBlockSize();
449  _freeList = new Block[numBlocks];
450  _blockList.push_back(_freeList);
451  for (size_t i = 0; i < numBlocks; ++i)
452  {
453  _freeList[i].init(i, getBlockSize());
454  }
455  _freeList[numBlocks - 1]._nextInList = 0;
456  _blocksAvailable += numBlocks;
457  assert(_freeList);
458  assert(_blocksAvailable);
459  }
460 
463  size_t getDefaultResizeSize() const
464  {
465  return _blocksPerRealloc * _blockSize;
466  }
467 
470  amps_uint32_t getDefaultResizeBlocks() const
471  {
472  return _blocksPerRealloc;
473  }
474 
482  // Lock should already be held
483  Block* resizeBuffer(size_t size_, amps_uint32_t* pNewBlocks_)
484  {
485  Block* freeList = 0;
486  while (_resizing)
487  {
488  if (_buffer->getSize() >= size_)
489  {
490  return freeList;
491  }
492  if (!_lock.wait(1000))
493  {
494  amps_invoke_waiting_function();
495  }
496  }
497  FlagFlip flip(&_resizing);
498  bool okToResize = false;
499  if (true)
500  {
501  BufferUnlock u(_lock);
502  // Don't do anything if resizeHandler says no
503  okToResize = _canResize(size_);
504  }
505  if (!okToResize)
506  {
507  return freeList;
508  }
509  try
510  {
511  _lock.signalAll();
512  size_t oldSize = _buffer->getSize();
513  amps_uint32_t oldBlocks = (amps_uint32_t)(oldSize / getBlockSize());
514  if (oldSize >= size_)
515  {
516  *pNewBlocks_ = 0;
517  return freeList;
518  }
519  _buffer->setSize(size_);
520  _buffer->zero(oldSize, size_ - oldSize);
521  // How many blocks are we resizing
522  *pNewBlocks_ = (amps_uint32_t)((size_ - oldSize) / getBlockSize());
523  freeList = new Block[*pNewBlocks_];
524  for (size_t i = 0; i < *pNewBlocks_; ++i)
525  {
526  freeList[i].init(oldBlocks + i, getBlockSize());
527  }
528  freeList[*pNewBlocks_ - 1]._nextInList = 0;
529  }
530 #ifdef _WIN32
531  catch (const std::bad_alloc&)
532 #else
533  catch (const std::bad_alloc& e)
534 #endif
535  {
536  std::ostringstream os;
537  os << "BlockStore failed to allocate " << size_
538  << " bytes for resize of store from " << _buffer->getSize()
539  << " bytes.";
540  throw StoreException(os.str());
541  }
542  return freeList;
543  }
544 
548  // Lock should already be held
549  void resize(size_t size_)
550  {
551  amps_uint32_t newBlocks = 0;
552  Block* addedBlockList = resizeBuffer(size_, &newBlocks);
553  if (!addedBlockList || !newBlocks)
554  {
555  // Maybe we didn't have to allocate in this thread
556  return;
557  }
558  _blockList.push_back(addedBlockList);
559  addedBlockList[newBlocks - 1]._nextInList = _freeList;
560  _freeList = addedBlockList;
561  _blocksAvailable += newBlocks;
562  assert(_blocksAvailable);
563  }
564 
567  // Lock should be held, no blocks should be used or allocated
568  amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
569  {
570  if (_usedList || _freeList)
571  {
572  return 0;
573  }
574  amps_uint32_t oldSize = _blockSize;
575  _blockSize = blockSize_;
576  return oldSize;
577  }
578 
582  // Lock should be held, no blocks should be used or allocated
583  amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
584  {
585  if (_usedList || _freeList)
586  {
587  return 0;
588  }
589  amps_uint32_t oldSize = _blockHeaderSize;
590  _blockHeaderSize = blockHeaderSize_;
591  return oldSize;
592  }
593 
596  // Lock should already be held
598  {
599  return _buffer;
600  }
601 
602  private:
604  bool _canResize(size_t requestedSize_)
605  {
606  if (_resizeHandler)
607  {
608  return _resizeHandler(requestedSize_, _resizeUserData);
609  }
610  else
611  {
612  return true;
613  }
614  }
615 
616  // Lock should already be acquired
617  void _flattenToFreeList(Block* block_)
618  {
619  // Flatten chain to front of free list
620  Block* current = block_;
621  while (current)
622  {
623  Block* chain = current->_nextInChain;
624  // Clear the header
625  _buffer->zero(current->_offset, _blockHeaderSize);
626  // Prepend to the free list and clear other values
627  current->_nextInList = _freeList;
628  _freeList = current;
629  ++_blocksAvailable;
630  current->_sequence = (amps_uint64_t)0;
631  current->_nextInChain = 0;
632  current = chain;
633  }
634  assert(_freeList);
635  assert(_blocksAvailable);
636  }
637 
638  // Member variables
639  // Buffer to use for storage
640  Buffer* _buffer;
641 
642  // The Block accounting
643  Block* _freeList;
644  Block* _usedList;
645  Block* _endOfUsedList;
646  // How much to resize buffer when needed
647  amps_uint32_t _blocksPerRealloc;
648  // How big is each Block, and what part is header
649  amps_uint32_t _blockSize;
650  amps_uint32_t _blockHeaderSize;
651  // How many blocks are free
652  amps_uint32_t _blocksAvailable;
653  // ResizeHandler to call before resizing
654  ResizeHandler _resizeHandler;
655  // ResizeHandler data
656  void* _resizeUserData;
657  // List of every allocated slab of Blocks
658  BlockList _blockList;
659  // Flag to control resizing
660 #if __cplusplus >= 201103L || _MSC_VER >= 1900
661  std::atomic<bool> _resizing;
662 #else
663  volatile bool _resizing;
664 #endif
665 
666  // Lock for _buffer
667  mutable Mutex _lock;
668 
669  };
670 
671 }
672 
673 #endif
674 
void wait()
Wait for a signal.
Definition: BlockStore.hpp:197
void setResizeHandler(ResizeHandler resizeHandler_, void *userData_)
Set a resize handler that is called with the new total size of the Buffer.
Definition: BlockStore.hpp:220
amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
Set the size to use for all Blocks.
Definition: BlockStore.hpp:568
void acquireRead() const
Acquire the lock for this object.
Definition: BlockStore.hpp:176
Constants
Default constant values for BlockStore.
Definition: BlockStore.hpp:63
Buffer * getBuffer()
Return the buffer underlying the store for direct write/read.
Definition: BlockStore.hpp:597
void put(Block *block_)
Return the given chain of Blocks to the free list for reuse.
Definition: BlockStore.hpp:341
~BlockStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockStore.hpp:148
void releaseRead() const
Release the lock for this object. Used by RAII templates.
Definition: BlockStore.hpp:183
Block * front() const
Get the first used block in the store.
Definition: BlockStore.hpp:230
amps_uint32_t getDefaultResizeBlocks() const
Return the default number of blocks for each resize.
Definition: BlockStore.hpp:470
void addBlocks(Block *blockArray_)
Allow users to create Block arrays during recovery that are tracked for cleanup here with all other B...
Definition: BlockStore.hpp:275
bool wait(long timeout_)
Wait timeout_ ms for a signal.
Definition: BlockStore.hpp:206
Block * back() const
Get the last used block in the store.
Definition: BlockStore.hpp:240
void resize(size_t size_)
Resize the buffer to the requested size, adding all new space as unused Blocks for the free list...
Definition: BlockStore.hpp:549
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
void init()
Initialize, assuming that _buffer has no existing information.
Definition: BlockStore.hpp:439
void setEndOfUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:266
size_t getDefaultResizeSize() const
Return the default number of bytes for each resize.
Definition: BlockStore.hpp:463
void setUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:258
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
Set the size to use for the header for all Blocks.
Definition: BlockStore.hpp:583
void setFreeList(Block *block_, amps_uint32_t freeCount_)
Allow containing classes to initialize the free list in recovery.
Definition: BlockStore.hpp:249
amps_uint32_t getBlockHeaderSize() const
Get the size of a header within each Block, as set in the constructor.
Definition: BlockStore.hpp:167
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
BlockStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=DEFAULT_BLOCKS_PER_REALLOC, amps_uint32_t blockHeaderSize_=DEFAULT_BLOCK_HEADER_SIZE, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockStore using buffer_ and default block size, that grows by blocksPerRealloc_ blocks when...
Definition: BlockStore.hpp:134
void putAll(Block *block_)
Return all Blocks starting with the given Block to the free list.
Definition: BlockStore.hpp:407
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Block * resizeBuffer(size_t size_, amps_uint32_t *pNewBlocks_)
Resize the buffer to the requested size, returning all new space.
Definition: BlockStore.hpp:483
void signalAll()
Signal lock waiters.
Definition: BlockStore.hpp:190
amps_uint32_t getBlockSize() const
Get the size of each Block, as set in the constructor.
Definition: BlockStore.hpp:160
AMPS_ATOMIC_BASE_TYPE put(amps_uint64_t sequence_)
Return all Blocks with sequence <= sequence_ for reuse.
Definition: BlockStore.hpp:381
Definition: ampsplusplus.hpp:102