26 #ifndef _BLOCKPUBLISHSTORE_H_ 27 #define _BLOCKPUBLISHSTORE_H_ 35 #include <amps/ampscrc.hpp> 36 #if __cplusplus >= 201103L || _MSC_VER >= 1900 42 #include <sys/timeb.h> 67 typedef Lock<BlockStore> BufferLock;
68 typedef Unlock<BlockStore> BufferUnlock;
72 SOW_DELETE_DATA = 0x01,
73 SOW_DELETE_FILTER = 0x02,
74 SOW_DELETE_KEYS = 0x04,
75 SOW_DELETE_BOOKMARK = 0x08,
76 SOW_DELETE_BOOKMARK_CANCEL = 0x10,
77 SOW_DELETE_UNKNOWN = 0x80
84 DEFAULT_BLOCK_HEADER_SIZE = 32,
85 DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
86 DEFAULT_BLOCKS_PER_REALLOC = 1000,
87 DEFAULT_BLOCK_SIZE = 2048
127 amps_uint32_t _blocksToWrite;
128 amps_uint32_t _totalRemaining;
130 amps_uint64_t _crcVal;
131 amps_uint64_t _nextInChain;
134 struct BlockChainHeader
136 amps_uint32_t _operation;
137 amps_uint32_t _commandIdLen;
138 amps_uint32_t _correlationIdLen;
139 amps_uint32_t _expirationLen;
140 amps_uint32_t _sowKeyLen;
141 amps_uint32_t _topicLen;
143 amps_uint32_t _ackTypes;
144 amps_uint32_t _unused[8];
146 : _operation(0), _commandIdLen(0), _correlationIdLen(0)
147 , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
158 return DEFAULT_BLOCK_HEADER_SIZE;
168 return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
176 return _blockStore.getBlockSize();
203 amps_uint32_t blocksPerRealloc_ = 1000,
204 bool isFile_ =
false,
205 bool errorOnPublishGap_ =
false,
206 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
208 , _blockStore(buffer_, blocksPerRealloc_,
209 DEFAULT_BLOCK_HEADER_SIZE,
210 (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
211 ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
213 , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
221 BufferLock bufferGuard(_blockStore);
223 _metadataBlock = _blockStore.get(1);
225 _blockStore.setUsedList(0);
226 _blockStore.setEndOfUsedList(0);
229 _metadataBlock->_sequence = (amps_uint64_t)0;
230 Buffer* pBuffer = _blockStore.getBuffer();
236 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
256 return store(message_,
true);
271 const char* commandId, *correlationId, *expiration, *sowKey,
274 BlockHeader blockHeader;
275 BlockChainHeader chainHeader;
277 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
279 chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
281 chainHeader._expirationLen = (amps_uint32_t)dataLen;
283 chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
285 chainHeader._topicLen = (amps_uint32_t)dataLen;
286 message_.getRawData(&data, &dataLen);
287 chainHeader._flag = -1;
289 chainHeader._operation = (amps_uint32_t)operation;
290 if (operation == Message::Command::SOWDelete)
294 chainHeader._flag = SOW_DELETE_DATA;
301 chainHeader._flag = SOW_DELETE_FILTER;
308 chainHeader._flag = SOW_DELETE_KEYS;
313 chainHeader._flag = SOW_DELETE_BOOKMARK;
316 size_t remaining = options.
len();
317 const void* next = NULL;
318 const void* start = (
const void*)(options.
data());
320 while (remaining >= 6 &&
321 (next = memchr(start, (
int)
'c', remaining)) != NULL)
323 remaining = (size_t)next - (
size_t)start;
324 if (remaining >= 6 && strncmp((
const char*)start,
327 chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
335 blockHeader._totalRemaining = (
336 (chainHeader._operation == Message::Command::Unknown)
339 + chainHeader._commandIdLen
340 + chainHeader._correlationIdLen
341 + chainHeader._expirationLen
342 + chainHeader._sowKeyLen
343 + chainHeader._topicLen
344 + (amps_uint32_t)dataLen));
345 size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
347 blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
349 : ((amps_uint32_t)(blockHeader._totalRemaining
351 + ((lastBlockLength > 0) ? 1 : 0)));
352 blockHeader._crcVal = (amps_uint64_t)0ULL;
353 blockHeader._crcVal = _crc(commandId,
354 chainHeader._commandIdLen,
355 blockHeader._crcVal);
356 blockHeader._crcVal = _crc(correlationId,
357 chainHeader._correlationIdLen,
358 blockHeader._crcVal);
359 blockHeader._crcVal = _crc(expiration,
360 chainHeader._expirationLen,
361 blockHeader._crcVal);
362 blockHeader._crcVal = _crc(sowKey,
363 chainHeader._sowKeyLen,
364 blockHeader._crcVal);
365 blockHeader._crcVal = _crc(topic,
366 chainHeader._topicLen,
367 blockHeader._crcVal);
368 blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
371 BufferLock bufferGuard(_blockStore);
372 Block* first = _blockStore.get(blockHeader._blocksToWrite);
375 if (_lastSequence <= 2)
379 blockHeader._seq = ++_lastSequence;
384 message_.getMessage(),
388 _maxDiscarded = blockHeader._seq - 1;
390 if (blockHeader._seq >= _lastSequence)
392 _lastSequence = blockHeader._seq;
398 size_t topicWritten = 0UL;
399 size_t dataWritten = 0UL;
400 size_t commandWritten = 0UL;
401 size_t correlationWritten = 0UL;
402 size_t expirationWritten = 0UL;
403 size_t sowKeyWritten = 0UL;
404 Buffer* pBuffer = _blockStore.getBuffer();
405 for (Block* next = first; next; next = next->_nextInChain)
407 next->_sequence = blockHeader._seq;
408 if (next->_nextInChain)
410 blockHeader._nextInChain = next->_nextInChain->_offset;
414 blockHeader._nextInChain = (amps_uint64_t)0;
418 pBuffer->
putBytes((
const char*)&blockHeader,
sizeof(BlockHeader));
420 blockHeader._crcVal = (amps_uint64_t)0;
426 pBuffer->
putBytes((
const char*)&chainHeader,
427 sizeof(BlockChainHeader));
436 if (commandWritten < chainHeader._commandIdLen)
438 size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
439 pBuffer->
putBytes(commandId + commandWritten,
441 bytesRemaining -= commandWrite;
442 commandWritten += commandWrite;
444 if (correlationWritten < chainHeader._correlationIdLen)
446 size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
447 pBuffer->
putBytes(correlationId + correlationWritten,
449 bytesRemaining -= correlationWrite;
450 correlationWritten += correlationWrite;
452 if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
454 size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
455 pBuffer->
putBytes(expiration + expirationWritten, expWrite);
456 bytesRemaining -= expWrite;
457 expirationWritten += expWrite;
459 if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
461 size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
462 pBuffer->
putBytes(sowKey + sowKeyWritten, sowKeyWrite);
463 bytesRemaining -= sowKeyWrite;
464 sowKeyWritten += sowKeyWrite;
466 if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
468 size_t topicWrite = (chainHeader._topicLen - topicWritten
470 ? chainHeader._topicLen - topicWritten
472 pBuffer->
putBytes(topic + topicWritten, topicWrite);
473 bytesRemaining -= topicWrite;
474 topicWritten += topicWrite;
476 if (bytesRemaining > 0 && dataWritten < dataLen)
478 size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
479 dataLen - dataWritten : bytesRemaining;
480 pBuffer->
putBytes(data + dataWritten, dataWrite);
481 bytesRemaining -= dataWrite;
482 dataWritten += dataWrite;
486 catch (
const AMPSException&)
488 _blockStore.put(first);
491 AMPS_FETCH_ADD(&_stored, 1);
492 return blockHeader._seq;
504 BufferLock bufferGuard(_blockStore);
505 Buffer* pBuffer = _blockStore.getBuffer();
508 amps_uint64_t lastPersisted = _metadataBlock->_sequence;
510 if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
514 if (lastPersisted < index_)
518 _metadataBlock->_sequence = index_;
519 if (_maxDiscarded < index_)
521 _maxDiscarded = index_;
523 if (_lastSequence <= index_)
525 _lastSequence = index_;
532 else if (getErrorOnPublishGap() && index_ < lastPersisted)
534 std::ostringstream os;
535 os <<
"Server last saw " << index_ <<
" from Client but Store " 536 <<
"has already discarded up to " << lastPersisted
537 <<
" which would leave a gap of unreceived messages.";
538 throw PublishStoreGapException(os.str());
540 _blockStore.signalAll();
544 _maxDiscarded = index_;
545 AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
546 _blockStore.signalAll();
547 if (lastPersisted >= index_)
553 _metadataBlock->_sequence = index_;
554 if (_lastSequence < index_)
556 _lastSequence = index_;
567 BufferLock bufferGuard(_blockStore);
569 if (!_blockStore.front())
573 Block* next = _blockStore.front();
576 for (Block* block = _blockStore.front(); block; block = next)
579 replayOnto(block, replayer_);
580 next = block->_nextInList;
583 catch (
const StoreException&)
585 _blockStore.putAll(next);
598 BufferLock bufferGuard(_blockStore);
600 if (!_blockStore.front())
605 amps_uint64_t lastIdx = _blockStore.back()->_sequence;
607 amps_uint64_t leastIdx = _blockStore.front()->_sequence;
608 if (index_ >= leastIdx && index_ <= lastIdx)
610 Block* block = _blockStore.front();
611 while (block && block->_sequence != index_)
613 block = block->_nextInList;
620 Buffer* pBuffer = _blockStore.getBuffer();
622 sizeof(amps_uint32_t));
627 replayOnto(block, replayer_);
634 _message.setSequence(leastIdx);
647 size_t count = (size_t)_stored;
661 BufferLock bufferGuard(_blockStore);
662 amps_uint64_t waitFor = _getHighestUnpersisted();
670 bool timedOut =
false;
671 AMPS_START_TIMER(timeout_);
673 while (!timedOut && _stored != 0
674 && waitFor >= _getLowestUnpersisted())
676 if (!_blockStore.wait(timeout_))
679 AMPS_RESET_TIMER(timedOut, timeout_);
683 if (timedOut && _stored != 0
684 && waitFor >= _getLowestUnpersisted())
686 throw TimedOutException(
"Timed out waiting to flush publish store.");
691 while (_stored != 0 && waitFor >= _getLowestUnpersisted())
694 _blockStore.wait(1000);
696 BufferUnlock unlck(_blockStore);
697 amps_invoke_waiting_function();
704 BufferLock bufferGuard(_blockStore);
705 return _getLowestUnpersisted();
708 amps_uint64_t getHighestUnpersisted()
const 710 BufferLock bufferGuard(_blockStore);
711 return _getHighestUnpersisted();
716 BufferLock bufferGuard(_blockStore);
717 return _getLastPersisted();
721 static bool canResize(
size_t requestedSize_,
void* vpThis_)
726 amps_uint64_t _getLowestUnpersisted()
const 730 if (!_blockStore.front())
734 return _blockStore.front()->_sequence;
737 amps_uint64_t _getHighestUnpersisted()
const 741 if (!_blockStore.back())
745 return _blockStore.back()->_sequence;
748 amps_uint64_t _getLastPersisted(
void)
751 amps_uint64_t lastPersisted = (amps_uint64_t)0;
752 Buffer* pBuffer = _blockStore.getBuffer();
757 if (_lastSequence < lastPersisted)
759 _lastSequence = lastPersisted;
761 return lastPersisted;
765 lastPersisted = _maxDiscarded;
772 lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
775 gettimeofday(&tv, NULL);
776 lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
777 * (amps_uint64_t)1000000;
780 if (_lastSequence > 2)
782 amps_uint64_t low = _getLowestUnpersisted();
783 amps_uint64_t high = _getHighestUnpersisted();
786 lastPersisted = low - 1;
790 _lastSequence = high;
792 if (_lastSequence < lastPersisted)
794 lastPersisted = _lastSequence - 1;
799 _lastSequence = lastPersisted;
802 +
sizeof(amps_uint32_t)
803 +
sizeof(amps_uint32_t));
805 _metadataBlock->_sequence = lastPersisted;
806 return lastPersisted;
811 BufferLock bufferGuard(_blockStore);
813 Buffer* pBuffer = _blockStore.getBuffer();
814 size_t size = pBuffer->
getSize();
819 _metadataBlock = _blockStore.get(1);
820 _metadataBlock->_sequence = (amps_uint64_t)0;
824 pBuffer->
putUint32((amps_uint32_t)blockSize);
828 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
833 size_t numBlocks = size / blockSize;
834 if (size % blockSize > 0)
839 numBlocks = size / blockSize;
841 amps_uint32_t blockCount = 0;
843 delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
846 if (size > pBuffer->
getSize() || numBlocks != (size_t)blockCount)
848 throw StoreException(
"Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
853 amps_uint64_t maxIdx = 0;
854 amps_uint64_t minIdx = 0;
856 BlockHeader blockHeader;
858 Block* blocks =
new Block[numBlocks];
859 blocks[numBlocks - 1]._nextInList = 0;
861 _blockStore.addBlocks(blocks);
862 _metadataBlock = blocks;
863 _metadataBlock->_nextInList = 0;
865 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
873 if (blockHeader._blocksToWrite == 1)
883 if (blockHeader._totalRemaining >= 5000000)
887 recoverOldFormat(blocks);
891 throw StoreException(
"Unrecognized format for Store. Can't recover.");
893 if (blockHeader._blocksToWrite == 0)
896 pBuffer->
putUint32((amps_uint32_t)blockSize);
900 blockSize = blockHeader._blocksToWrite;
901 _blockStore.setBlockSize(blockSize);
903 if (blockHeader._totalRemaining == 0)
910 _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
912 _metadataBlock->_sequence = blockHeader._seq;
913 if (_metadataBlock->_sequence
914 && _metadataBlock->_sequence < (amps_uint64_t)1000000)
917 +
sizeof(amps_uint32_t)
918 +
sizeof(amps_uint32_t));
920 _metadataBlock->_sequence = 0;
925 _maxDiscarded = _metadataBlock->_sequence;
926 _lastSequence = _maxDiscarded;
930 location += blockSize;
931 amps_uint32_t freeCount = 0;
932 Block* firstFree = NULL;
933 Block* endOfFreeList = NULL;
935 typedef std::map<amps_uint64_t, Block*> RecoverMap;
936 RecoverMap recoveredBlocks;
937 while (location < size)
941 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
942 if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
943 (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
946 location += blockSize;
949 Block* block = blocks[++blockNum].setOffset(location);
950 bool recovered =
false;
951 if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
954 block->_sequence = blockHeader._seq;
956 if (maxIdx < blockHeader._seq)
958 maxIdx = blockHeader._seq;
960 if (minIdx > blockHeader._seq)
962 minIdx = blockHeader._seq;
965 recoveredBlocks[blockHeader._seq] = block;
967 while (blockHeader._nextInChain != (amps_uint64_t)0)
969 Block* chain = blocks[++blockNum]
970 .setOffset((
size_t)blockHeader._nextInChain);
971 chain->_nextInList = 0;
972 pBuffer->
setPosition((
size_t)blockHeader._nextInChain
973 +
sizeof(amps_uint32_t)
974 +
sizeof(amps_uint32_t)
975 +
sizeof(amps_uint64_t)
976 +
sizeof(amps_uint64_t));
977 blockHeader._nextInChain = pBuffer->
getUint64();
978 block->_nextInChain = chain;
980 block->_sequence = blockHeader._seq;
989 endOfFreeList->_nextInList = block;
995 endOfFreeList = block;
998 location += blockSize;
1002 endOfFreeList->_nextInList = 0;
1004 _blockStore.setFreeList(firstFree, freeCount);
1005 if (maxIdx > _lastSequence)
1007 _lastSequence = maxIdx;
1009 if (minIdx > _maxDiscarded + 1)
1011 _maxDiscarded = minIdx - 1;
1013 if (_maxDiscarded > _metadataBlock->_sequence)
1015 _metadataBlock->_sequence = _maxDiscarded;
1016 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1020 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1021 for (RecoverMap::iterator i = recoveredBlocks.begin();
1022 i != recoveredBlocks.end(); ++i)
1026 end->_nextInList = i->second;
1030 _blockStore.setUsedList(i->second);
1036 end->_nextInList = 0;
1038 _blockStore.setEndOfUsedList(end);
1046 size_t start = block_->_offset;
1047 size_t position = start;
1048 Buffer* pBuffer = _blockStore.getBuffer();
1050 BlockHeader blockHeader;
1051 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1052 if (blockHeader._totalRemaining == 0)
1058 BlockChainHeader blockChainHeader;
1059 pBuffer->
copyBytes((
char*)&blockChainHeader,
sizeof(blockChainHeader));
1060 if (blockChainHeader._operation == Message::Command::Unknown)
1065 blockChainHeader._ackTypes |= Message::AckType::Persisted;
1070 if (blockHeader._totalRemaining
1071 < blockChainHeader._commandIdLen
1072 + blockChainHeader._correlationIdLen
1073 + blockChainHeader._expirationLen
1074 + blockChainHeader._sowKeyLen
1075 + blockChainHeader._topicLen)
1077 std::ostringstream os;
1078 os <<
"Corrupted message found with invalid lengths. " 1079 <<
"Attempting to replay " << block_->_sequence
1080 <<
". Block sequence " << blockHeader._seq
1081 <<
", topic length " << blockChainHeader._topicLen
1082 <<
", data length " << blockHeader._totalRemaining
1083 <<
", command ID length " << blockChainHeader._commandIdLen
1084 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1085 <<
", expiration length " << blockChainHeader._expirationLen
1086 <<
", sow key length " << blockChainHeader._sowKeyLen
1087 <<
", start " << start
1088 <<
", position " << position
1089 <<
", buffer size " << pBuffer->
getSize();
1090 throw StoreException(os.str());
1095 _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1096 _message.setAckTypeEnum((
unsigned)blockChainHeader._ackTypes
1097 | Message::AckType::Persisted);
1098 _message.setSequence(blockHeader._seq);
1100 Block* current = block_;
1102 amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1104 char** tmpBuffers = (blockHeader._blocksToWrite > 1) ?
new char* [blockHeader._blocksToWrite - 1] : 0;
1105 size_t blockNum = 0;
1106 if (blockChainHeader._commandIdLen > 0)
1108 if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1110 _message.assignCommandId(pBuffer->
getBytes(blockChainHeader._commandIdLen)._data,
1111 blockChainHeader._commandIdLen);
1112 blockBytesRemaining -= blockChainHeader._commandIdLen;
1116 tmpBuffers[blockNum] =
new char[blockChainHeader._commandIdLen];
1117 size_t totalLeft = blockChainHeader._commandIdLen;
1118 size_t totalRead = 0;
1122 readLen = blockBytesRemaining < totalLeft ?
1123 blockBytesRemaining : totalLeft;
1124 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1125 if (!(totalLeft -= readLen))
1129 if (!(current = current->_nextInChain))
1133 totalRead += readLen;
1138 blockBytesRemaining -= readLen;
1139 _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1141 blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1142 crcCalc = _crc(_message.getCommandId().data(),
1143 blockChainHeader._commandIdLen, crcCalc);
1145 if (blockChainHeader._correlationIdLen > 0)
1147 if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1149 _message.assignCorrelationId(
1150 pBuffer->
getBytes(blockChainHeader._correlationIdLen)._data,
1151 blockChainHeader._correlationIdLen);
1152 blockBytesRemaining -= blockChainHeader._correlationIdLen;
1156 tmpBuffers[blockNum] =
new char[blockChainHeader._correlationIdLen];
1157 size_t totalLeft = blockChainHeader._correlationIdLen;
1158 size_t totalRead = 0;
1162 readLen = blockBytesRemaining < totalLeft ?
1163 blockBytesRemaining : totalLeft;
1164 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1165 if (!(totalLeft -= readLen))
1169 if (!(current = current->_nextInChain))
1173 totalRead += readLen;
1178 blockBytesRemaining -= readLen;
1179 _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1181 blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1182 crcCalc = _crc(_message.getCorrelationId().data(),
1183 blockChainHeader._correlationIdLen, crcCalc);
1185 if (blockChainHeader._expirationLen > 0)
1187 if (blockChainHeader._expirationLen <= blockBytesRemaining)
1189 _message.assignExpiration(
1190 pBuffer->
getBytes(blockChainHeader._expirationLen)._data,
1191 blockChainHeader._expirationLen);
1192 blockBytesRemaining -= blockChainHeader._expirationLen;
1196 tmpBuffers[blockNum] =
new char[blockChainHeader._expirationLen];
1197 size_t totalLeft = blockChainHeader._expirationLen;
1198 size_t totalRead = 0;
1202 readLen = blockBytesRemaining < totalLeft ?
1203 blockBytesRemaining : totalLeft;
1204 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1205 if (!(totalLeft -= readLen))
1209 if (!(current = current->_nextInChain))
1213 totalRead += readLen;
1218 blockBytesRemaining -= readLen;
1219 _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1221 blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1222 crcCalc = _crc(_message.getExpiration().data(),
1223 blockChainHeader._expirationLen, crcCalc);
1225 if (blockChainHeader._sowKeyLen > 0)
1227 if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1229 _message.assignSowKey(pBuffer->
getBytes(blockChainHeader._sowKeyLen)._data,
1230 blockChainHeader._sowKeyLen);
1231 blockBytesRemaining -= blockChainHeader._sowKeyLen;
1235 tmpBuffers[blockNum] =
new char[blockChainHeader._sowKeyLen];
1236 size_t totalLeft = blockChainHeader._sowKeyLen;
1237 size_t totalRead = 0;
1241 readLen = blockBytesRemaining < totalLeft ?
1242 blockBytesRemaining : totalLeft;
1243 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1244 if (!(totalLeft -= readLen))
1248 if (!(current = current->_nextInChain))
1252 totalRead += readLen;
1257 blockBytesRemaining -= readLen;
1258 _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1260 blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1261 crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1263 if (blockChainHeader._topicLen > 0)
1265 if (blockChainHeader._topicLen <= blockBytesRemaining)
1267 _message.assignTopic(pBuffer->
getBytes(blockChainHeader._topicLen)._data,
1268 blockChainHeader._topicLen);
1269 blockBytesRemaining -= blockChainHeader._topicLen;
1273 tmpBuffers[blockNum] =
new char[blockChainHeader._topicLen];
1274 size_t totalLeft = blockChainHeader._topicLen;
1275 size_t totalRead = 0;
1279 readLen = blockBytesRemaining < totalLeft ?
1280 blockBytesRemaining : totalLeft;
1281 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1282 if (!(totalLeft -= readLen))
1286 if (!(current = current->_nextInChain))
1290 totalRead += readLen;
1295 blockBytesRemaining -= readLen;
1296 _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1298 blockHeader._totalRemaining -= blockChainHeader._topicLen;
1299 crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1301 if (blockHeader._totalRemaining > 0)
1303 if (blockHeader._totalRemaining <= blockBytesRemaining)
1305 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1307 _message.assignData(
1308 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1309 blockHeader._totalRemaining);
1310 crcCalc = _crc(_message.getData().data(),
1311 blockHeader._totalRemaining, crcCalc);
1313 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1315 _message.assignFilter(
1316 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1317 blockHeader._totalRemaining);
1318 crcCalc = _crc(_message.getFilter().data(),
1319 blockHeader._totalRemaining, crcCalc);
1321 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1323 _message.assignSowKeys(
1324 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1325 blockHeader._totalRemaining);
1326 crcCalc = _crc(_message.getSowKeys().data(),
1327 blockHeader._totalRemaining, crcCalc);
1329 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1331 _message.assignBookmark(
1332 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1333 blockHeader._totalRemaining);
1334 crcCalc = _crc(_message.getBookmark().data(),
1335 blockHeader._totalRemaining, crcCalc);
1337 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1339 _message.assignBookmark(
1340 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1341 blockHeader._totalRemaining);
1342 crcCalc = _crc(_message.getBookmark().data(),
1343 blockHeader._totalRemaining, crcCalc);
1344 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1349 tmpBuffers[blockNum] =
new char[blockHeader._totalRemaining];
1350 size_t totalLeft = blockHeader._totalRemaining;
1351 size_t totalRead = 0;
1355 readLen = blockBytesRemaining < totalLeft ?
1356 blockBytesRemaining : totalLeft;
1357 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1358 if (!(totalLeft -= readLen))
1362 if (!(current = current->_nextInChain))
1366 totalRead += readLen;
1371 position += readLen;
1372 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1374 _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1376 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1378 _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1380 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1382 _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1384 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1386 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1388 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1390 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1391 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1393 crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1398 if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1400 std::ostringstream os;
1401 os <<
"Corrupted message found by CRC or sequence " 1402 <<
"Attempting to replay " << block_->_sequence
1403 <<
". Block sequence " << blockHeader._seq
1404 <<
", expiration length " << blockChainHeader._expirationLen
1405 <<
", sowKey length " << blockChainHeader._sowKeyLen
1406 <<
", topic length " << blockChainHeader._topicLen
1407 <<
", data length " << blockHeader._totalRemaining
1408 <<
", command ID length " << blockChainHeader._commandIdLen
1409 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1410 <<
", flag " << blockChainHeader._flag
1411 <<
", expected CRC " << blockHeader._crcVal
1412 <<
", actual CRC " << crcCalc
1413 <<
", start " << start
1414 <<
", position " << position
1415 <<
", buffer size " << pBuffer->
getSize();
1416 for (Block* block = block_; block; block = block->_nextInChain)
1418 os <<
"\n BLOCK " << block->_offset;
1422 for (amps_uint32_t i = 0; i < blockNum; ++i)
1424 delete[] tmpBuffers[i];
1426 delete[] tmpBuffers;
1428 throw StoreException(os.str());
1435 for (amps_uint32_t i = 0; i < blockNum; ++i)
1437 delete[] tmpBuffers[i];
1439 delete[] tmpBuffers;
1445 void recoverOldFormat(Block* blocks)
1447 Buffer* pBuffer = _blockStore.getBuffer();
1448 amps_uint64_t maxIdx = 0;
1449 amps_uint64_t minIdx = 0;
1450 size_t size = pBuffer->
getSize();
1451 size_t location = 0;
1454 pBuffer->
putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1455 _metadataBlock->_sequence = pBuffer->
getUint64();
1456 if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1458 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1460 _metadataBlock->_sequence = 0;
1465 _maxDiscarded = _metadataBlock->_sequence;
1466 _lastSequence = _maxDiscarded;
1469 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1474 amps_uint32_t freeCount = 0;
1475 Block* firstFree = NULL;
1476 Block* endOfFreeList = NULL;
1478 size_t numBlocks = size / blockSize;
1479 size_t blockNum = 0;
1481 typedef std::map<amps_uint64_t, Block*> RecoverMap;
1482 RecoverMap recoveredBlocks;
1483 RecoverMap growingBlocks;
1484 amps_uint32_t growthBlocksNeeded = 0;
1485 while (location < size)
1489 BlockHeader blockHeader;
1490 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1491 size_t blockCount = (size_t)blockHeader._blocksToWrite;
1492 if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1493 && blockHeader._totalRemaining < size
1494 && blockHeader._blocksToWrite < numBlocks
1495 && (blockHeader._blocksToWrite * blockSize)
1496 >= blockHeader._totalRemaining)
1498 size_t oldFormatSize = blockHeader._totalRemaining;
1501 blockHeader._totalRemaining -= 64;
1505 BlockChainHeader chainHeader;
1509 pBuffer->
setPosition(location + (
sizeof(amps_uint32_t) * 2)
1510 + (
sizeof(amps_uint64_t) * 2) );
1516 sizeof(amps_uint32_t) * 8);
1518 if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1519 + chainHeader._expirationLen + chainHeader._sowKeyLen
1520 + chainHeader._topicLen) > blockHeader._totalRemaining)
1527 amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1529 + (blockHeader._totalRemaining
1532 if (blocksNeeded == blockHeader._blocksToWrite)
1534 Block* first = blocks[++blockNum].setOffset(location);
1535 first->_nextInList = 0;
1536 first->_sequence = blockHeader._seq;
1537 if (blockHeader._blocksToWrite > 1)
1540 amps_uint64_t crcVal = blockHeader._crcVal;
1541 blockHeader._crcVal = 0;
1546 size_t currentBlockNum = blockNum
1547 + blockHeader._blocksToWrite
1551 if (currentBlockNum >= numBlocks)
1553 currentBlockNum = currentBlockNum - numBlocks + 1;
1555 if (currentBlockNum < blockNum)
1557 Block* last = blocks[currentBlockNum]
1559 if ((current = firstFree) == last)
1561 firstFree = firstFree->_nextInList;
1572 if (current->_nextInList == last)
1574 current->_nextInList = last->_nextInList;
1579 current = current->_nextInList;
1585 current = blocks[currentBlockNum]
1590 while (current != first)
1592 current->_nextInList = 0;
1593 current->_sequence = blockHeader._seq;
1595 if (--currentBlockNum < 1
1596 || currentBlockNum > numBlocks)
1598 currentBlockNum = numBlocks - 1;
1600 Block* previous = blocks[currentBlockNum]
1601 .init(currentBlockNum,
1603 previous->_nextInChain = current;
1609 size_t bytesToMove = --blockCount
1613 pBuffer->
copyBytes(current->_offset + bytesToMove,
1620 dataBytes -= bytesToMove;
1628 blockHeader._nextInChain = (current->_nextInChain
1629 ? current->_nextInChain->_offset
1630 : (amps_uint64_t)0);
1633 pBuffer->
putBytes((
const char*)&blockHeader,
1634 sizeof(BlockHeader));
1635 if (firstFree == previous)
1637 firstFree = firstFree->_nextInList;
1646 current = firstFree;
1649 if (current->_nextInList == previous)
1651 current->_nextInList = previous->_nextInList;
1655 current = current->_nextInList;
1660 blockNum += blockHeader._blocksToWrite - 1;
1661 blockHeader._crcVal = crcVal;
1672 blockHeader._nextInChain = (first->_nextInChain
1673 ? first->_nextInChain->_offset
1674 : (amps_uint64_t)0);
1675 pBuffer->
putBytes((
const char*)&blockHeader,
1676 sizeof(BlockHeader));
1677 pBuffer->
putBytes((
const char*)&chainHeader,
1678 sizeof(BlockChainHeader));
1681 recoveredBlocks[blockHeader._seq] = first;
1687 growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1688 growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1689 blockNum += blockHeader._blocksToWrite - 1;
1692 if (maxIdx < blockHeader._seq)
1694 maxIdx = blockHeader._seq;
1696 if (minIdx > blockHeader._seq)
1698 minIdx = blockHeader._seq;
1701 location += blockHeader._blocksToWrite *
getBlockSize();
1703 assert(location >= size || blockNum < numBlocks);
1708 Block* block = blocks[++blockNum].setOffset(location);
1711 endOfFreeList->_nextInList = block;
1717 endOfFreeList = block;
1719 location += blockSize;
1722 for (RecoverMap::iterator i = growingBlocks.begin();
1723 i != growingBlocks.end(); ++i)
1725 Block* first = i->second;
1727 BlockHeader blockHeader;
1730 pBuffer->
copyBytes((
char*)&blockHeader, 24);
1733 blockHeader._totalRemaining -= 64;
1736 if (freeCount < growthBlocksNeeded)
1739 amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1740 amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1741 if (growthBlocks < minBlocksRequired)
1743 amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1744 if (minBlocksRequired % defaultBlocks)
1745 minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1747 growthBlocks = minBlocksRequired;
1749 amps_uint32_t newBlocks = 0;
1750 Block* addedBlocks = _blockStore.resizeBuffer(
1752 + growthBlocks * blockSize,
1756 throw StoreException(
"Failed to grow store buffer during recovery");
1758 _blockStore.addBlocks(addedBlocks);
1759 freeCount += newBlocks;
1760 growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1761 ? growthBlocksNeeded - freeCount : 0;
1764 endOfFreeList->_nextInList = addedBlocks;
1768 firstFree = addedBlocks;
1770 endOfFreeList = &(addedBlocks[newBlocks - 1]);
1771 endOfFreeList->_nextInList = 0;
1773 expandBlocks(blocks, first->_offset, first, blockHeader,
1774 &firstFree, &freeCount, pBuffer);
1776 recoveredBlocks[blockHeader._seq] = first;
1784 endOfFreeList->_nextInList = 0;
1786 _blockStore.setFreeList(firstFree, freeCount);
1787 if (maxIdx > _lastSequence)
1789 _lastSequence = maxIdx;
1791 if (minIdx > _maxDiscarded + 1)
1793 _maxDiscarded = minIdx - 1;
1795 if (_maxDiscarded > _metadataBlock->_sequence)
1797 _metadataBlock->_sequence = _maxDiscarded;
1798 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1802 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1803 for (RecoverMap::iterator i = recoveredBlocks.begin();
1804 i != recoveredBlocks.end(); ++i)
1806 if (_blockStore.front())
1808 end->_nextInList = i->second;
1812 _blockStore.setUsedList(i->second);
1818 end->_nextInList = 0;
1820 _blockStore.setEndOfUsedList(end);
1825 void expandBlocks(Block* blocks_,
size_t location_, Block* first_,
1826 BlockHeader blockHeader_,
1827 Block** pFreeList_, amps_uint32_t* pFreeCount_,
1831 Block* current = first_;
1834 amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1835 blockHeader_._totalRemaining -= 64;
1839 amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1841 + (blockHeader_._totalRemaining
1848 size_t endBlockSize = oldTotalRemaining % blockSize;
1851 endBlockSize = blockSize;
1853 size_t endOfData = 0;
1855 amps_uint64_t crcVal = blockHeader_._crcVal;
1856 blockHeader_._crcVal = 0;
1858 std::stack<Block*> blocksUsed;
1859 for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1861 blocksUsed.push(current);
1862 current->_sequence = blockHeader_._seq;
1863 if (i >= blockHeader_._blocksToWrite)
1865 if (i == blockHeader_._blocksToWrite)
1867 endOfData = current->_offset + endBlockSize;
1869 current->_nextInChain = *pFreeList_;
1871 *pFreeList_ = (*pFreeList_)->_nextInList;
1875 current->_nextInChain = current->_nextInList;
1876 if (current->_nextInChain)
1878 if (current->_offset + blockSize < pBuffer_->getSize())
1880 current->_nextInChain->setOffset(current->_offset
1885 current->_nextInChain->setOffset(blockSize);
1890 current->_nextInChain = blocks_[1].init(1, blockSize);
1892 if (current->_nextInChain == *pFreeList_)
1894 *pFreeList_ = (*pFreeList_)->_nextInList;
1899 for (Block* free = *pFreeList_; free;
1900 free = free->_nextInList)
1902 if (free->_nextInList == current->_nextInChain)
1904 free->_nextInList = free->_nextInList->_nextInList;
1911 current->_nextInList = 0;
1912 current = current->_nextInChain;
1915 blockHeader_._blocksToWrite = blocksNeeded;
1917 current->_nextInList = 0;
1918 current->_sequence = blockHeader_._seq;
1928 while (current != first_)
1930 size_t chunkBytesAvail = endOfData > location_
1931 ? endOfData - location_
1933 if (chunkBytesAvail < dataBytes)
1945 chunkBytesAvail = dataBytes - chunkBytesAvail;
1946 endOfData = pBuffer_->
getSize() - chunkBytesAvail;
1953 endOfData -= dataBytes;
1959 blockHeader_._nextInChain = (current->_nextInChain
1960 ? current->_nextInChain->_offset
1961 : (amps_uint64_t)0);
1964 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1965 current = blocksUsed.top();
1976 blockHeader_._crcVal = crcVal;
1977 blockHeader_._nextInChain = first_->_nextInChain->_offset;
1981 pBuffer_->
setPosition(location_ + (
sizeof(amps_uint32_t) * 2)
1982 + (
sizeof(amps_uint64_t) * 2) );
1987 BlockChainHeader chainHeader;
1988 pBuffer_->
copyBytes((
char*)&chainHeader,
1989 sizeof(amps_uint32_t) * 8);
1992 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1993 pBuffer_->
putBytes((
const char*)&chainHeader,
sizeof(BlockChainHeader));
1996 void chooseCRC(
bool isFile)
2005 _crc = AMPS::CRC<0>::crcNoSSE;
2007 if (AMPS::CRC<0>::isSSE42Enabled())
2009 _crc = AMPS::CRC<0>::crc;
2013 _crc = AMPS::CRC<0>::crcNoSSE;
2018 static amps_uint64_t noOpCRC(
const char*,
size_t, amps_uint64_t)
2028 Block* _metadataBlock;
2030 amps_uint64_t _maxDiscarded;
2032 #if __cplusplus >= 201103L || _MSC_VER >= 1900 2033 std::atomic<amps_uint64_t> _lastSequence;
2035 volatile amps_uint64_t _lastSequence;
2038 AMPS_ATOMIC_TYPE _stored;
2043 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1059
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1304
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:82
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1302
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1031
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:659
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:564
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1306
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:156
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:182
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1194
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:596
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:269
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:61
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:714
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:645
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1153
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1425
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:63
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:244
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1305
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:166
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1449
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:75
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:254
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, bool errorOnPublishGap_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:202
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:174
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1424
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:702
Definition: ampsplusplus.hpp:106
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1129
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:501
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.