26 #ifndef _BLOCKPUBLISHSTORE_H_ 27 #define _BLOCKPUBLISHSTORE_H_ 35 #include <amps/ampscrc.hpp> 36 #if __cplusplus >= 201103L || _MSC_VER >= 1900 42 #include <sys/timeb.h> 64 typedef Lock<BlockStore> BufferLock;
65 typedef Unlock<BlockStore> BufferUnlock;
69 SOW_DELETE_DATA = 0x01,
70 SOW_DELETE_FILTER = 0x02,
71 SOW_DELETE_KEYS = 0x04,
72 SOW_DELETE_BOOKMARK = 0x08,
73 SOW_DELETE_BOOKMARK_CANCEL = 0x10,
74 SOW_DELETE_UNKNOWN = 0x80
81 DEFAULT_BLOCK_HEADER_SIZE = 32,
82 DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83 DEFAULT_BLOCKS_PER_REALLOC = 1000,
84 DEFAULT_BLOCK_SIZE = 2048
124 amps_uint32_t _blocksToWrite;
125 amps_uint32_t _totalRemaining;
127 amps_uint64_t _crcVal;
128 amps_uint64_t _nextInChain;
131 struct BlockChainHeader
133 amps_uint32_t _operation;
134 amps_uint32_t _commandIdLen;
135 amps_uint32_t _correlationIdLen;
136 amps_uint32_t _expirationLen;
137 amps_uint32_t _sowKeyLen;
138 amps_uint32_t _topicLen;
140 amps_uint32_t _ackTypes;
141 amps_uint32_t _unused[8];
143 : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144 , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
155 return DEFAULT_BLOCK_HEADER_SIZE;
165 return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
173 return _blockStore.getBlockSize();
200 amps_uint32_t blocksPerRealloc_ = 1000,
201 bool isFile_ =
false,
202 bool errorOnPublishGap_ =
false,
203 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
205 , _blockStore(buffer_, blocksPerRealloc_,
206 DEFAULT_BLOCK_HEADER_SIZE,
207 (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
208 ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
210 , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
218 BufferLock bufferGuard(_blockStore);
220 _metadataBlock = _blockStore.get(1);
222 _blockStore.setUsedList(0);
223 _blockStore.setEndOfUsedList(0);
226 _metadataBlock->_sequence = (amps_uint64_t)0;
227 Buffer* pBuffer = _blockStore.getBuffer();
233 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
253 return store(message_,
true);
268 const char* commandId, *correlationId, *expiration, *sowKey,
271 BlockHeader blockHeader;
272 BlockChainHeader chainHeader;
274 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
276 chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
278 chainHeader._expirationLen = (amps_uint32_t)dataLen;
280 chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
282 chainHeader._topicLen = (amps_uint32_t)dataLen;
283 message_.getRawData(&data, &dataLen);
284 chainHeader._flag = -1;
286 chainHeader._operation = (amps_uint32_t)operation;
287 if (operation == Message::Command::SOWDelete)
291 chainHeader._flag = SOW_DELETE_DATA;
298 chainHeader._flag = SOW_DELETE_FILTER;
305 chainHeader._flag = SOW_DELETE_KEYS;
310 chainHeader._flag = SOW_DELETE_BOOKMARK;
313 size_t remaining = options.
len();
314 const void* next = NULL;
315 const void* start = (
const void*)(options.
data());
317 while (remaining >= 6 &&
318 (next = memchr(start, (
int)
'c', remaining)) != NULL)
320 remaining = (size_t)next - (
size_t)start;
321 if (remaining >= 6 && strncmp((
const char*)start,
324 chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
332 blockHeader._totalRemaining = (
333 (chainHeader._operation == Message::Command::Unknown)
336 + chainHeader._commandIdLen
337 + chainHeader._correlationIdLen
338 + chainHeader._expirationLen
339 + chainHeader._sowKeyLen
340 + chainHeader._topicLen
341 + (amps_uint32_t)dataLen));
342 size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
344 blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
346 : ((amps_uint32_t)(blockHeader._totalRemaining
348 + ((lastBlockLength > 0) ? 1 : 0)));
349 blockHeader._crcVal = (amps_uint64_t)0ULL;
350 blockHeader._crcVal = _crc(commandId,
351 chainHeader._commandIdLen,
352 blockHeader._crcVal);
353 blockHeader._crcVal = _crc(correlationId,
354 chainHeader._correlationIdLen,
355 blockHeader._crcVal);
356 blockHeader._crcVal = _crc(expiration,
357 chainHeader._expirationLen,
358 blockHeader._crcVal);
359 blockHeader._crcVal = _crc(sowKey,
360 chainHeader._sowKeyLen,
361 blockHeader._crcVal);
362 blockHeader._crcVal = _crc(topic,
363 chainHeader._topicLen,
364 blockHeader._crcVal);
365 blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
368 BufferLock bufferGuard(_blockStore);
369 Block* first = _blockStore.get(blockHeader._blocksToWrite);
372 if (_lastSequence <= 2)
376 blockHeader._seq = ++_lastSequence;
381 message_.getMessage(),
385 _maxDiscarded = blockHeader._seq - 1;
387 if (blockHeader._seq >= _lastSequence)
389 _lastSequence = blockHeader._seq;
395 size_t topicWritten = 0UL;
396 size_t dataWritten = 0UL;
397 size_t commandWritten = 0UL;
398 size_t correlationWritten = 0UL;
399 size_t expirationWritten = 0UL;
400 size_t sowKeyWritten = 0UL;
401 Buffer* pBuffer = _blockStore.getBuffer();
402 for (Block* next = first; next; next = next->_nextInChain)
404 next->_sequence = blockHeader._seq;
405 if (next->_nextInChain)
407 blockHeader._nextInChain = next->_nextInChain->_offset;
411 blockHeader._nextInChain = (amps_uint64_t)0;
415 pBuffer->
putBytes((
const char*)&blockHeader,
sizeof(BlockHeader));
417 blockHeader._crcVal = (amps_uint64_t)0;
423 pBuffer->
putBytes((
const char*)&chainHeader,
424 sizeof(BlockChainHeader));
433 if (commandWritten < chainHeader._commandIdLen)
435 size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
436 pBuffer->
putBytes(commandId + commandWritten,
438 bytesRemaining -= commandWrite;
439 commandWritten += commandWrite;
441 if (correlationWritten < chainHeader._correlationIdLen)
443 size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
444 pBuffer->
putBytes(correlationId + correlationWritten,
446 bytesRemaining -= correlationWrite;
447 correlationWritten += correlationWrite;
449 if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
451 size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
452 pBuffer->
putBytes(expiration + expirationWritten, expWrite);
453 bytesRemaining -= expWrite;
454 expirationWritten += expWrite;
456 if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
458 size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
459 pBuffer->
putBytes(sowKey + sowKeyWritten, sowKeyWrite);
460 bytesRemaining -= sowKeyWrite;
461 sowKeyWritten += sowKeyWrite;
463 if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
465 size_t topicWrite = (chainHeader._topicLen - topicWritten
467 ? chainHeader._topicLen - topicWritten
469 pBuffer->
putBytes(topic + topicWritten, topicWrite);
470 bytesRemaining -= topicWrite;
471 topicWritten += topicWrite;
473 if (bytesRemaining > 0 && dataWritten < dataLen)
475 size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
476 dataLen - dataWritten : bytesRemaining;
477 pBuffer->
putBytes(data + dataWritten, dataWrite);
478 bytesRemaining -= dataWrite;
479 dataWritten += dataWrite;
483 catch (
const AMPSException&)
485 _blockStore.put(first);
488 AMPS_FETCH_ADD(&_stored, 1);
489 return blockHeader._seq;
501 BufferLock bufferGuard(_blockStore);
502 Buffer* pBuffer = _blockStore.getBuffer();
505 amps_uint64_t lastPersisted = _metadataBlock->_sequence;
507 if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
511 if (lastPersisted < index_)
515 _metadataBlock->_sequence = index_;
516 if (_maxDiscarded < index_)
518 _maxDiscarded = index_;
520 if (_lastSequence <= index_)
522 _lastSequence = index_;
529 else if (getErrorOnPublishGap() && index_ < lastPersisted)
531 std::ostringstream os;
532 os <<
"Server last saw " << index_ <<
" from Client but Store " 533 <<
"has already discarded up to " << lastPersisted
534 <<
" which would leave a gap of unreceived messages.";
535 throw PublishStoreGapException(os.str());
537 _blockStore.signalAll();
541 _maxDiscarded = index_;
542 AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
543 _blockStore.signalAll();
544 if (lastPersisted >= index_)
550 _metadataBlock->_sequence = index_;
551 if (_lastSequence < index_)
553 _lastSequence = index_;
564 BufferLock bufferGuard(_blockStore);
566 if (!_blockStore.front())
570 Block* next = _blockStore.front();
573 for (Block* block = _blockStore.front(); block; block = next)
576 replayOnto(block, replayer_);
577 next = block->_nextInList;
580 catch (
const StoreException&)
582 _blockStore.putAll(next);
595 BufferLock bufferGuard(_blockStore);
597 if (!_blockStore.front())
602 amps_uint64_t lastIdx = _blockStore.back()->_sequence;
604 amps_uint64_t leastIdx = _blockStore.front()->_sequence;
605 if (index_ >= leastIdx && index_ <= lastIdx)
607 Block* block = _blockStore.front();
608 while (block && block->_sequence != index_)
610 block = block->_nextInList;
617 Buffer* pBuffer = _blockStore.getBuffer();
619 sizeof(amps_uint32_t));
624 replayOnto(block, replayer_);
631 _message.setSequence(leastIdx);
644 size_t count = (size_t)_stored;
658 BufferLock bufferGuard(_blockStore);
659 amps_uint64_t waitFor = _getHighestUnpersisted();
667 bool timedOut =
false;
668 AMPS_START_TIMER(timeout_);
670 while (!timedOut && _stored != 0
671 && waitFor >= _getLowestUnpersisted())
673 if (!_blockStore.wait(timeout_))
676 AMPS_RESET_TIMER(timedOut, timeout_);
680 if (timedOut && _stored != 0
681 && waitFor >= _getLowestUnpersisted())
683 throw TimedOutException(
"Timed out waiting to flush publish store.");
688 while (_stored != 0 && waitFor >= _getLowestUnpersisted())
691 _blockStore.wait(1000);
693 BufferUnlock unlck(_blockStore);
694 amps_invoke_waiting_function();
701 BufferLock bufferGuard(_blockStore);
702 return _getLowestUnpersisted();
705 amps_uint64_t getHighestUnpersisted()
const 707 BufferLock bufferGuard(_blockStore);
708 return _getHighestUnpersisted();
713 BufferLock bufferGuard(_blockStore);
714 return _getLastPersisted();
718 static bool canResize(
size_t requestedSize_,
void* vpThis_)
723 amps_uint64_t _getLowestUnpersisted()
const 727 if (!_blockStore.front())
731 return _blockStore.front()->_sequence;
734 amps_uint64_t _getHighestUnpersisted()
const 738 if (!_blockStore.back())
742 return _blockStore.back()->_sequence;
745 amps_uint64_t _getLastPersisted(
void)
748 amps_uint64_t lastPersisted = (amps_uint64_t)0;
749 Buffer* pBuffer = _blockStore.getBuffer();
754 if (_lastSequence < lastPersisted)
756 _lastSequence = lastPersisted;
758 return lastPersisted;
762 lastPersisted = _maxDiscarded;
769 lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
772 gettimeofday(&tv, NULL);
773 lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
774 * (amps_uint64_t)1000000;
777 if (_lastSequence > 2)
779 amps_uint64_t low = _getLowestUnpersisted();
780 amps_uint64_t high = _getHighestUnpersisted();
783 lastPersisted = low - 1;
787 _lastSequence = high;
789 if (_lastSequence < lastPersisted)
791 lastPersisted = _lastSequence - 1;
796 _lastSequence = lastPersisted;
799 +
sizeof(amps_uint32_t)
800 +
sizeof(amps_uint32_t));
802 _metadataBlock->_sequence = lastPersisted;
803 return lastPersisted;
808 BufferLock bufferGuard(_blockStore);
810 Buffer* pBuffer = _blockStore.getBuffer();
811 size_t size = pBuffer->
getSize();
816 _metadataBlock = _blockStore.get(1);
817 _metadataBlock->_sequence = (amps_uint64_t)0;
821 pBuffer->
putUint32((amps_uint32_t)blockSize);
825 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
830 size_t numBlocks = size / blockSize;
831 if (size % blockSize > 0)
836 numBlocks = size / blockSize;
838 amps_uint32_t blockCount = 0;
840 delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
843 if (size > pBuffer->
getSize() || numBlocks != (size_t)blockCount)
845 throw StoreException(
"Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
850 amps_uint64_t maxIdx = 0;
851 amps_uint64_t minIdx = 0;
853 BlockHeader blockHeader;
855 Block* blocks =
new Block[numBlocks];
856 blocks[numBlocks - 1]._nextInList = 0;
858 _blockStore.addBlocks(blocks);
859 _metadataBlock = blocks;
860 _metadataBlock->_nextInList = 0;
862 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
870 if (blockHeader._blocksToWrite == 1)
880 if (blockHeader._totalRemaining >= 5000000)
884 recoverOldFormat(blocks);
888 throw StoreException(
"Unrecognized format for Store. Can't recover.");
890 if (blockHeader._blocksToWrite == 0)
893 pBuffer->
putUint32((amps_uint32_t)blockSize);
897 blockSize = blockHeader._blocksToWrite;
898 _blockStore.setBlockSize(blockSize);
900 if (blockHeader._totalRemaining == 0)
907 _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
909 _metadataBlock->_sequence = blockHeader._seq;
910 if (_metadataBlock->_sequence
911 && _metadataBlock->_sequence < (amps_uint64_t)1000000)
914 +
sizeof(amps_uint32_t)
915 +
sizeof(amps_uint32_t));
917 _metadataBlock->_sequence = 0;
922 _maxDiscarded = _metadataBlock->_sequence;
923 _lastSequence = _maxDiscarded;
927 location += blockSize;
928 amps_uint32_t freeCount = 0;
929 Block* firstFree = NULL;
930 Block* endOfFreeList = NULL;
932 typedef std::map<amps_uint64_t, Block*> RecoverMap;
933 RecoverMap recoveredBlocks;
934 while (location < size)
938 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
939 if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
940 (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
943 location += blockSize;
946 Block* block = blocks[++blockNum].setOffset(location);
947 bool recovered =
false;
948 if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
951 block->_sequence = blockHeader._seq;
953 if (maxIdx < blockHeader._seq)
955 maxIdx = blockHeader._seq;
957 if (minIdx > blockHeader._seq)
959 minIdx = blockHeader._seq;
962 recoveredBlocks[blockHeader._seq] = block;
964 while (blockHeader._nextInChain != (amps_uint64_t)0)
966 Block* chain = blocks[++blockNum]
967 .setOffset((
size_t)blockHeader._nextInChain);
968 chain->_nextInList = 0;
969 pBuffer->
setPosition((
size_t)blockHeader._nextInChain
970 +
sizeof(amps_uint32_t)
971 +
sizeof(amps_uint32_t)
972 +
sizeof(amps_uint64_t)
973 +
sizeof(amps_uint64_t));
974 blockHeader._nextInChain = pBuffer->
getUint64();
975 block->_nextInChain = chain;
977 block->_sequence = blockHeader._seq;
986 endOfFreeList->_nextInList = block;
992 endOfFreeList = block;
995 location += blockSize;
999 endOfFreeList->_nextInList = 0;
1001 _blockStore.setFreeList(firstFree, freeCount);
1002 if (maxIdx > _lastSequence)
1004 _lastSequence = maxIdx;
1006 if (minIdx > _maxDiscarded + 1)
1008 _maxDiscarded = minIdx - 1;
1010 if (_maxDiscarded > _metadataBlock->_sequence)
1012 _metadataBlock->_sequence = _maxDiscarded;
1013 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1017 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1018 for (RecoverMap::iterator i = recoveredBlocks.begin();
1019 i != recoveredBlocks.end(); ++i)
1023 end->_nextInList = i->second;
1027 _blockStore.setUsedList(i->second);
1033 end->_nextInList = 0;
1035 _blockStore.setEndOfUsedList(end);
1043 size_t start = block_->_offset;
1044 size_t position = start;
1045 Buffer* pBuffer = _blockStore.getBuffer();
1047 BlockHeader blockHeader;
1048 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1049 if (blockHeader._totalRemaining == 0)
1055 BlockChainHeader blockChainHeader;
1056 pBuffer->
copyBytes((
char*)&blockChainHeader,
sizeof(blockChainHeader));
1057 if (blockChainHeader._operation == Message::Command::Unknown)
1062 blockChainHeader._ackTypes |= Message::AckType::Persisted;
1067 if (blockHeader._totalRemaining
1068 < blockChainHeader._commandIdLen
1069 + blockChainHeader._correlationIdLen
1070 + blockChainHeader._expirationLen
1071 + blockChainHeader._sowKeyLen
1072 + blockChainHeader._topicLen)
1074 std::ostringstream os;
1075 os <<
"Corrupted message found with invalid lengths. " 1076 <<
"Attempting to replay " << block_->_sequence
1077 <<
". Block sequence " << blockHeader._seq
1078 <<
", topic length " << blockChainHeader._topicLen
1079 <<
", data length " << blockHeader._totalRemaining
1080 <<
", command ID length " << blockChainHeader._commandIdLen
1081 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1082 <<
", expiration length " << blockChainHeader._expirationLen
1083 <<
", sow key length " << blockChainHeader._sowKeyLen
1084 <<
", start " << start
1085 <<
", position " << position
1086 <<
", buffer size " << pBuffer->
getSize();
1087 throw StoreException(os.str());
1092 _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1093 _message.setAckTypeEnum((
unsigned)blockChainHeader._ackTypes
1094 | Message::AckType::Persisted);
1095 _message.setSequence(blockHeader._seq);
1097 Block* current = block_;
1099 amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1101 char** tmpBuffers = (blockHeader._blocksToWrite > 1) ?
new char* [blockHeader._blocksToWrite - 1] : 0;
1102 size_t blockNum = 0;
1103 if (blockChainHeader._commandIdLen > 0)
1105 if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1107 _message.assignCommandId(pBuffer->
getBytes(blockChainHeader._commandIdLen)._data,
1108 blockChainHeader._commandIdLen);
1109 blockBytesRemaining -= blockChainHeader._commandIdLen;
1113 tmpBuffers[blockNum] =
new char[blockChainHeader._commandIdLen];
1114 size_t totalLeft = blockChainHeader._commandIdLen;
1115 size_t totalRead = 0;
1119 readLen = blockBytesRemaining < totalLeft ?
1120 blockBytesRemaining : totalLeft;
1121 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1122 if (!(totalLeft -= readLen))
1126 if (!(current = current->_nextInChain))
1130 totalRead += readLen;
1135 blockBytesRemaining -= readLen;
1136 _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1138 blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1139 crcCalc = _crc(_message.getCommandId().data(),
1140 blockChainHeader._commandIdLen, crcCalc);
1142 if (blockChainHeader._correlationIdLen > 0)
1144 if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1146 _message.assignCorrelationId(
1147 pBuffer->
getBytes(blockChainHeader._correlationIdLen)._data,
1148 blockChainHeader._correlationIdLen);
1149 blockBytesRemaining -= blockChainHeader._correlationIdLen;
1153 tmpBuffers[blockNum] =
new char[blockChainHeader._correlationIdLen];
1154 size_t totalLeft = blockChainHeader._correlationIdLen;
1155 size_t totalRead = 0;
1159 readLen = blockBytesRemaining < totalLeft ?
1160 blockBytesRemaining : totalLeft;
1161 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1162 if (!(totalLeft -= readLen))
1166 if (!(current = current->_nextInChain))
1170 totalRead += readLen;
1175 blockBytesRemaining -= readLen;
1176 _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1178 blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1179 crcCalc = _crc(_message.getCorrelationId().data(),
1180 blockChainHeader._correlationIdLen, crcCalc);
1182 if (blockChainHeader._expirationLen > 0)
1184 if (blockChainHeader._expirationLen <= blockBytesRemaining)
1186 _message.assignExpiration(
1187 pBuffer->
getBytes(blockChainHeader._expirationLen)._data,
1188 blockChainHeader._expirationLen);
1189 blockBytesRemaining -= blockChainHeader._expirationLen;
1193 tmpBuffers[blockNum] =
new char[blockChainHeader._expirationLen];
1194 size_t totalLeft = blockChainHeader._expirationLen;
1195 size_t totalRead = 0;
1199 readLen = blockBytesRemaining < totalLeft ?
1200 blockBytesRemaining : totalLeft;
1201 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1202 if (!(totalLeft -= readLen))
1206 if (!(current = current->_nextInChain))
1210 totalRead += readLen;
1215 blockBytesRemaining -= readLen;
1216 _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1218 blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1219 crcCalc = _crc(_message.getExpiration().data(),
1220 blockChainHeader._expirationLen, crcCalc);
1222 if (blockChainHeader._sowKeyLen > 0)
1224 if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1226 _message.assignSowKey(pBuffer->
getBytes(blockChainHeader._sowKeyLen)._data,
1227 blockChainHeader._sowKeyLen);
1228 blockBytesRemaining -= blockChainHeader._sowKeyLen;
1232 tmpBuffers[blockNum] =
new char[blockChainHeader._sowKeyLen];
1233 size_t totalLeft = blockChainHeader._sowKeyLen;
1234 size_t totalRead = 0;
1238 readLen = blockBytesRemaining < totalLeft ?
1239 blockBytesRemaining : totalLeft;
1240 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1241 if (!(totalLeft -= readLen))
1245 if (!(current = current->_nextInChain))
1249 totalRead += readLen;
1254 blockBytesRemaining -= readLen;
1255 _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1257 blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1258 crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1260 if (blockChainHeader._topicLen > 0)
1262 if (blockChainHeader._topicLen <= blockBytesRemaining)
1264 _message.assignTopic(pBuffer->
getBytes(blockChainHeader._topicLen)._data,
1265 blockChainHeader._topicLen);
1266 blockBytesRemaining -= blockChainHeader._topicLen;
1270 tmpBuffers[blockNum] =
new char[blockChainHeader._topicLen];
1271 size_t totalLeft = blockChainHeader._topicLen;
1272 size_t totalRead = 0;
1276 readLen = blockBytesRemaining < totalLeft ?
1277 blockBytesRemaining : totalLeft;
1278 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1279 if (!(totalLeft -= readLen))
1283 if (!(current = current->_nextInChain))
1287 totalRead += readLen;
1292 blockBytesRemaining -= readLen;
1293 _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1295 blockHeader._totalRemaining -= blockChainHeader._topicLen;
1296 crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1298 if (blockHeader._totalRemaining > 0)
1300 if (blockHeader._totalRemaining <= blockBytesRemaining)
1302 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1304 _message.assignData(
1305 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1306 blockHeader._totalRemaining);
1307 crcCalc = _crc(_message.getData().data(),
1308 blockHeader._totalRemaining, crcCalc);
1310 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1312 _message.assignFilter(
1313 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1314 blockHeader._totalRemaining);
1315 crcCalc = _crc(_message.getFilter().data(),
1316 blockHeader._totalRemaining, crcCalc);
1318 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1320 _message.assignSowKeys(
1321 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1322 blockHeader._totalRemaining);
1323 crcCalc = _crc(_message.getSowKeys().data(),
1324 blockHeader._totalRemaining, crcCalc);
1326 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1328 _message.assignBookmark(
1329 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1330 blockHeader._totalRemaining);
1331 crcCalc = _crc(_message.getBookmark().data(),
1332 blockHeader._totalRemaining, crcCalc);
1334 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1336 _message.assignBookmark(
1337 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1338 blockHeader._totalRemaining);
1339 crcCalc = _crc(_message.getBookmark().data(),
1340 blockHeader._totalRemaining, crcCalc);
1341 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1346 tmpBuffers[blockNum] =
new char[blockHeader._totalRemaining];
1347 size_t totalLeft = blockHeader._totalRemaining;
1348 size_t totalRead = 0;
1352 readLen = blockBytesRemaining < totalLeft ?
1353 blockBytesRemaining : totalLeft;
1354 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1355 if (!(totalLeft -= readLen))
1359 if (!(current = current->_nextInChain))
1363 totalRead += readLen;
1368 position += readLen;
1369 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1371 _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1373 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1375 _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1377 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1379 _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1381 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1383 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1385 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1387 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1388 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1390 crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1395 if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1397 std::ostringstream os;
1398 os <<
"Corrupted message found by CRC or sequence " 1399 <<
"Attempting to replay " << block_->_sequence
1400 <<
". Block sequence " << blockHeader._seq
1401 <<
", expiration length " << blockChainHeader._expirationLen
1402 <<
", sowKey length " << blockChainHeader._sowKeyLen
1403 <<
", topic length " << blockChainHeader._topicLen
1404 <<
", data length " << blockHeader._totalRemaining
1405 <<
", command ID length " << blockChainHeader._commandIdLen
1406 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1407 <<
", flag " << blockChainHeader._flag
1408 <<
", expected CRC " << blockHeader._crcVal
1409 <<
", actual CRC " << crcCalc
1410 <<
", start " << start
1411 <<
", position " << position
1412 <<
", buffer size " << pBuffer->
getSize();
1413 for (Block* block = block_; block; block = block->_nextInChain)
1415 os <<
"\n BLOCK " << block->_offset;
1419 for (amps_uint32_t i = 0; i < blockNum; ++i)
1421 delete[] tmpBuffers[i];
1423 delete[] tmpBuffers;
1425 throw StoreException(os.str());
1432 for (amps_uint32_t i = 0; i < blockNum; ++i)
1434 delete[] tmpBuffers[i];
1436 delete[] tmpBuffers;
1442 void recoverOldFormat(Block* blocks)
1444 Buffer* pBuffer = _blockStore.getBuffer();
1445 amps_uint64_t maxIdx = 0;
1446 amps_uint64_t minIdx = 0;
1447 size_t size = pBuffer->
getSize();
1448 size_t location = 0;
1451 pBuffer->
putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1452 _metadataBlock->_sequence = pBuffer->
getUint64();
1453 if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1455 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1457 _metadataBlock->_sequence = 0;
1462 _maxDiscarded = _metadataBlock->_sequence;
1463 _lastSequence = _maxDiscarded;
1466 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1471 amps_uint32_t freeCount = 0;
1472 Block* firstFree = NULL;
1473 Block* endOfFreeList = NULL;
1475 size_t numBlocks = size / blockSize;
1476 size_t blockNum = 0;
1478 typedef std::map<amps_uint64_t, Block*> RecoverMap;
1479 RecoverMap recoveredBlocks;
1480 RecoverMap growingBlocks;
1481 amps_uint32_t growthBlocksNeeded = 0;
1482 while (location < size)
1486 BlockHeader blockHeader;
1487 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1488 size_t blockCount = (size_t)blockHeader._blocksToWrite;
1489 if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1490 && blockHeader._totalRemaining < size
1491 && blockHeader._blocksToWrite < numBlocks
1492 && (blockHeader._blocksToWrite * blockSize)
1493 >= blockHeader._totalRemaining)
1495 size_t oldFormatSize = blockHeader._totalRemaining;
1498 blockHeader._totalRemaining -= 64;
1502 BlockChainHeader chainHeader;
1506 pBuffer->
setPosition(location + (
sizeof(amps_uint32_t) * 2)
1507 + (
sizeof(amps_uint64_t) * 2) );
1513 sizeof(amps_uint32_t) * 8);
1515 if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1516 + chainHeader._expirationLen + chainHeader._sowKeyLen
1517 + chainHeader._topicLen) > blockHeader._totalRemaining)
1524 amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1526 + (blockHeader._totalRemaining
1529 if (blocksNeeded == blockHeader._blocksToWrite)
1531 Block* first = blocks[++blockNum].setOffset(location);
1532 first->_nextInList = 0;
1533 first->_sequence = blockHeader._seq;
1534 if (blockHeader._blocksToWrite > 1)
1537 amps_uint64_t crcVal = blockHeader._crcVal;
1538 blockHeader._crcVal = 0;
1543 size_t currentBlockNum = blockNum
1544 + blockHeader._blocksToWrite
1548 if (currentBlockNum >= numBlocks)
1550 currentBlockNum = currentBlockNum - numBlocks + 1;
1552 if (currentBlockNum < blockNum)
1554 Block* last = blocks[currentBlockNum]
1556 if ((current = firstFree) == last)
1558 firstFree = firstFree->_nextInList;
1569 if (current->_nextInList == last)
1571 current->_nextInList = last->_nextInList;
1576 current = current->_nextInList;
1582 current = blocks[currentBlockNum]
1587 while (current != first)
1589 current->_nextInList = 0;
1590 current->_sequence = blockHeader._seq;
1592 if (--currentBlockNum < 1
1593 || currentBlockNum > numBlocks)
1595 currentBlockNum = numBlocks - 1;
1597 Block* previous = blocks[currentBlockNum]
1598 .init(currentBlockNum,
1600 previous->_nextInChain = current;
1606 size_t bytesToMove = --blockCount
1610 pBuffer->
copyBytes(current->_offset + bytesToMove,
1617 dataBytes -= bytesToMove;
1625 blockHeader._nextInChain = (current->_nextInChain
1626 ? current->_nextInChain->_offset
1627 : (amps_uint64_t)0);
1630 pBuffer->
putBytes((
const char*)&blockHeader,
1631 sizeof(BlockHeader));
1632 if (firstFree == previous)
1634 firstFree = firstFree->_nextInList;
1643 current = firstFree;
1646 if (current->_nextInList == previous)
1648 current->_nextInList = previous->_nextInList;
1652 current = current->_nextInList;
1657 blockNum += blockHeader._blocksToWrite - 1;
1658 blockHeader._crcVal = crcVal;
1669 blockHeader._nextInChain = (first->_nextInChain
1670 ? first->_nextInChain->_offset
1671 : (amps_uint64_t)0);
1672 pBuffer->
putBytes((
const char*)&blockHeader,
1673 sizeof(BlockHeader));
1674 pBuffer->
putBytes((
const char*)&chainHeader,
1675 sizeof(BlockChainHeader));
1678 recoveredBlocks[blockHeader._seq] = first;
1684 growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1685 growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1686 blockNum += blockHeader._blocksToWrite - 1;
1689 if (maxIdx < blockHeader._seq)
1691 maxIdx = blockHeader._seq;
1693 if (minIdx > blockHeader._seq)
1695 minIdx = blockHeader._seq;
1698 location += blockHeader._blocksToWrite *
getBlockSize();
1700 assert(location >= size || blockNum < numBlocks);
1705 Block* block = blocks[++blockNum].setOffset(location);
1708 endOfFreeList->_nextInList = block;
1714 endOfFreeList = block;
1716 location += blockSize;
1719 for (RecoverMap::iterator i = growingBlocks.begin();
1720 i != growingBlocks.end(); ++i)
1722 Block* first = i->second;
1724 BlockHeader blockHeader;
1727 pBuffer->
copyBytes((
char*)&blockHeader, 24);
1730 blockHeader._totalRemaining -= 64;
1733 if (freeCount < growthBlocksNeeded)
1736 amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1737 amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1738 if (growthBlocks < minBlocksRequired)
1740 amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1741 if (minBlocksRequired % defaultBlocks)
1742 minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1744 growthBlocks = minBlocksRequired;
1746 amps_uint32_t newBlocks = 0;
1747 Block* addedBlocks = _blockStore.resizeBuffer(
1749 + growthBlocks * blockSize,
1753 throw StoreException(
"Failed to grow store buffer during recovery");
1755 _blockStore.addBlocks(addedBlocks);
1756 freeCount += newBlocks;
1757 growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1758 ? growthBlocksNeeded - freeCount : 0;
1761 endOfFreeList->_nextInList = addedBlocks;
1765 firstFree = addedBlocks;
1767 endOfFreeList = &(addedBlocks[newBlocks - 1]);
1768 endOfFreeList->_nextInList = 0;
1770 expandBlocks(blocks, first->_offset, first, blockHeader,
1771 &firstFree, &freeCount, pBuffer);
1773 recoveredBlocks[blockHeader._seq] = first;
1781 endOfFreeList->_nextInList = 0;
1783 _blockStore.setFreeList(firstFree, freeCount);
1784 if (maxIdx > _lastSequence)
1786 _lastSequence = maxIdx;
1788 if (minIdx > _maxDiscarded + 1)
1790 _maxDiscarded = minIdx - 1;
1792 if (_maxDiscarded > _metadataBlock->_sequence)
1794 _metadataBlock->_sequence = _maxDiscarded;
1795 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1799 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1800 for (RecoverMap::iterator i = recoveredBlocks.begin();
1801 i != recoveredBlocks.end(); ++i)
1803 if (_blockStore.front())
1805 end->_nextInList = i->second;
1809 _blockStore.setUsedList(i->second);
1815 end->_nextInList = 0;
1817 _blockStore.setEndOfUsedList(end);
1822 void expandBlocks(Block* blocks_,
size_t location_, Block* first_,
1823 BlockHeader blockHeader_,
1824 Block** pFreeList_, amps_uint32_t* pFreeCount_,
1828 Block* current = first_;
1831 amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1832 blockHeader_._totalRemaining -= 64;
1836 amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1838 + (blockHeader_._totalRemaining
1845 size_t endBlockSize = oldTotalRemaining % blockSize;
1848 endBlockSize = blockSize;
1850 size_t endOfData = 0;
1852 amps_uint64_t crcVal = blockHeader_._crcVal;
1853 blockHeader_._crcVal = 0;
1855 std::stack<Block*> blocksUsed;
1856 for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1858 blocksUsed.push(current);
1859 current->_sequence = blockHeader_._seq;
1860 if (i >= blockHeader_._blocksToWrite)
1862 if (i == blockHeader_._blocksToWrite)
1864 endOfData = current->_offset + endBlockSize;
1866 current->_nextInChain = *pFreeList_;
1868 *pFreeList_ = (*pFreeList_)->_nextInList;
1872 current->_nextInChain = current->_nextInList;
1873 if (current->_nextInChain)
1875 if (current->_offset + blockSize < pBuffer_->getSize())
1877 current->_nextInChain->setOffset(current->_offset
1882 current->_nextInChain->setOffset(blockSize);
1887 current->_nextInChain = blocks_[1].init(1, blockSize);
1889 if (current->_nextInChain == *pFreeList_)
1891 *pFreeList_ = (*pFreeList_)->_nextInList;
1896 for (Block* free = *pFreeList_; free;
1897 free = free->_nextInList)
1899 if (free->_nextInList == current->_nextInChain)
1901 free->_nextInList = free->_nextInList->_nextInList;
1908 current->_nextInList = 0;
1909 current = current->_nextInChain;
1912 blockHeader_._blocksToWrite = blocksNeeded;
1914 current->_nextInList = 0;
1915 current->_sequence = blockHeader_._seq;
1925 while (current != first_)
1927 size_t chunkBytesAvail = endOfData > location_
1928 ? endOfData - location_
1930 if (chunkBytesAvail < dataBytes)
1942 chunkBytesAvail = dataBytes - chunkBytesAvail;
1943 endOfData = pBuffer_->
getSize() - chunkBytesAvail;
1950 endOfData -= dataBytes;
1956 blockHeader_._nextInChain = (current->_nextInChain
1957 ? current->_nextInChain->_offset
1958 : (amps_uint64_t)0);
1961 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1962 current = blocksUsed.top();
1973 blockHeader_._crcVal = crcVal;
1974 blockHeader_._nextInChain = first_->_nextInChain->_offset;
1978 pBuffer_->
setPosition(location_ + (
sizeof(amps_uint32_t) * 2)
1979 + (
sizeof(amps_uint64_t) * 2) );
1984 BlockChainHeader chainHeader;
1985 pBuffer_->
copyBytes((
char*)&chainHeader,
1986 sizeof(amps_uint32_t) * 8);
1989 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1990 pBuffer_->
putBytes((
const char*)&chainHeader,
sizeof(BlockChainHeader));
1993 void chooseCRC(
bool isFile)
2002 _crc = AMPS::CRC<0>::crcNoSSE;
2004 if (AMPS::CRC<0>::isSSE42Enabled())
2006 _crc = AMPS::CRC<0>::crc;
2010 _crc = AMPS::CRC<0>::crcNoSSE;
2015 static amps_uint64_t noOpCRC(
const char*,
size_t, amps_uint64_t)
2025 Block* _metadataBlock;
2027 amps_uint64_t _maxDiscarded;
2029 #if __cplusplus >= 201103L || _MSC_VER >= 1900 2030 std::atomic<amps_uint64_t> _lastSequence;
2032 volatile amps_uint64_t _lastSequence;
2035 AMPS_ATOMIC_TYPE _stored;
2040 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1090
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1304
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1302
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1062
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:656
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:561
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1306
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1194
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:593
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:266
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:711
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:642
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1184
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1425
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:241
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1305
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1449
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:251
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, bool errorOnPublishGap_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:199
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1424
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:699
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1160
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:498
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.