AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.5
BlockPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKPUBLISHSTORE_H_
27 #define _BLOCKPUBLISHSTORE_H_
28 #include <amps/ampsplusplus.hpp>
29 #include <amps/BlockStore.hpp>
30 #include <amps/Buffer.hpp>
31 #include <sstream>
32 #include <stack>
33 #include <string>
34 #include <map>
35 #include <amps/ampscrc.hpp>
36 #if __cplusplus >= 201103L || _MSC_VER >= 1900
37  #include <atomic>
38 #endif
39 
40 #ifdef _WIN32
41  #include <intrin.h>
42  #include <sys/timeb.h>
43 #else
44  #include <sys/time.h>
45 #endif
46 #include <iostream>
47 
52 
53 namespace AMPS
54 {
61  {
62  public:
63  typedef BlockStore::Block Block;
64  typedef Lock<BlockStore> BufferLock;
65  typedef Unlock<BlockStore> BufferUnlock;
66 
67  typedef enum
68  {
69  SOW_DELETE_DATA = 0x01,
70  SOW_DELETE_FILTER = 0x02,
71  SOW_DELETE_KEYS = 0x04,
72  SOW_DELETE_BOOKMARK = 0x08,
73  SOW_DELETE_BOOKMARK_CANCEL = 0x10,
74  SOW_DELETE_UNKNOWN = 0x80
75  } SowDeleteType;
76 
79  enum Constants : amps_uint32_t
80  {
81  DEFAULT_BLOCK_HEADER_SIZE = 32,
82  DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83  DEFAULT_BLOCKS_PER_REALLOC = 1000,
84  DEFAULT_BLOCK_SIZE = 2048
85  };
86 
87  /**********************************************************************
88  * Storage format
89  *************************************************************************
90  * Field Description | Type | # Bytes
91  *************************************************************************
92  * HEADER as detailed below | | 32 TOTAL
93  * | |
94  * Total number of blocks used by the record | uint32_t | 4
95  * Total length of the saved record | uint32_t | 4
96  * HA Message sequence | uint64_t | 8
97  * CRC Value - only set in first Block | uint64_t | 8
98  * next in chain offset | uint64_t | 8
99  *************************************************************************
100  * CHAIN HEADER as detailed below | | 64 TOTAL
101  * | |
102  * operation | uint32_t | 4
103  * command id length | uint32_t | 4
104  * correltation id length | uint32_t | 4
105  * expiration length | uint32_t | 4
106  * sow key length | uint32_t | 4
107  * topic length | uint32_t | 4
108  * sow delete flag | int32_t | 4
109  * ack types | uint32_t | 4
110  * unused [8] | uint32_t | 4*8 = 32
111  *************************************************************************
112  * DATA SECTION - can be spread across multiple blocks
113  *
114  * command id | char[]
115  * correlation id | char[]
116  * expiration | char[]
117  * sow key | char[]
118  * topic | char[]
119  * data | char[]
120  *************************************************************************/
121 
122  struct BlockHeader
123  {
124  amps_uint32_t _blocksToWrite;
125  amps_uint32_t _totalRemaining;
126  amps_uint64_t _seq;
127  amps_uint64_t _crcVal;
128  amps_uint64_t _nextInChain;
129  };
130 
131  struct BlockChainHeader
132  {
133  amps_uint32_t _operation;
134  amps_uint32_t _commandIdLen;
135  amps_uint32_t _correlationIdLen;
136  amps_uint32_t _expirationLen;
137  amps_uint32_t _sowKeyLen;
138  amps_uint32_t _topicLen;
139  amps_int32_t _flag;
140  amps_uint32_t _ackTypes;
141  amps_uint32_t _unused[8];
142  BlockChainHeader() // -V730
143  : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144  , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
145  , _ackTypes(0)
146  { ; }
147  };
148 
153  static inline amps_uint32_t getBlockHeaderSize()
154  {
155  return DEFAULT_BLOCK_HEADER_SIZE;
156  }
157 
163  static inline amps_uint32_t getBlockChainHeaderSize()
164  {
165  return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
166  }
167 
171  inline amps_uint32_t getBlockSize()
172  {
173  return _blockStore.getBlockSize();
174  }
175 
179  inline amps_uint32_t getBlockDataSize()
180  {
181  return _blockStore.getBlockSize() - getBlockHeaderSize();
182  }
183 
200  amps_uint32_t blocksPerRealloc_ = 1000,
201  bool isFile_ = false,
202  bool errorOnPublishGap_ = false,
203  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
204  : StoreImpl(errorOnPublishGap_)
205  , _blockStore(buffer_, blocksPerRealloc_,
206  DEFAULT_BLOCK_HEADER_SIZE,
207  (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
208  ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
209  , _metadataBlock(0)
210  , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
211  , _stored(0)
212  {
213  _blockStore.setResizeHandler(&BlockPublishStore::canResize, (void*)this);
214  chooseCRC(isFile_);
215  if (!isFile_)
216  {
217  // This gets set in recover in file-based stores
218  BufferLock bufferGuard(_blockStore);
219  _blockStore.init();
220  _metadataBlock = _blockStore.get(1);
221  // Remove metadata block from used list
222  _blockStore.setUsedList(0);
223  _blockStore.setEndOfUsedList(0);
224  // Metadata block holds block size, block header size,
225  // last discarded sequence, client version
226  _metadataBlock->_sequence = (amps_uint64_t)0;
227  Buffer* pBuffer = _blockStore.getBuffer();
228  pBuffer->setPosition(_metadataBlock->_offset);
229  pBuffer->putUint32((amps_uint32_t)getBlockSize());
230  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
231  pBuffer->putUint64((amps_uint64_t)0);
232  // Metadata blocks puts client version in CRC position
233  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
234  // No next in chain
235  pBuffer->putUint64((amps_uint64_t)0);
236  }
237  }
238 
242  {
243  }
244 
251  virtual amps_uint64_t store(const Message& message_)
252  {
253  return store(message_, true);
254  }
255 
266  amps_uint64_t store(const Message& message_, bool assignSequence_)
267  {
268  const char* commandId, *correlationId, *expiration, *sowKey,
269  *topic, *data;
270  size_t dataLen = 0;
271  BlockHeader blockHeader;
272  BlockChainHeader chainHeader;
273  message_.getRawCommandId(&commandId, &dataLen);
274  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
275  message_.getRawCorrelationId(&correlationId, &dataLen);
276  chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
277  message_.getRawExpiration(&expiration, &dataLen);
278  chainHeader._expirationLen = (amps_uint32_t)dataLen;
279  message_.getRawSowKey(&sowKey, &dataLen);
280  chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
281  message_.getRawTopic(&topic, &dataLen);
282  chainHeader._topicLen = (amps_uint32_t)dataLen;
283  message_.getRawData(&data, &dataLen);
284  chainHeader._flag = -1;
285  Message::Command::Type operation = message_.getCommandEnum();
286  chainHeader._operation = (amps_uint32_t)operation;
287  if (operation == Message::Command::SOWDelete)
288  {
289  if (dataLen > 0)
290  {
291  chainHeader._flag = SOW_DELETE_DATA;
292  }
293  else
294  {
295  message_.getRawFilter(&data, &dataLen);
296  if (dataLen > 0)
297  {
298  chainHeader._flag = SOW_DELETE_FILTER;
299  }
300  else
301  {
302  message_.getRawSowKeys(&data, &dataLen);
303  if (dataLen > 0)
304  {
305  chainHeader._flag = SOW_DELETE_KEYS;
306  }
307  else
308  {
309  message_.getRawBookmark(&data, &dataLen);
310  chainHeader._flag = SOW_DELETE_BOOKMARK;
311  // Check options for cancel
312  Message::Field options = message_.getOptions();
313  size_t remaining = options.len();
314  const void* next = NULL;
315  const void* start = (const void*)(options.data());
316  // Not necessarily null-terminated so no strstr
317  while (remaining >= 6 &&
318  (next = memchr(start, (int)'c', remaining)) != NULL)
319  {
320  remaining = (size_t)next - (size_t)start;
321  if (remaining >= 6 && strncmp((const char*)start,
322  "cancel", 6) == 0)
323  {
324  chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
325  break;
326  }
327  }
328  }
329  }
330  }
331  }
332  blockHeader._totalRemaining = (
333  (chainHeader._operation == Message::Command::Unknown)
334  ? 0
336  + chainHeader._commandIdLen
337  + chainHeader._correlationIdLen
338  + chainHeader._expirationLen
339  + chainHeader._sowKeyLen
340  + chainHeader._topicLen
341  + (amps_uint32_t)dataLen));
342  size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
343  (blockHeader._totalRemaining % getBlockDataSize()));
344  blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
345  ? 1
346  : ((amps_uint32_t)(blockHeader._totalRemaining
347  / getBlockDataSize())
348  + ((lastBlockLength > 0) ? 1 : 0)));
349  blockHeader._crcVal = (amps_uint64_t)0ULL;
350  blockHeader._crcVal = _crc(commandId,
351  chainHeader._commandIdLen,
352  blockHeader._crcVal);
353  blockHeader._crcVal = _crc(correlationId,
354  chainHeader._correlationIdLen,
355  blockHeader._crcVal);
356  blockHeader._crcVal = _crc(expiration,
357  chainHeader._expirationLen,
358  blockHeader._crcVal);
359  blockHeader._crcVal = _crc(sowKey,
360  chainHeader._sowKeyLen,
361  blockHeader._crcVal);
362  blockHeader._crcVal = _crc(topic,
363  chainHeader._topicLen,
364  blockHeader._crcVal);
365  blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
366 
367  // Reserve slots for storage, growing if necessary
368  BufferLock bufferGuard(_blockStore);
369  Block* first = _blockStore.get(blockHeader._blocksToWrite);
370  if (assignSequence_)
371  {
372  if (_lastSequence <= 2)
373  {
374  _getLastPersisted();
375  }
376  blockHeader._seq = ++_lastSequence;
377  }
378  else
379  {
380  blockHeader._seq = amps_message_get_field_uint64(
381  message_.getMessage(),
382  AMPS_Sequence);
383  if (!_maxDiscarded)
384  {
385  _maxDiscarded = blockHeader._seq - 1;
386  }
387  if (blockHeader._seq >= _lastSequence)
388  {
389  _lastSequence = blockHeader._seq;
390  }
391  }
392 
393  try
394  {
395  size_t topicWritten = 0UL;
396  size_t dataWritten = 0UL;
397  size_t commandWritten = 0UL;
398  size_t correlationWritten = 0UL;
399  size_t expirationWritten = 0UL;
400  size_t sowKeyWritten = 0UL;
401  Buffer* pBuffer = _blockStore.getBuffer();
402  for (Block* next = first; next; next = next->_nextInChain)
403  {
404  next->_sequence = blockHeader._seq;
405  if (next->_nextInChain)
406  {
407  blockHeader._nextInChain = next->_nextInChain->_offset;
408  }
409  else
410  {
411  blockHeader._nextInChain = (amps_uint64_t)0;
412  }
413  // Set buffer to start of Block and write the header
414  pBuffer->setPosition(next->_offset);
415  pBuffer->putBytes((const char*)&blockHeader, sizeof(BlockHeader));
416  // Clear crcVal, as it's only written in the first Block
417  blockHeader._crcVal = (amps_uint64_t)0;
418  size_t bytesRemaining = getBlockDataSize();
419  if (next == first)
420  {
421  // Write Block chain header
422  chainHeader._ackTypes = (amps_uint32_t)message_.getAckTypeEnum();
423  pBuffer->putBytes((const char*)&chainHeader,
424  sizeof(BlockChainHeader));
425  pBuffer->setPosition(next->_offset + getBlockHeaderSize() + getBlockChainHeaderSize());
426  bytesRemaining -= getBlockChainHeaderSize();
427  }
428  else
429  {
430  pBuffer->setPosition(next->_offset + getBlockHeaderSize());
431  }
432 
433  if (commandWritten < chainHeader._commandIdLen)
434  {
435  size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
436  pBuffer->putBytes(commandId + commandWritten,
437  commandWrite);
438  bytesRemaining -= commandWrite;
439  commandWritten += commandWrite;
440  }
441  if (correlationWritten < chainHeader._correlationIdLen)
442  {
443  size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
444  pBuffer->putBytes(correlationId + correlationWritten,
445  correlationWrite);
446  bytesRemaining -= correlationWrite;
447  correlationWritten += correlationWrite;
448  }
449  if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
450  {
451  size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
452  pBuffer->putBytes(expiration + expirationWritten, expWrite);
453  bytesRemaining -= expWrite;
454  expirationWritten += expWrite;
455  }
456  if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
457  {
458  size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
459  pBuffer->putBytes(sowKey + sowKeyWritten, sowKeyWrite);
460  bytesRemaining -= sowKeyWrite;
461  sowKeyWritten += sowKeyWrite;
462  }
463  if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
464  {
465  size_t topicWrite = (chainHeader._topicLen - topicWritten
466  < bytesRemaining)
467  ? chainHeader._topicLen - topicWritten
468  : bytesRemaining;
469  pBuffer->putBytes(topic + topicWritten, topicWrite);
470  bytesRemaining -= topicWrite;
471  topicWritten += topicWrite;
472  }
473  if (bytesRemaining > 0 && dataWritten < dataLen)
474  {
475  size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
476  dataLen - dataWritten : bytesRemaining;
477  pBuffer->putBytes(data + dataWritten, dataWrite);
478  bytesRemaining -= dataWrite;
479  dataWritten += dataWrite;
480  }
481  }
482  }
483  catch (const AMPSException&)
484  {
485  _blockStore.put(first);
486  throw;
487  }
488  AMPS_FETCH_ADD(&_stored, 1);
489  return blockHeader._seq;
490  }
491 
498  virtual void discardUpTo(amps_uint64_t index_)
499  {
500  // Get the lock
501  BufferLock bufferGuard(_blockStore);
502  Buffer* pBuffer = _blockStore.getBuffer();
503  // Don't use _getLastPersisted() here, don't want to set it
504  // to something other than index_ if it's not already set
505  amps_uint64_t lastPersisted = _metadataBlock->_sequence;
506  // Make sure it's a real index and we have messages to discard
507  if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
508  {
509  // During logon it's very possible we don't have a last persisted
510  // but that the Client is calling discardUpTo with the ack value.
511  if (lastPersisted < index_)
512  {
513  pBuffer->setPosition(_metadataBlock->_offset + 8);
514  pBuffer->putUint64(index_);
515  _metadataBlock->_sequence = index_;
516  if (_maxDiscarded < index_)
517  {
518  _maxDiscarded = index_;
519  }
520  if (_lastSequence <= index_)
521  {
522  _lastSequence = index_;
523  }
524  }
525  else if (!index_) // Fresh logon, no sequence history
526  {
527  _getLastPersisted();
528  }
529  else if (getErrorOnPublishGap() && index_ < lastPersisted) //Message gap
530  {
531  std::ostringstream os;
532  os << "Server last saw " << index_ << " from Client but Store "
533  << "has already discarded up to " << lastPersisted
534  << " which would leave a gap of unreceived messages.";
535  throw PublishStoreGapException(os.str());
536  }
537  _blockStore.signalAll();
538  return;
539  }
540 
541  _maxDiscarded = index_;
542  AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
543  _blockStore.signalAll();
544  if (lastPersisted >= index_)
545  {
546  return;
547  }
548  pBuffer->setPosition(_metadataBlock->_offset + 8);
549  pBuffer->putUint64(index_);
550  _metadataBlock->_sequence = index_;
551  if (_lastSequence < index_)
552  {
553  _lastSequence = index_;
554  }
555  }
556 
561  void replay(StoreReplayer& replayer_)
562  {
563  // Get the lock
564  BufferLock bufferGuard(_blockStore);
565  // If we don't have anything yet, return
566  if (!_blockStore.front())
567  {
568  return;
569  }
570  Block* next = _blockStore.front();
571  try
572  {
573  for (Block* block = _blockStore.front(); block; block = next)
574  {
575  // Replay the message
576  replayOnto(block, replayer_);
577  next = block->_nextInList;
578  }
579  }
580  catch (const StoreException&)
581  {
582  _blockStore.putAll(next);
583  throw;
584  }
585  }
586 
593  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
594  {
595  BufferLock bufferGuard(_blockStore);
596  // If we don't have anything yet, return
597  if (!_blockStore.front())
598  {
599  return false;
600  }
601  // Get the end point
602  amps_uint64_t lastIdx = _blockStore.back()->_sequence;
603  // Get the start point
604  amps_uint64_t leastIdx = _blockStore.front()->_sequence;
605  if (index_ >= leastIdx && index_ <= lastIdx)
606  {
607  Block* block = _blockStore.front();
608  while (block && block->_sequence != index_)
609  {
610  block = block->_nextInList;
611  }
612  if (!block)
613  {
614  return false;
615  }
616  // If total bytes is 0, it's a queue ack and gets skipped.
617  Buffer* pBuffer = _blockStore.getBuffer();
618  pBuffer->setPosition(block->_offset +
619  sizeof(amps_uint32_t));
620  if (pBuffer->getUint32() == 0)
621  {
622  return false;
623  }
624  replayOnto(block, replayer_);
625  return true;
626  }
627  else // Get Store and Client back in sync
628  {
629  _message.reset();
630  leastIdx -= 1;
631  _message.setSequence(leastIdx);
632  replayer_.execute(_message);
633  return false;
634  }
635  }
636 
642  size_t unpersistedCount() const
643  {
644  size_t count = (size_t)_stored;
645  return count;
646  }
647 
656  virtual void flush(long timeout_)
657  {
658  BufferLock bufferGuard(_blockStore);
659  amps_uint64_t waitFor = _getHighestUnpersisted();
660  // Check that we aren't already empty
661  if (waitFor == getUnsetSequence())
662  {
663  return;
664  }
665  if (timeout_ > 0)
666  {
667  bool timedOut = false;
668  AMPS_START_TIMER(timeout_);
669  // While timeout hasn't expired and we haven't had everything acked
670  while (!timedOut && _stored != 0
671  && waitFor >= _getLowestUnpersisted())
672  {
673  if (!_blockStore.wait(timeout_))
674  {
675  // May have woken up early, check real time
676  AMPS_RESET_TIMER(timedOut, timeout_);
677  }
678  }
679  // If we timed out and still haven't caught up with the acks
680  if (timedOut && _stored != 0
681  && waitFor >= _getLowestUnpersisted())
682  {
683  throw TimedOutException("Timed out waiting to flush publish store.");
684  }
685  }
686  else
687  {
688  while (_stored != 0 && waitFor >= _getLowestUnpersisted())
689  {
690  // Still wake up every 1s so python can interrupt
691  _blockStore.wait(1000);
692  // Don't hold lock if possibly grabbing GIL
693  BufferUnlock unlck(_blockStore);
694  amps_invoke_waiting_function();
695  }
696  }
697  }
698 
699  amps_uint64_t getLowestUnpersisted() const
700  {
701  BufferLock bufferGuard(_blockStore);
702  return _getLowestUnpersisted();
703  }
704 
705  amps_uint64_t getHighestUnpersisted() const
706  {
707  BufferLock bufferGuard(_blockStore);
708  return _getHighestUnpersisted();
709  }
710 
711  amps_uint64_t getLastPersisted(void)
712  {
713  BufferLock bufferGuard(_blockStore);
714  return _getLastPersisted();
715  }
716 
717  protected:
718  static bool canResize(size_t requestedSize_, void* vpThis_)
719  {
720  return ((BlockPublishStore*)vpThis_)->callResizeHandler(requestedSize_);
721  }
722 
723  amps_uint64_t _getLowestUnpersisted() const
724  {
725  // Assume the lock is held
726  // If we don't have anything, return MAX
727  if (!_blockStore.front())
728  {
729  return getUnsetSequence();
730  }
731  return _blockStore.front()->_sequence;
732  }
733 
734  amps_uint64_t _getHighestUnpersisted() const
735  {
736  // Assume the lock is held
737  // If we don't have anything, return MAX
738  if (!_blockStore.back())
739  {
740  return getUnsetSequence();
741  }
742  return _blockStore.back()->_sequence;
743  }
744 
745  amps_uint64_t _getLastPersisted(void)
746  {
747  // Assume the lock is held
748  amps_uint64_t lastPersisted = (amps_uint64_t)0;
749  Buffer* pBuffer = _blockStore.getBuffer();
750  pBuffer->setPosition(_metadataBlock->_offset + 8);
751  lastPersisted = pBuffer->getUint64();
752  if (lastPersisted)
753  {
754  if (_lastSequence < lastPersisted)
755  {
756  _lastSequence = lastPersisted;
757  }
758  return lastPersisted;
759  }
760  if (_maxDiscarded)
761  {
762  lastPersisted = _maxDiscarded;
763  }
764  else
765  {
766 #ifdef _WIN32
767  struct _timeb t;
768  _ftime_s(&t);
769  lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
770 #else // not _WIN32
771  struct timeval tv;
772  gettimeofday(&tv, NULL);
773  lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
774  * (amps_uint64_t)1000000;
775 #endif
776  }
777  if (_lastSequence > 2)
778  {
779  amps_uint64_t low = _getLowestUnpersisted();
780  amps_uint64_t high = _getHighestUnpersisted();
781  if (low != getUnsetSequence())
782  {
783  lastPersisted = low - 1;
784  }
785  if (high != getUnsetSequence() && _lastSequence <= high)
786  {
787  _lastSequence = high;
788  }
789  if (_lastSequence < lastPersisted)
790  {
791  lastPersisted = _lastSequence - 1;
792  }
793  }
794  else
795  {
796  _lastSequence = lastPersisted;
797  }
798  pBuffer->setPosition(_metadataBlock->_offset
799  + sizeof(amps_uint32_t) // blocks used
800  + sizeof(amps_uint32_t)); // record length
801  pBuffer->putUint64(lastPersisted);
802  _metadataBlock->_sequence = lastPersisted;
803  return lastPersisted;
804  }
805 
806  void recover(void)
807  {
808  BufferLock bufferGuard(_blockStore);
809  // Make sure the size isn't 0 and is a multiple of block size
810  Buffer* pBuffer = _blockStore.getBuffer();
811  size_t size = pBuffer->getSize();
812  amps_uint32_t blockSize = getBlockSize();
813  if (size == 0)
814  {
815  _blockStore.init();
816  _metadataBlock = _blockStore.get(1);
817  _metadataBlock->_sequence = (amps_uint64_t)0;
818  pBuffer->setPosition(_metadataBlock->_offset);
819  // Metadata block holds block size, block header size,
820  // last discarded sequence, client version
821  pBuffer->putUint32((amps_uint32_t)blockSize);
822  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
823  pBuffer->putUint64((amps_uint64_t)0);
824  // Metadata blocks puts client version in CRC position
825  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
826  // No next in chain
827  pBuffer->putUint64((amps_uint64_t)0);
828  return;
829  }
830  size_t numBlocks = size / blockSize;
831  if (size % blockSize > 0)
832  {
833  // We shouldn't ever be in here, since it requires starting with a
834  // file that is not an even multiple of block size and we always
835  // fix the size.
836  numBlocks = size / blockSize;
837  ++numBlocks;
838  amps_uint32_t blockCount = 0;
839  // We allocate all the Blocks at once below so delete allocated Block[]
840  delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
841  // Resize can fail if resizeHandler is set and refuses the request
842  // Since this is recovery, we need to simply fail in that case
843  if (size > pBuffer->getSize() || numBlocks != (size_t)blockCount)
844  {
845  throw StoreException("Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
846  }
847  size = pBuffer->getSize();
848  }
849 
850  amps_uint64_t maxIdx = 0;
851  amps_uint64_t minIdx = 0;
852  size_t location = 0;
853  BlockHeader blockHeader;
854  // The blocks we create here all get their offset set in below loop
855  Block* blocks = new Block[numBlocks];
856  blocks[numBlocks - 1]._nextInList = 0;
857  size_t blockNum = 0;
858  _blockStore.addBlocks(blocks);
859  _metadataBlock = blocks; // The first Block is metadata
860  _metadataBlock->_nextInList = 0;
861  pBuffer->setPosition(0);
862  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
863  /* Metadata Block header fields
864  * amps_uint32_t _blocksToWrite = BlockSize
865  * amps_uint32_t _totalRemaining = BlockHeaderSize
866  * amps_uint64_t _seq = last persisted sequence number
867  * amps_uint64_t _crcVal = unused
868  * amps_uint64_t _nextInChain = unused
869  */
870  if (blockHeader._blocksToWrite == 1) // Old store format?
871  {
872  /* Old format metadata block header fields
873  * amps_uint32_t _blocksToWrite = 1
874  * amps_uint32_t _totalRemaining = client version
875  * amps_uint64_t _seq = last persisted sequence number
876  * amps_uint64_t _crcVal = unused
877  * amps_uint64_t _nextInChain = unused
878  */
879  // Readable old format starts with version 5.0.0.0
880  if (blockHeader._totalRemaining >= 5000000)
881  {
882  // All recovery needs to be based on old format
883  // so go do that instead.
884  recoverOldFormat(blocks);
885  return;
886  }
887  // Unreadable format, fail
888  throw StoreException("Unrecognized format for Store. Can't recover.");
889  }
890  if (blockHeader._blocksToWrite == 0)
891  {
892  pBuffer->setPosition(0);
893  pBuffer->putUint32((amps_uint32_t)blockSize);
894  }
895  else
896  {
897  blockSize = blockHeader._blocksToWrite;
898  _blockStore.setBlockSize(blockSize);
899  }
900  if (blockHeader._totalRemaining == 0)
901  {
902  pBuffer->setPosition(sizeof(amps_uint32_t));
903  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
904  }
905  else
906  {
907  _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
908  }
909  _metadataBlock->_sequence = blockHeader._seq;
910  if (_metadataBlock->_sequence
911  && _metadataBlock->_sequence < (amps_uint64_t)1000000)
912  {
913  pBuffer->setPosition(_metadataBlock->_offset
914  + sizeof(amps_uint32_t) // BlockSize
915  + sizeof(amps_uint32_t)); // BlockHeaderSize
916  pBuffer->putUint64((amps_uint64_t)0);
917  _metadataBlock->_sequence = 0;
918  }
919  else
920  {
921  // Set _maxDiscarded and _lastSequence
922  _maxDiscarded = _metadataBlock->_sequence;
923  _lastSequence = _maxDiscarded;
924  }
925  // This would be where to check the client version string
926  // No checks currently
927  location += blockSize;
928  amps_uint32_t freeCount = 0;
929  Block* firstFree = NULL;
930  Block* endOfFreeList = NULL;
931  // Used to create used list in order after recovery
932  typedef std::map<amps_uint64_t, Block*> RecoverMap;
933  RecoverMap recoveredBlocks;
934  while (location < size)
935  {
936  // Get index and check if non-zero
937  pBuffer->setPosition(location);
938  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
939  if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
940  (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
941  {
942  // Block is part of a chain
943  location += blockSize;
944  continue;
945  }
946  Block* block = blocks[++blockNum].setOffset(location);
947  bool recovered = false;
948  if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
949  {
950  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
951  block->_sequence = blockHeader._seq;
952  // Track min and max
953  if (maxIdx < blockHeader._seq)
954  {
955  maxIdx = blockHeader._seq;
956  }
957  if (minIdx > blockHeader._seq)
958  {
959  minIdx = blockHeader._seq;
960  }
961  // Save it in recovered blocks
962  recoveredBlocks[blockHeader._seq] = block;
963  // Set up the chain
964  while (blockHeader._nextInChain != (amps_uint64_t)0)
965  {
966  Block* chain = blocks[++blockNum]
967  .setOffset((size_t)blockHeader._nextInChain);
968  chain->_nextInList = 0;
969  pBuffer->setPosition((size_t)blockHeader._nextInChain
970  + sizeof(amps_uint32_t) // blocks used
971  + sizeof(amps_uint32_t) // record length
972  + sizeof(amps_uint64_t) // seq
973  + sizeof(amps_uint64_t)); // crc
974  blockHeader._nextInChain = pBuffer->getUint64();
975  block->_nextInChain = chain;
976  block = chain;
977  block->_sequence = blockHeader._seq;
978  }
979  recovered = true;
980  }
981  if (!recovered)
982  {
983  // Put this Block on the free list
984  if (endOfFreeList)
985  {
986  endOfFreeList->_nextInList = block;
987  }
988  else
989  {
990  firstFree = block;
991  }
992  endOfFreeList = block;
993  ++freeCount;
994  }
995  location += blockSize;
996  }
997  if (endOfFreeList)
998  {
999  endOfFreeList->_nextInList = 0;
1000  }
1001  _blockStore.setFreeList(firstFree, freeCount);
1002  if (maxIdx > _lastSequence)
1003  {
1004  _lastSequence = maxIdx;
1005  }
1006  if (minIdx > _maxDiscarded + 1)
1007  {
1008  _maxDiscarded = minIdx - 1;
1009  }
1010  if (_maxDiscarded > _metadataBlock->_sequence)
1011  {
1012  _metadataBlock->_sequence = _maxDiscarded;
1013  pBuffer->setPosition(_metadataBlock->_offset + 8);
1014  pBuffer->putUint64(_maxDiscarded);
1015  }
1016  Block* end = NULL;
1017  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1018  for (RecoverMap::iterator i = recoveredBlocks.begin();
1019  i != recoveredBlocks.end(); ++i)
1020  {
1021  if (end)
1022  {
1023  end->_nextInList = i->second;
1024  }
1025  else
1026  {
1027  _blockStore.setUsedList(i->second);
1028  }
1029  end = i->second;
1030  }
1031  if (end)
1032  {
1033  end->_nextInList = 0;
1034  }
1035  _blockStore.setEndOfUsedList(end);
1036  }
1037 
1038  private:
1039  // Lock should already be held
1040  void replayOnto(Block* block_, StoreReplayer& replayer_)
1041  {
1042  // Read the header
1043  size_t start = block_->_offset;
1044  size_t position = start;
1045  Buffer* pBuffer = _blockStore.getBuffer();
1046  pBuffer->setPosition(position);
1047  BlockHeader blockHeader;
1048  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1049  if (blockHeader._totalRemaining == 0)
1050  {
1051  // Queue acking sow_delete
1052  return;
1053  }
1054  position += getBlockHeaderSize();
1055  BlockChainHeader blockChainHeader;
1056  pBuffer->copyBytes((char*)&blockChainHeader, sizeof(blockChainHeader));
1057  if (blockChainHeader._operation == Message::Command::Unknown)
1058  {
1059  // Queue acking sow_delete
1060  return;
1061  }
1062  blockChainHeader._ackTypes |= Message::AckType::Persisted;
1063  position += getBlockChainHeaderSize();
1064  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
1065  pBuffer->setPosition(position);
1066 
1067  if (blockHeader._totalRemaining
1068  < blockChainHeader._commandIdLen
1069  + blockChainHeader._correlationIdLen
1070  + blockChainHeader._expirationLen
1071  + blockChainHeader._sowKeyLen
1072  + blockChainHeader._topicLen)
1073  {
1074  std::ostringstream os;
1075  os << "Corrupted message found with invalid lengths. "
1076  << "Attempting to replay " << block_->_sequence
1077  << ". Block sequence " << blockHeader._seq
1078  << ", topic length " << blockChainHeader._topicLen
1079  << ", data length " << blockHeader._totalRemaining
1080  << ", command ID length " << blockChainHeader._commandIdLen
1081  << ", correlation ID length " << blockChainHeader._correlationIdLen
1082  << ", expiration length " << blockChainHeader._expirationLen
1083  << ", sow key length " << blockChainHeader._sowKeyLen
1084  << ", start " << start
1085  << ", position " << position
1086  << ", buffer size " << pBuffer->getSize();
1087  throw StoreException(os.str());
1088  }
1089 
1090  // Start prepping the message
1091  _message.reset();
1092  _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1093  _message.setAckTypeEnum((unsigned)blockChainHeader._ackTypes
1094  | Message::AckType::Persisted);
1095  _message.setSequence(blockHeader._seq);
1096  // Read the data and calculate the CRC
1097  Block* current = block_;
1098  size_t blockBytesRemaining = getBlockDataSize() - getBlockChainHeaderSize();
1099  amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1100  // Use tmpBuffers for any fields split across Block boundaries
1101  char** tmpBuffers = (blockHeader._blocksToWrite > 1) ? new char* [blockHeader._blocksToWrite - 1] : 0;
1102  size_t blockNum = 0;
1103  if (blockChainHeader._commandIdLen > 0)
1104  {
1105  if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1106  {
1107  _message.assignCommandId(pBuffer->getBytes(blockChainHeader._commandIdLen)._data,
1108  blockChainHeader._commandIdLen);
1109  blockBytesRemaining -= blockChainHeader._commandIdLen;
1110  }
1111  else
1112  {
1113  tmpBuffers[blockNum] = new char[blockChainHeader._commandIdLen]; // -V522
1114  size_t totalLeft = blockChainHeader._commandIdLen;
1115  size_t totalRead = 0;
1116  size_t readLen = 0;
1117  while (totalLeft)
1118  {
1119  readLen = blockBytesRemaining < totalLeft ?
1120  blockBytesRemaining : totalLeft;
1121  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1122  if (!(totalLeft -= readLen))
1123  {
1124  break;
1125  }
1126  if (!(current = current->_nextInChain))
1127  {
1128  break;
1129  }
1130  totalRead += readLen;
1131  blockBytesRemaining = getBlockDataSize();
1132  position = current->_offset + getBlockHeaderSize();
1133  pBuffer->setPosition(position);
1134  }
1135  blockBytesRemaining -= readLen;
1136  _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1137  }
1138  blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1139  crcCalc = _crc(_message.getCommandId().data(),
1140  blockChainHeader._commandIdLen, crcCalc);
1141  }
1142  if (blockChainHeader._correlationIdLen > 0)
1143  {
1144  if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1145  {
1146  _message.assignCorrelationId(
1147  pBuffer->getBytes(blockChainHeader._correlationIdLen)._data,
1148  blockChainHeader._correlationIdLen);
1149  blockBytesRemaining -= blockChainHeader._correlationIdLen;
1150  }
1151  else
1152  {
1153  tmpBuffers[blockNum] = new char[blockChainHeader._correlationIdLen]; // -V522
1154  size_t totalLeft = blockChainHeader._correlationIdLen;
1155  size_t totalRead = 0;
1156  size_t readLen = 0;
1157  while (totalLeft)
1158  {
1159  readLen = blockBytesRemaining < totalLeft ?
1160  blockBytesRemaining : totalLeft;
1161  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1162  if (!(totalLeft -= readLen))
1163  {
1164  break;
1165  }
1166  if (!(current = current->_nextInChain))
1167  {
1168  break; // -V522
1169  }
1170  totalRead += readLen;
1171  blockBytesRemaining = getBlockDataSize();
1172  position = current->_offset + getBlockHeaderSize();
1173  pBuffer->setPosition(position);
1174  }
1175  blockBytesRemaining -= readLen;
1176  _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1177  }
1178  blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1179  crcCalc = _crc(_message.getCorrelationId().data(),
1180  blockChainHeader._correlationIdLen, crcCalc);
1181  }
1182  if (blockChainHeader._expirationLen > 0)
1183  {
1184  if (blockChainHeader._expirationLen <= blockBytesRemaining)
1185  {
1186  _message.assignExpiration(
1187  pBuffer->getBytes(blockChainHeader._expirationLen)._data,
1188  blockChainHeader._expirationLen);
1189  blockBytesRemaining -= blockChainHeader._expirationLen;
1190  }
1191  else
1192  {
1193  tmpBuffers[blockNum] = new char[blockChainHeader._expirationLen]; // -V522
1194  size_t totalLeft = blockChainHeader._expirationLen;
1195  size_t totalRead = 0;
1196  size_t readLen = 0;
1197  while (totalLeft)
1198  {
1199  readLen = blockBytesRemaining < totalLeft ?
1200  blockBytesRemaining : totalLeft;
1201  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1202  if (!(totalLeft -= readLen))
1203  {
1204  break;
1205  }
1206  if (!(current = current->_nextInChain))
1207  {
1208  break;
1209  }
1210  totalRead += readLen;
1211  blockBytesRemaining = getBlockDataSize();
1212  position = current->_offset + getBlockHeaderSize();
1213  pBuffer->setPosition(position);
1214  }
1215  blockBytesRemaining -= readLen;
1216  _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1217  }
1218  blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1219  crcCalc = _crc(_message.getExpiration().data(),
1220  blockChainHeader._expirationLen, crcCalc);
1221  }
1222  if (blockChainHeader._sowKeyLen > 0)
1223  {
1224  if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1225  {
1226  _message.assignSowKey(pBuffer->getBytes(blockChainHeader._sowKeyLen)._data,
1227  blockChainHeader._sowKeyLen);
1228  blockBytesRemaining -= blockChainHeader._sowKeyLen;
1229  }
1230  else
1231  {
1232  tmpBuffers[blockNum] = new char[blockChainHeader._sowKeyLen]; // -V522
1233  size_t totalLeft = blockChainHeader._sowKeyLen;
1234  size_t totalRead = 0;
1235  size_t readLen = 0;
1236  while (totalLeft)
1237  {
1238  readLen = blockBytesRemaining < totalLeft ?
1239  blockBytesRemaining : totalLeft;
1240  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1241  if (!(totalLeft -= readLen))
1242  {
1243  break;
1244  }
1245  if (!(current = current->_nextInChain))
1246  {
1247  break;
1248  }
1249  totalRead += readLen;
1250  blockBytesRemaining = getBlockDataSize();
1251  position = current->_offset + getBlockHeaderSize();
1252  pBuffer->setPosition(position);
1253  }
1254  blockBytesRemaining -= readLen;
1255  _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1256  }
1257  blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1258  crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1259  }
1260  if (blockChainHeader._topicLen > 0)
1261  {
1262  if (blockChainHeader._topicLen <= blockBytesRemaining)
1263  {
1264  _message.assignTopic(pBuffer->getBytes(blockChainHeader._topicLen)._data,
1265  blockChainHeader._topicLen);
1266  blockBytesRemaining -= blockChainHeader._topicLen;
1267  }
1268  else
1269  {
1270  tmpBuffers[blockNum] = new char[blockChainHeader._topicLen]; // -V522
1271  size_t totalLeft = blockChainHeader._topicLen;
1272  size_t totalRead = 0;
1273  size_t readLen = 0;
1274  while (totalLeft)
1275  {
1276  readLen = blockBytesRemaining < totalLeft ?
1277  blockBytesRemaining : totalLeft;
1278  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1279  if (!(totalLeft -= readLen))
1280  {
1281  break;
1282  }
1283  if (!(current = current->_nextInChain))
1284  {
1285  break;
1286  }
1287  totalRead += readLen;
1288  blockBytesRemaining = getBlockDataSize();
1289  position = current->_offset + getBlockHeaderSize();
1290  pBuffer->setPosition(position);
1291  }
1292  blockBytesRemaining -= readLen;
1293  _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1294  }
1295  blockHeader._totalRemaining -= blockChainHeader._topicLen;
1296  crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1297  }
1298  if (blockHeader._totalRemaining > 0)
1299  {
1300  if (blockHeader._totalRemaining <= blockBytesRemaining)
1301  {
1302  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1303  {
1304  _message.assignData(
1305  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1306  blockHeader._totalRemaining);
1307  crcCalc = _crc(_message.getData().data(),
1308  blockHeader._totalRemaining, crcCalc);
1309  }
1310  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1311  {
1312  _message.assignFilter(
1313  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1314  blockHeader._totalRemaining);
1315  crcCalc = _crc(_message.getFilter().data(),
1316  blockHeader._totalRemaining, crcCalc);
1317  }
1318  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1319  {
1320  _message.assignSowKeys(
1321  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1322  blockHeader._totalRemaining);
1323  crcCalc = _crc(_message.getSowKeys().data(),
1324  blockHeader._totalRemaining, crcCalc);
1325  }
1326  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1327  {
1328  _message.assignBookmark(
1329  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1330  blockHeader._totalRemaining);
1331  crcCalc = _crc(_message.getBookmark().data(),
1332  blockHeader._totalRemaining, crcCalc);
1333  }
1334  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1335  {
1336  _message.assignBookmark(
1337  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1338  blockHeader._totalRemaining);
1339  crcCalc = _crc(_message.getBookmark().data(),
1340  blockHeader._totalRemaining, crcCalc);
1341  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1342  }
1343  }
1344  else
1345  {
1346  tmpBuffers[blockNum] = new char[blockHeader._totalRemaining]; // -V522
1347  size_t totalLeft = blockHeader._totalRemaining;
1348  size_t totalRead = 0;
1349  size_t readLen = 0;
1350  while (totalLeft)
1351  {
1352  readLen = blockBytesRemaining < totalLeft ?
1353  blockBytesRemaining : totalLeft;
1354  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1355  if (!(totalLeft -= readLen))
1356  {
1357  break;
1358  }
1359  if (!(current = current->_nextInChain))
1360  {
1361  break;
1362  }
1363  totalRead += readLen;
1364  blockBytesRemaining = getBlockDataSize();
1365  position = current->_offset + getBlockHeaderSize();
1366  pBuffer->setPosition(position);
1367  }
1368  position += readLen;
1369  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1370  {
1371  _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1372  }
1373  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1374  {
1375  _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1376  }
1377  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1378  {
1379  _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1380  }
1381  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1382  {
1383  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1384  }
1385  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1386  {
1387  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1388  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1389  }
1390  crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc); // -V595
1391  }
1392  }
1393 
1394  // Validate the crc and seq
1395  if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1396  {
1397  std::ostringstream os;
1398  os << "Corrupted message found by CRC or sequence "
1399  << "Attempting to replay " << block_->_sequence
1400  << ". Block sequence " << blockHeader._seq
1401  << ", expiration length " << blockChainHeader._expirationLen
1402  << ", sowKey length " << blockChainHeader._sowKeyLen
1403  << ", topic length " << blockChainHeader._topicLen
1404  << ", data length " << blockHeader._totalRemaining
1405  << ", command ID length " << blockChainHeader._commandIdLen
1406  << ", correlation ID length " << blockChainHeader._correlationIdLen
1407  << ", flag " << blockChainHeader._flag
1408  << ", expected CRC " << blockHeader._crcVal
1409  << ", actual CRC " << crcCalc
1410  << ", start " << start
1411  << ", position " << position
1412  << ", buffer size " << pBuffer->getSize();
1413  for (Block* block = block_; block; block = block->_nextInChain)
1414  {
1415  os << "\n BLOCK " << block->_offset;
1416  }
1417  if (tmpBuffers)
1418  {
1419  for (amps_uint32_t i = 0; i < blockNum; ++i)
1420  {
1421  delete[] tmpBuffers[i]; // -V522
1422  }
1423  delete[] tmpBuffers;
1424  }
1425  throw StoreException(os.str());
1426  }
1427  // Replay the message
1428  replayer_.execute(_message);
1429  // Free the buffer if allocated
1430  if (tmpBuffers)
1431  {
1432  for (amps_uint32_t i = 0; i < blockNum; ++i)
1433  {
1434  delete[] tmpBuffers[i]; // -V522
1435  }
1436  delete[] tmpBuffers;
1437  }
1438  }
1439 
1440  // Lock should already be held
1441  // Read an older format file and update it.
1442  void recoverOldFormat(Block* blocks)
1443  {
1444  Buffer* pBuffer = _blockStore.getBuffer();
1445  amps_uint64_t maxIdx = 0;
1446  amps_uint64_t minIdx = 0;
1447  size_t size = pBuffer->getSize();
1448  size_t location = 0;
1449  pBuffer->setPosition(location);
1450  pBuffer->putUint32((amps_uint32_t)getBlockSize());
1451  pBuffer->putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1452  _metadataBlock->_sequence = pBuffer->getUint64();
1453  if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1454  {
1455  pBuffer->setPosition(_metadataBlock->_offset + 8);
1456  pBuffer->putUint64((amps_uint64_t)0);
1457  _metadataBlock->_sequence = 0;
1458  }
1459  else
1460  {
1461  // Set _maxDiscarded and _lastSequence
1462  _maxDiscarded = _metadataBlock->_sequence;
1463  _lastSequence = _maxDiscarded;
1464  }
1465  // Write the current client version
1466  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1467  // No next in chain
1468  pBuffer->putUint64((amps_uint64_t)0);
1469  // No checks currently
1470  location += getBlockSize();
1471  amps_uint32_t freeCount = 0;
1472  Block* firstFree = NULL;
1473  Block* endOfFreeList = NULL;
1474  size_t blockSize = getBlockSize();
1475  size_t numBlocks = size / blockSize;
1476  size_t blockNum = 0;
1477  // Used to create used list in order after recovery
1478  typedef std::map<amps_uint64_t, Block*> RecoverMap;
1479  RecoverMap recoveredBlocks;
1480  RecoverMap growingBlocks;
1481  amps_uint32_t growthBlocksNeeded = 0;
1482  while (location < size)
1483  {
1484  // Get seq and check if non-zero
1485  pBuffer->setPosition(location);
1486  BlockHeader blockHeader;
1487  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1488  size_t blockCount = (size_t)blockHeader._blocksToWrite;
1489  if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1490  && blockHeader._totalRemaining < size
1491  && blockHeader._blocksToWrite < numBlocks
1492  && (blockHeader._blocksToWrite * blockSize)
1493  >= blockHeader._totalRemaining)
1494  {
1495  size_t oldFormatSize = blockHeader._totalRemaining;
1496  // Old format total was storage bytes plus 64 bytes for block
1497  // and chain headers.
1498  blockHeader._totalRemaining -= 64;
1499  // New format counts only chain header size
1500  blockHeader._totalRemaining += getBlockChainHeaderSize();
1501  // Get the rest of the header
1502  BlockChainHeader chainHeader;
1503  // Need to reset location to after OLD header:
1504  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1505  // amps_uint64_t seq, amps_uint64_t crc
1506  pBuffer->setPosition(location + (sizeof(amps_uint32_t) * 2)
1507  + (sizeof(amps_uint64_t) * 2) );
1508  // Read old chain header which uses same order, but not
1509  // as many bytes (everything is 32bit):
1510  // operation, commandIdLen, correlationIdLen,
1511  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1512  pBuffer->copyBytes((char*)&chainHeader,
1513  sizeof(amps_uint32_t) * 8);
1514  // Check for garbage, likely indicating this is part of a chain
1515  if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1516  + chainHeader._expirationLen + chainHeader._sowKeyLen
1517  + chainHeader._topicLen) > blockHeader._totalRemaining)
1518  {
1519  // Skip this block, can't be real data
1520  location += getBlockSize();
1521  continue;
1522  }
1523  // Check if data fits in current number of blocks
1524  amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1525  / getBlockDataSize())
1526  + (blockHeader._totalRemaining
1527  % getBlockDataSize()
1528  ? 1 : 0);
1529  if (blocksNeeded == blockHeader._blocksToWrite)
1530  {
1531  Block* first = blocks[++blockNum].setOffset(location);
1532  first->_nextInList = 0;
1533  first->_sequence = blockHeader._seq;
1534  if (blockHeader._blocksToWrite > 1)
1535  {
1536  // CRC is only set on the first block
1537  amps_uint64_t crcVal = blockHeader._crcVal;
1538  blockHeader._crcVal = 0;
1539  Block* current = 0;
1540  // It fits, just need to adjust the block formats
1541  // and set up the chain. Start with the last block
1542  // and move data as needed starting at the end.
1543  size_t currentBlockNum = blockNum
1544  + blockHeader._blocksToWrite
1545  - 1;
1546  // Last item could wrap to beginning, but beginning is
1547  // block 1, not 0, which is the metadata block.
1548  if (currentBlockNum >= numBlocks)
1549  {
1550  currentBlockNum = currentBlockNum - numBlocks + 1;
1551  }
1552  if (currentBlockNum < blockNum)
1553  {
1554  Block* last = blocks[currentBlockNum]
1555  .init(currentBlockNum, getBlockSize());
1556  if ((current = firstFree) == last)
1557  {
1558  firstFree = firstFree->_nextInList; // -V522
1559  if (!firstFree)
1560  {
1561  endOfFreeList = 0;
1562  }
1563  --freeCount;
1564  }
1565  else
1566  {
1567  while (current)
1568  {
1569  if (current->_nextInList == last)
1570  {
1571  current->_nextInList = last->_nextInList;
1572  current = last;
1573  --freeCount;
1574  break;
1575  }
1576  current = current->_nextInList;
1577  }
1578  }
1579  }
1580  if (!current)
1581  {
1582  current = blocks[currentBlockNum]
1583  .init(currentBlockNum, getBlockSize());
1584  }
1585  // Initially, the number of bytes in last block
1586  size_t dataBytes = oldFormatSize % getBlockSize();
1587  while (current != first)
1588  {
1589  current->_nextInList = 0;
1590  current->_sequence = blockHeader._seq;
1591  // Set _nextInChain on previous Block, will include first
1592  if (--currentBlockNum < 1
1593  || currentBlockNum > numBlocks)
1594  {
1595  currentBlockNum = numBlocks - 1;
1596  }
1597  Block* previous = blocks[currentBlockNum]
1598  .init(currentBlockNum,
1599  getBlockSize());
1600  previous->_nextInChain = current;
1601  // Shift to make room for a header in every block
1602  // Not growing, so this won't write past the end.
1603  // Shift amount accounts for a header added to each
1604  // block after the first plus any change in the
1605  // chain header size from 32, which is the old size.
1606  size_t bytesToMove = --blockCount
1607  * getBlockHeaderSize()
1609  - 32);
1610  pBuffer->copyBytes(current->_offset + bytesToMove,
1611  current->_offset,
1612  dataBytes);
1613  dataBytes = getBlockSize();
1614  if (bytesToMove > getBlockHeaderSize())
1615  {
1616  bytesToMove -= getBlockHeaderSize();
1617  dataBytes -= bytesToMove;
1618  pBuffer->copyBytes(current->_offset
1619  + getBlockHeaderSize(),
1620  previous->_offset
1621  + dataBytes,
1622  bytesToMove);
1623  }
1624  // Set next in chain for this block's header
1625  blockHeader._nextInChain = (current->_nextInChain
1626  ? current->_nextInChain->_offset
1627  : (amps_uint64_t)0);
1628  // Write the header for this block
1629  pBuffer->setPosition(current->_offset);
1630  pBuffer->putBytes((const char*)&blockHeader,
1631  sizeof(BlockHeader));
1632  if (firstFree == previous)
1633  {
1634  firstFree = firstFree->_nextInList;
1635  if (!firstFree)
1636  {
1637  endOfFreeList = 0;
1638  }
1639  --freeCount;
1640  }
1641  else
1642  {
1643  current = firstFree;
1644  while (current)
1645  {
1646  if (current->_nextInList == previous)
1647  {
1648  current->_nextInList = previous->_nextInList;
1649  --freeCount;
1650  break;
1651  }
1652  current = current->_nextInList;
1653  }
1654  }
1655  current = previous;
1656  }
1657  blockNum += blockHeader._blocksToWrite - 1;
1658  blockHeader._crcVal = crcVal;
1659  }
1660  // Move bytes for chain header expansion from 32 bytes
1661  size_t bytesToMove = getBlockDataSize() - 32
1662  - (getBlockChainHeaderSize() - 32);
1663  pBuffer->copyBytes(first->_offset + getBlockHeaderSize()
1665  first->_offset + getBlockHeaderSize() + 32,
1666  bytesToMove);
1667  // Rewrite the header and chain header for first Block.
1668  pBuffer->setPosition(first->_offset);
1669  blockHeader._nextInChain = (first->_nextInChain
1670  ? first->_nextInChain->_offset
1671  : (amps_uint64_t)0);
1672  pBuffer->putBytes((const char*)&blockHeader,
1673  sizeof(BlockHeader));
1674  pBuffer->putBytes((const char*)&chainHeader,
1675  sizeof(BlockChainHeader));
1676  // Add first Block to recovered for building the used
1677  // list later
1678  recoveredBlocks[blockHeader._seq] = first;
1679  }
1680  else
1681  {
1682  // This will need at least one more Block due to a header in
1683  // every Block. Check how many and save for later.
1684  growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1685  growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1686  blockNum += blockHeader._blocksToWrite - 1;
1687  }
1688  // Track min and max
1689  if (maxIdx < blockHeader._seq)
1690  {
1691  maxIdx = blockHeader._seq;
1692  }
1693  if (minIdx > blockHeader._seq)
1694  {
1695  minIdx = blockHeader._seq;
1696  }
1697  // Advance past read blocks
1698  location += blockHeader._blocksToWrite * getBlockSize();
1699  // Either we're exiting loop, or blockNum is in range
1700  assert(location >= size || blockNum < numBlocks);
1701  }
1702  else
1703  {
1704  // Put this Block on the free list
1705  Block* block = blocks[++blockNum].setOffset(location);
1706  if (endOfFreeList)
1707  {
1708  endOfFreeList->_nextInList = block;
1709  }
1710  else
1711  {
1712  firstFree = block;
1713  }
1714  endOfFreeList = block;
1715  ++freeCount;
1716  location += blockSize;
1717  }
1718  }
1719  for (RecoverMap::iterator i = growingBlocks.begin();
1720  i != growingBlocks.end(); ++i)
1721  {
1722  Block* first = i->second;
1723  pBuffer->setPosition(first->_offset);
1724  BlockHeader blockHeader;
1725  // Read an old BlockHeader, which is only 24 bytes.
1726  // The bytes match current BlockHeader, and _nextInChain is 0.
1727  pBuffer->copyBytes((char*)&blockHeader, 24);
1728  // Old format total was storage bytes plus 64 bytes for block
1729  // and chain headers.
1730  blockHeader._totalRemaining -= 64;
1731  // New format counts only chain header size
1732  blockHeader._totalRemaining += getBlockChainHeaderSize();
1733  if (freeCount < growthBlocksNeeded)
1734  {
1735  // We have to resize, let's try to do it once.
1736  amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1737  amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1738  if (growthBlocks < minBlocksRequired)
1739  {
1740  amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1741  if (minBlocksRequired % defaultBlocks)
1742  minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1743  * defaultBlocks;
1744  growthBlocks = minBlocksRequired;
1745  }
1746  amps_uint32_t newBlocks = 0;
1747  Block* addedBlocks = _blockStore.resizeBuffer(
1748  pBuffer->getSize()
1749  + growthBlocks * blockSize,
1750  &newBlocks);
1751  if (!addedBlocks)
1752  {
1753  throw StoreException("Failed to grow store buffer during recovery");
1754  }
1755  _blockStore.addBlocks(addedBlocks);
1756  freeCount += newBlocks;
1757  growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1758  ? growthBlocksNeeded - freeCount : 0;
1759  if (endOfFreeList)
1760  {
1761  endOfFreeList->_nextInList = addedBlocks;
1762  }
1763  else
1764  {
1765  firstFree = addedBlocks;
1766  }
1767  endOfFreeList = &(addedBlocks[newBlocks - 1]);
1768  endOfFreeList->_nextInList = 0;
1769  }
1770  expandBlocks(blocks, first->_offset, first, blockHeader,
1771  &firstFree, &freeCount, pBuffer);
1772  // Add first Block to recovered for building the used list later
1773  recoveredBlocks[blockHeader._seq] = first;
1774  if (!firstFree)
1775  {
1776  endOfFreeList = 0;
1777  }
1778  }
1779  if (endOfFreeList)
1780  {
1781  endOfFreeList->_nextInList = 0;
1782  }
1783  _blockStore.setFreeList(firstFree, freeCount);
1784  if (maxIdx > _lastSequence)
1785  {
1786  _lastSequence = maxIdx;
1787  }
1788  if (minIdx > _maxDiscarded + 1)
1789  {
1790  _maxDiscarded = minIdx - 1;
1791  }
1792  if (_maxDiscarded > _metadataBlock->_sequence)
1793  {
1794  _metadataBlock->_sequence = _maxDiscarded;
1795  pBuffer->setPosition(_metadataBlock->_offset + 8);
1796  pBuffer->putUint64(_maxDiscarded);
1797  }
1798  Block* end = NULL;
1799  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1800  for (RecoverMap::iterator i = recoveredBlocks.begin();
1801  i != recoveredBlocks.end(); ++i)
1802  {
1803  if (_blockStore.front())
1804  {
1805  end->_nextInList = i->second; // -V522
1806  }
1807  else
1808  {
1809  _blockStore.setUsedList(i->second);
1810  }
1811  end = i->second;
1812  }
1813  if (end)
1814  {
1815  end->_nextInList = 0;
1816  }
1817  _blockStore.setEndOfUsedList(end);
1818  }
1819 
1820  // For recovering an old format store to current format when more Blocks
1821  // are needed with the new format.
1822  void expandBlocks(Block* blocks_, size_t location_, Block* first_,
1823  BlockHeader blockHeader_,
1824  Block** pFreeList_, amps_uint32_t* pFreeCount_,
1825  Buffer* pBuffer_)
1826  {
1827  // First create the chain, then we'll fill in reverse
1828  Block* current = first_;
1829  // Old format total was storage bytes plus 64 bytes for block
1830  // and chain headers.
1831  amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1832  blockHeader_._totalRemaining -= 64;
1833  // New format counts only chain header size
1834  blockHeader_._totalRemaining += getBlockChainHeaderSize();
1835  // Check how many Blocks needed and if we have enough free.
1836  amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1837  / getBlockDataSize()
1838  + (blockHeader_._totalRemaining
1839  % getBlockDataSize()
1840  ? 1 : 0);
1841  // Last data block size, remove bytes saved in first block
1842  // then mod by block size.
1843  const amps_uint32_t blockSize = getBlockSize();
1844  // Old total remaining had all header included
1845  size_t endBlockSize = oldTotalRemaining % blockSize;
1846  if (!endBlockSize)
1847  {
1848  endBlockSize = blockSize;
1849  }
1850  size_t endOfData = 0;
1851  // Hang on to CRC until first block is written
1852  amps_uint64_t crcVal = blockHeader_._crcVal;
1853  blockHeader_._crcVal = 0;
1854 
1855  std::stack<Block*> blocksUsed;
1856  for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1857  {
1858  blocksUsed.push(current);
1859  current->_sequence = blockHeader_._seq;
1860  if (i >= blockHeader_._blocksToWrite)
1861  {
1862  if (i == blockHeader_._blocksToWrite)
1863  {
1864  endOfData = current->_offset + endBlockSize;
1865  }
1866  current->_nextInChain = *pFreeList_;
1867  --(*pFreeCount_);
1868  *pFreeList_ = (*pFreeList_)->_nextInList;
1869  }
1870  else
1871  {
1872  current->_nextInChain = current->_nextInList;
1873  if (current->_nextInChain)
1874  {
1875  if (current->_offset + blockSize < pBuffer_->getSize())
1876  {
1877  current->_nextInChain->setOffset(current->_offset
1878  + blockSize);
1879  }
1880  else
1881  {
1882  current->_nextInChain->setOffset(blockSize);
1883  }
1884  }
1885  else
1886  {
1887  current->_nextInChain = blocks_[1].init(1, blockSize);
1888  }
1889  if (current->_nextInChain == *pFreeList_)
1890  {
1891  *pFreeList_ = (*pFreeList_)->_nextInList;
1892  --(*pFreeCount_);
1893  }
1894  else
1895  {
1896  for (Block* free = *pFreeList_; free;
1897  free = free->_nextInList)
1898  {
1899  if (free->_nextInList == current->_nextInChain)
1900  {
1901  free->_nextInList = free->_nextInList->_nextInList;
1902  --(*pFreeCount_);
1903  break;
1904  }
1905  }
1906  }
1907  }
1908  current->_nextInList = 0;
1909  current = current->_nextInChain;
1910  }
1911  // Make sure we write the correct number of blocks to write
1912  blockHeader_._blocksToWrite = blocksNeeded;
1913  // Finish setting up current
1914  current->_nextInList = 0;
1915  current->_sequence = blockHeader_._seq;
1916  // Now shift data, starting at the last Block
1917  // The total shift is for number of Blocks beyond the first
1918  // times Block header size, since previous format only wrote
1919  // the header in the first Block and had contiguous data,
1920  // with only wrap from end to beginning of buffer possible.
1921 
1922  // First time through, this is bytes in last block. After,
1923  // it will be block data size.
1924  size_t dataBytes = blockHeader_._totalRemaining % getBlockDataSize();
1925  while (current != first_)
1926  {
1927  size_t chunkBytesAvail = endOfData > location_
1928  ? endOfData - location_
1929  : endOfData - 2048;
1930  if (chunkBytesAvail < dataBytes)
1931  {
1932  // Original was wrapped from end to start of buffer
1933  // Need to copy what's left at start to end of Block,
1934  // then start working from the end.
1935  // This can ONLY occur during wrap because the first
1936  // Block doesn't get moved in this loop.
1937  pBuffer_->copyBytes(current->_offset
1938  + getBlockSize()
1939  - chunkBytesAvail,
1940  getBlockSize(),
1941  chunkBytesAvail);
1942  chunkBytesAvail = dataBytes - chunkBytesAvail;
1943  endOfData = pBuffer_->getSize() - chunkBytesAvail;
1944  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1945  endOfData,
1946  chunkBytesAvail);
1947  }
1948  else
1949  {
1950  endOfData -= dataBytes;
1951  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1952  endOfData,
1953  dataBytes);
1954  }
1955  // Set next in chain in block header
1956  blockHeader_._nextInChain = (current->_nextInChain
1957  ? current->_nextInChain->_offset
1958  : (amps_uint64_t)0);
1959  // Write the header for this block
1960  pBuffer_->setPosition(current->_offset);
1961  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1962  current = blocksUsed.top();
1963  blocksUsed.pop();
1964  dataBytes = getBlockDataSize();
1965  }
1966  // Move bytes for chain header expansion from 32 bytes
1967  pBuffer_->copyBytes(first_->_offset
1968  + getBlockHeaderSize()
1970  first_->_offset + getBlockHeaderSize() + 32,
1972  // Set the CRC to indicate first block and set nextInChain
1973  blockHeader_._crcVal = crcVal;
1974  blockHeader_._nextInChain = first_->_nextInChain->_offset;
1975  // Need to reset location to after OLD header:
1976  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1977  // amps_uint64_t seq, amps_uint64_t crc
1978  pBuffer_->setPosition(location_ + (sizeof(amps_uint32_t) * 2)
1979  + (sizeof(amps_uint64_t) * 2) );
1980  // Read old chain header which uses same order, but not
1981  // as many bytes (everything is 32bit):
1982  // operation, commandIdLen, correlationIdLen,
1983  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1984  BlockChainHeader chainHeader;
1985  pBuffer_->copyBytes((char*)&chainHeader,
1986  sizeof(amps_uint32_t) * 8);
1987  // Rewrite the header and chain header for first Block.
1988  pBuffer_->setPosition(location_);
1989  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1990  pBuffer_->putBytes((const char*)&chainHeader, sizeof(BlockChainHeader));
1991  }
1992 
1993  void chooseCRC(bool isFile)
1994  {
1995  if (!isFile)
1996  {
1997  _crc = noOpCRC;
1998  return;
1999  }
2000 
2001 #ifndef AMPS_SSE_42
2002  _crc = AMPS::CRC<0>::crcNoSSE;
2003 #else
2004  if (AMPS::CRC<0>::isSSE42Enabled())
2005  {
2006  _crc = AMPS::CRC<0>::crc;
2007  }
2008  else
2009  {
2010  _crc = AMPS::CRC<0>::crcNoSSE;
2011  }
2012 #endif
2013  }
2014 
2015  static amps_uint64_t noOpCRC(const char*, size_t, amps_uint64_t)
2016  {
2017  return 0;
2018  }
2019 
2020  protected:
2021  mutable BlockStore _blockStore;
2022  private:
2023  // Block used to hold metadata, currently:
2024  // the last persisted
2025  Block* _metadataBlock;
2026  // Highest sequence that has been discarded
2027  amps_uint64_t _maxDiscarded;
2028  // Track the assigned sequence numbers
2029 #if __cplusplus >= 201103L || _MSC_VER >= 1900
2030  std::atomic<amps_uint64_t> _lastSequence;
2031 #else
2032  volatile amps_uint64_t _lastSequence;
2033 #endif
2034  // Track how many messages are stored
2035  AMPS_ATOMIC_TYPE _stored;
2036 
2037  // Message used for doing replay
2038  Message _message;
2039 
2040  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
2041 
2042  // Function used to calculate the CRC if one is used
2043  CRCFunction _crc;
2044 
2045  };
2046 
2047 }
2048 
2049 #endif
2050 
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1090
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1304
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1302
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1062
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:656
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:561
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1306
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1194
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:593
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:266
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:711
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:642
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1184
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1425
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:241
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1305
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1449
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:251
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, bool errorOnPublishGap_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:199
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1424
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:699
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1160
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:498
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.