AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
MMapStoreBuffer.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MMAPSTOREBUFFER_H_
27 #define _MMAPSTOREBUFFER_H_
28 
29 #include <amps/ampsplusplus.hpp>
31 #include <string>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <fcntl.h>
35 #ifdef _WIN32
36  #include <windows.h>
37 #else
38  #include <sys/mman.h>
39 #endif
40 
43 
44 
45 namespace AMPS
46 {
50  {
51  public:
57 #ifdef _WIN32
58  MMapStoreBuffer(const std::string& fileName_,
59  size_t initialSize_ = AMPS_MEMORYBUFFER_DEFAULT_LENGTH)
60  : MemoryStoreBuffer(MemoryStoreBuffer::EMPTY)
61  , _mapFile(INVALID_HANDLE_VALUE), _file(INVALID_HANDLE_VALUE)
62  {
63  _file = CreateFileA(fileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
64  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
65  if ( _file == INVALID_HANDLE_VALUE )
66  {
67  std::ostringstream os;
68  os << "Failed to create file " << fileName_ << " for MMapStoreBuffer";
69  error(os.str());
70  }
71  LARGE_INTEGER liFileSize;
72  if (GetFileSizeEx(_file, &liFileSize) == 0)
73  {
74  std::ostringstream os;
75  os << "Failure getting file size of " << fileName_ << " for MMapStoreBuffer";
76  error(os.str());
77  }
78 #ifdef _WIN64
79  size_t fileSize = liFileSize.QuadPart;
80 #else
81  size_t fileSize = liFileSize.LowPart;
82 #endif
83  _setSize( initialSize_ > fileSize ?
84  initialSize_ : fileSize);
85  }
86 #else
87  MMapStoreBuffer(const std::string& fileName_,
88  size_t initialSize_ = AMPS_MEMORYBUFFER_DEFAULT_LENGTH)
90  {
91  _fd = ::open(fileName_.c_str(), O_CREAT | O_RDWR, (mode_t)0644);
92  struct stat statBuf;
93  memset(&statBuf, 0, sizeof(statBuf));
94  if (fstat(_fd, &statBuf) == -1)
95  {
96  std::ostringstream os;
97  os << "Failed to stat file " << fileName_ << " for MMapStoreBuffer";
98  error(os.str());
99  }
100  bool recovery = (size_t)statBuf.st_size >= initialSize_;
101  _setSize(recovery ? (size_t)statBuf.st_size
102  : initialSize_,
103  recovery);
104  }
105 #endif
106 
107  ~MMapStoreBuffer()
108  {
109  if (_buffer)
110  {
111  close();
112  }
113  }
114 
115  void close()
116  {
117  sync();
118 #ifdef _WIN32
119  FlushFileBuffers(_file);
120  UnmapViewOfFile(_buffer);
121  CloseHandle(_mapFile);
122  CloseHandle(_file);
123  _mapFile = INVALID_HANDLE_VALUE;
124  _file = INVALID_HANDLE_VALUE;
125 #else
126  munmap(_buffer, _bufferLen);
127  ::close(_fd);
128  _fd = 0;
129 #endif
130  _buffer = NULL;
131  _bufferLen = 0;
132  }
133 
134  void sync()
135  {
136  if (_buffer != NULL && _bufferLen)
137  {
138 #ifdef _WIN32
139  if (!FlushViewOfFile(_buffer, _bufferPos))
140 #else
141  if (msync(_buffer, _bufferPos, MS_ASYNC) != 0)
142 #endif
143  {
144  std::ostringstream os;
145  os << "Failed to sync mapped memory; buffer: " << (size_t)_buffer
146  << " pos: " << _bufferPos;
147  error(os.str());
148  }
149  }
150  }
151 
152  virtual void setSize(size_t newSize_)
153  {
154  _setSize(newSize_);
155  }
156 
157  void _setSize(size_t newSize_, bool recovery_ = false)
158  {
159  if (_bufferLen > 0)
160  {
161  sync();
162  }
163  // Make sure we're using a multiple of page size
164  static const size_t pageSize = getPageSize();
165  size_t sz = (newSize_ + pageSize - 1) / pageSize * pageSize;
166 #ifdef _WIN32
167  if (_mapFile != INVALID_HANDLE_VALUE && _mapFile != NULL)
168  {
169  FlushFileBuffers(_file);
170  UnmapViewOfFile(_buffer);
171  CloseHandle(_mapFile);
172  }
173 #ifdef _WIN64
174  _mapFile = CreateFileMapping( _file, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
175 #else
176  _mapFile = CreateFileMapping( _file, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
177 #endif
178  if (_mapFile == INVALID_HANDLE_VALUE || _mapFile == NULL)
179  {
180  _buffer = 0;
181  _bufferLen = 0;
182  error("Failed to create map of log file");
183  }
184  else
185  {
186  _buffer = (char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
187  if (_buffer == NULL)
188  {
189  std::ostringstream os;
190  os << "Failed to map log file to memory; buffer: " << (size_t)_buffer << " length: " << sz << " previous size: " << _bufferLen;
191  _buffer = 0;
192  _bufferLen = 0;
193  error(os.str());
194  }
195  }
196 #else
197  // If not at current size, extend the underlying file
198  if (sz > _bufferLen)
199  {
200  if (lseek(_fd, (off_t)sz - 1, SEEK_SET) == -1)
201  {
202  std::ostringstream os;
203  os << "Seek failed for buffer extension; buffer: " << (size_t)_buffer
204  << " length: " << _bufferLen << " pos: " << _bufferPos
205  << " requested new size: " << newSize_;
206  error(os.str());
207  }
208  if (!recovery_)
209  {
210  if (::write(_fd, "", 1) == -1)
211  {
212  std::ostringstream os;
213  os << "Failed to grow buffer; buffer: " << (size_t)_buffer << " length: "
214  << _bufferLen << " pos: " << _bufferPos << " requested new size: "
215  << newSize_;
216  error(os.str());
217  }
218  }
219  }
220 
221  void* result = NULL;
222  if (_buffer == NULL)
223  {
224  result = mmap(0, sz, PROT_WRITE | PROT_READ, MAP_SHARED, _fd, 0);
225  }
226  else if (_bufferLen < sz)
227  {
228 #if defined(linux)
229  result = mremap(_buffer, _bufferLen, sz, MREMAP_MAYMOVE);
230 #else
231  munmap(_buffer, _bufferLen);
232  result = mmap(0, sz, PROT_WRITE | PROT_READ, MAP_SHARED, _fd, 0);
233 #endif
234  }
235  if (result == MAP_FAILED || result == NULL)
236  {
237  std::ostringstream os;
238  os << "Failed to map log file to memory; buffer: "
239  << (size_t)_buffer << " length: " << sz
240  << " previous size: " << _bufferLen;
241  _buffer = 0;
242  _bufferLen = 0;
243  error(os.str());
244  }
245  else
246  {
247  _buffer = (char*)result;
248  }
249 #endif
250  if (_buffer)
251  {
252  _bufferLen = sz;
253  }
254  }
255 
256  private:
257  void error(const std::string & message)
258  {
259  std::ostringstream os;
260 #ifdef _WIN32
261  const size_t errorBufferSize = 1024;
262  char errorBuffer[errorBufferSize];
263  memset(errorBuffer, 0, errorBufferSize);
264  strerror_s(errorBuffer, errorBufferSize, errno);
265  os << message << ". Error is " << errorBuffer;
266 #else
267  os << message << ". Error is " << strerror(errno);
268 #endif
269  throw StoreException(os.str());
270  }
271 
272 #ifdef _WIN32
273  HANDLE _mapFile;
274  HANDLE _file;
275 #else
276  int _fd;
277 #endif
278  static size_t getPageSize()
279  {
280  static size_t pageSize;
281  if (pageSize == 0)
282  {
283 #ifdef _WIN32
284  SYSTEM_INFO SYS_INFO;
285  GetSystemInfo(&SYS_INFO);
286  pageSize = SYS_INFO.dwPageSize;
287 #else
288  pageSize = (size_t)sysconf(_SC_PAGESIZE);
289 #endif
290  if (pageSize == (size_t)-1)
291  {
292  // Guess
293  pageSize = 4096;
294  }
295  }
296  return pageSize;
297  }
298 
299  };
300 
301 } // end namespace AMPS
302 
303 #endif //_MMAPSTOREBUFFER_H_
304 
virtual void setSize(size_t newSize_)
Set the size for the buffer.
Definition: MMapStoreBuffer.hpp:152
MMapStoreBuffer(const std::string &fileName_, size_t initialSize_=AMPS_MEMORYBUFFER_DEFAULT_LENGTH)
Create an MMapStoreBuffer using fileName_ as the name of the memory mapped file used for storage...
Definition: MMapStoreBuffer.hpp:87
A Buffer implementation that uses memory for storage.
Definition: MemoryStoreBuffer.hpp:50
Core type, function, and class declarations for the AMPS C++ client.
A Buffer implementation that uses a memory mapped file as its storage.
Definition: MMapStoreBuffer.hpp:49
Provides AMPS::MemoryStoreBuffer, used by an AMPS::HAClient to store messages in memory.
Definition: ampsplusplus.hpp:103