AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.2
BlockStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKSTORE_H_
27 #define _BLOCKSTORE_H_
28 #include <amps/ampsplusplus.hpp>
29 #include <amps/Buffer.hpp>
30 #include <sstream>
31 #include <string>
32 #include <map>
33 #include <amps/ampscrc.hpp>
34 #if __cplusplus >= 201103L || _MSC_VER >= 1900
35  #include <atomic>
36 #endif
37 
38 #ifdef _WIN32
39  #include <intrin.h>
40  #include <sys/timeb.h>
41 #else
42  #include <sys/time.h>
43  #if AMPS_SSE_42
44  #include <cpuid.h>
45  #endif
46 #endif
47 #include <iostream>
48 
53 
54 namespace AMPS
55 {
61  class BlockStore
62  {
63  public:
66  enum Constants : amps_uint32_t
67  {
68  DEFAULT_BLOCK_HEADER_SIZE = 32,
69  DEFAULT_BLOCKS_PER_REALLOC = 1000,
70  DEFAULT_BLOCK_SIZE = 2048
71  };
72 
75  class Block
76  {
77  public:
78  // The offset of the Block's data in the buffer.
79  size_t _offset;
80  // The sequence number associated with the Block.
81  amps_uint64_t _sequence;
82  // The next Block in the chain when data is in multiple Blocks.
83  Block* _nextInChain;
84  // The next Block in list of available or free Blocks.
85  Block* _nextInList;
86 
87  // Create Block with given offset
88  Block(size_t offset_) : _offset(offset_), _sequence(0)
89  , _nextInChain(0), _nextInList(0)
90  { ; }
91 
92  // Create Block with _nextInList at an address one Block farther
93  // than self. Convenient for creating arrays of Blocks.
94  Block() : _offset(0), _sequence(0)
95  , _nextInChain(0), _nextInList((Block*)(this + 1))
96  { ; }
97 
98  // Init Block to an offset at index_ * blockSize_
99  Block* init(size_t index_, amps_uint32_t blockSize_)
100  {
101  _offset = index_ * blockSize_;
102  return this;
103  }
104 
105  // Set Block to given offset and return pointer to self
106  Block* setOffset(size_t offset_)
107  {
108  _offset = offset_;
109  return this;
110  }
111 
112  };
113 
114  private:
115  // Typedefs
116  typedef Lock<Mutex> BufferLock;
117  typedef Unlock<Mutex> BufferUnlock;
118  typedef bool (*ResizeHandler)(size_t, void*);
119  typedef std::vector<Block*> BlockList;
120 
121  public:
137  BlockStore(Buffer* buffer_,
138  amps_uint32_t blocksPerRealloc_ = DEFAULT_BLOCKS_PER_REALLOC,
139  amps_uint32_t blockHeaderSize_ = DEFAULT_BLOCK_HEADER_SIZE,
140  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
141  : _buffer(buffer_), _freeList(0), _usedList(0)
142  , _endOfUsedList(0), _blocksPerRealloc(blocksPerRealloc_)
143  , _blockSize(blockSize_), _blockHeaderSize(blockHeaderSize_)
144  , _blocksAvailable(0), _resizeHandler(0), _resizeUserData(0)
145  , _resizing(false)
146  {
147  }
148 
152  {
153  for (BlockList::iterator i = _blockList.begin();
154  i != _blockList.end(); ++i)
155  {
156  delete[] *i;
157  }
158  delete _buffer;
159  }
160 
163  amps_uint32_t getBlockSize() const
164  {
165  return _blockSize;
166  }
167 
170  amps_uint32_t getBlockHeaderSize() const
171  {
172  return _blockHeaderSize;
173  }
174 
179  void acquireRead() const
180  {
181  _lock.acquireRead();
182  }
183 
186  void releaseRead() const
187  {
188  _lock.releaseRead();
189  }
190 
193  void signalAll()
194  {
195  _lock.signalAll();
196  }
197 
200  void wait()
201  {
202  _lock.wait();
203  }
204 
209  bool wait(long timeout_)
210  {
211  return _lock.wait(timeout_);
212  }
213 
223  void setResizeHandler(ResizeHandler resizeHandler_, void* userData_)
224  {
225  _resizeHandler = resizeHandler_;
226  _resizeUserData = userData_;
227  }
228 
232  // Lock should already be acquired
233  Block* front() const
234  {
235  return _usedList;
236  }
237 
242  // Lock should already be acquired
243  Block* back() const
244  {
245  return _endOfUsedList;
246  }
247 
251  // Lock should already be acquired
252  void setFreeList(Block* block_, amps_uint32_t freeCount_)
253  {
254  _freeList = block_;
255  _blocksAvailable = freeCount_;
256  }
257 
260  // Lock should already be acquired
261  void setUsedList(Block* block_)
262  {
263  _usedList = block_;
264  }
265 
268  // Lock should already be acquired
269  void setEndOfUsedList(Block* block_)
270  {
271  _endOfUsedList = block_;
272  }
273 
277  // Lock should already be acquired
278  void addBlocks(Block* blockArray_)
279  {
280  _blockList.push_back(blockArray_);
281  }
282 
287  // Lock should already be acquired
288  Block* get(amps_uint32_t numBlocksInChain_)
289  {
290  // Check that we have enough blocks
291  // Do this in a loop since resize can possibly return without resizing
292  // and may still leave us needing more space.
293  while (_blocksAvailable < numBlocksInChain_)
294  {
295  // Resize by required multiple of blockPerRealloc
296  unsigned int blocksNeeded = numBlocksInChain_ - _blocksAvailable;
297  amps_uint32_t addedBlocks = (blocksNeeded / _blocksPerRealloc + 1)
298  * _blocksPerRealloc;
299  size_t size = _buffer->getSize() + (addedBlocks * _blockSize);
300  resize(size);
301  }
302  // Return first free block with others as _nextInChain
303  Block* first = 0;
304  Block* last = 0;
305  Block* next = 0;
306  for (unsigned int i = 0; i < numBlocksInChain_; ++i)
307  {
308  // Take from free list and advance
309  next = _freeList;
310  _freeList = _freeList->_nextInList;
311  next->_nextInList = 0;
312  if (!first)
313  {
314  // First, set it up
315  first = next;
316  last = next;
317  }
318  else
319  {
320  // Not first, add it to chain
321  last->_nextInChain = next;
322  last = next;
323  }
324  }
325  assert(first);
326  // Set _usedList or add it to the end of the used list
327  if (!_usedList)
328  {
329  _usedList = first;
330  }
331  else
332  {
333  _endOfUsedList->_nextInList = first;
334  }
335  _endOfUsedList = first;
336  _blocksAvailable -= numBlocksInChain_;
337  return first;
338  }
339 
343  // Lock should already be acquired
344  void put(Block* block_)
345  {
346  assert(_usedList);
347  assert(_endOfUsedList);
348  // Remove from used list
349  if (_usedList == block_)
350  {
351  // Easy
352  _usedList = _usedList->_nextInList;
353  if (!_usedList)
354  {
355  _endOfUsedList = 0;
356  }
357  }
358  else
359  {
360  // Search and remove the block
361  Block* used = _usedList;
362  while (used)
363  {
364  if (used->_nextInList == block_)
365  {
366  used->_nextInList = block_->_nextInList;
367  break;
368  }
369  used = used->_nextInList;
370  if (!_usedList) // -V1051
371  {
372  _endOfUsedList = 0;
373  }
374  }
375  }
376  // Add to free list
377  _flattenToFreeList(block_);
378  }
379 
383  // Lock should already be acquired
384  AMPS_ATOMIC_BASE_TYPE put(amps_uint64_t sequence_)
385  {
386  assert(_usedList);
387  assert(_endOfUsedList);
388  Block* used = _usedList;
389  AMPS_ATOMIC_BASE_TYPE removalCount = 0;
390  while (used && used->_sequence <= sequence_)
391  {
392  Block* next = used->_nextInList;
393  // Add to free list
394  _flattenToFreeList(used);
395  used = next;
396  ++removalCount;
397  }
398  _usedList = used;
399  if (!used)
400  {
401  _endOfUsedList = 0;
402  }
403  return removalCount;
404  }
405 
409  // Lock should already be acquired
410  void putAll(Block* block_)
411  {
412  // Remove from used list
413  Block* newEndOfUsedList = 0;
414  for (Block* used = _usedList; used; used = used->_nextInList)
415  {
416  if (used == block_)
417  {
418  if (newEndOfUsedList)
419  {
420  newEndOfUsedList->_nextInList = 0;
421  }
422  else
423  {
424  _usedList = 0;
425  }
426  _endOfUsedList = newEndOfUsedList;
427  }
428  newEndOfUsedList = used;
429  }
430  // Add all remaining to free list
431  Block* next = 0;
432  for (Block* block = block_; block; block = next)
433  {
434  next = block->_nextInList;
435  _flattenToFreeList(block);
436  }
437  }
438 
441  // Lock should already be held
442  void init()
443  {
444  size_t startSize = _buffer->getSize();
445  if (!startSize)
446  {
448  startSize = _buffer->getSize();
449  }
450  // How many blocks are we resizing
451  amps_uint32_t numBlocks = (amps_uint32_t)(startSize) / getBlockSize();
452  _freeList = new Block[numBlocks];
453  _blockList.push_back(_freeList);
454  for (size_t i = 0; i < numBlocks; ++i)
455  {
456  _freeList[i].init(i, getBlockSize());
457  }
458  _freeList[numBlocks - 1]._nextInList = 0;
459  _blocksAvailable += numBlocks;
460  assert(_freeList);
461  assert(_blocksAvailable);
462  }
463 
466  size_t getDefaultResizeSize() const
467  {
468  return _blocksPerRealloc * _blockSize;
469  }
470 
473  amps_uint32_t getDefaultResizeBlocks() const
474  {
475  return _blocksPerRealloc;
476  }
477 
485  // Lock should already be held
486  Block* resizeBuffer(size_t size_, amps_uint32_t* pNewBlocks_)
487  {
488  Block* freeList = 0;
489  while (_resizing)
490  {
491  if (_buffer->getSize() >= size_)
492  {
493  return freeList;
494  }
495  if (!_lock.wait(1000))
496  {
497  amps_invoke_waiting_function();
498  }
499  }
500  FlagFlip flip(&_resizing);
501  bool okToResize = false;
502  if (true)
503  {
504  BufferUnlock u(_lock);
505  // Don't do anything if resizeHandler says no
506  okToResize = _canResize(size_);
507  }
508  if (!okToResize)
509  {
510  return freeList;
511  }
512  try
513  {
514  _lock.signalAll();
515  size_t oldSize = _buffer->getSize();
516  amps_uint32_t oldBlocks = (amps_uint32_t)(oldSize / getBlockSize());
517  if (oldSize >= size_)
518  {
519  *pNewBlocks_ = 0;
520  return freeList;
521  }
522  _buffer->setSize(size_);
523  _buffer->zero(oldSize, size_ - oldSize);
524  // How many blocks are we resizing
525  *pNewBlocks_ = (amps_uint32_t)((size_ - oldSize) / getBlockSize());
526  freeList = new Block[*pNewBlocks_];
527  for (size_t i = 0; i < *pNewBlocks_; ++i)
528  {
529  freeList[i].init(oldBlocks + i, getBlockSize());
530  }
531  freeList[*pNewBlocks_ - 1]._nextInList = 0;
532  }
533 #ifdef _WIN32
534  catch (const std::bad_alloc&)
535 #else
536  catch (const std::bad_alloc& e)
537 #endif
538  {
539  std::ostringstream os;
540  os << "BlockStore failed to allocate " << size_
541  << " bytes for resize of store from " << _buffer->getSize()
542  << " bytes.";
543  throw StoreException(os.str());
544  }
545  return freeList;
546  }
547 
551  // Lock should already be held
552  void resize(size_t size_)
553  {
554  amps_uint32_t newBlocks = 0;
555  Block* addedBlockList = resizeBuffer(size_, &newBlocks);
556  if (!addedBlockList || !newBlocks)
557  {
558  // Maybe we didn't have to allocate in this thread
559  return;
560  }
561  _blockList.push_back(addedBlockList);
562  addedBlockList[newBlocks - 1]._nextInList = _freeList;
563  _freeList = addedBlockList;
564  _blocksAvailable += newBlocks;
565  assert(_blocksAvailable);
566  }
567 
570  // Lock should be held, no blocks should be used or allocated
571  amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
572  {
573  if (_usedList || _freeList)
574  {
575  return 0;
576  }
577  amps_uint32_t oldSize = _blockSize;
578  _blockSize = blockSize_;
579  return oldSize;
580  }
581 
585  // Lock should be held, no blocks should be used or allocated
586  amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
587  {
588  if (_usedList || _freeList)
589  {
590  return 0;
591  }
592  amps_uint32_t oldSize = _blockHeaderSize;
593  _blockHeaderSize = blockHeaderSize_;
594  return oldSize;
595  }
596 
599  // Lock should already be held
601  {
602  return _buffer;
603  }
604 
605  private:
607  bool _canResize(size_t requestedSize_)
608  {
609  if (_resizeHandler)
610  {
611  return _resizeHandler(requestedSize_, _resizeUserData);
612  }
613  else
614  {
615  return true;
616  }
617  }
618 
619  // Lock should already be acquired
620  void _flattenToFreeList(Block* block_)
621  {
622  // Flatten chain to front of free list
623  Block* current = block_;
624  while (current)
625  {
626  Block* chain = current->_nextInChain;
627  // Clear the header
628  _buffer->zero(current->_offset, _blockHeaderSize);
629  // Prepend to the free list and clear other values
630  current->_nextInList = _freeList;
631  _freeList = current;
632  ++_blocksAvailable;
633  current->_sequence = (amps_uint64_t)0;
634  current->_nextInChain = 0;
635  current = chain;
636  }
637  assert(_freeList);
638  assert(_blocksAvailable);
639  }
640 
641  // Member variables
642  // Buffer to use for storage
643  Buffer* _buffer;
644 
645  // The Block accounting
646  Block* _freeList;
647  Block* _usedList;
648  Block* _endOfUsedList;
649  // How much to resize buffer when needed
650  amps_uint32_t _blocksPerRealloc;
651  // How big is each Block, and what part is header
652  amps_uint32_t _blockSize;
653  amps_uint32_t _blockHeaderSize;
654  // How many blocks are free
655  amps_uint32_t _blocksAvailable;
656  // ResizeHandler to call before resizing
657  ResizeHandler _resizeHandler;
658  // ResizeHandler data
659  void* _resizeUserData;
660  // List of every allocated slab of Blocks
661  BlockList _blockList;
662  // Flag to control resizing
663 #if __cplusplus >= 201103L || _MSC_VER >= 1900
664  std::atomic<bool> _resizing;
665 #else
666  volatile bool _resizing;
667 #endif
668 
669  // Lock for _buffer
670  mutable Mutex _lock;
671 
672  };
673 
674 }
675 
676 #endif
677 
void wait()
Wait for a signal.
Definition: BlockStore.hpp:200
void setResizeHandler(ResizeHandler resizeHandler_, void *userData_)
Set a resize handler that is called with the new total size of the Buffer.
Definition: BlockStore.hpp:223
amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
Set the size to use for all Blocks.
Definition: BlockStore.hpp:571
void acquireRead() const
Acquire the lock for this object.
Definition: BlockStore.hpp:179
Constants
Default constant values for BlockStore.
Definition: BlockStore.hpp:66
Buffer * getBuffer()
Return the buffer underlying the store for direct write/read.
Definition: BlockStore.hpp:600
void put(Block *block_)
Return the given chain of Blocks to the free list for reuse.
Definition: BlockStore.hpp:344
~BlockStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockStore.hpp:151
void releaseRead() const
Release the lock for this object. Used by RAII templates.
Definition: BlockStore.hpp:186
Block * front() const
Get the first used block in the store.
Definition: BlockStore.hpp:233
amps_uint32_t getDefaultResizeBlocks() const
Return the default number of blocks for each resize.
Definition: BlockStore.hpp:473
void addBlocks(Block *blockArray_)
Allow users to create Block arrays during recovery that are tracked for cleanup here with all other B...
Definition: BlockStore.hpp:278
bool wait(long timeout_)
Wait timeout_ ms for a signal.
Definition: BlockStore.hpp:209
Block * back() const
Get the last used block in the store.
Definition: BlockStore.hpp:243
void resize(size_t size_)
Resize the buffer to the requested size, adding all new space as unused Blocks for the free list...
Definition: BlockStore.hpp:552
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:61
void init()
Initialize, assuming that _buffer has no existing information.
Definition: BlockStore.hpp:442
void setEndOfUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:269
size_t getDefaultResizeSize() const
Return the default number of bytes for each resize.
Definition: BlockStore.hpp:466
void setUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:261
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
Set the size to use for the header for all Blocks.
Definition: BlockStore.hpp:586
void setFreeList(Block *block_, amps_uint32_t freeCount_)
Allow containing classes to initialize the free list in recovery.
Definition: BlockStore.hpp:252
amps_uint32_t getBlockHeaderSize() const
Get the size of a header within each Block, as set in the constructor.
Definition: BlockStore.hpp:170
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
BlockStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=DEFAULT_BLOCKS_PER_REALLOC, amps_uint32_t blockHeaderSize_=DEFAULT_BLOCK_HEADER_SIZE, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockStore using buffer_ and default block size, that grows by blocksPerRealloc_ blocks when...
Definition: BlockStore.hpp:137
void putAll(Block *block_)
Return all Blocks starting with the given Block to the free list.
Definition: BlockStore.hpp:410
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:75
Block * resizeBuffer(size_t size_, amps_uint32_t *pNewBlocks_)
Resize the buffer to the requested size, returning all new space.
Definition: BlockStore.hpp:486
void signalAll()
Signal lock waiters.
Definition: BlockStore.hpp:193
amps_uint32_t getBlockSize() const
Get the size of each Block, as set in the constructor.
Definition: BlockStore.hpp:163
AMPS_ATOMIC_BASE_TYPE put(amps_uint64_t sequence_)
Return all Blocks with sequence <= sequence_ for reuse.
Definition: BlockStore.hpp:384
Definition: ampsplusplus.hpp:102