AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
SOWRecoveryPointAdapter.hpp
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _SOWRECOVERYPOINTADAPTER_H_
27 #define _SOWRECOVERYPOINTADAPTER_H_
28 
29 #include <amps/ampsplusplus.hpp>
30 #include <amps/Field.hpp>
31 #include <amps/RecoveryPoint.hpp>
33 #include <assert.h>
34 #include <memory>
35 #include <string>
36 
37 using std::string;
38 
39 #define AMPS_SOW_STORE_DEFAULT_TOPIC "/ADMIN/bookmark_store"
40 #define AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD "clientName"
41 #define AMPS_SOW_STORE_DEFAULT_SUB_FIELD "subId"
42 #define AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD "bookmark"
43 
44 #define SOW_RECOVERY_HANDLE_EXCEPTION(x) \
45  catch (const AMPSException& ex_) \
46  { \
47  std::ostringstream os; \
48  os << x << ": AMPSException " << ex_.what(); \
49  StoreException ex(os.str()); \
50  if (_throwNotListen) \
51  { \
52  throw ex;\
53  } \
54  else if (_pExceptionListener) \
55  { \
56  _pExceptionListener->exceptionThrown(ex); \
57  } \
58  } \
59  catch (const std::exception& ex_) \
60  { \
61  std::ostringstream os; \
62  os << x << ": std::exception " << ex_.what(); \
63  StoreException ex(os.str()); \
64  if (_throwNotListen) \
65  { \
66  throw ex;\
67  } \
68  else if (_pExceptionListener) \
69  { \
70  _pExceptionListener->exceptionThrown(ex); \
71  } \
72  } \
73  catch (...) \
74  { \
75  std::ostringstream os; \
76  os << x << ": Unknown exception"; \
77  StoreException ex(os.str()); \
78  if (_throwNotListen) \
79  { \
80  throw ex;\
81  } \
82  else if (_pExceptionListener) \
83  { \
84  _pExceptionListener->exceptionThrown(ex); \
85  } \
86  }
87 
88 namespace AMPS
89 {
90 
94  {
95  public:
96 
128  SOWRecoveryPointAdapter(const Client& storeClient_,
129  const string& trackedClientName_,
130  unsigned timeoutMillis_ = 5000,
131  bool useTimestamp_ = false,
132  bool closeClient_ = true,
133  bool updateFailureThrows_ = false,
134  const string& topic_ = AMPS_SOW_STORE_DEFAULT_TOPIC,
135  const string& clientNameField_ = AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD,
136  const string& subIdField_ = AMPS_SOW_STORE_DEFAULT_SUB_FIELD,
137  const string& bookmarkField_ = AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD
138  )
140  , _serializeLen(0)
141  , _serializeBuffer(0)
142  , _deserializeLen(0)
143  , _deserializeBuffer(0)
144  , _client(storeClient_)
145  , _trackedName(trackedClientName_)
146  , _topic(topic_)
147  , _nameField(clientNameField_)
148  , _subIdField(subIdField_)
149  , _bookmarkField(bookmarkField_)
150  , _timeoutMillis(timeoutMillis_)
151  , _closeClient(closeClient_)
152  , _executed(false)
153  , _throwNotListen(updateFailureThrows_)
154  , _useTimestamp(useTimestamp_)
155  , _closed(false)
156  {
157  if (_client.getName() == _trackedName)
158  {
159  throw UsageException("The SOWRecoveryPointAdapter cannot use the tracked client to update AMPS");
160  }
161  _initSerialization();
162  }
163 
164  virtual ~SOWRecoveryPointAdapter()
165  {
166  _close();
167  delete[] _serializeBuffer;
168  delete[] _deserializeBuffer;
169  }
170 
175  virtual bool next(RecoveryPoint& current_)
176  {
177  static Field emptyField;
178  try
179  {
180  if (!_executed)
181  {
182  Command cmd("sow");
183  cmd.setTopic(_topic)
184  .setFilter("/" + _nameField + "='" + _trackedName + "'")
185  .setTimeout(_timeoutMillis);
186  if (_useTimestamp)
187  {
188  cmd.setOptions("select=[-/,+/" + _subIdField + ",+/"
189  + _bookmarkField + "],timestamp");
190  }
191  else
192  {
193  cmd.setOptions("select=[-/,+/" + _subIdField + ",+/"
194  + _bookmarkField + "]");
195  }
196  _stream = _client.execute(cmd).timeout(_timeoutMillis);
197  _msIter = _stream.begin();
198  _executed = true;
199  }
200  else
201  {
202  ++_msIter;
203  }
204  if (_msIter == MessageStream::iterator())
205  {
206  return false;
207  }
208  Message m = *_msIter;
209  if (!m.isValid())
210  {
211  current_ = RecoveryPoint(NULL);
212  return false;
213  }
214  if (m.getCommand() == "group_begin")
215  {
216  return next(current_);
217  }
218  else if (m.getCommand() == "sow")
219  {
220  if (_useTimestamp)
221  {
222  current_ = RecoveryPoint(deserialize(m.getData(),
223  m.getTimestamp()));
224  }
225  else
226  {
227  current_ = RecoveryPoint(deserialize(m.getData(),
228  emptyField));
229  }
230  return true;
231  }
232  else if (m.getCommand() == "group_end" || m.getCommand() == "ack")
233  {
234  current_ = RecoveryPoint(NULL);
235  _msIter = MessageStream::iterator();
236  _stream = MessageStream();
237  return false;
238  }
239  }
240  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::next")
241  return false;
242  }
243 
246  virtual void update(RecoveryPoint& recoveryPoint_)
247  {
248  if (_closed)
249  {
250  return;
251  }
252  try
253  {
254  Field data = serialize(recoveryPoint_);
255  _client.publish(_topic.data(), _topic.length(), data.data(), data.len());
256  }
257  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::update")
258  }
259 
261  virtual void purge()
262  {
263  if (_closed)
264  {
265  return;
266  }
267  try
268  {
269  Message m = _client.sowDelete(_topic, "/" + _nameField
270  + "='" + _trackedName + "'");
271  }
272  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::purge")
273  }
274 
277  virtual void purge(const Field& subId_)
278  {
279  if (_closed)
280  {
281  return;
282  }
283  try
284  {
285  Message m = _client.sowDelete(_topic, "/" + _nameField + "='"
286  + _trackedName + "' and /"
287  + _subIdField + "='"
288  + subId_ + "'");
289  }
290  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::purge(subId)")
291  }
292 
294  virtual void close()
295  {
296  _close();
297  }
298 
306  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
307  {
308  _pExceptionListener = pListener_;
309  }
310  protected:
311  void _close()
312  {
313  if (_closed)
314  {
315  return;
316  }
317  // If client is invalid or unused
318  if (!_client.isValid() || !_executed)
319  {
320  _closed = true;
321  if (_closeClient && _client.isValid())
322  {
323  _client.disconnect();
324  _client = Client();
325  }
326  return;
327  }
328  try
329  {
330  _client.publishFlush();
331  }
332  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::close publishFlush")
333  try
334  {
335  if (_closeClient)
336  {
337  _closed = true;
338  _client.disconnect();
339  _client = Client();
340  }
341  }
342  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::close disconnect")
343  }
344 
345  void _initSerialization()
346  {
347  try
348  {
349  // Set up json serialization
350  if (_serializeLen == 0)
351  {
352  _serializeLen = (size_t) (_nameField.length()
353  + _trackedName.length()
354  + _subIdField.length()
355  + _bookmarkField.length()
356  + (AMPS_MAX_BOOKMARK_LEN * 4UL)
357  + SUBID_LEN + JSON_EXTRA);
358  _serializeLen += (128 - (_serializeLen % 128));
359  }
360  _serializeBuffer = new char[_serializeLen];
361  AMPS_snprintf(_serializeBuffer, _serializeLen,
362  "{\"%s\":\"%s\",\"%s\":\"", _nameField.c_str()
363  , _trackedName.c_str()
364  , _subIdField.c_str());
365  _serializeStart = JSON_START + _nameField.length()
366  + _trackedName.length() + _subIdField.length();
367  }
368  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::initSerialization")
369  }
370 
371  // Subclasses can override this to set up for something other than json
372  // serialization if not using json.
373  virtual void initSerialization()
374  {
375  _initSerialization();
376  }
377 
378  // Subclasses can override this function if not using json data type.
379  // It needs to return an allocated RecoveryPointImpl based on the data
380  // field from a sow message that contains only 2 fields: _subIdField and
381  // _bookmarkField. If you'd like more, override begin()
382  virtual RecoveryPointImpl* deserialize(const Field& data_,
383  const Field& timestamp_)
384  {
385  Field subId;
386  Field bookmark;
387  try
388  {
389  // We have 2 fields subId and bookmark and we only need the
390  // values. Find : then start ", then end ".
391  const char* start = (const char*)memchr((const void*)data_.data(),
392  (int)':', data_.len());
393  if (!start)
394  {
395  throw StoreException("Failure parsing json RecoveryPoint subId, no :");
396  }
397  size_t remain = data_.len() - (size_t)(start - data_.data());
398  start = (const char*)memchr((const void*)start, (int)'"', remain);
399  if (!start)
400  {
401  throw StoreException("Failure parsing json RecoveryPoint subId, no start \"");
402  }
403  ++start;
404  remain = data_.len() - (size_t)(start - data_.data());
405  const char* end = (const char*)memchr((const void*)start,
406  (int)'"', remain);
407  if (!end)
408  {
409  throw StoreException("Failure parsing json RecoveryPoint subId, no end \"");
410  }
411  size_t len = (size_t)(end - start);
412  subId = Field(start, len);
413  start = (const char*)memchr((const void*)start, (int)':', data_.len());
414  if (!start)
415  {
416  throw StoreException("Failure parsing json RecoveryPoint bookmark, no :");
417  }
418  remain = data_.len() - (size_t)(start - data_.data());
419  start = (const char*)memchr((const void*)start, (int)'"', remain);
420  if (!start)
421  {
422  throw StoreException("Failure parsing json RecoveryPoint bookmark, no start \"");
423  }
424  ++start;
425  remain = data_.len() - (size_t)(start - data_.data());
426  end = (const char*)memchr((const void*)start, (int)'"', remain);
427  if (!end)
428  {
429  throw StoreException("Failure parsing json RecoveryPoint bookmark, no end \"");
430  }
431  len = (size_t)(end - start);
432  if (_useTimestamp && !timestamp_.empty())
433  {
434  if (_deserializeLen < len + timestamp_.len())
435  {
436  delete[] _deserializeBuffer;
437  _deserializeBuffer = 0;
438  }
439  if (!_deserializeBuffer)
440  {
441  _deserializeLen = len + timestamp_.len() + 1;
442  _deserializeBuffer = new char[_deserializeLen];
443  }
444  memcpy((void*)_deserializeBuffer, (const void*)start, len);
445  _deserializeBuffer[len] = ',';
446  memcpy((void*)(_deserializeBuffer + len + 1),
447  (const void*)timestamp_.data(), timestamp_.len());
448  bookmark = Field(_deserializeBuffer, _deserializeLen);
449  }
450  else
451  {
452  bookmark = Field(start, len);
453  }
454  }
455  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::deserialize")
456  // Return a recovery point that will copy current field values and
457  // clear them when destructed.
458  return new FixedRecoveryPoint(subId, bookmark, true);
459  }
460 
461  virtual Field& serialize(const RecoveryPoint& recoveryPoint_)
462  {
463  try
464  {
465  Field subId = recoveryPoint_.getSubId();
466  Field bookmark = recoveryPoint_.getBookmark();
467  size_t fullLen = _serializeStart + subId.len()
468  + _bookmarkField.length() + bookmark.len() + JSON_END;
469  if (fullLen >= _serializeLen)
470  {
471  _serializeLen = fullLen + (128 - (fullLen % 128));
472  delete[] _serializeBuffer;
473  // This will reallocate the buffer and fill with predicate
474  initSerialization();
475  }
476  AMPS_snprintf(_serializeBuffer + _serializeStart,
477  _serializeLen - _serializeStart,
478  "%.*s\",\"%s\":\"%.*s\"}", (int)subId.len()
479  , subId.data()
480  , _bookmarkField.c_str()
481  , (int)bookmark.len()
482  , bookmark.data());
483  _serializeField.assign(_serializeBuffer, fullLen);
484  }
485  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::serialize")
486  return _serializeField;
487  }
488 
489  enum Constants : size_t
490  {
491  JSON_START = 11, // '{', 7 '"', 2 ':', 1 ','
492  JSON_END = 8, // '}', 5 '"', 1 ':', 1 ','
493  JSON_EXTRA = 19, // '{', '}', 3 ':', 12 '"', 2 ','
494  SUBID_LEN = 64 // rough guess on typical max len
495  };
496 
497  private:
498  size_t _serializeLen;
499  size_t _serializeStart;
500  Field _serializeField;
501  char* _serializeBuffer;
502  size_t _deserializeLen;
503  char* _deserializeBuffer;
504  Client _client;
505  std::string _trackedName;
506  std::string _topic;
507  std::string _nameField;
508  std::string _subIdField;
509  std::string _bookmarkField;
510  unsigned _timeoutMillis;
511  MessageStream _stream;
512  MessageStream::iterator _msIter;
513  std::shared_ptr<const ExceptionListener> _pExceptionListener;
514  bool _closeClient;
515  bool _executed;
516  bool _throwNotListen;
517  bool _useTimestamp;
518  bool _closed;
519  };
520 } // namespace AMPS
521 #endif //_SOWRECOVERYPOINTADAPTER_H_
522 
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: SOWRecoveryPointAdapter.hpp:246
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5108
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
Field getCommand() const
Retrieves the value of the Command header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1257
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:9019
RecoveryPointAdapter virtual base class for implementing external storage of subscription recovery po...
Definition: SOWRecoveryPointAdapter.hpp:93
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5189
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
RecoveryPointImpl virtual base class provides access to the subId and bookmark needed to restart a su...
Definition: RecoveryPoint.hpp:49
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
virtual void purge()
Remove all data from the storage.
Definition: SOWRecoveryPointAdapter.hpp:261
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: SOWRecoveryPointAdapter.hpp:175
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: SOWRecoveryPointAdapter.hpp:277
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Set an exception listener on this adapter that will be notified of all exceptions that occur rather t...
Definition: SOWRecoveryPointAdapter.hpp:306
SOWRecoveryPointAdapter(const Client &storeClient_, const string &trackedClientName_, unsigned timeoutMillis_=5000, bool useTimestamp_=false, bool closeClient_=true, bool updateFailureThrows_=false, const string &topic_=AMPS_SOW_STORE_DEFAULT_TOPIC, const string &clientNameField_=AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD, const string &subIdField_=AMPS_SOW_STORE_DEFAULT_SUB_FIELD, const string &bookmarkField_=AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD)
Create a SOWRecoveryPointAdapter for a BookmarkStore that writes updated RecoveryPoints to the server...
Definition: SOWRecoveryPointAdapter.hpp:128
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5063
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5802
virtual void close()
Take any necessary actions to close the associated storage.
Definition: SOWRecoveryPointAdapter.hpp:294
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:5055
Field getTimestamp() const
Retrieves the value of the Timestamp header of the Message as a Field which references the underlying...
Definition: Message.hpp:1492
Field getData() const
Returns the data from this message.
Definition: Message.hpp:1522
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5408
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:52
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5680
Definition: ampsplusplus.hpp:103
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8990
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5251
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:472
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6844
const Field & getBookmark() const
Get the bookmark for this recovery point.
Definition: RecoveryPoint.hpp:91