26 #ifndef _BLOCKPUBLISHSTORE_H_ 27 #define _BLOCKPUBLISHSTORE_H_ 35 #include <amps/ampscrc.hpp> 36 #if __cplusplus >= 201103L || _MSC_VER >= 1900 42 #include <sys/timeb.h> 64 typedef Lock<BlockStore> BufferLock;
65 typedef Unlock<BlockStore> BufferUnlock;
69 SOW_DELETE_DATA = 0x01,
70 SOW_DELETE_FILTER = 0x02,
71 SOW_DELETE_KEYS = 0x04,
72 SOW_DELETE_BOOKMARK = 0x08,
73 SOW_DELETE_BOOKMARK_CANCEL = 0x10,
74 SOW_DELETE_UNKNOWN = 0x80
81 DEFAULT_BLOCK_HEADER_SIZE = 32,
82 DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83 DEFAULT_BLOCKS_PER_REALLOC = 1000,
84 DEFAULT_BLOCK_SIZE = 2048
124 amps_uint32_t _blocksToWrite;
125 amps_uint32_t _totalRemaining;
127 amps_uint64_t _crcVal;
128 amps_uint64_t _nextInChain;
131 struct BlockChainHeader
133 amps_uint32_t _operation;
134 amps_uint32_t _commandIdLen;
135 amps_uint32_t _correlationIdLen;
136 amps_uint32_t _expirationLen;
137 amps_uint32_t _sowKeyLen;
138 amps_uint32_t _topicLen;
140 amps_uint32_t _ackTypes;
141 amps_uint32_t _unused[8];
143 : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144 , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
155 return DEFAULT_BLOCK_HEADER_SIZE;
165 return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
173 return _blockStore.getBlockSize();
200 amps_uint32_t blocksPerRealloc_ = 1000,
201 bool isFile_ =
false,
202 bool errorOnPublishGap_ =
false,
203 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
205 , _blockStore(buffer_, blocksPerRealloc_,
206 DEFAULT_BLOCK_HEADER_SIZE,
207 (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
208 ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
210 , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
218 BufferLock bufferGuard(_blockStore);
220 _metadataBlock = _blockStore.get(1);
222 _blockStore.setUsedList(0);
223 _blockStore.setEndOfUsedList(0);
226 _metadataBlock->_sequence = (amps_uint64_t)0;
227 Buffer* pBuffer = _blockStore.getBuffer();
233 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
253 return store(message_,
true);
268 const char* commandId, *correlationId, *expiration, *sowKey,
271 BlockHeader blockHeader;
272 BlockChainHeader chainHeader;
274 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
276 chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
278 chainHeader._expirationLen = (amps_uint32_t)dataLen;
280 chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
282 chainHeader._topicLen = (amps_uint32_t)dataLen;
283 message_.getRawData(&data, &dataLen);
284 chainHeader._flag = -1;
286 chainHeader._operation = (amps_uint32_t)operation;
287 if (operation == Message::Command::SOWDelete)
291 chainHeader._flag = SOW_DELETE_DATA;
298 chainHeader._flag = SOW_DELETE_FILTER;
305 chainHeader._flag = SOW_DELETE_KEYS;
310 chainHeader._flag = SOW_DELETE_BOOKMARK;
313 size_t remaining = options.
len();
314 const void* next = NULL;
315 const void* start = (
const void*)(options.
data());
317 while (remaining >= 6 &&
318 (next = memchr(start, (
int)
'c', remaining)) != NULL)
320 remaining = (size_t)next - (
size_t)start;
321 if (remaining >= 6 && strncmp((
const char*)start,
324 chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
332 blockHeader._totalRemaining = (
333 (chainHeader._operation == Message::Command::Unknown)
336 + chainHeader._commandIdLen
337 + chainHeader._correlationIdLen
338 + chainHeader._expirationLen
339 + chainHeader._sowKeyLen
340 + chainHeader._topicLen
341 + (amps_uint32_t)dataLen));
342 size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
344 blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
346 : ((amps_uint32_t)(blockHeader._totalRemaining
348 + ((lastBlockLength > 0) ? 1 : 0)));
349 blockHeader._crcVal = (amps_uint64_t)0ULL;
350 blockHeader._crcVal = _crc(commandId,
351 chainHeader._commandIdLen,
352 blockHeader._crcVal);
353 blockHeader._crcVal = _crc(correlationId,
354 chainHeader._correlationIdLen,
355 blockHeader._crcVal);
356 blockHeader._crcVal = _crc(expiration,
357 chainHeader._expirationLen,
358 blockHeader._crcVal);
359 blockHeader._crcVal = _crc(sowKey,
360 chainHeader._sowKeyLen,
361 blockHeader._crcVal);
362 blockHeader._crcVal = _crc(topic,
363 chainHeader._topicLen,
364 blockHeader._crcVal);
365 blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
368 BufferLock bufferGuard(_blockStore);
369 Block* first = _blockStore.get(blockHeader._blocksToWrite);
372 if (_lastSequence <= 2)
376 blockHeader._seq = ++_lastSequence;
381 message_.getMessage(),
385 _maxDiscarded = blockHeader._seq - 1;
387 if (blockHeader._seq >= _lastSequence)
389 _lastSequence = blockHeader._seq;
395 size_t topicWritten = 0UL;
396 size_t dataWritten = 0UL;
397 size_t commandWritten = 0UL;
398 size_t correlationWritten = 0UL;
399 size_t expirationWritten = 0UL;
400 size_t sowKeyWritten = 0UL;
401 Buffer* pBuffer = _blockStore.getBuffer();
402 for (Block* next = first; next; next = next->_nextInChain)
404 next->_sequence = blockHeader._seq;
405 if (next->_nextInChain)
407 blockHeader._nextInChain = next->_nextInChain->_offset;
411 blockHeader._nextInChain = (amps_uint64_t)0;
415 pBuffer->
putBytes((
const char*)&blockHeader,
sizeof(BlockHeader));
417 blockHeader._crcVal = (amps_uint64_t)0;
423 pBuffer->
putBytes((
const char*)&chainHeader,
424 sizeof(BlockChainHeader));
433 if (commandWritten < chainHeader._commandIdLen)
435 size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
436 pBuffer->
putBytes(commandId + commandWritten,
438 bytesRemaining -= commandWrite;
439 commandWritten += commandWrite;
441 if (correlationWritten < chainHeader._correlationIdLen)
443 size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
444 pBuffer->
putBytes(correlationId + correlationWritten,
446 bytesRemaining -= correlationWrite;
447 correlationWritten += correlationWrite;
449 if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
451 size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
452 pBuffer->
putBytes(expiration + expirationWritten, expWrite);
453 bytesRemaining -= expWrite;
454 expirationWritten += expWrite;
456 if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
458 size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
459 pBuffer->
putBytes(sowKey + sowKeyWritten, sowKeyWrite);
460 bytesRemaining -= sowKeyWrite;
461 sowKeyWritten += sowKeyWrite;
463 if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
465 size_t topicWrite = (chainHeader._topicLen - topicWritten
467 ? chainHeader._topicLen - topicWritten
469 pBuffer->
putBytes(topic + topicWritten, topicWrite);
470 bytesRemaining -= topicWrite;
471 topicWritten += topicWrite;
473 if (bytesRemaining > 0 && dataWritten < dataLen)
475 size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
476 dataLen - dataWritten : bytesRemaining;
477 pBuffer->
putBytes(data + dataWritten, dataWrite);
478 bytesRemaining -= dataWrite;
479 dataWritten += dataWrite;
483 catch (
const AMPSException&)
485 _blockStore.put(first);
488 AMPS_FETCH_ADD(&_stored, 1);
489 return blockHeader._seq;
501 BufferLock bufferGuard(_blockStore);
502 Buffer* pBuffer = _blockStore.getBuffer();
505 amps_uint64_t lastPersisted = _metadataBlock->_sequence;
507 if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
511 if (lastPersisted < index_)
515 _metadataBlock->_sequence = index_;
516 if (_maxDiscarded < index_)
518 _maxDiscarded = index_;
520 if (_lastSequence <= index_)
522 _lastSequence = index_;
529 else if (getErrorOnPublishGap() && index_ < lastPersisted)
531 std::ostringstream os;
532 os <<
"Server last saw " << index_ <<
" from Client but Store " 533 <<
"has already discarded up to " << lastPersisted
534 <<
" which would leave a gap of unreceived messages.";
535 throw PublishStoreGapException(os.str());
537 _blockStore.signalAll();
541 _maxDiscarded = index_;
542 AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
543 _blockStore.signalAll();
544 if (lastPersisted >= index_)
550 _metadataBlock->_sequence = index_;
551 if (_lastSequence < index_)
553 _lastSequence = index_;
564 BufferLock bufferGuard(_blockStore);
566 if (!_blockStore.front())
570 Block* next = _blockStore.front();
573 for (Block* block = _blockStore.front(); block; block = next)
576 replayOnto(block, replayer_);
577 next = block->_nextInList;
580 catch (
const StoreException&)
582 _blockStore.putAll(next);
595 BufferLock bufferGuard(_blockStore);
597 if (!_blockStore.front())
602 amps_uint64_t lastIdx = _blockStore.back()->_sequence;
604 amps_uint64_t leastIdx = _blockStore.front()->_sequence;
605 if (index_ >= leastIdx && index_ <= lastIdx)
607 Block* block = _blockStore.front();
608 while (block && block->_sequence != index_)
610 block = block->_nextInList;
617 Buffer* pBuffer = _blockStore.getBuffer();
619 sizeof(amps_uint32_t));
624 replayOnto(block, replayer_);
631 _message.setSequence(leastIdx);
644 size_t count = (size_t)_stored;
658 BufferLock bufferGuard(_blockStore);
659 amps_uint64_t waitFor = _getHighestUnpersisted();
667 bool timedOut =
false;
668 AMPS_START_TIMER(timeout_);
670 while (!timedOut && _stored != 0
671 && waitFor >= _getLowestUnpersisted())
673 if (!_blockStore.wait(timeout_))
676 AMPS_RESET_TIMER(timedOut, timeout_);
680 if (timedOut && _stored != 0
681 && waitFor >= _getLowestUnpersisted())
683 throw TimedOutException(
"Timed out waiting to flush publish store.");
688 while (_stored != 0 && waitFor >= _getLowestUnpersisted())
691 _blockStore.wait(1000);
693 BufferUnlock unlck(_blockStore);
694 amps_invoke_waiting_function();
701 BufferLock bufferGuard(_blockStore);
702 return _getLowestUnpersisted();
705 amps_uint64_t getHighestUnpersisted()
const 707 BufferLock bufferGuard(_blockStore);
708 return _getHighestUnpersisted();
713 BufferLock bufferGuard(_blockStore);
714 return _getLastPersisted();
718 static bool canResize(
size_t requestedSize_,
void* vpThis_)
721 return me->callResizeHandler(requestedSize_);
724 amps_uint64_t _getLowestUnpersisted()
const 728 if (!_blockStore.front())
732 return _blockStore.front()->_sequence;
735 amps_uint64_t _getHighestUnpersisted()
const 739 if (!_blockStore.back())
743 return _blockStore.back()->_sequence;
746 amps_uint64_t _getLastPersisted(
void)
749 amps_uint64_t lastPersisted = (amps_uint64_t)0;
750 Buffer* pBuffer = _blockStore.getBuffer();
755 if (_lastSequence < lastPersisted)
757 _lastSequence = lastPersisted;
759 return lastPersisted;
763 lastPersisted = _maxDiscarded;
770 lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
773 gettimeofday(&tv, NULL);
774 lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
775 * (amps_uint64_t)1000000;
778 if (_lastSequence > 2)
780 amps_uint64_t low = _getLowestUnpersisted();
781 amps_uint64_t high = _getHighestUnpersisted();
784 lastPersisted = low - 1;
788 _lastSequence = high;
790 if (_lastSequence < lastPersisted)
792 lastPersisted = _lastSequence - 1;
797 _lastSequence = lastPersisted;
800 +
sizeof(amps_uint32_t)
801 +
sizeof(amps_uint32_t));
803 _metadataBlock->_sequence = lastPersisted;
804 return lastPersisted;
809 BufferLock bufferGuard(_blockStore);
811 Buffer* pBuffer = _blockStore.getBuffer();
812 size_t size = pBuffer->
getSize();
817 _metadataBlock = _blockStore.get(1);
818 _metadataBlock->_sequence = (amps_uint64_t)0;
826 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
831 size_t numBlocks = size / blockSize;
832 if (size % blockSize > 0)
837 numBlocks = size / blockSize;
839 amps_uint32_t blockCount = 0;
841 delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
844 if (size > pBuffer->
getSize() || numBlocks != (size_t)blockCount)
846 throw StoreException(
"Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
851 amps_uint64_t maxIdx = 0;
852 amps_uint64_t minIdx = 0;
854 BlockHeader blockHeader;
856 Block* blocks =
new Block[numBlocks];
857 blocks[numBlocks - 1]._nextInList = 0;
859 _blockStore.addBlocks(blocks);
860 _metadataBlock = blocks;
861 _metadataBlock->_nextInList = 0;
863 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
871 if (blockHeader._blocksToWrite == 1)
881 if (blockHeader._totalRemaining >= 5000000)
885 recoverOldFormat(blocks);
889 throw StoreException(
"Unrecognized format for Store. Can't recover.");
891 if (blockHeader._blocksToWrite == 0)
898 blockSize = blockHeader._blocksToWrite;
899 _blockStore.setBlockSize(blockSize);
901 if (blockHeader._totalRemaining == 0)
908 _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
910 _metadataBlock->_sequence = blockHeader._seq;
911 if (_metadataBlock->_sequence
912 && _metadataBlock->_sequence < (amps_uint64_t)1000000)
915 +
sizeof(amps_uint32_t)
916 +
sizeof(amps_uint32_t));
918 _metadataBlock->_sequence = 0;
923 _maxDiscarded = _metadataBlock->_sequence;
924 _lastSequence = _maxDiscarded;
928 location += blockSize;
929 amps_uint32_t freeCount = 0;
930 Block* firstFree = NULL;
931 Block* endOfFreeList = NULL;
933 typedef std::map<amps_uint64_t, Block*> RecoverMap;
934 RecoverMap recoveredBlocks;
935 while (location < size)
939 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
940 if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
941 (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
944 location += blockSize;
947 Block* block = blocks[++blockNum].setOffset(location);
948 bool recovered =
false;
949 if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
952 block->_sequence = blockHeader._seq;
954 if (maxIdx < blockHeader._seq)
956 maxIdx = blockHeader._seq;
958 if (minIdx > blockHeader._seq)
960 minIdx = blockHeader._seq;
963 recoveredBlocks[blockHeader._seq] = block;
965 while (blockHeader._nextInChain != (amps_uint64_t)0)
967 Block* chain = blocks[++blockNum]
968 .setOffset((
size_t)blockHeader._nextInChain);
969 chain->_nextInList = 0;
970 pBuffer->
setPosition((
size_t)blockHeader._nextInChain
971 +
sizeof(amps_uint32_t)
972 +
sizeof(amps_uint32_t)
973 +
sizeof(amps_uint64_t)
974 +
sizeof(amps_uint64_t));
975 blockHeader._nextInChain = pBuffer->
getUint64();
976 block->_nextInChain = chain;
978 block->_sequence = blockHeader._seq;
987 endOfFreeList->_nextInList = block;
993 endOfFreeList = block;
996 location += blockSize;
1000 endOfFreeList->_nextInList = 0;
1002 _blockStore.setFreeList(firstFree, freeCount);
1003 if (maxIdx > _lastSequence)
1005 _lastSequence = maxIdx;
1007 if (minIdx > _maxDiscarded + 1)
1009 _maxDiscarded = minIdx - 1;
1011 if (_maxDiscarded > _metadataBlock->_sequence)
1013 _metadataBlock->_sequence = _maxDiscarded;
1014 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1018 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1019 for (RecoverMap::iterator i = recoveredBlocks.begin();
1020 i != recoveredBlocks.end(); ++i)
1024 end->_nextInList = i->second;
1028 _blockStore.setUsedList(i->second);
1034 end->_nextInList = 0;
1036 _blockStore.setEndOfUsedList(end);
1044 size_t start = block_->_offset;
1045 size_t position = start;
1046 Buffer* pBuffer = _blockStore.getBuffer();
1048 BlockHeader blockHeader;
1049 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1050 if (blockHeader._totalRemaining == 0)
1056 BlockChainHeader blockChainHeader;
1057 pBuffer->
copyBytes((
char*)&blockChainHeader,
sizeof(blockChainHeader));
1058 if (blockChainHeader._operation == Message::Command::Unknown)
1063 blockChainHeader._ackTypes |= Message::AckType::Persisted;
1068 if (blockHeader._totalRemaining
1069 < blockChainHeader._commandIdLen
1070 + blockChainHeader._correlationIdLen
1071 + blockChainHeader._expirationLen
1072 + blockChainHeader._sowKeyLen
1073 + blockChainHeader._topicLen)
1075 std::ostringstream os;
1076 os <<
"Corrupted message found with invalid lengths. " 1077 <<
"Attempting to replay " << block_->_sequence
1078 <<
". Block sequence " << blockHeader._seq
1079 <<
", topic length " << blockChainHeader._topicLen
1080 <<
", data length " << blockHeader._totalRemaining
1081 <<
", command ID length " << blockChainHeader._commandIdLen
1082 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1083 <<
", expiration length " << blockChainHeader._expirationLen
1084 <<
", sow key length " << blockChainHeader._sowKeyLen
1085 <<
", start " << start
1086 <<
", position " << position
1087 <<
", buffer size " << pBuffer->
getSize();
1088 throw StoreException(os.str());
1093 _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1094 _message.setAckTypeEnum((
unsigned)blockChainHeader._ackTypes
1095 | Message::AckType::Persisted);
1096 _message.setSequence(blockHeader._seq);
1098 Block* current = block_;
1100 amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1102 char** tmpBuffers = (blockHeader._blocksToWrite > 1) ?
new char* [blockHeader._blocksToWrite - 1] : 0;
1103 size_t blockNum = 0;
1104 if (blockChainHeader._commandIdLen > 0)
1106 if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1108 _message.assignCommandId(pBuffer->
getBytes(blockChainHeader._commandIdLen)._data,
1109 blockChainHeader._commandIdLen);
1110 blockBytesRemaining -= blockChainHeader._commandIdLen;
1114 tmpBuffers[blockNum] =
new char[blockChainHeader._commandIdLen];
1115 size_t totalLeft = blockChainHeader._commandIdLen;
1116 size_t totalRead = 0;
1120 readLen = blockBytesRemaining < totalLeft ?
1121 blockBytesRemaining : totalLeft;
1122 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1123 if (!(totalLeft -= readLen))
1127 if (!(current = current->_nextInChain))
1131 totalRead += readLen;
1136 blockBytesRemaining -= readLen;
1137 _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1139 blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1140 crcCalc = _crc(_message.getCommandId().data(),
1141 blockChainHeader._commandIdLen, crcCalc);
1143 if (blockChainHeader._correlationIdLen > 0)
1145 if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1147 _message.assignCorrelationId(
1148 pBuffer->
getBytes(blockChainHeader._correlationIdLen)._data,
1149 blockChainHeader._correlationIdLen);
1150 blockBytesRemaining -= blockChainHeader._correlationIdLen;
1154 tmpBuffers[blockNum] =
new char[blockChainHeader._correlationIdLen];
1155 size_t totalLeft = blockChainHeader._correlationIdLen;
1156 size_t totalRead = 0;
1160 readLen = blockBytesRemaining < totalLeft ?
1161 blockBytesRemaining : totalLeft;
1162 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1163 if (!(totalLeft -= readLen))
1167 if (!(current = current->_nextInChain))
1171 totalRead += readLen;
1176 blockBytesRemaining -= readLen;
1177 _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1179 blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1180 crcCalc = _crc(_message.getCorrelationId().data(),
1181 blockChainHeader._correlationIdLen, crcCalc);
1183 if (blockChainHeader._expirationLen > 0)
1185 if (blockChainHeader._expirationLen <= blockBytesRemaining)
1187 _message.assignExpiration(
1188 pBuffer->
getBytes(blockChainHeader._expirationLen)._data,
1189 blockChainHeader._expirationLen);
1190 blockBytesRemaining -= blockChainHeader._expirationLen;
1194 tmpBuffers[blockNum] =
new char[blockChainHeader._expirationLen];
1195 size_t totalLeft = blockChainHeader._expirationLen;
1196 size_t totalRead = 0;
1200 readLen = blockBytesRemaining < totalLeft ?
1201 blockBytesRemaining : totalLeft;
1202 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1203 if (!(totalLeft -= readLen))
1207 if (!(current = current->_nextInChain))
1211 totalRead += readLen;
1216 blockBytesRemaining -= readLen;
1217 _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1219 blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1220 crcCalc = _crc(_message.getExpiration().data(),
1221 blockChainHeader._expirationLen, crcCalc);
1223 if (blockChainHeader._sowKeyLen > 0)
1225 if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1227 _message.assignSowKey(pBuffer->
getBytes(blockChainHeader._sowKeyLen)._data,
1228 blockChainHeader._sowKeyLen);
1229 blockBytesRemaining -= blockChainHeader._sowKeyLen;
1233 tmpBuffers[blockNum] =
new char[blockChainHeader._sowKeyLen];
1234 size_t totalLeft = blockChainHeader._sowKeyLen;
1235 size_t totalRead = 0;
1239 readLen = blockBytesRemaining < totalLeft ?
1240 blockBytesRemaining : totalLeft;
1241 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1242 if (!(totalLeft -= readLen))
1246 if (!(current = current->_nextInChain))
1250 totalRead += readLen;
1255 blockBytesRemaining -= readLen;
1256 _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1258 blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1259 crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1261 if (blockChainHeader._topicLen > 0)
1263 if (blockChainHeader._topicLen <= blockBytesRemaining)
1265 _message.assignTopic(pBuffer->
getBytes(blockChainHeader._topicLen)._data,
1266 blockChainHeader._topicLen);
1267 blockBytesRemaining -= blockChainHeader._topicLen;
1271 tmpBuffers[blockNum] =
new char[blockChainHeader._topicLen];
1272 size_t totalLeft = blockChainHeader._topicLen;
1273 size_t totalRead = 0;
1277 readLen = blockBytesRemaining < totalLeft ?
1278 blockBytesRemaining : totalLeft;
1279 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1280 if (!(totalLeft -= readLen))
1284 if (!(current = current->_nextInChain))
1288 totalRead += readLen;
1293 blockBytesRemaining -= readLen;
1294 _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1296 blockHeader._totalRemaining -= blockChainHeader._topicLen;
1297 crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1299 if (blockHeader._totalRemaining > 0)
1301 if (blockHeader._totalRemaining <= blockBytesRemaining)
1303 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1305 _message.assignData(
1306 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1307 blockHeader._totalRemaining);
1308 crcCalc = _crc(_message.getData().data(),
1309 blockHeader._totalRemaining, crcCalc);
1311 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1313 _message.assignFilter(
1314 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1315 blockHeader._totalRemaining);
1316 crcCalc = _crc(_message.getFilter().data(),
1317 blockHeader._totalRemaining, crcCalc);
1319 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1321 _message.assignSowKeys(
1322 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1323 blockHeader._totalRemaining);
1324 crcCalc = _crc(_message.getSowKeys().data(),
1325 blockHeader._totalRemaining, crcCalc);
1327 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1329 _message.assignBookmark(
1330 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1331 blockHeader._totalRemaining);
1332 crcCalc = _crc(_message.getBookmark().data(),
1333 blockHeader._totalRemaining, crcCalc);
1335 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1337 _message.assignBookmark(
1338 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1339 blockHeader._totalRemaining);
1340 crcCalc = _crc(_message.getBookmark().data(),
1341 blockHeader._totalRemaining, crcCalc);
1342 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1347 tmpBuffers[blockNum] =
new char[blockHeader._totalRemaining];
1348 size_t totalLeft = blockHeader._totalRemaining;
1349 size_t totalRead = 0;
1353 readLen = blockBytesRemaining < totalLeft ?
1354 blockBytesRemaining : totalLeft;
1355 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1356 if (!(totalLeft -= readLen))
1360 if (!(current = current->_nextInChain))
1364 totalRead += readLen;
1369 position += readLen;
1370 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1372 _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1374 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1376 _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1378 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1380 _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1382 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1384 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1386 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1388 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1389 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1391 crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1396 if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1398 std::ostringstream os;
1399 os <<
"Corrupted message found by CRC or sequence " 1400 <<
"Attempting to replay " << block_->_sequence
1401 <<
". Block sequence " << blockHeader._seq
1402 <<
", expiration length " << blockChainHeader._expirationLen
1403 <<
", sowKey length " << blockChainHeader._sowKeyLen
1404 <<
", topic length " << blockChainHeader._topicLen
1405 <<
", data length " << blockHeader._totalRemaining
1406 <<
", command ID length " << blockChainHeader._commandIdLen
1407 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1408 <<
", flag " << blockChainHeader._flag
1409 <<
", expected CRC " << blockHeader._crcVal
1410 <<
", actual CRC " << crcCalc
1411 <<
", start " << start
1412 <<
", position " << position
1413 <<
", buffer size " << pBuffer->
getSize();
1414 for (Block* block = block_; block; block = block->_nextInChain)
1416 os <<
"\n BLOCK " << block->_offset;
1420 for (amps_uint32_t i = 0; i < blockNum; ++i)
1422 delete[] tmpBuffers[i];
1424 delete[] tmpBuffers;
1426 throw StoreException(os.str());
1433 for (amps_uint32_t i = 0; i < blockNum; ++i)
1435 delete[] tmpBuffers[i];
1437 delete[] tmpBuffers;
1443 void recoverOldFormat(Block* blocks)
1445 Buffer* pBuffer = _blockStore.getBuffer();
1446 amps_uint64_t maxIdx = 0;
1447 amps_uint64_t minIdx = 0;
1448 size_t size = pBuffer->
getSize();
1449 size_t location = 0;
1452 pBuffer->
putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1453 _metadataBlock->_sequence = pBuffer->
getUint64();
1454 if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1456 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1458 _metadataBlock->_sequence = 0;
1463 _maxDiscarded = _metadataBlock->_sequence;
1464 _lastSequence = _maxDiscarded;
1467 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1472 amps_uint32_t freeCount = 0;
1473 Block* firstFree = NULL;
1474 Block* endOfFreeList = NULL;
1476 size_t numBlocks = size / blockSize;
1477 size_t blockNum = 0;
1479 typedef std::map<amps_uint64_t, Block*> RecoverMap;
1480 RecoverMap recoveredBlocks;
1481 RecoverMap growingBlocks;
1482 amps_uint32_t growthBlocksNeeded = 0;
1483 while (location < size)
1487 BlockHeader blockHeader;
1488 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1489 size_t blockCount = (size_t)blockHeader._blocksToWrite;
1490 if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1491 && blockHeader._totalRemaining < size
1492 && blockHeader._blocksToWrite < numBlocks
1493 && (blockHeader._blocksToWrite * blockSize)
1494 >= blockHeader._totalRemaining)
1496 size_t oldFormatSize = blockHeader._totalRemaining;
1499 blockHeader._totalRemaining -= 64;
1503 BlockChainHeader chainHeader;
1507 pBuffer->
setPosition(location + (
sizeof(amps_uint32_t) * 2)
1508 + (
sizeof(amps_uint64_t) * 2) );
1514 sizeof(amps_uint32_t) * 8);
1516 if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1517 + chainHeader._expirationLen + chainHeader._sowKeyLen
1518 + chainHeader._topicLen) > blockHeader._totalRemaining)
1525 amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1527 + (blockHeader._totalRemaining
1530 if (blocksNeeded == blockHeader._blocksToWrite)
1532 Block* first = blocks[++blockNum].setOffset(location);
1533 first->_nextInList = 0;
1534 first->_sequence = blockHeader._seq;
1535 if (blockHeader._blocksToWrite > 1)
1538 amps_uint64_t crcVal = blockHeader._crcVal;
1539 blockHeader._crcVal = 0;
1544 size_t currentBlockNum = blockNum
1545 + blockHeader._blocksToWrite
1549 if (currentBlockNum >= numBlocks)
1551 currentBlockNum = currentBlockNum - numBlocks + 1;
1553 if (currentBlockNum < blockNum)
1555 Block* last = blocks[currentBlockNum]
1557 if ((current = firstFree) == last)
1559 firstFree = firstFree->_nextInList;
1570 if (current->_nextInList == last)
1572 current->_nextInList = last->_nextInList;
1577 current = current->_nextInList;
1583 current = blocks[currentBlockNum]
1588 while (current != first)
1590 current->_nextInList = 0;
1591 current->_sequence = blockHeader._seq;
1593 if (--currentBlockNum < 1
1594 || currentBlockNum > numBlocks)
1596 currentBlockNum = numBlocks - 1;
1598 Block* previous = blocks[currentBlockNum]
1599 .init(currentBlockNum,
1601 previous->_nextInChain = current;
1607 size_t bytesToMove = --blockCount
1611 pBuffer->
copyBytes(current->_offset + bytesToMove,
1618 dataBytes -= bytesToMove;
1626 blockHeader._nextInChain = (current->_nextInChain
1627 ? current->_nextInChain->_offset
1628 : (amps_uint64_t)0);
1631 pBuffer->
putBytes((
const char*)&blockHeader,
1632 sizeof(BlockHeader));
1633 if (firstFree == previous)
1635 firstFree = firstFree->_nextInList;
1644 current = firstFree;
1647 if (current->_nextInList == previous)
1649 current->_nextInList = previous->_nextInList;
1653 current = current->_nextInList;
1658 blockNum += blockHeader._blocksToWrite - 1;
1659 blockHeader._crcVal = crcVal;
1670 blockHeader._nextInChain = (first->_nextInChain
1671 ? first->_nextInChain->_offset
1672 : (amps_uint64_t)0);
1673 pBuffer->
putBytes((
const char*)&blockHeader,
1674 sizeof(BlockHeader));
1675 pBuffer->
putBytes((
const char*)&chainHeader,
1676 sizeof(BlockChainHeader));
1679 recoveredBlocks[blockHeader._seq] = first;
1685 growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1686 growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1687 blockNum += blockHeader._blocksToWrite - 1;
1690 if (maxIdx < blockHeader._seq)
1692 maxIdx = blockHeader._seq;
1694 if (minIdx > blockHeader._seq)
1696 minIdx = blockHeader._seq;
1699 location += blockHeader._blocksToWrite *
getBlockSize();
1701 assert(location >= size || blockNum < numBlocks);
1706 Block* block = blocks[++blockNum].setOffset(location);
1709 endOfFreeList->_nextInList = block;
1715 endOfFreeList = block;
1717 location += blockSize;
1720 for (RecoverMap::iterator i = growingBlocks.begin();
1721 i != growingBlocks.end(); ++i)
1723 Block* first = i->second;
1725 BlockHeader blockHeader;
1728 pBuffer->
copyBytes((
char*)&blockHeader, 24);
1731 blockHeader._totalRemaining -= 64;
1734 if (freeCount < growthBlocksNeeded)
1737 amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1738 amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1739 if (growthBlocks < minBlocksRequired)
1741 amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1742 if (minBlocksRequired % defaultBlocks)
1743 minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1745 growthBlocks = minBlocksRequired;
1747 amps_uint32_t newBlocks = 0;
1748 Block* addedBlocks = _blockStore.resizeBuffer(
1750 + growthBlocks * blockSize,
1754 throw StoreException(
"Failed to grow store buffer during recovery");
1756 _blockStore.addBlocks(addedBlocks);
1757 freeCount += newBlocks;
1758 growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1759 ? growthBlocksNeeded - freeCount : 0;
1762 endOfFreeList->_nextInList = addedBlocks;
1766 firstFree = addedBlocks;
1768 endOfFreeList = &(addedBlocks[newBlocks - 1]);
1769 endOfFreeList->_nextInList = 0;
1771 expandBlocks(blocks, first->_offset, first, blockHeader,
1772 &firstFree, &freeCount, pBuffer);
1774 recoveredBlocks[blockHeader._seq] = first;
1782 endOfFreeList->_nextInList = 0;
1784 _blockStore.setFreeList(firstFree, freeCount);
1785 if (maxIdx > _lastSequence)
1787 _lastSequence = maxIdx;
1789 if (minIdx > _maxDiscarded + 1)
1791 _maxDiscarded = minIdx - 1;
1793 if (_maxDiscarded > _metadataBlock->_sequence)
1795 _metadataBlock->_sequence = _maxDiscarded;
1796 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1800 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1801 for (RecoverMap::iterator i = recoveredBlocks.begin();
1802 i != recoveredBlocks.end(); ++i)
1804 if (_blockStore.front())
1806 end->_nextInList = i->second;
1810 _blockStore.setUsedList(i->second);
1816 end->_nextInList = 0;
1818 _blockStore.setEndOfUsedList(end);
1823 void expandBlocks(Block* blocks_,
size_t location_, Block* first_,
1824 BlockHeader blockHeader_,
1825 Block** pFreeList_, amps_uint32_t* pFreeCount_,
1829 Block* current = first_;
1832 amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1833 blockHeader_._totalRemaining -= 64;
1837 amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1839 + (blockHeader_._totalRemaining
1846 size_t endBlockSize = oldTotalRemaining % blockSize;
1849 endBlockSize = blockSize;
1851 size_t endOfData = 0;
1853 amps_uint64_t crcVal = blockHeader_._crcVal;
1854 blockHeader_._crcVal = 0;
1856 std::stack<Block*> blocksUsed;
1857 for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1859 blocksUsed.push(current);
1860 current->_sequence = blockHeader_._seq;
1861 if (i >= blockHeader_._blocksToWrite)
1863 if (i == blockHeader_._blocksToWrite)
1865 endOfData = current->_offset + endBlockSize;
1867 current->_nextInChain = *pFreeList_;
1869 *pFreeList_ = (*pFreeList_)->_nextInList;
1873 current->_nextInChain = current->_nextInList;
1874 if (current->_nextInChain)
1876 if (current->_offset + blockSize < pBuffer_->getSize())
1878 current->_nextInChain->setOffset(current->_offset
1883 current->_nextInChain->setOffset(blockSize);
1888 current->_nextInChain = blocks_[1].init(1, blockSize);
1890 if (current->_nextInChain == *pFreeList_)
1892 *pFreeList_ = (*pFreeList_)->_nextInList;
1897 for (Block* free = *pFreeList_; free;
1898 free = free->_nextInList)
1900 if (free->_nextInList == current->_nextInChain)
1902 free->_nextInList = free->_nextInList->_nextInList;
1909 current->_nextInList = 0;
1910 current = current->_nextInChain;
1913 blockHeader_._blocksToWrite = blocksNeeded;
1915 current->_nextInList = 0;
1916 current->_sequence = blockHeader_._seq;
1926 while (current != first_)
1928 size_t chunkBytesAvail = endOfData > location_
1929 ? endOfData - location_
1931 if (chunkBytesAvail < dataBytes)
1943 chunkBytesAvail = dataBytes - chunkBytesAvail;
1944 endOfData = pBuffer_->
getSize() - chunkBytesAvail;
1951 endOfData -= dataBytes;
1957 blockHeader_._nextInChain = (current->_nextInChain
1958 ? current->_nextInChain->_offset
1959 : (amps_uint64_t)0);
1962 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1963 current = blocksUsed.top();
1974 blockHeader_._crcVal = crcVal;
1975 blockHeader_._nextInChain = first_->_nextInChain->_offset;
1979 pBuffer_->
setPosition(location_ + (
sizeof(amps_uint32_t) * 2)
1980 + (
sizeof(amps_uint64_t) * 2) );
1985 BlockChainHeader chainHeader;
1986 pBuffer_->
copyBytes((
char*)&chainHeader,
1987 sizeof(amps_uint32_t) * 8);
1990 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1991 pBuffer_->
putBytes((
const char*)&chainHeader,
sizeof(BlockChainHeader));
1994 void chooseCRC(
bool isFile)
2003 _crc = AMPS::CRC<0>::crcNoSSE;
2005 if (AMPS::CRC<0>::isSSE42Enabled())
2007 _crc = AMPS::CRC<0>::crc;
2011 _crc = AMPS::CRC<0>::crcNoSSE;
2016 static amps_uint64_t noOpCRC(
const char*,
size_t, amps_uint64_t)
2026 Block* _metadataBlock;
2028 amps_uint64_t _maxDiscarded;
2030 #if __cplusplus >= 201103L || _MSC_VER >= 1900 2031 std::atomic<amps_uint64_t> _lastSequence;
2033 volatile amps_uint64_t _lastSequence;
2036 AMPS_ATOMIC_TYPE _stored;
2041 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the CorrelationId header of self in the un...
Definition: Message.hpp:1366
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1290
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the CommandId header of self in the underl...
Definition: Message.hpp:1364
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:656
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:561
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Filter header of self in the underlyin...
Definition: Message.hpp:1368
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1378
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Bookmark header of self in the underly...
Definition: Message.hpp:1256
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:593
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:266
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:711
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:642
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1191
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the SowKeys header of self in the underlyi...
Definition: Message.hpp:1487
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:241
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Expiration header of self in the under...
Definition: Message.hpp:1367
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Topic header of self in the underlying...
Definition: Message.hpp:1511
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:251
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, bool errorOnPublishGap_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:199
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the SowKey header of self in the underlyin...
Definition: Message.hpp:1486
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1222
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:699
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:498
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.