AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.2
BlockPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKPUBLISHSTORE_H_
27 #define _BLOCKPUBLISHSTORE_H_
28 #include <amps/ampsplusplus.hpp>
29 #include <amps/BlockStore.hpp>
30 #include <amps/Buffer.hpp>
31 #include <sstream>
32 #include <stack>
33 #include <string>
34 #include <map>
35 #include <amps/ampscrc.hpp>
36 #if __cplusplus >= 201103L || _MSC_VER >= 1900
37  #include <atomic>
38 #endif
39 
40 #ifdef _WIN32
41  #include <intrin.h>
42  #include <sys/timeb.h>
43 #else
44  #include <sys/time.h>
45  #if AMPS_SSE_42
46  #include <cpuid.h>
47  #endif
48 #endif
49 #include <iostream>
50 
55 
56 namespace AMPS
57 {
64  {
65  public:
66  typedef BlockStore::Block Block;
67  typedef Lock<BlockStore> BufferLock;
68  typedef Unlock<BlockStore> BufferUnlock;
69 
70  typedef enum
71  {
72  SOW_DELETE_DATA = 0x01,
73  SOW_DELETE_FILTER = 0x02,
74  SOW_DELETE_KEYS = 0x04,
75  SOW_DELETE_BOOKMARK = 0x08,
76  SOW_DELETE_BOOKMARK_CANCEL = 0x10,
77  SOW_DELETE_UNKNOWN = 0x80
78  } SowDeleteType;
79 
82  enum Constants : amps_uint32_t
83  {
84  DEFAULT_BLOCK_HEADER_SIZE = 32,
85  DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
86  DEFAULT_BLOCKS_PER_REALLOC = 1000,
87  DEFAULT_BLOCK_SIZE = 2048
88  };
89 
90  /**********************************************************************
91  * Storage format
92  *************************************************************************
93  * Field Description | Type | # Bytes
94  *************************************************************************
95  * HEADER as detailed below | | 32 TOTAL
96  * | |
97  * Total number of blocks used by the record | uint32_t | 4
98  * Total length of the saved record | uint32_t | 4
99  * HA Message sequence | uint64_t | 8
100  * CRC Value - only set in first Block | uint64_t | 8
101  * next in chain offset | uint64_t | 8
102  *************************************************************************
103  * CHAIN HEADER as detailed below | | 64 TOTAL
104  * | |
105  * operation | uint32_t | 4
106  * command id length | uint32_t | 4
107  * correltation id length | uint32_t | 4
108  * expiration length | uint32_t | 4
109  * sow key length | uint32_t | 4
110  * topic length | uint32_t | 4
111  * sow delete flag | int32_t | 4
112  * ack types | uint32_t | 4
113  * unused [8] | uint32_t | 4*8 = 32
114  *************************************************************************
115  * DATA SECTION - can be spread across multiple blocks
116  *
117  * command id | char[]
118  * correlation id | char[]
119  * expiration | char[]
120  * sow key | char[]
121  * topic | char[]
122  * data | char[]
123  *************************************************************************/
124 
125  struct BlockHeader
126  {
127  amps_uint32_t _blocksToWrite;
128  amps_uint32_t _totalRemaining;
129  amps_uint64_t _seq;
130  amps_uint64_t _crcVal;
131  amps_uint64_t _nextInChain;
132  };
133 
134  struct BlockChainHeader
135  {
136  amps_uint32_t _operation;
137  amps_uint32_t _commandIdLen;
138  amps_uint32_t _correlationIdLen;
139  amps_uint32_t _expirationLen;
140  amps_uint32_t _sowKeyLen;
141  amps_uint32_t _topicLen;
142  amps_int32_t _flag;
143  amps_uint32_t _ackTypes;
144  amps_uint32_t _unused[8];
145  BlockChainHeader() // -V730
146  : _operation(0), _commandIdLen(0), _correlationIdLen(0)
147  , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
148  , _ackTypes(0)
149  { ; }
150  };
151 
156  static inline amps_uint32_t getBlockHeaderSize()
157  {
158  return DEFAULT_BLOCK_HEADER_SIZE;
159  }
160 
166  static inline amps_uint32_t getBlockChainHeaderSize()
167  {
168  return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
169  }
170 
174  inline amps_uint32_t getBlockSize()
175  {
176  return _blockStore.getBlockSize();
177  }
178 
182  inline amps_uint32_t getBlockDataSize()
183  {
184  return _blockStore.getBlockSize() - getBlockHeaderSize();
185  }
186 
203  amps_uint32_t blocksPerRealloc_ = 1000,
204  bool isFile_ = false,
205  bool errorOnPublishGap_ = false,
206  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
207  : StoreImpl(errorOnPublishGap_)
208  , _blockStore(buffer_, blocksPerRealloc_,
209  DEFAULT_BLOCK_HEADER_SIZE,
210  (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
211  ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
212  , _metadataBlock(0)
213  , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
214  , _stored(0)
215  {
216  _blockStore.setResizeHandler(&BlockPublishStore::canResize, (void*)this);
217  chooseCRC(isFile_);
218  if (!isFile_)
219  {
220  // This gets set in recover in file-based stores
221  BufferLock bufferGuard(_blockStore);
222  _blockStore.init();
223  _metadataBlock = _blockStore.get(1);
224  // Remove metadata block from used list
225  _blockStore.setUsedList(0);
226  _blockStore.setEndOfUsedList(0);
227  // Metadata block holds block size, block header size,
228  // last discarded sequence, client version
229  _metadataBlock->_sequence = (amps_uint64_t)0;
230  Buffer* pBuffer = _blockStore.getBuffer();
231  pBuffer->setPosition(_metadataBlock->_offset);
232  pBuffer->putUint32((amps_uint32_t)getBlockSize());
233  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
234  pBuffer->putUint64((amps_uint64_t)0);
235  // Metadata blocks puts client version in CRC position
236  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
237  // No next in chain
238  pBuffer->putUint64((amps_uint64_t)0);
239  }
240  }
241 
245  {
246  }
247 
254  virtual amps_uint64_t store(const Message& message_)
255  {
256  return store(message_, true);
257  }
258 
269  amps_uint64_t store(const Message& message_, bool assignSequence_)
270  {
271  const char* commandId, *correlationId, *expiration, *sowKey,
272  *topic, *data;
273  size_t dataLen = 0;
274  BlockHeader blockHeader;
275  BlockChainHeader chainHeader;
276  message_.getRawCommandId(&commandId, &dataLen);
277  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
278  message_.getRawCorrelationId(&correlationId, &dataLen);
279  chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
280  message_.getRawExpiration(&expiration, &dataLen);
281  chainHeader._expirationLen = (amps_uint32_t)dataLen;
282  message_.getRawSowKey(&sowKey, &dataLen);
283  chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
284  message_.getRawTopic(&topic, &dataLen);
285  chainHeader._topicLen = (amps_uint32_t)dataLen;
286  message_.getRawData(&data, &dataLen);
287  chainHeader._flag = -1;
288  Message::Command::Type operation = message_.getCommandEnum();
289  chainHeader._operation = (amps_uint32_t)operation;
290  if (operation == Message::Command::SOWDelete)
291  {
292  if (dataLen > 0)
293  {
294  chainHeader._flag = SOW_DELETE_DATA;
295  }
296  else
297  {
298  message_.getRawFilter(&data, &dataLen);
299  if (dataLen > 0)
300  {
301  chainHeader._flag = SOW_DELETE_FILTER;
302  }
303  else
304  {
305  message_.getRawSowKeys(&data, &dataLen);
306  if (dataLen > 0)
307  {
308  chainHeader._flag = SOW_DELETE_KEYS;
309  }
310  else
311  {
312  message_.getRawBookmark(&data, &dataLen);
313  chainHeader._flag = SOW_DELETE_BOOKMARK;
314  // Check options for cancel
315  Message::Field options = message_.getOptions();
316  size_t remaining = options.len();
317  const void* next = NULL;
318  const void* start = (const void*)(options.data());
319  // Not necessarily null-terminated so no strstr
320  while (remaining >= 6 &&
321  (next = memchr(start, (int)'c', remaining)) != NULL)
322  {
323  remaining = (size_t)next - (size_t)start;
324  if (remaining >= 6 && strncmp((const char*)start,
325  "cancel", 6) == 0)
326  {
327  chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
328  break;
329  }
330  }
331  }
332  }
333  }
334  }
335  blockHeader._totalRemaining = (
336  (chainHeader._operation == Message::Command::Unknown)
337  ? 0
339  + chainHeader._commandIdLen
340  + chainHeader._correlationIdLen
341  + chainHeader._expirationLen
342  + chainHeader._sowKeyLen
343  + chainHeader._topicLen
344  + (amps_uint32_t)dataLen));
345  size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
346  (blockHeader._totalRemaining % getBlockDataSize()));
347  blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
348  ? 1
349  : ((amps_uint32_t)(blockHeader._totalRemaining
350  / getBlockDataSize())
351  + ((lastBlockLength > 0) ? 1 : 0)));
352  blockHeader._crcVal = (amps_uint64_t)0ULL;
353  blockHeader._crcVal = _crc(commandId,
354  chainHeader._commandIdLen,
355  blockHeader._crcVal);
356  blockHeader._crcVal = _crc(correlationId,
357  chainHeader._correlationIdLen,
358  blockHeader._crcVal);
359  blockHeader._crcVal = _crc(expiration,
360  chainHeader._expirationLen,
361  blockHeader._crcVal);
362  blockHeader._crcVal = _crc(sowKey,
363  chainHeader._sowKeyLen,
364  blockHeader._crcVal);
365  blockHeader._crcVal = _crc(topic,
366  chainHeader._topicLen,
367  blockHeader._crcVal);
368  blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
369 
370  // Reserve slots for storage, growing if necessary
371  BufferLock bufferGuard(_blockStore);
372  Block* first = _blockStore.get(blockHeader._blocksToWrite);
373  if (assignSequence_)
374  {
375  if (_lastSequence <= 2)
376  {
377  _getLastPersisted();
378  }
379  blockHeader._seq = ++_lastSequence;
380  }
381  else
382  {
383  blockHeader._seq = amps_message_get_field_uint64(
384  message_.getMessage(),
385  AMPS_Sequence);
386  if (!_maxDiscarded)
387  {
388  _maxDiscarded = blockHeader._seq - 1;
389  }
390  if (blockHeader._seq >= _lastSequence)
391  {
392  _lastSequence = blockHeader._seq;
393  }
394  }
395 
396  try
397  {
398  size_t topicWritten = 0UL;
399  size_t dataWritten = 0UL;
400  size_t commandWritten = 0UL;
401  size_t correlationWritten = 0UL;
402  size_t expirationWritten = 0UL;
403  size_t sowKeyWritten = 0UL;
404  Buffer* pBuffer = _blockStore.getBuffer();
405  for (Block* next = first; next; next = next->_nextInChain)
406  {
407  next->_sequence = blockHeader._seq;
408  if (next->_nextInChain)
409  {
410  blockHeader._nextInChain = next->_nextInChain->_offset;
411  }
412  else
413  {
414  blockHeader._nextInChain = (amps_uint64_t)0;
415  }
416  // Set buffer to start of Block and write the header
417  pBuffer->setPosition(next->_offset);
418  pBuffer->putBytes((const char*)&blockHeader, sizeof(BlockHeader));
419  // Clear crcVal, as it's only written in the first Block
420  blockHeader._crcVal = (amps_uint64_t)0;
421  size_t bytesRemaining = getBlockDataSize();
422  if (next == first)
423  {
424  // Write Block chain header
425  chainHeader._ackTypes = (amps_uint32_t)message_.getAckTypeEnum();
426  pBuffer->putBytes((const char*)&chainHeader,
427  sizeof(BlockChainHeader));
428  pBuffer->setPosition(next->_offset + getBlockHeaderSize() + getBlockChainHeaderSize());
429  bytesRemaining -= getBlockChainHeaderSize();
430  }
431  else
432  {
433  pBuffer->setPosition(next->_offset + getBlockHeaderSize());
434  }
435 
436  if (commandWritten < chainHeader._commandIdLen)
437  {
438  size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
439  pBuffer->putBytes(commandId + commandWritten,
440  commandWrite);
441  bytesRemaining -= commandWrite;
442  commandWritten += commandWrite;
443  }
444  if (correlationWritten < chainHeader._correlationIdLen)
445  {
446  size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
447  pBuffer->putBytes(correlationId + correlationWritten,
448  correlationWrite);
449  bytesRemaining -= correlationWrite;
450  correlationWritten += correlationWrite;
451  }
452  if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
453  {
454  size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
455  pBuffer->putBytes(expiration + expirationWritten, expWrite);
456  bytesRemaining -= expWrite;
457  expirationWritten += expWrite;
458  }
459  if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
460  {
461  size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
462  pBuffer->putBytes(sowKey + sowKeyWritten, sowKeyWrite);
463  bytesRemaining -= sowKeyWrite;
464  sowKeyWritten += sowKeyWrite;
465  }
466  if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
467  {
468  size_t topicWrite = (chainHeader._topicLen - topicWritten
469  < bytesRemaining)
470  ? chainHeader._topicLen - topicWritten
471  : bytesRemaining;
472  pBuffer->putBytes(topic + topicWritten, topicWrite);
473  bytesRemaining -= topicWrite;
474  topicWritten += topicWrite;
475  }
476  if (bytesRemaining > 0 && dataWritten < dataLen)
477  {
478  size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
479  dataLen - dataWritten : bytesRemaining;
480  pBuffer->putBytes(data + dataWritten, dataWrite);
481  bytesRemaining -= dataWrite;
482  dataWritten += dataWrite;
483  }
484  }
485  }
486  catch (const AMPSException&)
487  {
488  _blockStore.put(first);
489  throw;
490  }
491  AMPS_FETCH_ADD(&_stored, 1);
492  return blockHeader._seq;
493  }
494 
501  virtual void discardUpTo(amps_uint64_t index_)
502  {
503  // Get the lock
504  BufferLock bufferGuard(_blockStore);
505  Buffer* pBuffer = _blockStore.getBuffer();
506  // Don't use _getLastPersisted() here, don't want to set it
507  // to something other than index_ if it's not already set
508  amps_uint64_t lastPersisted = _metadataBlock->_sequence;
509  // Make sure it's a real index and we have messages to discard
510  if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
511  {
512  // During logon it's very possible we don't have a last persisted
513  // but that the Client is calling discardUpTo with the ack value.
514  if (lastPersisted < index_)
515  {
516  pBuffer->setPosition(_metadataBlock->_offset + 8);
517  pBuffer->putUint64(index_);
518  _metadataBlock->_sequence = index_;
519  if (_maxDiscarded < index_)
520  {
521  _maxDiscarded = index_;
522  }
523  if (_lastSequence <= index_)
524  {
525  _lastSequence = index_;
526  }
527  }
528  else if (!index_) // Fresh logon, no sequence history
529  {
530  _getLastPersisted();
531  }
532  else if (getErrorOnPublishGap() && index_ < lastPersisted) //Message gap
533  {
534  std::ostringstream os;
535  os << "Server last saw " << index_ << " from Client but Store "
536  << "has already discarded up to " << lastPersisted
537  << " which would leave a gap of unreceived messages.";
538  throw PublishStoreGapException(os.str());
539  }
540  _blockStore.signalAll();
541  return;
542  }
543 
544  _maxDiscarded = index_;
545  AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
546  _blockStore.signalAll();
547  if (lastPersisted >= index_)
548  {
549  return;
550  }
551  pBuffer->setPosition(_metadataBlock->_offset + 8);
552  pBuffer->putUint64(index_);
553  _metadataBlock->_sequence = index_;
554  if (_lastSequence < index_)
555  {
556  _lastSequence = index_;
557  }
558  }
559 
564  void replay(StoreReplayer& replayer_)
565  {
566  // Get the lock
567  BufferLock bufferGuard(_blockStore);
568  // If we don't have anything yet, return
569  if (!_blockStore.front())
570  {
571  return;
572  }
573  Block* next = _blockStore.front();
574  try
575  {
576  for (Block* block = _blockStore.front(); block; block = next)
577  {
578  // Replay the message
579  replayOnto(block, replayer_);
580  next = block->_nextInList;
581  }
582  }
583  catch (const StoreException&)
584  {
585  _blockStore.putAll(next);
586  throw;
587  }
588  }
589 
596  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
597  {
598  BufferLock bufferGuard(_blockStore);
599  // If we don't have anything yet, return
600  if (!_blockStore.front())
601  {
602  return false;
603  }
604  // Get the end point
605  amps_uint64_t lastIdx = _blockStore.back()->_sequence;
606  // Get the start point
607  amps_uint64_t leastIdx = _blockStore.front()->_sequence;
608  if (index_ >= leastIdx && index_ <= lastIdx)
609  {
610  Block* block = _blockStore.front();
611  while (block && block->_sequence != index_)
612  {
613  block = block->_nextInList;
614  }
615  if (!block)
616  {
617  return false;
618  }
619  // If total bytes is 0, it's a queue ack and gets skipped.
620  Buffer* pBuffer = _blockStore.getBuffer();
621  pBuffer->setPosition(block->_offset +
622  sizeof(amps_uint32_t));
623  if (pBuffer->getUint32() == 0)
624  {
625  return false;
626  }
627  replayOnto(block, replayer_);
628  return true;
629  }
630  else // Get Store and Client back in sync
631  {
632  _message.reset();
633  leastIdx -= 1;
634  _message.setSequence(leastIdx);
635  replayer_.execute(_message);
636  return false;
637  }
638  }
639 
645  size_t unpersistedCount() const
646  {
647  size_t count = (size_t)_stored;
648  return count;
649  }
650 
659  virtual void flush(long timeout_)
660  {
661  BufferLock bufferGuard(_blockStore);
662  amps_uint64_t waitFor = _getHighestUnpersisted();
663  // Check that we aren't already empty
664  if (waitFor == getUnsetSequence())
665  {
666  return;
667  }
668  if (timeout_ > 0)
669  {
670  bool timedOut = false;
671  AMPS_START_TIMER(timeout_);
672  // While timeout hasn't expired and we haven't had everything acked
673  while (!timedOut && _stored != 0
674  && waitFor >= _getLowestUnpersisted())
675  {
676  if (!_blockStore.wait(timeout_))
677  {
678  // May have woken up early, check real time
679  AMPS_RESET_TIMER(timedOut, timeout_);
680  }
681  }
682  // If we timed out and still haven't caught up with the acks
683  if (timedOut && _stored != 0
684  && waitFor >= _getLowestUnpersisted())
685  {
686  throw TimedOutException("Timed out waiting to flush publish store.");
687  }
688  }
689  else
690  {
691  while (_stored != 0 && waitFor >= _getLowestUnpersisted())
692  {
693  // Still wake up every 1s so python can interrupt
694  _blockStore.wait(1000);
695  // Don't hold lock if possibly grabbing GIL
696  BufferUnlock unlck(_blockStore);
697  amps_invoke_waiting_function();
698  }
699  }
700  }
701 
702  amps_uint64_t getLowestUnpersisted() const
703  {
704  BufferLock bufferGuard(_blockStore);
705  return _getLowestUnpersisted();
706  }
707 
708  amps_uint64_t getHighestUnpersisted() const
709  {
710  BufferLock bufferGuard(_blockStore);
711  return _getHighestUnpersisted();
712  }
713 
714  amps_uint64_t getLastPersisted(void)
715  {
716  BufferLock bufferGuard(_blockStore);
717  return _getLastPersisted();
718  }
719 
720  protected:
721  static bool canResize(size_t requestedSize_, void* vpThis_)
722  {
723  return ((BlockPublishStore*)vpThis_)->callResizeHandler(requestedSize_);
724  }
725 
726  amps_uint64_t _getLowestUnpersisted() const
727  {
728  // Assume the lock is held
729  // If we don't have anything, return MAX
730  if (!_blockStore.front())
731  {
732  return getUnsetSequence();
733  }
734  return _blockStore.front()->_sequence;
735  }
736 
737  amps_uint64_t _getHighestUnpersisted() const
738  {
739  // Assume the lock is held
740  // If we don't have anything, return MAX
741  if (!_blockStore.back())
742  {
743  return getUnsetSequence();
744  }
745  return _blockStore.back()->_sequence;
746  }
747 
748  amps_uint64_t _getLastPersisted(void)
749  {
750  // Assume the lock is held
751  amps_uint64_t lastPersisted = (amps_uint64_t)0;
752  Buffer* pBuffer = _blockStore.getBuffer();
753  pBuffer->setPosition(_metadataBlock->_offset + 8);
754  lastPersisted = pBuffer->getUint64();
755  if (lastPersisted)
756  {
757  if (_lastSequence < lastPersisted)
758  {
759  _lastSequence = lastPersisted;
760  }
761  return lastPersisted;
762  }
763  if (_maxDiscarded)
764  {
765  lastPersisted = _maxDiscarded;
766  }
767  else
768  {
769 #ifdef _WIN32
770  struct _timeb t;
771  _ftime_s(&t);
772  lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
773 #else // not _WIN32
774  struct timeval tv;
775  gettimeofday(&tv, NULL);
776  lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
777  * (amps_uint64_t)1000000;
778 #endif
779  }
780  if (_lastSequence > 2)
781  {
782  amps_uint64_t low = _getLowestUnpersisted();
783  amps_uint64_t high = _getHighestUnpersisted();
784  if (low != getUnsetSequence())
785  {
786  lastPersisted = low - 1;
787  }
788  if (high != getUnsetSequence() && _lastSequence <= high)
789  {
790  _lastSequence = high;
791  }
792  if (_lastSequence < lastPersisted)
793  {
794  lastPersisted = _lastSequence - 1;
795  }
796  }
797  else
798  {
799  _lastSequence = lastPersisted;
800  }
801  pBuffer->setPosition(_metadataBlock->_offset
802  + sizeof(amps_uint32_t) // blocks used
803  + sizeof(amps_uint32_t)); // record length
804  pBuffer->putUint64(lastPersisted);
805  _metadataBlock->_sequence = lastPersisted;
806  return lastPersisted;
807  }
808 
809  void recover(void)
810  {
811  BufferLock bufferGuard(_blockStore);
812  // Make sure the size isn't 0 and is a multiple of block size
813  Buffer* pBuffer = _blockStore.getBuffer();
814  size_t size = pBuffer->getSize();
815  amps_uint32_t blockSize = getBlockSize();
816  if (size == 0)
817  {
818  _blockStore.init();
819  _metadataBlock = _blockStore.get(1);
820  _metadataBlock->_sequence = (amps_uint64_t)0;
821  pBuffer->setPosition(_metadataBlock->_offset);
822  // Metadata block holds block size, block header size,
823  // last discarded sequence, client version
824  pBuffer->putUint32((amps_uint32_t)blockSize);
825  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
826  pBuffer->putUint64((amps_uint64_t)0);
827  // Metadata blocks puts client version in CRC position
828  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
829  // No next in chain
830  pBuffer->putUint64((amps_uint64_t)0);
831  return;
832  }
833  size_t numBlocks = size / blockSize;
834  if (size % blockSize > 0)
835  {
836  // We shouldn't ever be in here, since it requires starting with a
837  // file that is not an even multiple of block size and we always
838  // fix the size.
839  numBlocks = size / blockSize;
840  ++numBlocks;
841  amps_uint32_t blockCount = 0;
842  // We allocate all the Blocks at once below so delete allocated Block[]
843  delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
844  // Resize can fail if resizeHandler is set and refuses the request
845  // Since this is recovery, we need to simply fail in that case
846  if (size > pBuffer->getSize() || numBlocks != (size_t)blockCount)
847  {
848  throw StoreException("Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
849  }
850  size = pBuffer->getSize();
851  }
852 
853  amps_uint64_t maxIdx = 0;
854  amps_uint64_t minIdx = 0;
855  size_t location = 0;
856  BlockHeader blockHeader;
857  // The blocks we create here all get their offset set in below loop
858  Block* blocks = new Block[numBlocks];
859  blocks[numBlocks - 1]._nextInList = 0;
860  size_t blockNum = 0;
861  _blockStore.addBlocks(blocks);
862  _metadataBlock = blocks; // The first Block is metadata
863  _metadataBlock->_nextInList = 0;
864  pBuffer->setPosition(0);
865  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
866  /* Metadata Block header fields
867  * amps_uint32_t _blocksToWrite = BlockSize
868  * amps_uint32_t _totalRemaining = BlockHeaderSize
869  * amps_uint64_t _seq = last persisted sequence number
870  * amps_uint64_t _crcVal = unused
871  * amps_uint64_t _nextInChain = unused
872  */
873  if (blockHeader._blocksToWrite == 1) // Old store format?
874  {
875  /* Old format metadata block header fields
876  * amps_uint32_t _blocksToWrite = 1
877  * amps_uint32_t _totalRemaining = client version
878  * amps_uint64_t _seq = last persisted sequence number
879  * amps_uint64_t _crcVal = unused
880  * amps_uint64_t _nextInChain = unused
881  */
882  // Readable old format starts with version 5.0.0.0
883  if (blockHeader._totalRemaining >= 5000000)
884  {
885  // All recovery needs to be based on old format
886  // so go do that instead.
887  recoverOldFormat(blocks);
888  return;
889  }
890  // Unreadable format, fail
891  throw StoreException("Unrecognized format for Store. Can't recover.");
892  }
893  if (blockHeader._blocksToWrite == 0)
894  {
895  pBuffer->setPosition(0);
896  pBuffer->putUint32((amps_uint32_t)blockSize);
897  }
898  else
899  {
900  blockSize = blockHeader._blocksToWrite;
901  _blockStore.setBlockSize(blockSize);
902  }
903  if (blockHeader._totalRemaining == 0)
904  {
905  pBuffer->setPosition(sizeof(amps_uint32_t));
906  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
907  }
908  else
909  {
910  _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
911  }
912  _metadataBlock->_sequence = blockHeader._seq;
913  if (_metadataBlock->_sequence
914  && _metadataBlock->_sequence < (amps_uint64_t)1000000)
915  {
916  pBuffer->setPosition(_metadataBlock->_offset
917  + sizeof(amps_uint32_t) // BlockSize
918  + sizeof(amps_uint32_t)); // BlockHeaderSize
919  pBuffer->putUint64((amps_uint64_t)0);
920  _metadataBlock->_sequence = 0;
921  }
922  else
923  {
924  // Set _maxDiscarded and _lastSequence
925  _maxDiscarded = _metadataBlock->_sequence;
926  _lastSequence = _maxDiscarded;
927  }
928  // This would be where to check the client version string
929  // No checks currently
930  location += blockSize;
931  amps_uint32_t freeCount = 0;
932  Block* firstFree = NULL;
933  Block* endOfFreeList = NULL;
934  // Used to create used list in order after recovery
935  typedef std::map<amps_uint64_t, Block*> RecoverMap;
936  RecoverMap recoveredBlocks;
937  while (location < size)
938  {
939  // Get index and check if non-zero
940  pBuffer->setPosition(location);
941  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
942  if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
943  (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
944  {
945  // Block is part of a chain
946  location += blockSize;
947  continue;
948  }
949  Block* block = blocks[++blockNum].setOffset(location);
950  bool recovered = false;
951  if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
952  {
953  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
954  block->_sequence = blockHeader._seq;
955  // Track min and max
956  if (maxIdx < blockHeader._seq)
957  {
958  maxIdx = blockHeader._seq;
959  }
960  if (minIdx > blockHeader._seq)
961  {
962  minIdx = blockHeader._seq;
963  }
964  // Save it in recovered blocks
965  recoveredBlocks[blockHeader._seq] = block;
966  // Set up the chain
967  while (blockHeader._nextInChain != (amps_uint64_t)0)
968  {
969  Block* chain = blocks[++blockNum]
970  .setOffset((size_t)blockHeader._nextInChain);
971  chain->_nextInList = 0;
972  pBuffer->setPosition((size_t)blockHeader._nextInChain
973  + sizeof(amps_uint32_t) // blocks used
974  + sizeof(amps_uint32_t) // record length
975  + sizeof(amps_uint64_t) // seq
976  + sizeof(amps_uint64_t)); // crc
977  blockHeader._nextInChain = pBuffer->getUint64();
978  block->_nextInChain = chain;
979  block = chain;
980  block->_sequence = blockHeader._seq;
981  }
982  recovered = true;
983  }
984  if (!recovered)
985  {
986  // Put this Block on the free list
987  if (endOfFreeList)
988  {
989  endOfFreeList->_nextInList = block;
990  }
991  else
992  {
993  firstFree = block;
994  }
995  endOfFreeList = block;
996  ++freeCount;
997  }
998  location += blockSize;
999  }
1000  if (endOfFreeList)
1001  {
1002  endOfFreeList->_nextInList = 0;
1003  }
1004  _blockStore.setFreeList(firstFree, freeCount);
1005  if (maxIdx > _lastSequence)
1006  {
1007  _lastSequence = maxIdx;
1008  }
1009  if (minIdx > _maxDiscarded + 1)
1010  {
1011  _maxDiscarded = minIdx - 1;
1012  }
1013  if (_maxDiscarded > _metadataBlock->_sequence)
1014  {
1015  _metadataBlock->_sequence = _maxDiscarded;
1016  pBuffer->setPosition(_metadataBlock->_offset + 8);
1017  pBuffer->putUint64(_maxDiscarded);
1018  }
1019  Block* end = NULL;
1020  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1021  for (RecoverMap::iterator i = recoveredBlocks.begin();
1022  i != recoveredBlocks.end(); ++i)
1023  {
1024  if (end)
1025  {
1026  end->_nextInList = i->second;
1027  }
1028  else
1029  {
1030  _blockStore.setUsedList(i->second);
1031  }
1032  end = i->second;
1033  }
1034  if (end)
1035  {
1036  end->_nextInList = 0;
1037  }
1038  _blockStore.setEndOfUsedList(end);
1039  }
1040 
1041  private:
1042  // Lock should already be held
1043  void replayOnto(Block* block_, StoreReplayer& replayer_)
1044  {
1045  // Read the header
1046  size_t start = block_->_offset;
1047  size_t position = start;
1048  Buffer* pBuffer = _blockStore.getBuffer();
1049  pBuffer->setPosition(position);
1050  BlockHeader blockHeader;
1051  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1052  if (blockHeader._totalRemaining == 0)
1053  {
1054  // Queue acking sow_delete
1055  return;
1056  }
1057  position += getBlockHeaderSize();
1058  BlockChainHeader blockChainHeader;
1059  pBuffer->copyBytes((char*)&blockChainHeader, sizeof(blockChainHeader));
1060  if (blockChainHeader._operation == Message::Command::Unknown)
1061  {
1062  // Queue acking sow_delete
1063  return;
1064  }
1065  blockChainHeader._ackTypes |= Message::AckType::Persisted;
1066  position += getBlockChainHeaderSize();
1067  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
1068  pBuffer->setPosition(position);
1069 
1070  if (blockHeader._totalRemaining
1071  < blockChainHeader._commandIdLen
1072  + blockChainHeader._correlationIdLen
1073  + blockChainHeader._expirationLen
1074  + blockChainHeader._sowKeyLen
1075  + blockChainHeader._topicLen)
1076  {
1077  std::ostringstream os;
1078  os << "Corrupted message found with invalid lengths. "
1079  << "Attempting to replay " << block_->_sequence
1080  << ". Block sequence " << blockHeader._seq
1081  << ", topic length " << blockChainHeader._topicLen
1082  << ", data length " << blockHeader._totalRemaining
1083  << ", command ID length " << blockChainHeader._commandIdLen
1084  << ", correlation ID length " << blockChainHeader._correlationIdLen
1085  << ", expiration length " << blockChainHeader._expirationLen
1086  << ", sow key length " << blockChainHeader._sowKeyLen
1087  << ", start " << start
1088  << ", position " << position
1089  << ", buffer size " << pBuffer->getSize();
1090  throw StoreException(os.str());
1091  }
1092 
1093  // Start prepping the message
1094  _message.reset();
1095  _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1096  _message.setAckTypeEnum((unsigned)blockChainHeader._ackTypes
1097  | Message::AckType::Persisted);
1098  _message.setSequence(blockHeader._seq);
1099  // Read the data and calculate the CRC
1100  Block* current = block_;
1101  size_t blockBytesRemaining = getBlockDataSize() - getBlockChainHeaderSize();
1102  amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1103  // Use tmpBuffers for any fields split across Block boundaries
1104  char** tmpBuffers = (blockHeader._blocksToWrite > 1) ? new char* [blockHeader._blocksToWrite - 1] : 0;
1105  size_t blockNum = 0;
1106  if (blockChainHeader._commandIdLen > 0)
1107  {
1108  if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1109  {
1110  _message.assignCommandId(pBuffer->getBytes(blockChainHeader._commandIdLen)._data,
1111  blockChainHeader._commandIdLen);
1112  blockBytesRemaining -= blockChainHeader._commandIdLen;
1113  }
1114  else
1115  {
1116  tmpBuffers[blockNum] = new char[blockChainHeader._commandIdLen]; // -V522
1117  size_t totalLeft = blockChainHeader._commandIdLen;
1118  size_t totalRead = 0;
1119  size_t readLen = 0;
1120  while (totalLeft)
1121  {
1122  readLen = blockBytesRemaining < totalLeft ?
1123  blockBytesRemaining : totalLeft;
1124  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1125  if (!(totalLeft -= readLen))
1126  {
1127  break;
1128  }
1129  if (!(current = current->_nextInChain))
1130  {
1131  break;
1132  }
1133  totalRead += readLen;
1134  blockBytesRemaining = getBlockDataSize();
1135  position = current->_offset + getBlockHeaderSize();
1136  pBuffer->setPosition(position);
1137  }
1138  blockBytesRemaining -= readLen;
1139  _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1140  }
1141  blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1142  crcCalc = _crc(_message.getCommandId().data(),
1143  blockChainHeader._commandIdLen, crcCalc);
1144  }
1145  if (blockChainHeader._correlationIdLen > 0)
1146  {
1147  if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1148  {
1149  _message.assignCorrelationId(
1150  pBuffer->getBytes(blockChainHeader._correlationIdLen)._data,
1151  blockChainHeader._correlationIdLen);
1152  blockBytesRemaining -= blockChainHeader._correlationIdLen;
1153  }
1154  else
1155  {
1156  tmpBuffers[blockNum] = new char[blockChainHeader._correlationIdLen]; // -V522
1157  size_t totalLeft = blockChainHeader._correlationIdLen;
1158  size_t totalRead = 0;
1159  size_t readLen = 0;
1160  while (totalLeft)
1161  {
1162  readLen = blockBytesRemaining < totalLeft ?
1163  blockBytesRemaining : totalLeft;
1164  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1165  if (!(totalLeft -= readLen))
1166  {
1167  break;
1168  }
1169  if (!(current = current->_nextInChain))
1170  {
1171  break; // -V522
1172  }
1173  totalRead += readLen;
1174  blockBytesRemaining = getBlockDataSize();
1175  position = current->_offset + getBlockHeaderSize();
1176  pBuffer->setPosition(position);
1177  }
1178  blockBytesRemaining -= readLen;
1179  _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1180  }
1181  blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1182  crcCalc = _crc(_message.getCorrelationId().data(),
1183  blockChainHeader._correlationIdLen, crcCalc);
1184  }
1185  if (blockChainHeader._expirationLen > 0)
1186  {
1187  if (blockChainHeader._expirationLen <= blockBytesRemaining)
1188  {
1189  _message.assignExpiration(
1190  pBuffer->getBytes(blockChainHeader._expirationLen)._data,
1191  blockChainHeader._expirationLen);
1192  blockBytesRemaining -= blockChainHeader._expirationLen;
1193  }
1194  else
1195  {
1196  tmpBuffers[blockNum] = new char[blockChainHeader._expirationLen]; // -V522
1197  size_t totalLeft = blockChainHeader._expirationLen;
1198  size_t totalRead = 0;
1199  size_t readLen = 0;
1200  while (totalLeft)
1201  {
1202  readLen = blockBytesRemaining < totalLeft ?
1203  blockBytesRemaining : totalLeft;
1204  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1205  if (!(totalLeft -= readLen))
1206  {
1207  break;
1208  }
1209  if (!(current = current->_nextInChain))
1210  {
1211  break;
1212  }
1213  totalRead += readLen;
1214  blockBytesRemaining = getBlockDataSize();
1215  position = current->_offset + getBlockHeaderSize();
1216  pBuffer->setPosition(position);
1217  }
1218  blockBytesRemaining -= readLen;
1219  _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1220  }
1221  blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1222  crcCalc = _crc(_message.getExpiration().data(),
1223  blockChainHeader._expirationLen, crcCalc);
1224  }
1225  if (blockChainHeader._sowKeyLen > 0)
1226  {
1227  if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1228  {
1229  _message.assignSowKey(pBuffer->getBytes(blockChainHeader._sowKeyLen)._data,
1230  blockChainHeader._sowKeyLen);
1231  blockBytesRemaining -= blockChainHeader._sowKeyLen;
1232  }
1233  else
1234  {
1235  tmpBuffers[blockNum] = new char[blockChainHeader._sowKeyLen]; // -V522
1236  size_t totalLeft = blockChainHeader._sowKeyLen;
1237  size_t totalRead = 0;
1238  size_t readLen = 0;
1239  while (totalLeft)
1240  {
1241  readLen = blockBytesRemaining < totalLeft ?
1242  blockBytesRemaining : totalLeft;
1243  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1244  if (!(totalLeft -= readLen))
1245  {
1246  break;
1247  }
1248  if (!(current = current->_nextInChain))
1249  {
1250  break;
1251  }
1252  totalRead += readLen;
1253  blockBytesRemaining = getBlockDataSize();
1254  position = current->_offset + getBlockHeaderSize();
1255  pBuffer->setPosition(position);
1256  }
1257  blockBytesRemaining -= readLen;
1258  _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1259  }
1260  blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1261  crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1262  }
1263  if (blockChainHeader._topicLen > 0)
1264  {
1265  if (blockChainHeader._topicLen <= blockBytesRemaining)
1266  {
1267  _message.assignTopic(pBuffer->getBytes(blockChainHeader._topicLen)._data,
1268  blockChainHeader._topicLen);
1269  blockBytesRemaining -= blockChainHeader._topicLen;
1270  }
1271  else
1272  {
1273  tmpBuffers[blockNum] = new char[blockChainHeader._topicLen]; // -V522
1274  size_t totalLeft = blockChainHeader._topicLen;
1275  size_t totalRead = 0;
1276  size_t readLen = 0;
1277  while (totalLeft)
1278  {
1279  readLen = blockBytesRemaining < totalLeft ?
1280  blockBytesRemaining : totalLeft;
1281  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1282  if (!(totalLeft -= readLen))
1283  {
1284  break;
1285  }
1286  if (!(current = current->_nextInChain))
1287  {
1288  break;
1289  }
1290  totalRead += readLen;
1291  blockBytesRemaining = getBlockDataSize();
1292  position = current->_offset + getBlockHeaderSize();
1293  pBuffer->setPosition(position);
1294  }
1295  blockBytesRemaining -= readLen;
1296  _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1297  }
1298  blockHeader._totalRemaining -= blockChainHeader._topicLen;
1299  crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1300  }
1301  if (blockHeader._totalRemaining > 0)
1302  {
1303  if (blockHeader._totalRemaining <= blockBytesRemaining)
1304  {
1305  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1306  {
1307  _message.assignData(
1308  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1309  blockHeader._totalRemaining);
1310  crcCalc = _crc(_message.getData().data(),
1311  blockHeader._totalRemaining, crcCalc);
1312  }
1313  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1314  {
1315  _message.assignFilter(
1316  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1317  blockHeader._totalRemaining);
1318  crcCalc = _crc(_message.getFilter().data(),
1319  blockHeader._totalRemaining, crcCalc);
1320  }
1321  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1322  {
1323  _message.assignSowKeys(
1324  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1325  blockHeader._totalRemaining);
1326  crcCalc = _crc(_message.getSowKeys().data(),
1327  blockHeader._totalRemaining, crcCalc);
1328  }
1329  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1330  {
1331  _message.assignBookmark(
1332  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1333  blockHeader._totalRemaining);
1334  crcCalc = _crc(_message.getBookmark().data(),
1335  blockHeader._totalRemaining, crcCalc);
1336  }
1337  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1338  {
1339  _message.assignBookmark(
1340  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1341  blockHeader._totalRemaining);
1342  crcCalc = _crc(_message.getBookmark().data(),
1343  blockHeader._totalRemaining, crcCalc);
1344  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1345  }
1346  }
1347  else
1348  {
1349  tmpBuffers[blockNum] = new char[blockHeader._totalRemaining]; // -V522
1350  size_t totalLeft = blockHeader._totalRemaining;
1351  size_t totalRead = 0;
1352  size_t readLen = 0;
1353  while (totalLeft)
1354  {
1355  readLen = blockBytesRemaining < totalLeft ?
1356  blockBytesRemaining : totalLeft;
1357  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1358  if (!(totalLeft -= readLen))
1359  {
1360  break;
1361  }
1362  if (!(current = current->_nextInChain))
1363  {
1364  break;
1365  }
1366  totalRead += readLen;
1367  blockBytesRemaining = getBlockDataSize();
1368  position = current->_offset + getBlockHeaderSize();
1369  pBuffer->setPosition(position);
1370  }
1371  position += readLen;
1372  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1373  {
1374  _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1375  }
1376  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1377  {
1378  _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1379  }
1380  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1381  {
1382  _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1383  }
1384  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1385  {
1386  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1387  }
1388  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1389  {
1390  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1391  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1392  }
1393  crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc); // -V595
1394  }
1395  }
1396 
1397  // Validate the crc and seq
1398  if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1399  {
1400  std::ostringstream os;
1401  os << "Corrupted message found by CRC or sequence "
1402  << "Attempting to replay " << block_->_sequence
1403  << ". Block sequence " << blockHeader._seq
1404  << ", expiration length " << blockChainHeader._expirationLen
1405  << ", sowKey length " << blockChainHeader._sowKeyLen
1406  << ", topic length " << blockChainHeader._topicLen
1407  << ", data length " << blockHeader._totalRemaining
1408  << ", command ID length " << blockChainHeader._commandIdLen
1409  << ", correlation ID length " << blockChainHeader._correlationIdLen
1410  << ", flag " << blockChainHeader._flag
1411  << ", expected CRC " << blockHeader._crcVal
1412  << ", actual CRC " << crcCalc
1413  << ", start " << start
1414  << ", position " << position
1415  << ", buffer size " << pBuffer->getSize();
1416  for (Block* block = block_; block; block = block->_nextInChain)
1417  {
1418  os << "\n BLOCK " << block->_offset;
1419  }
1420  if (tmpBuffers)
1421  {
1422  for (amps_uint32_t i = 0; i < blockNum; ++i)
1423  {
1424  delete[] tmpBuffers[i]; // -V522
1425  }
1426  delete[] tmpBuffers;
1427  }
1428  throw StoreException(os.str());
1429  }
1430  // Replay the message
1431  replayer_.execute(_message);
1432  // Free the buffer if allocated
1433  if (tmpBuffers)
1434  {
1435  for (amps_uint32_t i = 0; i < blockNum; ++i)
1436  {
1437  delete[] tmpBuffers[i]; // -V522
1438  }
1439  delete[] tmpBuffers;
1440  }
1441  }
1442 
1443  // Lock should already be held
1444  // Read an older format file and update it.
1445  void recoverOldFormat(Block* blocks)
1446  {
1447  Buffer* pBuffer = _blockStore.getBuffer();
1448  amps_uint64_t maxIdx = 0;
1449  amps_uint64_t minIdx = 0;
1450  size_t size = pBuffer->getSize();
1451  size_t location = 0;
1452  pBuffer->setPosition(location);
1453  pBuffer->putUint32((amps_uint32_t)getBlockSize());
1454  pBuffer->putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1455  _metadataBlock->_sequence = pBuffer->getUint64();
1456  if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1457  {
1458  pBuffer->setPosition(_metadataBlock->_offset + 8);
1459  pBuffer->putUint64((amps_uint64_t)0);
1460  _metadataBlock->_sequence = 0;
1461  }
1462  else
1463  {
1464  // Set _maxDiscarded and _lastSequence
1465  _maxDiscarded = _metadataBlock->_sequence;
1466  _lastSequence = _maxDiscarded;
1467  }
1468  // Write the current client version
1469  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1470  // No next in chain
1471  pBuffer->putUint64((amps_uint64_t)0);
1472  // No checks currently
1473  location += getBlockSize();
1474  amps_uint32_t freeCount = 0;
1475  Block* firstFree = NULL;
1476  Block* endOfFreeList = NULL;
1477  size_t blockSize = getBlockSize();
1478  size_t numBlocks = size / blockSize;
1479  size_t blockNum = 0;
1480  // Used to create used list in order after recovery
1481  typedef std::map<amps_uint64_t, Block*> RecoverMap;
1482  RecoverMap recoveredBlocks;
1483  RecoverMap growingBlocks;
1484  amps_uint32_t growthBlocksNeeded = 0;
1485  while (location < size)
1486  {
1487  // Get seq and check if non-zero
1488  pBuffer->setPosition(location);
1489  BlockHeader blockHeader;
1490  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1491  size_t blockCount = (size_t)blockHeader._blocksToWrite;
1492  if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1493  && blockHeader._totalRemaining < size
1494  && blockHeader._blocksToWrite < numBlocks
1495  && (blockHeader._blocksToWrite * blockSize)
1496  >= blockHeader._totalRemaining)
1497  {
1498  size_t oldFormatSize = blockHeader._totalRemaining;
1499  // Old format total was storage bytes plus 64 bytes for block
1500  // and chain headers.
1501  blockHeader._totalRemaining -= 64;
1502  // New format counts only chain header size
1503  blockHeader._totalRemaining += getBlockChainHeaderSize();
1504  // Get the rest of the header
1505  BlockChainHeader chainHeader;
1506  // Need to reset location to after OLD header:
1507  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1508  // amps_uint64_t seq, amps_uint64_t crc
1509  pBuffer->setPosition(location + (sizeof(amps_uint32_t) * 2)
1510  + (sizeof(amps_uint64_t) * 2) );
1511  // Read old chain header which uses same order, but not
1512  // as many bytes (everything is 32bit):
1513  // operation, commandIdLen, correlationIdLen,
1514  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1515  pBuffer->copyBytes((char*)&chainHeader,
1516  sizeof(amps_uint32_t) * 8);
1517  // Check for garbage, likely indicating this is part of a chain
1518  if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1519  + chainHeader._expirationLen + chainHeader._sowKeyLen
1520  + chainHeader._topicLen) > blockHeader._totalRemaining)
1521  {
1522  // Skip this block, can't be real data
1523  location += getBlockSize();
1524  continue;
1525  }
1526  // Check if data fits in current number of blocks
1527  amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1528  / getBlockDataSize())
1529  + (blockHeader._totalRemaining
1530  % getBlockDataSize()
1531  ? 1 : 0);
1532  if (blocksNeeded == blockHeader._blocksToWrite)
1533  {
1534  Block* first = blocks[++blockNum].setOffset(location);
1535  first->_nextInList = 0;
1536  first->_sequence = blockHeader._seq;
1537  if (blockHeader._blocksToWrite > 1)
1538  {
1539  // CRC is only set on the first block
1540  amps_uint64_t crcVal = blockHeader._crcVal;
1541  blockHeader._crcVal = 0;
1542  Block* current = 0;
1543  // It fits, just need to adjust the block formats
1544  // and set up the chain. Start with the last block
1545  // and move data as needed starting at the end.
1546  size_t currentBlockNum = blockNum
1547  + blockHeader._blocksToWrite
1548  - 1;
1549  // Last item could wrap to beginning, but beginning is
1550  // block 1, not 0, which is the metadata block.
1551  if (currentBlockNum >= numBlocks)
1552  {
1553  currentBlockNum = currentBlockNum - numBlocks + 1;
1554  }
1555  if (currentBlockNum < blockNum)
1556  {
1557  Block* last = blocks[currentBlockNum]
1558  .init(currentBlockNum, getBlockSize());
1559  if ((current = firstFree) == last)
1560  {
1561  firstFree = firstFree->_nextInList; // -V522
1562  if (!firstFree)
1563  {
1564  endOfFreeList = 0;
1565  }
1566  --freeCount;
1567  }
1568  else
1569  {
1570  while (current)
1571  {
1572  if (current->_nextInList == last)
1573  {
1574  current->_nextInList = last->_nextInList;
1575  current = last;
1576  --freeCount;
1577  break;
1578  }
1579  current = current->_nextInList;
1580  }
1581  }
1582  }
1583  if (!current)
1584  {
1585  current = blocks[currentBlockNum]
1586  .init(currentBlockNum, getBlockSize());
1587  }
1588  // Initially, the number of bytes in last block
1589  size_t dataBytes = oldFormatSize % getBlockSize();
1590  while (current != first)
1591  {
1592  current->_nextInList = 0;
1593  current->_sequence = blockHeader._seq;
1594  // Set _nextInChain on previous Block, will include first
1595  if (--currentBlockNum < 1
1596  || currentBlockNum > numBlocks)
1597  {
1598  currentBlockNum = numBlocks - 1;
1599  }
1600  Block* previous = blocks[currentBlockNum]
1601  .init(currentBlockNum,
1602  getBlockSize());
1603  previous->_nextInChain = current;
1604  // Shift to make room for a header in every block
1605  // Not growing, so this won't write past the end.
1606  // Shift amount accounts for a header added to each
1607  // block after the first plus any change in the
1608  // chain header size from 32, which is the old size.
1609  size_t bytesToMove = --blockCount
1610  * getBlockHeaderSize()
1612  - 32);
1613  pBuffer->copyBytes(current->_offset + bytesToMove,
1614  current->_offset,
1615  dataBytes);
1616  dataBytes = getBlockSize();
1617  if (bytesToMove > getBlockHeaderSize())
1618  {
1619  bytesToMove -= getBlockHeaderSize();
1620  dataBytes -= bytesToMove;
1621  pBuffer->copyBytes(current->_offset
1622  + getBlockHeaderSize(),
1623  previous->_offset
1624  + dataBytes,
1625  bytesToMove);
1626  }
1627  // Set next in chain for this block's header
1628  blockHeader._nextInChain = (current->_nextInChain
1629  ? current->_nextInChain->_offset
1630  : (amps_uint64_t)0);
1631  // Write the header for this block
1632  pBuffer->setPosition(current->_offset);
1633  pBuffer->putBytes((const char*)&blockHeader,
1634  sizeof(BlockHeader));
1635  if (firstFree == previous)
1636  {
1637  firstFree = firstFree->_nextInList;
1638  if (!firstFree)
1639  {
1640  endOfFreeList = 0;
1641  }
1642  --freeCount;
1643  }
1644  else
1645  {
1646  current = firstFree;
1647  while (current)
1648  {
1649  if (current->_nextInList == previous)
1650  {
1651  current->_nextInList = previous->_nextInList;
1652  --freeCount;
1653  break;
1654  }
1655  current = current->_nextInList;
1656  }
1657  }
1658  current = previous;
1659  }
1660  blockNum += blockHeader._blocksToWrite - 1;
1661  blockHeader._crcVal = crcVal;
1662  }
1663  // Move bytes for chain header expansion from 32 bytes
1664  size_t bytesToMove = getBlockDataSize() - 32
1665  - (getBlockChainHeaderSize() - 32);
1666  pBuffer->copyBytes(first->_offset + getBlockHeaderSize()
1668  first->_offset + getBlockHeaderSize() + 32,
1669  bytesToMove);
1670  // Rewrite the header and chain header for first Block.
1671  pBuffer->setPosition(first->_offset);
1672  blockHeader._nextInChain = (first->_nextInChain
1673  ? first->_nextInChain->_offset
1674  : (amps_uint64_t)0);
1675  pBuffer->putBytes((const char*)&blockHeader,
1676  sizeof(BlockHeader));
1677  pBuffer->putBytes((const char*)&chainHeader,
1678  sizeof(BlockChainHeader));
1679  // Add first Block to recovered for building the used
1680  // list later
1681  recoveredBlocks[blockHeader._seq] = first;
1682  }
1683  else
1684  {
1685  // This will need at least one more Block due to a header in
1686  // every Block. Check how many and save for later.
1687  growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1688  growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1689  blockNum += blockHeader._blocksToWrite - 1;
1690  }
1691  // Track min and max
1692  if (maxIdx < blockHeader._seq)
1693  {
1694  maxIdx = blockHeader._seq;
1695  }
1696  if (minIdx > blockHeader._seq)
1697  {
1698  minIdx = blockHeader._seq;
1699  }
1700  // Advance past read blocks
1701  location += blockHeader._blocksToWrite * getBlockSize();
1702  // Either we're exiting loop, or blockNum is in range
1703  assert(location >= size || blockNum < numBlocks);
1704  }
1705  else
1706  {
1707  // Put this Block on the free list
1708  Block* block = blocks[++blockNum].setOffset(location);
1709  if (endOfFreeList)
1710  {
1711  endOfFreeList->_nextInList = block;
1712  }
1713  else
1714  {
1715  firstFree = block;
1716  }
1717  endOfFreeList = block;
1718  ++freeCount;
1719  location += blockSize;
1720  }
1721  }
1722  for (RecoverMap::iterator i = growingBlocks.begin();
1723  i != growingBlocks.end(); ++i)
1724  {
1725  Block* first = i->second;
1726  pBuffer->setPosition(first->_offset);
1727  BlockHeader blockHeader;
1728  // Read an old BlockHeader, which is only 24 bytes.
1729  // The bytes match current BlockHeader, and _nextInChain is 0.
1730  pBuffer->copyBytes((char*)&blockHeader, 24);
1731  // Old format total was storage bytes plus 64 bytes for block
1732  // and chain headers.
1733  blockHeader._totalRemaining -= 64;
1734  // New format counts only chain header size
1735  blockHeader._totalRemaining += getBlockChainHeaderSize();
1736  if (freeCount < growthBlocksNeeded)
1737  {
1738  // We have to resize, let's try to do it once.
1739  amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1740  amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1741  if (growthBlocks < minBlocksRequired)
1742  {
1743  amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1744  if (minBlocksRequired % defaultBlocks)
1745  minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1746  * defaultBlocks;
1747  growthBlocks = minBlocksRequired;
1748  }
1749  amps_uint32_t newBlocks = 0;
1750  Block* addedBlocks = _blockStore.resizeBuffer(
1751  pBuffer->getSize()
1752  + growthBlocks * blockSize,
1753  &newBlocks);
1754  if (!addedBlocks)
1755  {
1756  throw StoreException("Failed to grow store buffer during recovery");
1757  }
1758  _blockStore.addBlocks(addedBlocks);
1759  freeCount += newBlocks;
1760  growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1761  ? growthBlocksNeeded - freeCount : 0;
1762  if (endOfFreeList)
1763  {
1764  endOfFreeList->_nextInList = addedBlocks;
1765  }
1766  else
1767  {
1768  firstFree = addedBlocks;
1769  }
1770  endOfFreeList = &(addedBlocks[newBlocks - 1]);
1771  endOfFreeList->_nextInList = 0;
1772  }
1773  expandBlocks(blocks, first->_offset, first, blockHeader,
1774  &firstFree, &freeCount, pBuffer);
1775  // Add first Block to recovered for building the used list later
1776  recoveredBlocks[blockHeader._seq] = first;
1777  if (!firstFree)
1778  {
1779  endOfFreeList = 0;
1780  }
1781  }
1782  if (endOfFreeList)
1783  {
1784  endOfFreeList->_nextInList = 0;
1785  }
1786  _blockStore.setFreeList(firstFree, freeCount);
1787  if (maxIdx > _lastSequence)
1788  {
1789  _lastSequence = maxIdx;
1790  }
1791  if (minIdx > _maxDiscarded + 1)
1792  {
1793  _maxDiscarded = minIdx - 1;
1794  }
1795  if (_maxDiscarded > _metadataBlock->_sequence)
1796  {
1797  _metadataBlock->_sequence = _maxDiscarded;
1798  pBuffer->setPosition(_metadataBlock->_offset + 8);
1799  pBuffer->putUint64(_maxDiscarded);
1800  }
1801  Block* end = NULL;
1802  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1803  for (RecoverMap::iterator i = recoveredBlocks.begin();
1804  i != recoveredBlocks.end(); ++i)
1805  {
1806  if (_blockStore.front())
1807  {
1808  end->_nextInList = i->second; // -V522
1809  }
1810  else
1811  {
1812  _blockStore.setUsedList(i->second);
1813  }
1814  end = i->second;
1815  }
1816  if (end)
1817  {
1818  end->_nextInList = 0;
1819  }
1820  _blockStore.setEndOfUsedList(end);
1821  }
1822 
1823  // For recovering an old format store to current format when more Blocks
1824  // are needed with the new format.
1825  void expandBlocks(Block* blocks_, size_t location_, Block* first_,
1826  BlockHeader blockHeader_,
1827  Block** pFreeList_, amps_uint32_t* pFreeCount_,
1828  Buffer* pBuffer_)
1829  {
1830  // First create the chain, then we'll fill in reverse
1831  Block* current = first_;
1832  // Old format total was storage bytes plus 64 bytes for block
1833  // and chain headers.
1834  amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1835  blockHeader_._totalRemaining -= 64;
1836  // New format counts only chain header size
1837  blockHeader_._totalRemaining += getBlockChainHeaderSize();
1838  // Check how many Blocks needed and if we have enough free.
1839  amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1840  / getBlockDataSize()
1841  + (blockHeader_._totalRemaining
1842  % getBlockDataSize()
1843  ? 1 : 0);
1844  // Last data block size, remove bytes saved in first block
1845  // then mod by block size.
1846  const amps_uint32_t blockSize = getBlockSize();
1847  // Old total remaining had all header included
1848  size_t endBlockSize = oldTotalRemaining % blockSize;
1849  if (!endBlockSize)
1850  {
1851  endBlockSize = blockSize;
1852  }
1853  size_t endOfData = 0;
1854  // Hang on to CRC until first block is written
1855  amps_uint64_t crcVal = blockHeader_._crcVal;
1856  blockHeader_._crcVal = 0;
1857 
1858  std::stack<Block*> blocksUsed;
1859  for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1860  {
1861  blocksUsed.push(current);
1862  current->_sequence = blockHeader_._seq;
1863  if (i >= blockHeader_._blocksToWrite)
1864  {
1865  if (i == blockHeader_._blocksToWrite)
1866  {
1867  endOfData = current->_offset + endBlockSize;
1868  }
1869  current->_nextInChain = *pFreeList_;
1870  --(*pFreeCount_);
1871  *pFreeList_ = (*pFreeList_)->_nextInList;
1872  }
1873  else
1874  {
1875  current->_nextInChain = current->_nextInList;
1876  if (current->_nextInChain)
1877  {
1878  if (current->_offset + blockSize < pBuffer_->getSize())
1879  {
1880  current->_nextInChain->setOffset(current->_offset
1881  + blockSize);
1882  }
1883  else
1884  {
1885  current->_nextInChain->setOffset(blockSize);
1886  }
1887  }
1888  else
1889  {
1890  current->_nextInChain = blocks_[1].init(1, blockSize);
1891  }
1892  if (current->_nextInChain == *pFreeList_)
1893  {
1894  *pFreeList_ = (*pFreeList_)->_nextInList;
1895  --(*pFreeCount_);
1896  }
1897  else
1898  {
1899  for (Block* free = *pFreeList_; free;
1900  free = free->_nextInList)
1901  {
1902  if (free->_nextInList == current->_nextInChain)
1903  {
1904  free->_nextInList = free->_nextInList->_nextInList;
1905  --(*pFreeCount_);
1906  break;
1907  }
1908  }
1909  }
1910  }
1911  current->_nextInList = 0;
1912  current = current->_nextInChain;
1913  }
1914  // Make sure we write the correct number of blocks to write
1915  blockHeader_._blocksToWrite = blocksNeeded;
1916  // Finish setting up current
1917  current->_nextInList = 0;
1918  current->_sequence = blockHeader_._seq;
1919  // Now shift data, starting at the last Block
1920  // The total shift is for number of Blocks beyond the first
1921  // times Block header size, since previous format only wrote
1922  // the header in the first Block and had contiguous data,
1923  // with only wrap from end to beginning of buffer possible.
1924 
1925  // First time through, this is bytes in last block. After,
1926  // it will be block data size.
1927  size_t dataBytes = blockHeader_._totalRemaining % getBlockDataSize();
1928  while (current != first_)
1929  {
1930  size_t chunkBytesAvail = endOfData > location_
1931  ? endOfData - location_
1932  : endOfData - 2048;
1933  if (chunkBytesAvail < dataBytes)
1934  {
1935  // Original was wrapped from end to start of buffer
1936  // Need to copy what's left at start to end of Block,
1937  // then start working from the end.
1938  // This can ONLY occur during wrap because the first
1939  // Block doesn't get moved in this loop.
1940  pBuffer_->copyBytes(current->_offset
1941  + getBlockSize()
1942  - chunkBytesAvail,
1943  getBlockSize(),
1944  chunkBytesAvail);
1945  chunkBytesAvail = dataBytes - chunkBytesAvail;
1946  endOfData = pBuffer_->getSize() - chunkBytesAvail;
1947  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1948  endOfData,
1949  chunkBytesAvail);
1950  }
1951  else
1952  {
1953  endOfData -= dataBytes;
1954  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1955  endOfData,
1956  dataBytes);
1957  }
1958  // Set next in chain in block header
1959  blockHeader_._nextInChain = (current->_nextInChain
1960  ? current->_nextInChain->_offset
1961  : (amps_uint64_t)0);
1962  // Write the header for this block
1963  pBuffer_->setPosition(current->_offset);
1964  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1965  current = blocksUsed.top();
1966  blocksUsed.pop();
1967  dataBytes = getBlockDataSize();
1968  }
1969  // Move bytes for chain header expansion from 32 bytes
1970  pBuffer_->copyBytes(first_->_offset
1971  + getBlockHeaderSize()
1973  first_->_offset + getBlockHeaderSize() + 32,
1975  // Set the CRC to indicate first block and set nextInChain
1976  blockHeader_._crcVal = crcVal;
1977  blockHeader_._nextInChain = first_->_nextInChain->_offset;
1978  // Need to reset location to after OLD header:
1979  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1980  // amps_uint64_t seq, amps_uint64_t crc
1981  pBuffer_->setPosition(location_ + (sizeof(amps_uint32_t) * 2)
1982  + (sizeof(amps_uint64_t) * 2) );
1983  // Read old chain header which uses same order, but not
1984  // as many bytes (everything is 32bit):
1985  // operation, commandIdLen, correlationIdLen,
1986  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1987  BlockChainHeader chainHeader;
1988  pBuffer_->copyBytes((char*)&chainHeader,
1989  sizeof(amps_uint32_t) * 8);
1990  // Rewrite the header and chain header for first Block.
1991  pBuffer_->setPosition(location_);
1992  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1993  pBuffer_->putBytes((const char*)&chainHeader, sizeof(BlockChainHeader));
1994  }
1995 
1996  void chooseCRC(bool isFile)
1997  {
1998  if (!isFile)
1999  {
2000  _crc = noOpCRC;
2001  return;
2002  }
2003 
2004 #ifndef AMPS_SSE_42
2005  _crc = AMPS::CRC<0>::crcNoSSE;
2006 #else
2007  if (AMPS::CRC<0>::isSSE42Enabled())
2008  {
2009  _crc = AMPS::CRC<0>::crc;
2010  }
2011  else
2012  {
2013  _crc = AMPS::CRC<0>::crcNoSSE;
2014  }
2015 #endif
2016  }
2017 
2018  static amps_uint64_t noOpCRC(const char*, size_t, amps_uint64_t)
2019  {
2020  return 0;
2021  }
2022 
2023  protected:
2024  mutable BlockStore _blockStore;
2025  private:
2026  // Block used to hold metadata, currently:
2027  // the last persisted
2028  Block* _metadataBlock;
2029  // Highest sequence that has been discarded
2030  amps_uint64_t _maxDiscarded;
2031  // Track the assigned sequence numbers
2032 #if __cplusplus >= 201103L || _MSC_VER >= 1900
2033  std::atomic<amps_uint64_t> _lastSequence;
2034 #else
2035  volatile amps_uint64_t _lastSequence;
2036 #endif
2037  // Track how many messages are stored
2038  AMPS_ATOMIC_TYPE _stored;
2039 
2040  // Message used for doing replay
2041  Message _message;
2042 
2043  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
2044 
2045  // Function used to calculate the CRC if one is used
2046  CRCFunction _crc;
2047 
2048  };
2049 
2050 }
2051 
2052 #endif
2053 
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1086
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1304
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1228
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:82
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1302
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1058
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:659
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:564
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1306
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:156
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:182
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1194
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:596
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:269
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:61
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:714
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:645
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1180
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1425
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:63
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:244
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1305
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:166
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1449
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:75
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:254
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, bool errorOnPublishGap_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:202
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:174
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1424
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1160
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:702
Definition: ampsplusplus.hpp:102
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1156
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:501
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.