AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
BlockPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKPUBLISHSTORE_H_
27 #define _BLOCKPUBLISHSTORE_H_
28 #include <amps/ampsplusplus.hpp>
29 #include <amps/BlockStore.hpp>
30 #include <amps/Buffer.hpp>
31 #include <sstream>
32 #include <stack>
33 #include <string>
34 #include <map>
35 #include <amps/ampscrc.hpp>
36 #if __cplusplus >= 201103L || _MSC_VER >= 1900
37  #include <atomic>
38 #endif
39 
40 #ifdef _WIN32
41  #include <intrin.h>
42  #include <sys/timeb.h>
43 #else
44  #include <sys/time.h>
45 #endif
46 #include <iostream>
47 
52 
53 namespace AMPS
54 {
61  {
62  public:
63  typedef BlockStore::Block Block;
64  typedef Lock<BlockStore> BufferLock;
65  typedef Unlock<BlockStore> BufferUnlock;
66 
67  typedef enum
68  {
69  SOW_DELETE_DATA = 0x01,
70  SOW_DELETE_FILTER = 0x02,
71  SOW_DELETE_KEYS = 0x04,
72  SOW_DELETE_BOOKMARK = 0x08,
73  SOW_DELETE_BOOKMARK_CANCEL = 0x10,
74  SOW_DELETE_UNKNOWN = 0x80
75  } SowDeleteType;
76 
79  enum Constants : amps_uint32_t
80  {
81  DEFAULT_BLOCK_HEADER_SIZE = 32,
82  DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83  DEFAULT_BLOCKS_PER_REALLOC = 1000,
84  DEFAULT_BLOCK_SIZE = 2048
85  };
86 
87  /**********************************************************************
88  * Storage format
89  *************************************************************************
90  * Field Description | Type | # Bytes
91  *************************************************************************
92  * HEADER as detailed below | | 32 TOTAL
93  * | |
94  * Total number of blocks used by the record | uint32_t | 4
95  * Total length of the saved record | uint32_t | 4
96  * HA Message sequence | uint64_t | 8
97  * CRC Value - only set in first Block | uint64_t | 8
98  * next in chain offset | uint64_t | 8
99  *************************************************************************
100  * CHAIN HEADER as detailed below | | 64 TOTAL
101  * | |
102  * operation | uint32_t | 4
103  * command id length | uint32_t | 4
104  * correltation id length | uint32_t | 4
105  * expiration length | uint32_t | 4
106  * sow key length | uint32_t | 4
107  * topic length | uint32_t | 4
108  * sow delete flag | int32_t | 4
109  * ack types | uint32_t | 4
110  * unused [8] | uint32_t | 4*8 = 32
111  *************************************************************************
112  * DATA SECTION - can be spread across multiple blocks
113  *
114  * command id | char[]
115  * correlation id | char[]
116  * expiration | char[]
117  * sow key | char[]
118  * topic | char[]
119  * data | char[]
120  *************************************************************************/
121 
122  struct BlockHeader
123  {
124  amps_uint32_t _blocksToWrite;
125  amps_uint32_t _totalRemaining;
126  amps_uint64_t _seq;
127  amps_uint64_t _crcVal;
128  amps_uint64_t _nextInChain;
129  };
130 
131  struct BlockChainHeader
132  {
133  amps_uint32_t _operation;
134  amps_uint32_t _commandIdLen;
135  amps_uint32_t _correlationIdLen;
136  amps_uint32_t _expirationLen;
137  amps_uint32_t _sowKeyLen;
138  amps_uint32_t _topicLen;
139  amps_int32_t _flag;
140  amps_uint32_t _ackTypes;
141  amps_uint32_t _unused[8];
142  BlockChainHeader() // -V730
143  : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144  , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
145  , _ackTypes(0)
146  { ; }
147  };
148 
153  static inline amps_uint32_t getBlockHeaderSize()
154  {
155  return DEFAULT_BLOCK_HEADER_SIZE;
156  }
157 
163  static inline amps_uint32_t getBlockChainHeaderSize()
164  {
165  return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
166  }
167 
171  inline amps_uint32_t getBlockSize()
172  {
173  return _blockStore.getBlockSize();
174  }
175 
179  inline amps_uint32_t getBlockDataSize()
180  {
181  return _blockStore.getBlockSize() - getBlockHeaderSize();
182  }
183 
200  amps_uint32_t blocksPerRealloc_ = 1000,
201  bool isFile_ = false,
202  bool errorOnPublishGap_ = false,
203  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
204  : StoreImpl(errorOnPublishGap_)
205  , _blockStore(buffer_, blocksPerRealloc_,
206  DEFAULT_BLOCK_HEADER_SIZE,
207  (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
208  ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
209  , _metadataBlock(0)
210  , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
211  , _stored(0)
212  {
213  _blockStore.setResizeHandler(&BlockPublishStore::canResize, (void*)this);
214  chooseCRC(isFile_);
215  if (!isFile_)
216  {
217  // This gets set in recover in file-based stores
218  BufferLock bufferGuard(_blockStore);
219  _blockStore.init();
220  _metadataBlock = _blockStore.get(1);
221  // Remove metadata block from used list
222  _blockStore.setUsedList(0);
223  _blockStore.setEndOfUsedList(0);
224  // Metadata block holds block size, block header size,
225  // last discarded sequence, client version
226  _metadataBlock->_sequence = (amps_uint64_t)0;
227  Buffer* pBuffer = _blockStore.getBuffer();
228  pBuffer->setPosition(_metadataBlock->_offset);
229  pBuffer->putUint32((amps_uint32_t)getBlockSize());
230  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
231  pBuffer->putUint64((amps_uint64_t)0);
232  // Metadata blocks puts client version in CRC position
233  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
234  // No next in chain
235  pBuffer->putUint64((amps_uint64_t)0);
236  }
237  }
238 
242  {
243  }
244 
251  virtual amps_uint64_t store(const Message& message_)
252  {
253  return store(message_, true);
254  }
255 
266  amps_uint64_t store(const Message& message_, bool assignSequence_)
267  {
268  const char* commandId, *correlationId, *expiration, *sowKey,
269  *topic, *data;
270  size_t dataLen = 0;
271  BlockHeader blockHeader;
272  BlockChainHeader chainHeader;
273  message_.getRawCommandId(&commandId, &dataLen);
274  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
275  message_.getRawCorrelationId(&correlationId, &dataLen);
276  chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
277  message_.getRawExpiration(&expiration, &dataLen);
278  chainHeader._expirationLen = (amps_uint32_t)dataLen;
279  message_.getRawSowKey(&sowKey, &dataLen);
280  chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
281  message_.getRawTopic(&topic, &dataLen);
282  chainHeader._topicLen = (amps_uint32_t)dataLen;
283  message_.getRawData(&data, &dataLen);
284  chainHeader._flag = -1;
285  Message::Command::Type operation = message_.getCommandEnum();
286  chainHeader._operation = (amps_uint32_t)operation;
287  if (operation == Message::Command::SOWDelete)
288  {
289  if (dataLen > 0)
290  {
291  chainHeader._flag = SOW_DELETE_DATA;
292  }
293  else
294  {
295  message_.getRawFilter(&data, &dataLen);
296  if (dataLen > 0)
297  {
298  chainHeader._flag = SOW_DELETE_FILTER;
299  }
300  else
301  {
302  message_.getRawSowKeys(&data, &dataLen);
303  if (dataLen > 0)
304  {
305  chainHeader._flag = SOW_DELETE_KEYS;
306  }
307  else
308  {
309  message_.getRawBookmark(&data, &dataLen);
310  chainHeader._flag = SOW_DELETE_BOOKMARK;
311  // Check options for cancel
312  Message::Field options = message_.getOptions();
313  size_t remaining = options.len();
314  const void* next = NULL;
315  const void* start = (const void*)(options.data());
316  // Not necessarily null-terminated so no strstr
317  while (remaining >= 6 &&
318  (next = memchr(start, (int)'c', remaining)) != NULL)
319  {
320  remaining = (size_t)next - (size_t)start;
321  if (remaining >= 6 && strncmp((const char*)start,
322  "cancel", 6) == 0)
323  {
324  chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
325  break;
326  }
327  }
328  }
329  }
330  }
331  }
332  blockHeader._totalRemaining = (
333  (chainHeader._operation == Message::Command::Unknown)
334  ? 0
336  + chainHeader._commandIdLen
337  + chainHeader._correlationIdLen
338  + chainHeader._expirationLen
339  + chainHeader._sowKeyLen
340  + chainHeader._topicLen
341  + (amps_uint32_t)dataLen));
342  size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
343  (blockHeader._totalRemaining % getBlockDataSize()));
344  blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
345  ? 1
346  : ((amps_uint32_t)(blockHeader._totalRemaining
347  / getBlockDataSize())
348  + ((lastBlockLength > 0) ? 1 : 0)));
349  blockHeader._crcVal = (amps_uint64_t)0ULL;
350  blockHeader._crcVal = _crc(commandId,
351  chainHeader._commandIdLen,
352  blockHeader._crcVal);
353  blockHeader._crcVal = _crc(correlationId,
354  chainHeader._correlationIdLen,
355  blockHeader._crcVal);
356  blockHeader._crcVal = _crc(expiration,
357  chainHeader._expirationLen,
358  blockHeader._crcVal);
359  blockHeader._crcVal = _crc(sowKey,
360  chainHeader._sowKeyLen,
361  blockHeader._crcVal);
362  blockHeader._crcVal = _crc(topic,
363  chainHeader._topicLen,
364  blockHeader._crcVal);
365  blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
366 
367  // Reserve slots for storage, growing if necessary
368  BufferLock bufferGuard(_blockStore);
369  Block* first = _blockStore.get(blockHeader._blocksToWrite);
370  if (assignSequence_)
371  {
372  if (_lastSequence <= 2)
373  {
374  _getLastPersisted();
375  }
376  blockHeader._seq = ++_lastSequence;
377  }
378  else
379  {
380  blockHeader._seq = amps_message_get_field_uint64(
381  message_.getMessage(),
382  AMPS_Sequence);
383  if (!_maxDiscarded)
384  {
385  _maxDiscarded = blockHeader._seq - 1;
386  }
387  if (blockHeader._seq >= _lastSequence)
388  {
389  _lastSequence = blockHeader._seq;
390  }
391  }
392 
393  try
394  {
395  size_t topicWritten = 0UL;
396  size_t dataWritten = 0UL;
397  size_t commandWritten = 0UL;
398  size_t correlationWritten = 0UL;
399  size_t expirationWritten = 0UL;
400  size_t sowKeyWritten = 0UL;
401  Buffer* pBuffer = _blockStore.getBuffer();
402  for (Block* next = first; next; next = next->_nextInChain)
403  {
404  next->_sequence = blockHeader._seq;
405  if (next->_nextInChain)
406  {
407  blockHeader._nextInChain = next->_nextInChain->_offset;
408  }
409  else
410  {
411  blockHeader._nextInChain = (amps_uint64_t)0;
412  }
413  // Set buffer to start of Block and write the header
414  pBuffer->setPosition(next->_offset);
415  pBuffer->putBytes((const char*)&blockHeader, sizeof(BlockHeader));
416  // Clear crcVal, as it's only written in the first Block
417  blockHeader._crcVal = (amps_uint64_t)0;
418  size_t bytesRemaining = getBlockDataSize();
419  if (next == first)
420  {
421  // Write Block chain header
422  chainHeader._ackTypes = (amps_uint32_t)message_.getAckTypeEnum();
423  pBuffer->putBytes((const char*)&chainHeader,
424  sizeof(BlockChainHeader));
425  pBuffer->setPosition(next->_offset + getBlockHeaderSize() + getBlockChainHeaderSize());
426  bytesRemaining -= getBlockChainHeaderSize();
427  }
428  else
429  {
430  pBuffer->setPosition(next->_offset + getBlockHeaderSize());
431  }
432 
433  if (commandWritten < chainHeader._commandIdLen)
434  {
435  size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
436  pBuffer->putBytes(commandId + commandWritten,
437  commandWrite);
438  bytesRemaining -= commandWrite;
439  commandWritten += commandWrite;
440  }
441  if (correlationWritten < chainHeader._correlationIdLen)
442  {
443  size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
444  pBuffer->putBytes(correlationId + correlationWritten,
445  correlationWrite);
446  bytesRemaining -= correlationWrite;
447  correlationWritten += correlationWrite;
448  }
449  if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
450  {
451  size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
452  pBuffer->putBytes(expiration + expirationWritten, expWrite);
453  bytesRemaining -= expWrite;
454  expirationWritten += expWrite;
455  }
456  if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
457  {
458  size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
459  pBuffer->putBytes(sowKey + sowKeyWritten, sowKeyWrite);
460  bytesRemaining -= sowKeyWrite;
461  sowKeyWritten += sowKeyWrite;
462  }
463  if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
464  {
465  size_t topicWrite = (chainHeader._topicLen - topicWritten
466  < bytesRemaining)
467  ? chainHeader._topicLen - topicWritten
468  : bytesRemaining;
469  pBuffer->putBytes(topic + topicWritten, topicWrite);
470  bytesRemaining -= topicWrite;
471  topicWritten += topicWrite;
472  }
473  if (bytesRemaining > 0 && dataWritten < dataLen)
474  {
475  size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
476  dataLen - dataWritten : bytesRemaining;
477  pBuffer->putBytes(data + dataWritten, dataWrite);
478  bytesRemaining -= dataWrite;
479  dataWritten += dataWrite;
480  }
481  }
482  }
483  catch (const AMPSException&)
484  {
485  _blockStore.put(first);
486  throw;
487  }
488  AMPS_FETCH_ADD(&_stored, 1);
489  return blockHeader._seq;
490  }
491 
498  virtual void discardUpTo(amps_uint64_t index_)
499  {
500  // Get the lock
501  BufferLock bufferGuard(_blockStore);
502  Buffer* pBuffer = _blockStore.getBuffer();
503  // Don't use _getLastPersisted() here, don't want to set it
504  // to something other than index_ if it's not already set
505  amps_uint64_t lastPersisted = _metadataBlock->_sequence;
506  // Make sure it's a real index and we have messages to discard
507  if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
508  {
509  // During logon it's very possible we don't have a last persisted
510  // but that the Client is calling discardUpTo with the ack value.
511  if (lastPersisted < index_)
512  {
513  pBuffer->setPosition(_metadataBlock->_offset + 8);
514  pBuffer->putUint64(index_);
515  _metadataBlock->_sequence = index_;
516  if (_maxDiscarded < index_)
517  {
518  _maxDiscarded = index_;
519  }
520  if (_lastSequence <= index_)
521  {
522  _lastSequence = index_;
523  }
524  }
525  else if (!index_) // Fresh logon, no sequence history
526  {
527  _getLastPersisted();
528  }
529  else if (getErrorOnPublishGap() && index_ < lastPersisted) //Message gap
530  {
531  std::ostringstream os;
532  os << "Server last saw " << index_ << " from Client but Store "
533  << "has already discarded up to " << lastPersisted
534  << " which would leave a gap of unreceived messages.";
535  throw PublishStoreGapException(os.str());
536  }
537  _blockStore.signalAll();
538  return;
539  }
540 
541  _maxDiscarded = index_;
542  AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
543  _blockStore.signalAll();
544  if (lastPersisted >= index_)
545  {
546  return;
547  }
548  pBuffer->setPosition(_metadataBlock->_offset + 8);
549  pBuffer->putUint64(index_);
550  _metadataBlock->_sequence = index_;
551  if (_lastSequence < index_)
552  {
553  _lastSequence = index_;
554  }
555  }
556 
561  void replay(StoreReplayer& replayer_)
562  {
563  // Get the lock
564  BufferLock bufferGuard(_blockStore);
565  // If we don't have anything yet, return
566  if (!_blockStore.front())
567  {
568  return;
569  }
570  Block* next = _blockStore.front();
571  try
572  {
573  for (Block* block = _blockStore.front(); block; block = next)
574  {
575  // Replay the message
576  replayOnto(block, replayer_);
577  next = block->_nextInList;
578  }
579  }
580  catch (const StoreException&)
581  {
582  _blockStore.putAll(next);
583  throw;
584  }
585  }
586 
593  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
594  {
595  BufferLock bufferGuard(_blockStore);
596  // If we don't have anything yet, return
597  if (!_blockStore.front())
598  {
599  return false;
600  }
601  // Get the end point
602  amps_uint64_t lastIdx = _blockStore.back()->_sequence;
603  // Get the start point
604  amps_uint64_t leastIdx = _blockStore.front()->_sequence;
605  if (index_ >= leastIdx && index_ <= lastIdx)
606  {
607  Block* block = _blockStore.front();
608  while (block && block->_sequence != index_)
609  {
610  block = block->_nextInList;
611  }
612  if (!block)
613  {
614  return false;
615  }
616  // If total bytes is 0, it's a queue ack and gets skipped.
617  Buffer* pBuffer = _blockStore.getBuffer();
618  pBuffer->setPosition(block->_offset +
619  sizeof(amps_uint32_t));
620  if (pBuffer->getUint32() == 0)
621  {
622  return false;
623  }
624  replayOnto(block, replayer_);
625  return true;
626  }
627  else // Get Store and Client back in sync
628  {
629  _message.reset();
630  leastIdx -= 1;
631  _message.setSequence(leastIdx);
632  replayer_.execute(_message);
633  return false;
634  }
635  }
636 
642  size_t unpersistedCount() const
643  {
644  size_t count = (size_t)_stored;
645  return count;
646  }
647 
656  virtual void flush(long timeout_)
657  {
658  BufferLock bufferGuard(_blockStore);
659  amps_uint64_t waitFor = _getHighestUnpersisted();
660  // Check that we aren't already empty
661  if (waitFor == getUnsetSequence())
662  {
663  return;
664  }
665  if (timeout_ > 0)
666  {
667  bool timedOut = false;
668  AMPS_START_TIMER(timeout_);
669  // While timeout hasn't expired and we haven't had everything acked
670  while (!timedOut && _stored != 0
671  && waitFor >= _getLowestUnpersisted())
672  {
673  if (!_blockStore.wait(timeout_))
674  {
675  // May have woken up early, check real time
676  AMPS_RESET_TIMER(timedOut, timeout_);
677  }
678  }
679  // If we timed out and still haven't caught up with the acks
680  if (timedOut && _stored != 0
681  && waitFor >= _getLowestUnpersisted())
682  {
683  throw TimedOutException("Timed out waiting to flush publish store.");
684  }
685  }
686  else
687  {
688  while (_stored != 0 && waitFor >= _getLowestUnpersisted())
689  {
690  // Still wake up every 1s so python can interrupt
691  _blockStore.wait(1000);
692  // Don't hold lock if possibly grabbing GIL
693  BufferUnlock unlck(_blockStore);
694  amps_invoke_waiting_function();
695  }
696  }
697  }
698 
699  amps_uint64_t getLowestUnpersisted() const
700  {
701  BufferLock bufferGuard(_blockStore);
702  return _getLowestUnpersisted();
703  }
704 
705  amps_uint64_t getHighestUnpersisted() const
706  {
707  BufferLock bufferGuard(_blockStore);
708  return _getHighestUnpersisted();
709  }
710 
711  amps_uint64_t getLastPersisted(void)
712  {
713  BufferLock bufferGuard(_blockStore);
714  return _getLastPersisted();
715  }
716 
717  protected:
718  static bool canResize(size_t requestedSize_, void* vpThis_)
719  {
720  BlockPublishStore* me = (BlockPublishStore*)vpThis_;
721  return me->callResizeHandler(requestedSize_);
722  }
723 
724  amps_uint64_t _getLowestUnpersisted() const
725  {
726  // Assume the lock is held
727  // If we don't have anything, return MAX
728  if (!_blockStore.front())
729  {
730  return getUnsetSequence();
731  }
732  return _blockStore.front()->_sequence;
733  }
734 
735  amps_uint64_t _getHighestUnpersisted() const
736  {
737  // Assume the lock is held
738  // If we don't have anything, return MAX
739  if (!_blockStore.back())
740  {
741  return getUnsetSequence();
742  }
743  return _blockStore.back()->_sequence;
744  }
745 
746  amps_uint64_t _getLastPersisted(void)
747  {
748  // Assume the lock is held
749  amps_uint64_t lastPersisted = (amps_uint64_t)0;
750  Buffer* pBuffer = _blockStore.getBuffer();
751  pBuffer->setPosition(_metadataBlock->_offset + 8);
752  lastPersisted = pBuffer->getUint64();
753  if (lastPersisted)
754  {
755  if (_lastSequence < lastPersisted)
756  {
757  _lastSequence = lastPersisted;
758  }
759  return lastPersisted;
760  }
761  if (_maxDiscarded)
762  {
763  lastPersisted = _maxDiscarded;
764  }
765  else
766  {
767 #ifdef _WIN32
768  struct _timeb t;
769  _ftime_s(&t);
770  lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
771 #else // not _WIN32
772  struct timeval tv;
773  gettimeofday(&tv, NULL);
774  lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
775  * (amps_uint64_t)1000000;
776 #endif
777  }
778  if (_lastSequence > 2)
779  {
780  amps_uint64_t low = _getLowestUnpersisted();
781  amps_uint64_t high = _getHighestUnpersisted();
782  if (low != getUnsetSequence())
783  {
784  lastPersisted = low - 1;
785  }
786  if (high != getUnsetSequence() && _lastSequence <= high)
787  {
788  _lastSequence = high;
789  }
790  if (_lastSequence < lastPersisted)
791  {
792  lastPersisted = _lastSequence - 1;
793  }
794  }
795  else
796  {
797  _lastSequence = lastPersisted;
798  }
799  pBuffer->setPosition(_metadataBlock->_offset
800  + sizeof(amps_uint32_t) // blocks used
801  + sizeof(amps_uint32_t)); // record length
802  pBuffer->putUint64(lastPersisted);
803  _metadataBlock->_sequence = lastPersisted;
804  return lastPersisted;
805  }
806 
807  void recover(void)
808  {
809  BufferLock bufferGuard(_blockStore);
810  // Make sure the size isn't 0 and is a multiple of block size
811  Buffer* pBuffer = _blockStore.getBuffer();
812  size_t size = pBuffer->getSize();
813  amps_uint32_t blockSize = getBlockSize();
814  if (size == 0)
815  {
816  _blockStore.init();
817  _metadataBlock = _blockStore.get(1);
818  _metadataBlock->_sequence = (amps_uint64_t)0;
819  pBuffer->setPosition(_metadataBlock->_offset);
820  // Metadata block holds block size, block header size,
821  // last discarded sequence, client version
822  pBuffer->putUint32(blockSize);
823  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
824  pBuffer->putUint64((amps_uint64_t)0);
825  // Metadata blocks puts client version in CRC position
826  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
827  // No next in chain
828  pBuffer->putUint64((amps_uint64_t)0);
829  return;
830  }
831  size_t numBlocks = size / blockSize;
832  if (size % blockSize > 0)
833  {
834  // We shouldn't ever be in here, since it requires starting with a
835  // file that is not an even multiple of block size and we always
836  // fix the size.
837  numBlocks = size / blockSize;
838  ++numBlocks;
839  amps_uint32_t blockCount = 0;
840  // We allocate all the Blocks at once below so delete allocated Block[]
841  delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
842  // Resize can fail if resizeHandler is set and refuses the request
843  // Since this is recovery, we need to simply fail in that case
844  if (size > pBuffer->getSize() || numBlocks != (size_t)blockCount)
845  {
846  throw StoreException("Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
847  }
848  size = pBuffer->getSize();
849  }
850 
851  amps_uint64_t maxIdx = 0;
852  amps_uint64_t minIdx = 0;
853  size_t location = 0;
854  BlockHeader blockHeader;
855  // The blocks we create here all get their offset set in below loop
856  Block* blocks = new Block[numBlocks];
857  blocks[numBlocks - 1]._nextInList = 0;
858  size_t blockNum = 0;
859  _blockStore.addBlocks(blocks);
860  _metadataBlock = blocks; // The first Block is metadata
861  _metadataBlock->_nextInList = 0;
862  pBuffer->setPosition(0);
863  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
864  /* Metadata Block header fields
865  * amps_uint32_t _blocksToWrite = BlockSize
866  * amps_uint32_t _totalRemaining = BlockHeaderSize
867  * amps_uint64_t _seq = last persisted sequence number
868  * amps_uint64_t _crcVal = unused
869  * amps_uint64_t _nextInChain = unused
870  */
871  if (blockHeader._blocksToWrite == 1) // Old store format?
872  {
873  /* Old format metadata block header fields
874  * amps_uint32_t _blocksToWrite = 1
875  * amps_uint32_t _totalRemaining = client version
876  * amps_uint64_t _seq = last persisted sequence number
877  * amps_uint64_t _crcVal = unused
878  * amps_uint64_t _nextInChain = unused
879  */
880  // Readable old format starts with version 5.0.0.0
881  if (blockHeader._totalRemaining >= 5000000)
882  {
883  // All recovery needs to be based on old format
884  // so go do that instead.
885  recoverOldFormat(blocks);
886  return;
887  }
888  // Unreadable format, fail
889  throw StoreException("Unrecognized format for Store. Can't recover.");
890  }
891  if (blockHeader._blocksToWrite == 0)
892  {
893  pBuffer->setPosition(0);
894  pBuffer->putUint32(blockSize);
895  }
896  else
897  {
898  blockSize = blockHeader._blocksToWrite;
899  _blockStore.setBlockSize(blockSize);
900  }
901  if (blockHeader._totalRemaining == 0)
902  {
903  pBuffer->setPosition(sizeof(amps_uint32_t));
904  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
905  }
906  else
907  {
908  _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
909  }
910  _metadataBlock->_sequence = blockHeader._seq;
911  if (_metadataBlock->_sequence
912  && _metadataBlock->_sequence < (amps_uint64_t)1000000)
913  {
914  pBuffer->setPosition(_metadataBlock->_offset
915  + sizeof(amps_uint32_t) // BlockSize
916  + sizeof(amps_uint32_t)); // BlockHeaderSize
917  pBuffer->putUint64((amps_uint64_t)0);
918  _metadataBlock->_sequence = 0;
919  }
920  else
921  {
922  // Set _maxDiscarded and _lastSequence
923  _maxDiscarded = _metadataBlock->_sequence;
924  _lastSequence = _maxDiscarded;
925  }
926  // This would be where to check the client version string
927  // No checks currently
928  location += blockSize;
929  amps_uint32_t freeCount = 0;
930  Block* firstFree = NULL;
931  Block* endOfFreeList = NULL;
932  // Used to create used list in order after recovery
933  typedef std::map<amps_uint64_t, Block*> RecoverMap;
934  RecoverMap recoveredBlocks;
935  while (location < size)
936  {
937  // Get index and check if non-zero
938  pBuffer->setPosition(location);
939  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
940  if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
941  (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
942  {
943  // Block is part of a chain
944  location += blockSize;
945  continue;
946  }
947  Block* block = blocks[++blockNum].setOffset(location);
948  bool recovered = false;
949  if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
950  {
951  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
952  block->_sequence = blockHeader._seq;
953  // Track min and max
954  if (maxIdx < blockHeader._seq)
955  {
956  maxIdx = blockHeader._seq;
957  }
958  if (minIdx > blockHeader._seq)
959  {
960  minIdx = blockHeader._seq;
961  }
962  // Save it in recovered blocks
963  recoveredBlocks[blockHeader._seq] = block;
964  // Set up the chain
965  while (blockHeader._nextInChain != (amps_uint64_t)0)
966  {
967  Block* chain = blocks[++blockNum]
968  .setOffset((size_t)blockHeader._nextInChain);
969  chain->_nextInList = 0;
970  pBuffer->setPosition((size_t)blockHeader._nextInChain
971  + sizeof(amps_uint32_t) // blocks used
972  + sizeof(amps_uint32_t) // record length
973  + sizeof(amps_uint64_t) // seq
974  + sizeof(amps_uint64_t)); // crc
975  blockHeader._nextInChain = pBuffer->getUint64();
976  block->_nextInChain = chain;
977  block = chain;
978  block->_sequence = blockHeader._seq;
979  }
980  recovered = true;
981  }
982  if (!recovered)
983  {
984  // Put this Block on the free list
985  if (endOfFreeList)
986  {
987  endOfFreeList->_nextInList = block;
988  }
989  else
990  {
991  firstFree = block;
992  }
993  endOfFreeList = block;
994  ++freeCount;
995  }
996  location += blockSize;
997  }
998  if (endOfFreeList)
999  {
1000  endOfFreeList->_nextInList = 0;
1001  }
1002  _blockStore.setFreeList(firstFree, freeCount);
1003  if (maxIdx > _lastSequence)
1004  {
1005  _lastSequence = maxIdx;
1006  }
1007  if (minIdx > _maxDiscarded + 1)
1008  {
1009  _maxDiscarded = minIdx - 1;
1010  }
1011  if (_maxDiscarded > _metadataBlock->_sequence)
1012  {
1013  _metadataBlock->_sequence = _maxDiscarded;
1014  pBuffer->setPosition(_metadataBlock->_offset + 8);
1015  pBuffer->putUint64(_maxDiscarded);
1016  }
1017  Block* end = NULL;
1018  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1019  for (RecoverMap::iterator i = recoveredBlocks.begin();
1020  i != recoveredBlocks.end(); ++i)
1021  {
1022  if (end)
1023  {
1024  end->_nextInList = i->second;
1025  }
1026  else
1027  {
1028  _blockStore.setUsedList(i->second);
1029  }
1030  end = i->second;
1031  }
1032  if (end)
1033  {
1034  end->_nextInList = 0;
1035  }
1036  _blockStore.setEndOfUsedList(end);
1037  }
1038 
1039  private:
1040  // Lock should already be held
1041  void replayOnto(Block* block_, StoreReplayer& replayer_)
1042  {
1043  // Read the header
1044  size_t start = block_->_offset;
1045  size_t position = start;
1046  Buffer* pBuffer = _blockStore.getBuffer();
1047  pBuffer->setPosition(position);
1048  BlockHeader blockHeader;
1049  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1050  if (blockHeader._totalRemaining == 0)
1051  {
1052  // Queue acking sow_delete
1053  return;
1054  }
1055  position += getBlockHeaderSize();
1056  BlockChainHeader blockChainHeader;
1057  pBuffer->copyBytes((char*)&blockChainHeader, sizeof(blockChainHeader));
1058  if (blockChainHeader._operation == Message::Command::Unknown)
1059  {
1060  // Queue acking sow_delete
1061  return;
1062  }
1063  blockChainHeader._ackTypes |= Message::AckType::Persisted;
1064  position += getBlockChainHeaderSize();
1065  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
1066  pBuffer->setPosition(position);
1067 
1068  if (blockHeader._totalRemaining
1069  < blockChainHeader._commandIdLen
1070  + blockChainHeader._correlationIdLen
1071  + blockChainHeader._expirationLen
1072  + blockChainHeader._sowKeyLen
1073  + blockChainHeader._topicLen)
1074  {
1075  std::ostringstream os;
1076  os << "Corrupted message found with invalid lengths. "
1077  << "Attempting to replay " << block_->_sequence
1078  << ". Block sequence " << blockHeader._seq
1079  << ", topic length " << blockChainHeader._topicLen
1080  << ", data length " << blockHeader._totalRemaining
1081  << ", command ID length " << blockChainHeader._commandIdLen
1082  << ", correlation ID length " << blockChainHeader._correlationIdLen
1083  << ", expiration length " << blockChainHeader._expirationLen
1084  << ", sow key length " << blockChainHeader._sowKeyLen
1085  << ", start " << start
1086  << ", position " << position
1087  << ", buffer size " << pBuffer->getSize();
1088  throw StoreException(os.str());
1089  }
1090 
1091  // Start prepping the message
1092  _message.reset();
1093  _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1094  _message.setAckTypeEnum((unsigned)blockChainHeader._ackTypes
1095  | Message::AckType::Persisted);
1096  _message.setSequence(blockHeader._seq);
1097  // Read the data and calculate the CRC
1098  Block* current = block_;
1099  size_t blockBytesRemaining = getBlockDataSize() - getBlockChainHeaderSize();
1100  amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1101  // Use tmpBuffers for any fields split across Block boundaries
1102  char** tmpBuffers = (blockHeader._blocksToWrite > 1) ? new char* [blockHeader._blocksToWrite - 1] : 0;
1103  size_t blockNum = 0;
1104  if (blockChainHeader._commandIdLen > 0)
1105  {
1106  if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1107  {
1108  _message.assignCommandId(pBuffer->getBytes(blockChainHeader._commandIdLen)._data,
1109  blockChainHeader._commandIdLen);
1110  blockBytesRemaining -= blockChainHeader._commandIdLen;
1111  }
1112  else
1113  {
1114  tmpBuffers[blockNum] = new char[blockChainHeader._commandIdLen]; // -V522
1115  size_t totalLeft = blockChainHeader._commandIdLen;
1116  size_t totalRead = 0;
1117  size_t readLen = 0;
1118  while (totalLeft)
1119  {
1120  readLen = blockBytesRemaining < totalLeft ?
1121  blockBytesRemaining : totalLeft;
1122  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1123  if (!(totalLeft -= readLen))
1124  {
1125  break;
1126  }
1127  if (!(current = current->_nextInChain))
1128  {
1129  break;
1130  }
1131  totalRead += readLen;
1132  blockBytesRemaining = getBlockDataSize();
1133  position = current->_offset + getBlockHeaderSize();
1134  pBuffer->setPosition(position);
1135  }
1136  blockBytesRemaining -= readLen;
1137  _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1138  }
1139  blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1140  crcCalc = _crc(_message.getCommandId().data(),
1141  blockChainHeader._commandIdLen, crcCalc);
1142  }
1143  if (blockChainHeader._correlationIdLen > 0)
1144  {
1145  if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1146  {
1147  _message.assignCorrelationId(
1148  pBuffer->getBytes(blockChainHeader._correlationIdLen)._data,
1149  blockChainHeader._correlationIdLen);
1150  blockBytesRemaining -= blockChainHeader._correlationIdLen;
1151  }
1152  else
1153  {
1154  tmpBuffers[blockNum] = new char[blockChainHeader._correlationIdLen]; // -V522
1155  size_t totalLeft = blockChainHeader._correlationIdLen;
1156  size_t totalRead = 0;
1157  size_t readLen = 0;
1158  while (totalLeft)
1159  {
1160  readLen = blockBytesRemaining < totalLeft ?
1161  blockBytesRemaining : totalLeft;
1162  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1163  if (!(totalLeft -= readLen))
1164  {
1165  break;
1166  }
1167  if (!(current = current->_nextInChain))
1168  {
1169  break; // -V522
1170  }
1171  totalRead += readLen;
1172  blockBytesRemaining = getBlockDataSize();
1173  position = current->_offset + getBlockHeaderSize();
1174  pBuffer->setPosition(position);
1175  }
1176  blockBytesRemaining -= readLen;
1177  _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1178  }
1179  blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1180  crcCalc = _crc(_message.getCorrelationId().data(),
1181  blockChainHeader._correlationIdLen, crcCalc);
1182  }
1183  if (blockChainHeader._expirationLen > 0)
1184  {
1185  if (blockChainHeader._expirationLen <= blockBytesRemaining)
1186  {
1187  _message.assignExpiration(
1188  pBuffer->getBytes(blockChainHeader._expirationLen)._data,
1189  blockChainHeader._expirationLen);
1190  blockBytesRemaining -= blockChainHeader._expirationLen;
1191  }
1192  else
1193  {
1194  tmpBuffers[blockNum] = new char[blockChainHeader._expirationLen]; // -V522
1195  size_t totalLeft = blockChainHeader._expirationLen;
1196  size_t totalRead = 0;
1197  size_t readLen = 0;
1198  while (totalLeft)
1199  {
1200  readLen = blockBytesRemaining < totalLeft ?
1201  blockBytesRemaining : totalLeft;
1202  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1203  if (!(totalLeft -= readLen))
1204  {
1205  break;
1206  }
1207  if (!(current = current->_nextInChain))
1208  {
1209  break;
1210  }
1211  totalRead += readLen;
1212  blockBytesRemaining = getBlockDataSize();
1213  position = current->_offset + getBlockHeaderSize();
1214  pBuffer->setPosition(position);
1215  }
1216  blockBytesRemaining -= readLen;
1217  _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1218  }
1219  blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1220  crcCalc = _crc(_message.getExpiration().data(),
1221  blockChainHeader._expirationLen, crcCalc);
1222  }
1223  if (blockChainHeader._sowKeyLen > 0)
1224  {
1225  if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1226  {
1227  _message.assignSowKey(pBuffer->getBytes(blockChainHeader._sowKeyLen)._data,
1228  blockChainHeader._sowKeyLen);
1229  blockBytesRemaining -= blockChainHeader._sowKeyLen;
1230  }
1231  else
1232  {
1233  tmpBuffers[blockNum] = new char[blockChainHeader._sowKeyLen]; // -V522
1234  size_t totalLeft = blockChainHeader._sowKeyLen;
1235  size_t totalRead = 0;
1236  size_t readLen = 0;
1237  while (totalLeft)
1238  {
1239  readLen = blockBytesRemaining < totalLeft ?
1240  blockBytesRemaining : totalLeft;
1241  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1242  if (!(totalLeft -= readLen))
1243  {
1244  break;
1245  }
1246  if (!(current = current->_nextInChain))
1247  {
1248  break;
1249  }
1250  totalRead += readLen;
1251  blockBytesRemaining = getBlockDataSize();
1252  position = current->_offset + getBlockHeaderSize();
1253  pBuffer->setPosition(position);
1254  }
1255  blockBytesRemaining -= readLen;
1256  _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1257  }
1258  blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1259  crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1260  }
1261  if (blockChainHeader._topicLen > 0)
1262  {
1263  if (blockChainHeader._topicLen <= blockBytesRemaining)
1264  {
1265  _message.assignTopic(pBuffer->getBytes(blockChainHeader._topicLen)._data,
1266  blockChainHeader._topicLen);
1267  blockBytesRemaining -= blockChainHeader._topicLen;
1268  }
1269  else
1270  {
1271  tmpBuffers[blockNum] = new char[blockChainHeader._topicLen]; // -V522
1272  size_t totalLeft = blockChainHeader._topicLen;
1273  size_t totalRead = 0;
1274  size_t readLen = 0;
1275  while (totalLeft)
1276  {
1277  readLen = blockBytesRemaining < totalLeft ?
1278  blockBytesRemaining : totalLeft;
1279  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1280  if (!(totalLeft -= readLen))
1281  {
1282  break;
1283  }
1284  if (!(current = current->_nextInChain))
1285  {
1286  break;
1287  }
1288  totalRead += readLen;
1289  blockBytesRemaining = getBlockDataSize();
1290  position = current->_offset + getBlockHeaderSize();
1291  pBuffer->setPosition(position);
1292  }
1293  blockBytesRemaining -= readLen;
1294  _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1295  }
1296  blockHeader._totalRemaining -= blockChainHeader._topicLen;
1297  crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1298  }
1299  if (blockHeader._totalRemaining > 0)
1300  {
1301  if (blockHeader._totalRemaining <= blockBytesRemaining)
1302  {
1303  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1304  {
1305  _message.assignData(
1306  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1307  blockHeader._totalRemaining);
1308  crcCalc = _crc(_message.getData().data(),
1309  blockHeader._totalRemaining, crcCalc);
1310  }
1311  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1312  {
1313  _message.assignFilter(
1314  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1315  blockHeader._totalRemaining);
1316  crcCalc = _crc(_message.getFilter().data(),
1317  blockHeader._totalRemaining, crcCalc);
1318  }
1319  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1320  {
1321  _message.assignSowKeys(
1322  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1323  blockHeader._totalRemaining);
1324  crcCalc = _crc(_message.getSowKeys().data(),
1325  blockHeader._totalRemaining, crcCalc);
1326  }
1327  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1328  {
1329  _message.assignBookmark(
1330  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1331  blockHeader._totalRemaining);
1332  crcCalc = _crc(_message.getBookmark().data(),
1333  blockHeader._totalRemaining, crcCalc);
1334  }
1335  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1336  {
1337  _message.assignBookmark(
1338  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1339  blockHeader._totalRemaining);
1340  crcCalc = _crc(_message.getBookmark().data(),
1341  blockHeader._totalRemaining, crcCalc);
1342  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1343  }
1344  }
1345  else
1346  {
1347  tmpBuffers[blockNum] = new char[blockHeader._totalRemaining]; // -V522
1348  size_t totalLeft = blockHeader._totalRemaining;
1349  size_t totalRead = 0;
1350  size_t readLen = 0;
1351  while (totalLeft)
1352  {
1353  readLen = blockBytesRemaining < totalLeft ?
1354  blockBytesRemaining : totalLeft;
1355  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1356  if (!(totalLeft -= readLen))
1357  {
1358  break;
1359  }
1360  if (!(current = current->_nextInChain))
1361  {
1362  break;
1363  }
1364  totalRead += readLen;
1365  blockBytesRemaining = getBlockDataSize();
1366  position = current->_offset + getBlockHeaderSize();
1367  pBuffer->setPosition(position);
1368  }
1369  position += readLen;
1370  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1371  {
1372  _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1373  }
1374  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1375  {
1376  _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1377  }
1378  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1379  {
1380  _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1381  }
1382  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1383  {
1384  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1385  }
1386  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1387  {
1388  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1389  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1390  }
1391  crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc); // -V595
1392  }
1393  }
1394 
1395  // Validate the crc and seq
1396  if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1397  {
1398  std::ostringstream os;
1399  os << "Corrupted message found by CRC or sequence "
1400  << "Attempting to replay " << block_->_sequence
1401  << ". Block sequence " << blockHeader._seq
1402  << ", expiration length " << blockChainHeader._expirationLen
1403  << ", sowKey length " << blockChainHeader._sowKeyLen
1404  << ", topic length " << blockChainHeader._topicLen
1405  << ", data length " << blockHeader._totalRemaining
1406  << ", command ID length " << blockChainHeader._commandIdLen
1407  << ", correlation ID length " << blockChainHeader._correlationIdLen
1408  << ", flag " << blockChainHeader._flag
1409  << ", expected CRC " << blockHeader._crcVal
1410  << ", actual CRC " << crcCalc
1411  << ", start " << start
1412  << ", position " << position
1413  << ", buffer size " << pBuffer->getSize();
1414  for (Block* block = block_; block; block = block->_nextInChain)
1415  {
1416  os << "\n BLOCK " << block->_offset;
1417  }
1418  if (tmpBuffers)
1419  {
1420  for (amps_uint32_t i = 0; i < blockNum; ++i)
1421  {
1422  delete[] tmpBuffers[i]; // -V522
1423  }
1424  delete[] tmpBuffers;
1425  }
1426  throw StoreException(os.str());
1427  }
1428  // Replay the message
1429  replayer_.execute(_message);
1430  // Free the buffer if allocated
1431  if (tmpBuffers)
1432  {
1433  for (amps_uint32_t i = 0; i < blockNum; ++i)
1434  {
1435  delete[] tmpBuffers[i]; // -V522
1436  }
1437  delete[] tmpBuffers;
1438  }
1439  }
1440 
1441  // Lock should already be held
1442  // Read an older format file and update it.
1443  void recoverOldFormat(Block* blocks)
1444  {
1445  Buffer* pBuffer = _blockStore.getBuffer();
1446  amps_uint64_t maxIdx = 0;
1447  amps_uint64_t minIdx = 0;
1448  size_t size = pBuffer->getSize();
1449  size_t location = 0;
1450  pBuffer->setPosition(location);
1451  pBuffer->putUint32((amps_uint32_t)getBlockSize());
1452  pBuffer->putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1453  _metadataBlock->_sequence = pBuffer->getUint64();
1454  if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1455  {
1456  pBuffer->setPosition(_metadataBlock->_offset + 8);
1457  pBuffer->putUint64((amps_uint64_t)0);
1458  _metadataBlock->_sequence = 0;
1459  }
1460  else
1461  {
1462  // Set _maxDiscarded and _lastSequence
1463  _maxDiscarded = _metadataBlock->_sequence;
1464  _lastSequence = _maxDiscarded;
1465  }
1466  // Write the current client version
1467  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1468  // No next in chain
1469  pBuffer->putUint64((amps_uint64_t)0);
1470  // No checks currently
1471  location += getBlockSize();
1472  amps_uint32_t freeCount = 0;
1473  Block* firstFree = NULL;
1474  Block* endOfFreeList = NULL;
1475  amps_uint32_t blockSize = getBlockSize();
1476  size_t numBlocks = size / blockSize;
1477  size_t blockNum = 0;
1478  // Used to create used list in order after recovery
1479  typedef std::map<amps_uint64_t, Block*> RecoverMap;
1480  RecoverMap recoveredBlocks;
1481  RecoverMap growingBlocks;
1482  amps_uint32_t growthBlocksNeeded = 0;
1483  while (location < size)
1484  {
1485  // Get seq and check if non-zero
1486  pBuffer->setPosition(location);
1487  BlockHeader blockHeader;
1488  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1489  size_t blockCount = (size_t)blockHeader._blocksToWrite;
1490  if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1491  && blockHeader._totalRemaining < size
1492  && blockHeader._blocksToWrite < numBlocks
1493  && (blockHeader._blocksToWrite * blockSize)
1494  >= blockHeader._totalRemaining)
1495  {
1496  size_t oldFormatSize = blockHeader._totalRemaining;
1497  // Old format total was storage bytes plus 64 bytes for block
1498  // and chain headers.
1499  blockHeader._totalRemaining -= 64;
1500  // New format counts only chain header size
1501  blockHeader._totalRemaining += getBlockChainHeaderSize();
1502  // Get the rest of the header
1503  BlockChainHeader chainHeader;
1504  // Need to reset location to after OLD header:
1505  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1506  // amps_uint64_t seq, amps_uint64_t crc
1507  pBuffer->setPosition(location + (sizeof(amps_uint32_t) * 2)
1508  + (sizeof(amps_uint64_t) * 2) );
1509  // Read old chain header which uses same order, but not
1510  // as many bytes (everything is 32bit):
1511  // operation, commandIdLen, correlationIdLen,
1512  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1513  pBuffer->copyBytes((char*)&chainHeader,
1514  sizeof(amps_uint32_t) * 8);
1515  // Check for garbage, likely indicating this is part of a chain
1516  if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1517  + chainHeader._expirationLen + chainHeader._sowKeyLen
1518  + chainHeader._topicLen) > blockHeader._totalRemaining)
1519  {
1520  // Skip this block, can't be real data
1521  location += getBlockSize();
1522  continue;
1523  }
1524  // Check if data fits in current number of blocks
1525  amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1526  / getBlockDataSize())
1527  + (blockHeader._totalRemaining
1528  % getBlockDataSize()
1529  ? 1 : 0);
1530  if (blocksNeeded == blockHeader._blocksToWrite)
1531  {
1532  Block* first = blocks[++blockNum].setOffset(location);
1533  first->_nextInList = 0;
1534  first->_sequence = blockHeader._seq;
1535  if (blockHeader._blocksToWrite > 1)
1536  {
1537  // CRC is only set on the first block
1538  amps_uint64_t crcVal = blockHeader._crcVal;
1539  blockHeader._crcVal = 0;
1540  Block* current = 0;
1541  // It fits, just need to adjust the block formats
1542  // and set up the chain. Start with the last block
1543  // and move data as needed starting at the end.
1544  size_t currentBlockNum = blockNum
1545  + blockHeader._blocksToWrite
1546  - 1;
1547  // Last item could wrap to beginning, but beginning is
1548  // block 1, not 0, which is the metadata block.
1549  if (currentBlockNum >= numBlocks)
1550  {
1551  currentBlockNum = currentBlockNum - numBlocks + 1;
1552  }
1553  if (currentBlockNum < blockNum)
1554  {
1555  Block* last = blocks[currentBlockNum]
1556  .init(currentBlockNum, getBlockSize());
1557  if ((current = firstFree) == last)
1558  {
1559  firstFree = firstFree->_nextInList; // -V522
1560  if (!firstFree)
1561  {
1562  endOfFreeList = 0;
1563  }
1564  --freeCount;
1565  }
1566  else
1567  {
1568  while (current)
1569  {
1570  if (current->_nextInList == last)
1571  {
1572  current->_nextInList = last->_nextInList;
1573  current = last;
1574  --freeCount;
1575  break;
1576  }
1577  current = current->_nextInList;
1578  }
1579  }
1580  }
1581  if (!current)
1582  {
1583  current = blocks[currentBlockNum]
1584  .init(currentBlockNum, getBlockSize());
1585  }
1586  // Initially, the number of bytes in last block
1587  size_t dataBytes = oldFormatSize % getBlockSize();
1588  while (current != first)
1589  {
1590  current->_nextInList = 0;
1591  current->_sequence = blockHeader._seq;
1592  // Set _nextInChain on previous Block, will include first
1593  if (--currentBlockNum < 1
1594  || currentBlockNum > numBlocks)
1595  {
1596  currentBlockNum = numBlocks - 1;
1597  }
1598  Block* previous = blocks[currentBlockNum]
1599  .init(currentBlockNum,
1600  getBlockSize());
1601  previous->_nextInChain = current;
1602  // Shift to make room for a header in every block
1603  // Not growing, so this won't write past the end.
1604  // Shift amount accounts for a header added to each
1605  // block after the first plus any change in the
1606  // chain header size from 32, which is the old size.
1607  size_t bytesToMove = --blockCount
1608  * getBlockHeaderSize()
1610  - 32);
1611  pBuffer->copyBytes(current->_offset + bytesToMove,
1612  current->_offset,
1613  dataBytes);
1614  dataBytes = getBlockSize();
1615  if (bytesToMove > getBlockHeaderSize())
1616  {
1617  bytesToMove -= getBlockHeaderSize();
1618  dataBytes -= bytesToMove;
1619  pBuffer->copyBytes(current->_offset
1620  + getBlockHeaderSize(),
1621  previous->_offset
1622  + dataBytes,
1623  bytesToMove);
1624  }
1625  // Set next in chain for this block's header
1626  blockHeader._nextInChain = (current->_nextInChain
1627  ? current->_nextInChain->_offset
1628  : (amps_uint64_t)0);
1629  // Write the header for this block
1630  pBuffer->setPosition(current->_offset);
1631  pBuffer->putBytes((const char*)&blockHeader,
1632  sizeof(BlockHeader));
1633  if (firstFree == previous)
1634  {
1635  firstFree = firstFree->_nextInList;
1636  if (!firstFree)
1637  {
1638  endOfFreeList = 0;
1639  }
1640  --freeCount;
1641  }
1642  else
1643  {
1644  current = firstFree;
1645  while (current)
1646  {
1647  if (current->_nextInList == previous)
1648  {
1649  current->_nextInList = previous->_nextInList;
1650  --freeCount;
1651  break;
1652  }
1653  current = current->_nextInList;
1654  }
1655  }
1656  current = previous;
1657  }
1658  blockNum += blockHeader._blocksToWrite - 1;
1659  blockHeader._crcVal = crcVal;
1660  }
1661  // Move bytes for chain header expansion from 32 bytes
1662  size_t bytesToMove = getBlockDataSize() - 32
1663  - (getBlockChainHeaderSize() - 32);
1664  pBuffer->copyBytes(first->_offset + getBlockHeaderSize()
1666  first->_offset + getBlockHeaderSize() + 32,
1667  bytesToMove);
1668  // Rewrite the header and chain header for first Block.
1669  pBuffer->setPosition(first->_offset);
1670  blockHeader._nextInChain = (first->_nextInChain
1671  ? first->_nextInChain->_offset
1672  : (amps_uint64_t)0);
1673  pBuffer->putBytes((const char*)&blockHeader,
1674  sizeof(BlockHeader));
1675  pBuffer->putBytes((const char*)&chainHeader,
1676  sizeof(BlockChainHeader));
1677  // Add first Block to recovered for building the used
1678  // list later
1679  recoveredBlocks[blockHeader._seq] = first;
1680  }
1681  else
1682  {
1683  // This will need at least one more Block due to a header in
1684  // every Block. Check how many and save for later.
1685  growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1686  growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1687  blockNum += blockHeader._blocksToWrite - 1;
1688  }
1689  // Track min and max
1690  if (maxIdx < blockHeader._seq)
1691  {
1692  maxIdx = blockHeader._seq;
1693  }
1694  if (minIdx > blockHeader._seq)
1695  {
1696  minIdx = blockHeader._seq;
1697  }
1698  // Advance past read blocks
1699  location += blockHeader._blocksToWrite * getBlockSize();
1700  // Either we're exiting loop, or blockNum is in range
1701  assert(location >= size || blockNum < numBlocks);
1702  }
1703  else
1704  {
1705  // Put this Block on the free list
1706  Block* block = blocks[++blockNum].setOffset(location);
1707  if (endOfFreeList)
1708  {
1709  endOfFreeList->_nextInList = block;
1710  }
1711  else
1712  {
1713  firstFree = block;
1714  }
1715  endOfFreeList = block;
1716  ++freeCount;
1717  location += blockSize;
1718  }
1719  }
1720  for (RecoverMap::iterator i = growingBlocks.begin();
1721  i != growingBlocks.end(); ++i)
1722  {
1723  Block* first = i->second;
1724  pBuffer->setPosition(first->_offset);
1725  BlockHeader blockHeader;
1726  // Read an old BlockHeader, which is only 24 bytes.
1727  // The bytes match current BlockHeader, and _nextInChain is 0.
1728  pBuffer->copyBytes((char*)&blockHeader, 24);
1729  // Old format total was storage bytes plus 64 bytes for block
1730  // and chain headers.
1731  blockHeader._totalRemaining -= 64;
1732  // New format counts only chain header size
1733  blockHeader._totalRemaining += getBlockChainHeaderSize();
1734  if (freeCount < growthBlocksNeeded)
1735  {
1736  // We have to resize, let's try to do it once.
1737  amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1738  amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1739  if (growthBlocks < minBlocksRequired)
1740  {
1741  amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1742  if (minBlocksRequired % defaultBlocks)
1743  minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1744  * defaultBlocks;
1745  growthBlocks = minBlocksRequired;
1746  }
1747  amps_uint32_t newBlocks = 0;
1748  Block* addedBlocks = _blockStore.resizeBuffer(
1749  pBuffer->getSize()
1750  + growthBlocks * blockSize,
1751  &newBlocks);
1752  if (!addedBlocks)
1753  {
1754  throw StoreException("Failed to grow store buffer during recovery");
1755  }
1756  _blockStore.addBlocks(addedBlocks);
1757  freeCount += newBlocks;
1758  growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1759  ? growthBlocksNeeded - freeCount : 0;
1760  if (endOfFreeList)
1761  {
1762  endOfFreeList->_nextInList = addedBlocks;
1763  }
1764  else
1765  {
1766  firstFree = addedBlocks;
1767  }
1768  endOfFreeList = &(addedBlocks[newBlocks - 1]);
1769  endOfFreeList->_nextInList = 0;
1770  }
1771  expandBlocks(blocks, first->_offset, first, blockHeader,
1772  &firstFree, &freeCount, pBuffer);
1773  // Add first Block to recovered for building the used list later
1774  recoveredBlocks[blockHeader._seq] = first;
1775  if (!firstFree)
1776  {
1777  endOfFreeList = 0;
1778  }
1779  }
1780  if (endOfFreeList)
1781  {
1782  endOfFreeList->_nextInList = 0;
1783  }
1784  _blockStore.setFreeList(firstFree, freeCount);
1785  if (maxIdx > _lastSequence)
1786  {
1787  _lastSequence = maxIdx;
1788  }
1789  if (minIdx > _maxDiscarded + 1)
1790  {
1791  _maxDiscarded = minIdx - 1;
1792  }
1793  if (_maxDiscarded > _metadataBlock->_sequence)
1794  {
1795  _metadataBlock->_sequence = _maxDiscarded;
1796  pBuffer->setPosition(_metadataBlock->_offset + 8);
1797  pBuffer->putUint64(_maxDiscarded);
1798  }
1799  Block* end = NULL;
1800  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1801  for (RecoverMap::iterator i = recoveredBlocks.begin();
1802  i != recoveredBlocks.end(); ++i)
1803  {
1804  if (_blockStore.front())
1805  {
1806  end->_nextInList = i->second; // -V522
1807  }
1808  else
1809  {
1810  _blockStore.setUsedList(i->second);
1811  }
1812  end = i->second;
1813  }
1814  if (end)
1815  {
1816  end->_nextInList = 0;
1817  }
1818  _blockStore.setEndOfUsedList(end);
1819  }
1820 
1821  // For recovering an old format store to current format when more Blocks
1822  // are needed with the new format.
1823  void expandBlocks(Block* blocks_, size_t location_, Block* first_,
1824  BlockHeader blockHeader_,
1825  Block** pFreeList_, amps_uint32_t* pFreeCount_,
1826  Buffer* pBuffer_)
1827  {
1828  // First create the chain, then we'll fill in reverse
1829  Block* current = first_;
1830  // Old format total was storage bytes plus 64 bytes for block
1831  // and chain headers.
1832  amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1833  blockHeader_._totalRemaining -= 64;
1834  // New format counts only chain header size
1835  blockHeader_._totalRemaining += getBlockChainHeaderSize();
1836  // Check how many Blocks needed and if we have enough free.
1837  amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1838  / getBlockDataSize()
1839  + (blockHeader_._totalRemaining
1840  % getBlockDataSize()
1841  ? 1 : 0);
1842  // Last data block size, remove bytes saved in first block
1843  // then mod by block size.
1844  const amps_uint32_t blockSize = getBlockSize();
1845  // Old total remaining had all header included
1846  size_t endBlockSize = oldTotalRemaining % blockSize;
1847  if (!endBlockSize)
1848  {
1849  endBlockSize = blockSize;
1850  }
1851  size_t endOfData = 0;
1852  // Hang on to CRC until first block is written
1853  amps_uint64_t crcVal = blockHeader_._crcVal;
1854  blockHeader_._crcVal = 0;
1855 
1856  std::stack<Block*> blocksUsed;
1857  for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1858  {
1859  blocksUsed.push(current);
1860  current->_sequence = blockHeader_._seq;
1861  if (i >= blockHeader_._blocksToWrite)
1862  {
1863  if (i == blockHeader_._blocksToWrite)
1864  {
1865  endOfData = current->_offset + endBlockSize;
1866  }
1867  current->_nextInChain = *pFreeList_;
1868  --(*pFreeCount_);
1869  *pFreeList_ = (*pFreeList_)->_nextInList;
1870  }
1871  else
1872  {
1873  current->_nextInChain = current->_nextInList;
1874  if (current->_nextInChain)
1875  {
1876  if (current->_offset + blockSize < pBuffer_->getSize())
1877  {
1878  current->_nextInChain->setOffset(current->_offset
1879  + blockSize);
1880  }
1881  else
1882  {
1883  current->_nextInChain->setOffset(blockSize);
1884  }
1885  }
1886  else
1887  {
1888  current->_nextInChain = blocks_[1].init(1, blockSize);
1889  }
1890  if (current->_nextInChain == *pFreeList_)
1891  {
1892  *pFreeList_ = (*pFreeList_)->_nextInList;
1893  --(*pFreeCount_);
1894  }
1895  else
1896  {
1897  for (Block* free = *pFreeList_; free;
1898  free = free->_nextInList)
1899  {
1900  if (free->_nextInList == current->_nextInChain)
1901  {
1902  free->_nextInList = free->_nextInList->_nextInList;
1903  --(*pFreeCount_);
1904  break;
1905  }
1906  }
1907  }
1908  }
1909  current->_nextInList = 0;
1910  current = current->_nextInChain;
1911  }
1912  // Make sure we write the correct number of blocks to write
1913  blockHeader_._blocksToWrite = blocksNeeded;
1914  // Finish setting up current
1915  current->_nextInList = 0;
1916  current->_sequence = blockHeader_._seq;
1917  // Now shift data, starting at the last Block
1918  // The total shift is for number of Blocks beyond the first
1919  // times Block header size, since previous format only wrote
1920  // the header in the first Block and had contiguous data,
1921  // with only wrap from end to beginning of buffer possible.
1922 
1923  // First time through, this is bytes in last block. After,
1924  // it will be block data size.
1925  size_t dataBytes = blockHeader_._totalRemaining % getBlockDataSize();
1926  while (current != first_)
1927  {
1928  size_t chunkBytesAvail = endOfData > location_
1929  ? endOfData - location_
1930  : endOfData - 2048;
1931  if (chunkBytesAvail < dataBytes)
1932  {
1933  // Original was wrapped from end to start of buffer
1934  // Need to copy what's left at start to end of Block,
1935  // then start working from the end.
1936  // This can ONLY occur during wrap because the first
1937  // Block doesn't get moved in this loop.
1938  pBuffer_->copyBytes(current->_offset
1939  + getBlockSize()
1940  - chunkBytesAvail,
1941  getBlockSize(),
1942  chunkBytesAvail);
1943  chunkBytesAvail = dataBytes - chunkBytesAvail;
1944  endOfData = pBuffer_->getSize() - chunkBytesAvail;
1945  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1946  endOfData,
1947  chunkBytesAvail);
1948  }
1949  else
1950  {
1951  endOfData -= dataBytes;
1952  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1953  endOfData,
1954  dataBytes);
1955  }
1956  // Set next in chain in block header
1957  blockHeader_._nextInChain = (current->_nextInChain
1958  ? current->_nextInChain->_offset
1959  : (amps_uint64_t)0);
1960  // Write the header for this block
1961  pBuffer_->setPosition(current->_offset);
1962  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1963  current = blocksUsed.top();
1964  blocksUsed.pop();
1965  dataBytes = getBlockDataSize();
1966  }
1967  // Move bytes for chain header expansion from 32 bytes
1968  pBuffer_->copyBytes(first_->_offset
1969  + getBlockHeaderSize()
1971  first_->_offset + getBlockHeaderSize() + 32,
1973  // Set the CRC to indicate first block and set nextInChain
1974  blockHeader_._crcVal = crcVal;
1975  blockHeader_._nextInChain = first_->_nextInChain->_offset;
1976  // Need to reset location to after OLD header:
1977  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1978  // amps_uint64_t seq, amps_uint64_t crc
1979  pBuffer_->setPosition(location_ + (sizeof(amps_uint32_t) * 2)
1980  + (sizeof(amps_uint64_t) * 2) );
1981  // Read old chain header which uses same order, but not
1982  // as many bytes (everything is 32bit):
1983  // operation, commandIdLen, correlationIdLen,
1984  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1985  BlockChainHeader chainHeader;
1986  pBuffer_->copyBytes((char*)&chainHeader,
1987  sizeof(amps_uint32_t) * 8);
1988  // Rewrite the header and chain header for first Block.
1989  pBuffer_->setPosition(location_);
1990  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1991  pBuffer_->putBytes((const char*)&chainHeader, sizeof(BlockChainHeader));
1992  }
1993 
1994  void chooseCRC(bool isFile)
1995  {
1996  if (!isFile)
1997  {
1998  _crc = noOpCRC;
1999  return;
2000  }
2001 
2002 #ifndef AMPS_SSE_42
2003  _crc = AMPS::CRC<0>::crcNoSSE;
2004 #else
2005  if (AMPS::CRC<0>::isSSE42Enabled())
2006  {
2007  _crc = AMPS::CRC<0>::crc;
2008  }
2009  else
2010  {
2011  _crc = AMPS::CRC<0>::crcNoSSE;
2012  }
2013 #endif
2014  }
2015 
2016  static amps_uint64_t noOpCRC(const char*, size_t, amps_uint64_t)
2017  {
2018  return 0;
2019  }
2020 
2021  protected:
2022  mutable BlockStore _blockStore;
2023  private:
2024  // Block used to hold metadata, currently:
2025  // the last persisted
2026  Block* _metadataBlock;
2027  // Highest sequence that has been discarded
2028  amps_uint64_t _maxDiscarded;
2029  // Track the assigned sequence numbers
2030 #if __cplusplus >= 201103L || _MSC_VER >= 1900
2031  std::atomic<amps_uint64_t> _lastSequence;
2032 #else
2033  volatile amps_uint64_t _lastSequence;
2034 #endif
2035  // Track how many messages are stored
2036  AMPS_ATOMIC_TYPE _stored;
2037 
2038  // Message used for doing replay
2039  Message _message;
2040 
2041  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
2042 
2043  // Function used to calculate the CRC if one is used
2044  CRCFunction _crc;
2045 
2046  };
2047 
2048 }
2049 
2050 #endif
2051 
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1097
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the CorrelationId header of self in the un...
Definition: Message.hpp:1366
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1290
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the CommandId header of self in the underl...
Definition: Message.hpp:1364
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1069
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:656
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:561
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Filter header of self in the underlyin...
Definition: Message.hpp:1368
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1378
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Bookmark header of self in the underly...
Definition: Message.hpp:1256
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:593
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:266
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:711
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:642
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1191
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the SowKeys header of self in the underlyi...
Definition: Message.hpp:1487
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:241
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Expiration header of self in the under...
Definition: Message.hpp:1367
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the Topic header of self in the underlying...
Definition: Message.hpp:1511
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:251
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, bool errorOnPublishGap_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:199
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Modifies the passed in arguments to reference the value of the SowKey header of self in the underlyin...
Definition: Message.hpp:1486
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1222
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:699
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1167
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:498
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.