AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.4
MemorySubscriptionManager.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_
27 #define _MEMORYSUBSCRIPTIONMANAGER_H_
28 
29 #include <amps/ampsplusplus.hpp>
30 #include <amps/Field.hpp>
31 #include <algorithm>
32 #ifdef AMPS_USE_FUNCTIONAL
33  #include <forward_list>
34 #endif
35 #include <list>
36 #include <map>
37 #include <memory>
38 #include <set>
39 
44 
45 namespace AMPS
46 {
47 
52  {
53  protected:
54 
55  class SubscriptionInfo
56  {
57  public:
58  SubscriptionInfo(MessageHandler messageHandler_,
59  const Message& message_,
60  unsigned requestedAckTypes_)
61  : _handler(messageHandler_)
62  , _m(message_)
63  , _subId(message_.getSubscriptionId())
64  , _requestedAckTypes(requestedAckTypes_)
65  , _useBookmark(!message_.getBookmark().empty())
66  , _clearSubId(false)
67  {
68  std::string options = _m.getOptions();
69  size_t replace = options.find("replace");
70  // AMPS should be ok if options contains ,,
71  static const size_t replaceLen = 7;
72  if (replace != std::string::npos)
73  {
74  options.erase(replace, replaceLen);
75  _m.setOptions(options);
76  }
77  _paused = (options.find("pause") != std::string::npos);
78  }
79 
80  ~SubscriptionInfo()
81  {
82  if (_clearSubId)
83  {
84  _m.getSubscriptionId().clear();
85  }
86  }
87 
88  void resubscribe(Client& client_, int timeout_, void* userData_)
89  {
90  // A previous NotEntitledException could have set userid on
91  // the message and that field will no longer be valid.
92  _m.setUserId((const char*)0, 0);
93  if (_useBookmark)
94  {
95  // Use the same bookmark for all members of a pause group
96  if (_paused && !_recent.empty())
97  {
98  _m.setBookmark(_recent);
99  }
100  else
101  {
102  _m.setBookmark(client_.getBookmarkStore().getMostRecent(_subId));
103  }
104  }
105  _m.newCommandId();
106  _m.setAckTypeEnum(_requestedAckTypes);
107  if (!userData_)
108  {
109  client_.send(_handler, _m, timeout_);
110  }
111  else
112  {
113  MessageHandler handler(_handler.function(), userData_);
114  client_.send(handler, _m, timeout_);
115  }
116  }
117 
118  Message message() const
119  {
120  return _m;
121  }
122 
123  MessageHandler messageHandler() const
124  {
125  return _handler;
126  }
127 
128  const Message::Field& subId() const
129  {
130  return _subId;
131  }
132 
133  unsigned requestedAcks() const
134  {
135  return _requestedAckTypes;
136  }
137 
138  // Returns true if the last subId is removed, false otherwise
139  bool removeSubId(const Message::Field& subId_)
140  {
141  size_t subIdLen = subId_.len();
142  const char* subIdData = subId_.data();
143  while (subIdLen && *subIdData == ',')
144  {
145  ++subIdData;
146  --subIdLen;
147  }
148  while (subIdLen && subIdData[subIdLen - 1] == ',')
149  {
150  --subIdLen;
151  }
152  if (subIdLen == 0 || subIdLen > _subId.len())
153  {
154  return _subId.empty();
155  }
156  bool match = true;
157  size_t matchStart = 0;
158  size_t matchCount = 0;
159  for (size_t i = 0; i < _subId.len(); ++i)
160  {
161  if (_subId.data()[i] == ',')
162  {
163  if (matchCount == subIdLen)
164  {
165  break;
166  }
167  matchStart = i + 1;
168  matchCount = 0;
169  match = true;
170  }
171  else if (match)
172  {
173  if (_subId.data()[i] == subIdData[matchCount])
174  {
175  ++matchCount;
176  }
177  else
178  {
179  matchCount = 0;
180  match = false;
181  }
182  }
183  }
184  if (match && matchCount == subIdLen)
185  {
186  size_t newLen = _subId.len() - matchCount;
187  if (newLen > 1) // More than just ,
188  {
189  while (matchStart + matchCount < _subId.len() &&
190  _subId.data()[matchStart + matchCount] == ',')
191  {
192  ++matchCount;
193  --newLen;
194  }
195  char* buffer = new char[newLen];
196  // Match is not first
197  if (matchStart > 0)
198  {
199  memcpy(buffer, _subId.data(), matchStart);
200  }
201  // Match is not last
202  if (matchStart + matchCount < _subId.len())
203  {
204  memcpy(buffer + matchStart,
205  _subId.data() + matchStart + matchCount,
206  _subId.len() - matchStart - matchCount);
207  }
208  if (_clearSubId)
209  {
210  _m.getSubscriptionId().clear();
211  }
212  else
213  {
214  _clearSubId = true;
215  }
216  _m.assignSubscriptionId(buffer, newLen);
217  _subId = _m.getSubscriptionId();
218  return false;
219  }
220  else
221  {
222  if (_clearSubId)
223  {
224  _m.getSubscriptionId().clear();
225  _clearSubId = false;
226  }
227  else
228  {
229  _m.getSubscriptionId().assign(NULL, 0);
230  }
231  _subId = _m.getSubscriptionId();
232  return true;
233  }
234  }
235  return _subId.empty();
236  }
237 
238  bool paused() const
239  {
240  return _paused;
241  }
242 
243  void pause()
244  {
245  if (_paused)
246  {
247  return;
248  }
249  std::string opts(Message::Options::Pause());
250  opts.append(_m.getOptions());
251  _m.setOptions(opts);
252  _paused = true;
253  }
254 
255  std::string getMostRecent(Client& client_)
256  {
257  if (!_recent.empty())
258  {
259  return _recent;
260  }
261  std::map<amps_uint64_t, amps_uint64_t> publishers;
262  const char* start = _subId.data();
263  const char* end = _subId.data() + _subId.len();
264  while (start < end)
265  {
266  const char* comma = (const char*)memchr(start, ',',
267  (size_t)(end - start));
268  // No more commas found, just use start->end
269  if (!comma)
270  {
271  comma = end;
272  }
273  if (comma == start)
274  {
275  start = comma + 1;
276  continue;
277  }
278  Message::Field sid(start, (size_t)(comma - start));
279  Message::Field sidRecent = client_.getBookmarkStore().getMostRecent(sid);
280  const char* sidRecentStart = sidRecent.data();
281  const char* sidRecentEnd = sidRecent.data() + sidRecent.len();
282  while (sidRecentStart < sidRecentEnd)
283  {
284  const char* sidRecentComma = (const char*)
285  memchr(sidRecentStart, ',',
286  (size_t)(sidRecentEnd - sidRecentStart));
287  // No more commas found, just use start->end
288  if (!sidRecentComma)
289  {
290  sidRecentComma = sidRecentEnd;
291  }
292  if (sidRecentComma == sidRecentStart)
293  {
294  sidRecentStart = sidRecentComma + 1;
295  continue;
296  }
297  Message::Field bookmark(sidRecentStart,
298  (size_t)(sidRecentComma - sidRecentStart));
299  amps_uint64_t publisher = (amps_uint64_t)0;
300  amps_uint64_t seq = (amps_uint64_t)0;
301  Field::parseBookmark(bookmark, publisher, seq);
302  if (publishers.count(publisher) == 0
303  || publishers[publisher] > seq)
304  {
305  publishers[publisher] = seq;
306  }
307  // Move past comma
308  sidRecentStart = sidRecentComma + 1;
309  }
310  // Move past comma
311  start = comma + 1;
312  }
313  std::ostringstream os;
314  for (std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
315  i != publishers.end();
316  ++i)
317  {
318  if (i->first == 0 && i->second == 0)
319  {
320  continue;
321  }
322  if (os.tellp() > 0)
323  {
324  os << ',';
325  }
326  os << i->first << '|' << i->second << "|";
327  }
328  _recent = os.str();
329  return _recent;
330  }
331 
332  void setMostRecent(const std::string& recent_)
333  {
334  _recent = recent_;
335  }
336 
337  private:
338  std::string _recent;
339  MessageHandler _handler;
340  Message _m;
341  Message::Field _subId;
342  unsigned _requestedAckTypes;
343  bool _useBookmark;
344  bool _paused;
345  bool _clearSubId;
346 
347  };//class SubscriptionInfo
348 
349  typedef std::map<SubscriptionInfo*, AMPSException> FailedResubscribeMap;
350 
351  class Resubscriber
352  {
353  Client& _client;
354  int _timeout;
355  public:
356  FailedResubscribeMap* _failures;
357 
358  Resubscriber(Client& client_, int timeout_)
359  : _client(client_)
360  , _timeout(timeout_)
361  {
362  _failures = new FailedResubscribeMap();
363  }
364  // We want the same pointer
365  Resubscriber(const Resubscriber& rhs_)
366  : _client(rhs_._client)
367  , _timeout(rhs_._timeout)
368  , _failures(rhs_._failures)
369  { }
370  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
371  {
372  void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
373  iter_.second->resubscribe(_client, _timeout, data);
374  }
375  void operator()(SubscriptionInfo* iter_)
376  {
377  try
378  {
379  void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
380  iter_->resubscribe(_client, _timeout, data);
381  }
382  catch (const AMPSException& ex)
383  {
384  _failures->insert(std::make_pair(iter_, ex));
385  }
386  }
387  };
388 
389  class Deleter
390  {
391  bool _clearSubId;
392  public:
393  Deleter(bool clearSubId_ = false)
394  : _clearSubId(clearSubId_)
395  { }
396  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
397  {
398  if (_clearSubId)
399  {
400  iter_.first.clear();
401  }
402  else
403  {
404  amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
405  delete iter_.second;
406  }
407  }
408  void operator()(SubscriptionInfo* iter_)
409  {
410  delete iter_;
411  }
412  };
413 
414  virtual SubscriptionInfo* createSubscriptionInfo(MessageHandler messageHandler_,
415  const Message& message_,
416  unsigned requestedAckTypes_)
417  {
418  return new SubscriptionInfo(messageHandler_, message_,
419  requestedAckTypes_);
420  }
421 
422  public:
423  typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
424 
426  : _resubscribing(0)
427  , _resubscriptionTimeout(getDefaultResubscriptionTimeout())
428  { ; }
429 
431  {
432  _clear();
433  }
434 
441  void subscribe(MessageHandler messageHandler_,
442  const Message& message_, unsigned requestedAckTypes_)
443  {
444  const Message::Field& subId = message_.getSubscriptionId();
445  if (!subId.empty())
446  {
447  Lock<Mutex> l(_lock);
448  while (_resubscribing != 0)
449  {
450  _lock.wait(10);
451  }
452  std::string options = message_.getOptions();
453  if (options.find("resume") != std::string::npos)
454  {
455  // For a resume, we store each sub id with a single Subscription
456  SubscriptionInfo* subInfo = createSubscriptionInfo(MessageHandler(),
457  message_,
458  requestedAckTypes_);
459  bool saved = false;
460  Field fullSubId = subInfo->subId();
461  const char* start = fullSubId.data();
462  const char* end = fullSubId.data() + fullSubId.len();
463  while (start < end)
464  {
465  const char* comma = (const char*)memchr(start, ',',
466  (size_t)(end - start));
467  // No more commas found, just use start->end
468  if (!comma)
469  {
470  comma = end;
471  }
472  if (comma == start)
473  {
474  start = comma + 1;
475  continue;
476  }
477  Message::Field sid = Message::Field(start,
478  (size_t)(comma - start));
479  // Calling resume on something already resumed is ignored,
480  // so don't update anything that exists.
481  if (_resumed.find(sid) == _resumed.end())
482  {
483  _resumed[sid.deepCopy()] = subInfo;
484  saved = true;
485  }
486  // Move past comma
487  start = comma + 1;
488  }
489  if (saved)
490  {
491  _resumedSet.insert(subInfo);
492  }
493  else
494  {
495  delete subInfo;
496  }
497  }
498  else if (options.find("pause") != std::string::npos)
499  {
500  const char* start = subId.data();
501  const char* end = subId.data() + subId.len();
502  while (start < end)
503  {
504  MessageHandler messageHandler = messageHandler_;
505  const char* comma = (const char*)memchr(start, ',',
506  (size_t)(end - start));
507  // No more commas found, just use start->end
508  if (!comma)
509  {
510  comma = end;
511  }
512  if (comma == start)
513  {
514  start = comma + 1;
515  continue;
516  }
517  Message::Field sid = Message::Field(start,
518  (size_t)(comma - start));
519  SubscriptionMap::iterator resume = _resumed.find(sid);
520  if (resume != _resumed.end())
521  {
522  SubscriptionInfo* subPtr = resume->second;
523  Message::Field subField(resume->first);
524  _resumed.erase(resume); // Remove mapping for sid
525  subField.clear();
526  // If last subId, remove completely
527  if (subPtr->removeSubId(sid))
528  {
529  _resumedSet.erase(subPtr);
530  delete subPtr;
531  }
532  }
533  // Move past comma
534  start = comma + 1;
535  SubscriptionMap::iterator item = _active.find(sid);
536  if (item != _active.end())
537  {
538  if (options.find("replace") != std::string::npos)
539  {
540  messageHandler = item->second->messageHandler();
541  delete item->second;
542  _active.erase(item);
543  }
544  else
545  {
546  item->second->pause();
547  continue; // Leave current one
548  }
549  }
550  else
551  {
552  Unlock<Mutex> u(_lock);
553  void* data = amps_invoke_copy_route_function(
554  messageHandler_.userData());
555  if (data)
556  {
557  messageHandler = MessageHandler(messageHandler_.function(), data);
558  }
559  }
560  Message m = message_.deepCopy();
561  m.setSubscriptionId(sid.data(), sid.len());
562  SubscriptionInfo* s = createSubscriptionInfo(messageHandler, m,
563  requestedAckTypes_);
564  // Insert using the subId from s, which is deep copy of original
565  _active[s->subId()] = s;
566  }
567  }
568  else // Not a pause or resume
569  {
570  MessageHandler messageHandler = messageHandler_;
571  SubscriptionMap::iterator item = _active.find(subId);
572  if (item != _active.end())
573  {
574  messageHandler = item->second->messageHandler();
575  delete item->second;
576  _active.erase(item);
577  }
578  else
579  {
580  Unlock<Mutex> u(_lock);
581  void* data = amps_invoke_copy_route_function(
582  messageHandler_.userData());
583  if (data)
584  {
585  messageHandler = MessageHandler(messageHandler_.function(), data);
586  }
587  }
588  SubscriptionInfo* s = createSubscriptionInfo(messageHandler,
589  message_,
590  requestedAckTypes_);
591  // Insert using the subId from s, which is deep copy of original
592  _active[s->subId()] = s;
593  }
594  }
595  }
596 
600  void unsubscribe(const Message::Field& subId_)
601  {
602  Lock<Mutex> l(_lock);
603  SubscriptionMap::iterator item = _active.find(subId_);
604  if (item != _active.end())
605  {
606  SubscriptionInfo* subPtr = item->second;
607  _active.erase(item);
608  while (_resubscribing != 0)
609  {
610  _lock.wait(10);
611  }
612  Unlock<Mutex> u(_lock);
613  amps_invoke_remove_route_function(subPtr->messageHandler().userData());
614  delete subPtr;
615  }
616  item = _resumed.find(subId_);
617  if (item != _resumed.end())
618  {
619  SubscriptionInfo* subPtr = item->second;
620  Message::Field subField(item->first);
621  _resumed.erase(item);
622  subField.clear();
623  // If last subId, remove completely
624  if (subPtr->removeSubId(subId_))
625  {
626  _resumedSet.erase(subPtr);
627  while (_resubscribing != 0)
628  {
629  _lock.wait(10);
630  }
631  delete subPtr;
632  }
633  }
634  }
635 
638  void clear()
639  {
640  _clear();
641  }
642 
645  void _clear()
646  {
647  Lock<Mutex> l(_lock);
648  while (_resubscribing != 0)
649  {
650  _lock.wait(10);
651  }
652  // Settting _resubscribing keeps other threads from touching data
653  // even if lock isn't held. Don't want to hold lock when
654  // amps_invoke_remove_route_function is called.
655  AtomicFlagFlip resubFlip(&_resubscribing);
656  {
657  Unlock<Mutex> u(_lock);
658  std::for_each(_active.begin(), _active.end(), Deleter());
659  std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
660  std::for_each(_resumed.begin(), _resumed.end(), Deleter(true));
661  }
662  _active.clear();
663  _resumed.clear();
664  _resumedSet.clear();
665  }
666 
670  void resubscribe(Client& client_)
671  {
672  // At this point, it's better to throw an exception back to disconnect
673  // handling than to attempt a reconnect in send, so turn off retry.
674  bool retry = client_.getRetryOnDisconnect();
675  client_.setRetryOnDisconnect(false);
676 #ifdef AMPS_USE_FUNCTIONAL
677  std::forward_list<SubscriptionInfo*> subscriptions;
678 #else
679  std::list<SubscriptionInfo*> subscriptions;
680 #endif
681  Resubscriber resubscriber(client_, _resubscriptionTimeout);
682  try
683  {
684  AtomicFlagFlip resubFlip(&_resubscribing);
685  {
686  Lock<Mutex> l(_lock);
687  subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
688  for (SubscriptionMap::iterator iter = _active.begin();
689  iter != _active.end(); ++iter)
690  {
691  SubscriptionInfo* sub = iter->second;
692  if (sub->paused())
693  {
694  SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
695  // All pause subs resuming together should be sent with
696  // bookmark as list of the resumes' most recents
697  if (resIter != _resumed.end())
698  {
699  sub->setMostRecent(resIter->second->getMostRecent(client_));
700  }
701  }
702  subscriptions.push_front(iter->second);
703  }
704  }
705  std::for_each(subscriptions.begin(), subscriptions.end(),
706  resubscriber);
707  std::vector<SubscriptionInfo*> removals;
708  bool throwExcept = false;
709  AMPSException except("None", AMPS_E_OK);
710  if (_failedResubscribeHandler)
711  {
712  try
713  {
714  for (auto failedSub = resubscriber._failures->begin();
715  failedSub != resubscriber._failures->end(); ++failedSub)
716  {
717  SubscriptionInfo* pSubInfo = failedSub->first;
718  if (_failedResubscribeHandler->failure(pSubInfo->message(),
719  pSubInfo->messageHandler(),
720  pSubInfo->requestedAcks(),
721  failedSub->second))
722  {
723  removals.push_back(pSubInfo);
724  }
725  else // We'll rethrow an exception for failure left in place
726  {
727  except = failedSub->second;
728  throwExcept = true;
729  }
730  }
731  }
732  catch (const AMPSException& ex_)
733  {
734  except = ex_;
735  throwExcept = true;
736  }
737  catch (const std::exception& ex_)
738  {
739  except = AMPSException(ex_.what(), AMPS_E_RETRY);
740  throwExcept = true;
741  }
742  catch (...)
743  {
744  except = AMPSException("Unknown Exception thrown by FailedResubscribeHandler", AMPS_E_RETRY);
745  throwExcept = true;
746  }
747  }
748  else
749  {
750  throwExcept = !resubscriber._failures->empty();
751  if (throwExcept)
752  {
753  except = resubscriber._failures->begin()->second;
754  }
755  }
756  // Remove any failiures that should be removed
757  if (_failedResubscribeHandler && !removals.empty())
758  {
759  Lock<Mutex> l(_lock);
760  for (std::vector<SubscriptionInfo*>::iterator pSubInfo = removals.begin();
761  pSubInfo != removals.end(); ++pSubInfo)
762  {
763  _active.erase((*pSubInfo)->subId());
764  _resumed.erase((*pSubInfo)->subId());
765  _resumedSet.erase(*pSubInfo);
766  delete *pSubInfo;
767  }
768  }
769  // Throw an exception not removed
770  if (throwExcept)
771  {
772  throw except;
773  }
774  delete resubscriber._failures;
775  resubscriber._failures = 0;
776  client_.setRetryOnDisconnect(retry);
777  }
778  catch (const AMPSException&)
779  {
780  delete resubscriber._failures;
781  resubscriber._failures = 0;
782  client_.setRetryOnDisconnect(retry);
783  throw;
784  }
785  catch (const std::exception&)
786  {
787  delete resubscriber._failures;
788  resubscriber._failures = 0;
789  client_.setRetryOnDisconnect(retry);
790  throw;
791  }
792  }
793 
797  void setResubscriptionTimeout(int timeout_)
798  {
799  if (timeout_ >= 0)
800  {
801  _resubscriptionTimeout = timeout_;
802  }
803  }
804 
809  {
810  return _resubscriptionTimeout;
811  }
812 
817  static int setDefaultResubscriptionTimeout(int timeout_)
818  {
819  static int _defaultResubscriptionTimeout =
820  AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
821  if (timeout_ >= 0)
822  {
823  _defaultResubscriptionTimeout = timeout_;
824  }
825  return _defaultResubscriptionTimeout;
826  }
827 
833  {
835  }
836 
837  private:
838 
839  SubscriptionMap _active;
840  SubscriptionMap _resumed;
841  std::set<SubscriptionInfo*> _resumedSet;
842  Mutex _lock;
843  AMPS_ATOMIC_TYPE_8 _resubscribing;
844  int _resubscriptionTimeout;
845  }; //class MemorySubscriptionManager
846 
847 } // namespace AMPS
848 
849 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_
850 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:808
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:832
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5274
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7368
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:797
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1453
Success.
Definition: amps.h:221
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5069
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:645
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:670
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5395
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:441
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:638
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:600
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:51
Definition: ampsplusplus.hpp:102
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:817
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7377