26 #ifndef _SOWRECOVERYPOINTADAPTER_H_ 27 #define _SOWRECOVERYPOINTADAPTER_H_ 39 #define AMPS_SOW_STORE_DEFAULT_TOPIC "/ADMIN/bookmark_store" 40 #define AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD "clientName" 41 #define AMPS_SOW_STORE_DEFAULT_SUB_FIELD "subId" 42 #define AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD "bookmark" 44 #define SOW_RECOVERY_HANDLE_EXCEPTION(x) \ 45 catch (const AMPSException& ex_) \ 47 std::ostringstream os; \ 48 os << x << ": AMPSException " << ex_.what(); \ 49 StoreException ex(os.str()); \ 50 if (_throwNotListen) \ 54 else if (_pExceptionListener) \ 56 _pExceptionListener->exceptionThrown(ex); \ 59 catch (const std::exception& ex_) \ 61 std::ostringstream os; \ 62 os << x << ": std::exception " << ex_.what(); \ 63 StoreException ex(os.str()); \ 64 if (_throwNotListen) \ 68 else if (_pExceptionListener) \ 70 _pExceptionListener->exceptionThrown(ex); \ 75 std::ostringstream os; \ 76 os << x << ": Unknown exception"; \ 77 StoreException ex(os.str()); \ 78 if (_throwNotListen) \ 82 else if (_pExceptionListener) \ 84 _pExceptionListener->exceptionThrown(ex); \ 129 const string& trackedClientName_,
130 unsigned timeoutMillis_ = 5000,
131 bool useTimestamp_ =
false,
132 bool closeClient_ =
true,
133 bool updateFailureThrows_ =
false,
134 const string& topic_ = AMPS_SOW_STORE_DEFAULT_TOPIC,
135 const string& clientNameField_ = AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD,
136 const string& subIdField_ = AMPS_SOW_STORE_DEFAULT_SUB_FIELD,
137 const string& bookmarkField_ = AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD
141 , _serializeBuffer(0)
143 , _deserializeBuffer(0)
144 , _client(storeClient_)
145 , _trackedName(trackedClientName_)
147 , _nameField(clientNameField_)
148 , _subIdField(subIdField_)
149 , _bookmarkField(bookmarkField_)
150 , _timeoutMillis(timeoutMillis_)
151 , _closeClient(closeClient_)
153 , _throwNotListen(updateFailureThrows_)
154 , _useTimestamp(useTimestamp_)
157 if (_client.
getName() == _trackedName)
159 throw UsageException(
"The SOWRecoveryPointAdapter cannot use the tracked client to update AMPS");
161 _initSerialization();
167 delete[] _serializeBuffer;
168 delete[] _deserializeBuffer;
177 static Field emptyField;
184 .
setFilter(
"/" + _nameField +
"='" + _trackedName +
"'")
188 cmd.
setOptions(
"select=[-/,+/" + _subIdField +
",+/" 189 + _bookmarkField +
"],timestamp");
193 cmd.
setOptions(
"select=[-/,+/" + _subIdField +
",+/" 194 + _bookmarkField +
"]");
197 _msIter = _stream.
begin();
216 return next(current_);
240 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::next")
250 Field data = serialize(recoveryPoint_);
251 _client.
publish(_topic.data(), _topic.length(), data.
data(), data.
len());
253 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::update")
262 +
"='" + _trackedName +
"'");
264 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::purge")
274 + _trackedName +
"' and /" 278 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::purge(subId)")
296 _pExceptionListener = pListener_;
306 if (!_client.isValid() || !_executed)
309 if (_closeClient && _client.isValid())
320 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::close publishFlush")
330 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::close disconnect")
333 void _initSerialization()
338 if (_serializeLen == 0)
340 _serializeLen = (size_t) (_nameField.length()
341 + _trackedName.length()
342 + _subIdField.length()
343 + _bookmarkField.length()
344 + (AMPS_MAX_BOOKMARK_LEN * 4UL)
345 + SUBID_LEN + JSON_EXTRA);
346 _serializeLen += (128 - (_serializeLen % 128));
348 _serializeBuffer =
new char[_serializeLen];
349 AMPS_snprintf(_serializeBuffer, _serializeLen,
350 "{\"%s\":\"%s\",\"%s\":\"", _nameField.c_str()
351 , _trackedName.c_str()
352 , _subIdField.c_str());
353 _serializeStart = JSON_START + _nameField.length()
354 + _trackedName.length() + _subIdField.length();
356 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::initSerialization")
361 virtual void initSerialization()
363 _initSerialization();
371 const Field& timestamp_)
379 const char* start = (
const char*)memchr((
const void*)data_.
data(),
380 (int)
':', data_.
len());
383 throw StoreException(
"Failure parsing json RecoveryPoint subId, no :");
385 size_t remain = data_.
len() - (size_t)(start - data_.
data());
386 start = (
const char*)memchr((
const void*)start, (int)
'"', remain);
389 throw StoreException(
"Failure parsing json RecoveryPoint subId, no start \"");
392 remain = data_.
len() - (size_t)(start - data_.
data());
393 const char* end = (
const char*)memchr((
const void*)start,
397 throw StoreException(
"Failure parsing json RecoveryPoint subId, no end \"");
399 size_t len = (size_t)(end - start);
400 subId =
Field(start, len);
401 start = (
const char*)memchr((
const void*)start, (int)
':', data_.
len());
404 throw StoreException(
"Failure parsing json RecoveryPoint bookmark, no :");
406 remain = data_.
len() - (size_t)(start - data_.
data());
407 start = (
const char*)memchr((
const void*)start, (int)
'"', remain);
410 throw StoreException(
"Failure parsing json RecoveryPoint bookmark, no start \"");
413 remain = data_.
len() - (size_t)(start - data_.
data());
414 end = (
const char*)memchr((
const void*)start, (int)
'"', remain);
417 throw StoreException(
"Failure parsing json RecoveryPoint bookmark, no end \"");
419 len = (size_t)(end - start);
420 if (_useTimestamp && !timestamp_.
empty())
422 if (_deserializeLen < len + timestamp_.
len())
424 delete[] _deserializeBuffer;
425 _deserializeBuffer = 0;
427 if (!_deserializeBuffer)
429 _deserializeLen = len + timestamp_.
len() + 1;
430 _deserializeBuffer =
new char[_deserializeLen];
432 memcpy((
void*)_deserializeBuffer, (
const void*)start, len);
433 _deserializeBuffer[len] =
',';
434 memcpy((
void*)(_deserializeBuffer + len + 1),
435 (
const void*)timestamp_.
data(), timestamp_.
len());
436 bookmark =
Field(_deserializeBuffer, _deserializeLen);
440 bookmark =
Field(start, len);
443 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::deserialize")
455 size_t fullLen = _serializeStart + subId.
len()
456 + _bookmarkField.length() + bookmark.
len() + JSON_END;
457 if (fullLen >= _serializeLen)
459 _serializeLen = fullLen + (128 - (fullLen % 128));
460 delete[] _serializeBuffer;
464 AMPS_snprintf(_serializeBuffer + _serializeStart,
465 _serializeLen - _serializeStart,
466 "%.*s\",\"%s\":\"%.*s\"}", (
int)subId.
len()
468 , _bookmarkField.c_str()
469 , (int)bookmark.
len()
471 _serializeField.assign(_serializeBuffer, fullLen);
473 SOW_RECOVERY_HANDLE_EXCEPTION(
"SOWRecoveryPoint::serialize")
474 return _serializeField;
477 enum Constants :
size_t 486 size_t _serializeLen;
487 size_t _serializeStart;
488 Field _serializeField;
489 char* _serializeBuffer;
490 size_t _deserializeLen;
491 char* _deserializeBuffer;
493 std::string _trackedName;
495 std::string _nameField;
496 std::string _subIdField;
497 std::string _bookmarkField;
498 unsigned _timeoutMillis;
501 std::shared_ptr<const ExceptionListener> _pExceptionListener;
504 bool _throwNotListen;
509 #endif //_SOWRECOVERYPOINTADAPTER_H_ Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:789
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: SOWRecoveryPointAdapter.hpp:246
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:5086
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
Field getCommand() const
Retrieves the value of the Command header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1257
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8982
RecoveryPointAdapter virtual base class for implementing external storage of subscription recovery po...
Definition: SOWRecoveryPointAdapter.hpp:93
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:692
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5167
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
RecoveryPointImpl virtual base class provides access to the subId and bookmark needed to restart a su...
Definition: RecoveryPoint.hpp:49
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:851
virtual void purge()
Remove all data from the storage.
Definition: SOWRecoveryPointAdapter.hpp:257
virtual bool next(RecoveryPoint ¤t_)
Recovery is done by iteration over elements in storage.
Definition: SOWRecoveryPointAdapter.hpp:175
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: SOWRecoveryPointAdapter.hpp:269
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Set an exception listener on this adapter that will be notified of all exceptions that occur rather t...
Definition: SOWRecoveryPointAdapter.hpp:294
SOWRecoveryPointAdapter(const Client &storeClient_, const string &trackedClientName_, unsigned timeoutMillis_=5000, bool useTimestamp_=false, bool closeClient_=true, bool updateFailureThrows_=false, const string &topic_=AMPS_SOW_STORE_DEFAULT_TOPIC, const string &clientNameField_=AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD, const string &subIdField_=AMPS_SOW_STORE_DEFAULT_SUB_FIELD, const string &bookmarkField_=AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD)
Create a SOWRecoveryPointAdapter for a BookmarkStore that writes updated RecoveryPoints to the server...
Definition: SOWRecoveryPointAdapter.hpp:128
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:5041
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5780
virtual void close()
Take any necessary actions to close the associated storage.
Definition: SOWRecoveryPointAdapter.hpp:282
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:5033
Field getTimestamp() const
Retrieves the value of the Timestamp header of the Message as a Field which references the underlying...
Definition: Message.hpp:1492
Field getData() const
Returns the data from this message.
Definition: Message.hpp:1522
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:679
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5386
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:52
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5658
Definition: ampsplusplus.hpp:103
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8953
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:5229
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:472
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6822
const Field & getBookmark() const
Get the bookmark for this recovery point.
Definition: RecoveryPoint.hpp:91