AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.0
MemorySubscriptionManager.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_
27 #define _MEMORYSUBSCRIPTIONMANAGER_H_
28 
29 #include <amps/ampsplusplus.hpp>
30 #include <amps/Field.hpp>
31 #include <algorithm>
32 #ifdef AMPS_USE_FUNCTIONAL
33  #include <forward_list>
34 #endif
35 #include <list>
36 #include <map>
37 #include <memory>
38 #include <set>
39 
44 
45 namespace AMPS
46 {
47 
52  {
53  protected:
54 
55  class SubscriptionInfo
56  {
57  public:
58  SubscriptionInfo(MessageHandler messageHandler_,
59  const Message& message_,
60  unsigned requestedAckTypes_)
61  : _handler(messageHandler_)
62  , _m(message_)
63  , _subId(message_.getSubscriptionId())
64  , _requestedAckTypes(requestedAckTypes_)
65  , _useBookmark(!message_.getBookmark().empty())
66  , _clearSubId(false)
67  {
68  std::string options = _m.getOptions();
69  size_t replace = options.find("replace");
70  // AMPS should be ok if options contains ,,
71  static const size_t replaceLen = 7;
72  if (replace != std::string::npos)
73  {
74  options.erase(replace, replaceLen);
75  _m.setOptions(options);
76  }
77  _paused = (options.find("pause") != std::string::npos);
78  }
79 
80  ~SubscriptionInfo()
81  {
82  if (_clearSubId)
83  {
84  _m.getSubscriptionId().clear();
85  }
86  }
87 
88  void resubscribe(Client& client_, int timeout_, void* userData_)
89  {
90  // A previous NotEntitledException could have set userid on
91  // the message and that field will no longer be valid.
92  _m.setUserId((const char*)0, 0);
93  if (_useBookmark)
94  {
95  // Use the same bookmark for all members of a pause group
96  if (_paused && !_recent.empty())
97  {
98  _m.setBookmark(_recent);
99  }
100  else
101  {
102  _m.assignOwnershipBookmark(client_.getBookmarkStore().getMostRecent(_subId));
103  }
104  }
105  _m.newCommandId();
106  _m.setAckTypeEnum(_requestedAckTypes);
107  if (!userData_)
108  {
109  client_.send(_handler, _m, timeout_);
110  }
111  else
112  {
113  MessageHandler handler(_handler.function(), userData_);
114  client_.send(handler, _m, timeout_);
115  }
116  }
117 
118  Message message() const
119  {
120  return _m;
121  }
122 
123  MessageHandler messageHandler() const
124  {
125  return _handler;
126  }
127 
128  const Message::Field& subId() const
129  {
130  return _subId;
131  }
132 
133  unsigned requestedAcks() const
134  {
135  return _requestedAckTypes;
136  }
137 
138  // Returns true if the last subId is removed, false otherwise
139  bool removeSubId(const Message::Field& subId_)
140  {
141  size_t subIdLen = subId_.len();
142  const char* subIdData = subId_.data();
143  while (subIdLen && *subIdData == ',')
144  {
145  ++subIdData;
146  --subIdLen;
147  }
148  while (subIdLen && subIdData[subIdLen - 1] == ',')
149  {
150  --subIdLen;
151  }
152  if (subIdLen == 0 || subIdLen > _subId.len())
153  {
154  return _subId.empty();
155  }
156  bool match = true;
157  size_t matchStart = 0;
158  size_t matchCount = 0;
159  for (size_t i = 0; i < _subId.len(); ++i)
160  {
161  if (_subId.data()[i] == ',')
162  {
163  if (matchCount == subIdLen)
164  {
165  break;
166  }
167  matchStart = i + 1;
168  matchCount = 0;
169  match = true;
170  }
171  else if (match)
172  {
173  if (_subId.data()[i] == subIdData[matchCount])
174  {
175  ++matchCount;
176  }
177  else
178  {
179  matchCount = 0;
180  match = false;
181  }
182  }
183  }
184  if (match && matchCount == subIdLen)
185  {
186  size_t newLen = _subId.len() - matchCount;
187  if (newLen > 1) // More than just ,
188  {
189  while (matchStart + matchCount < _subId.len() &&
190  _subId.data()[matchStart + matchCount] == ',')
191  {
192  ++matchCount;
193  --newLen;
194  }
195  char* buffer = (char*)malloc(newLen);
196  // Match is not first
197  if (matchStart > 0)
198  {
199  memcpy(buffer, _subId.data(), matchStart);
200  }
201  // Match is not last
202  if (matchStart + matchCount < _subId.len())
203  {
204  memcpy(buffer + matchStart,
205  _subId.data() + matchStart + matchCount,
206  _subId.len() - matchStart - matchCount);
207  }
208  if (_clearSubId)
209  {
210  _m.getSubscriptionId().clear();
211  }
212  else
213  {
214  _clearSubId = true;
215  }
216  _m.assignSubscriptionId(buffer, newLen);
217  _subId = _m.getSubscriptionId();
218  return false;
219  }
220  else
221  {
222  if (_clearSubId)
223  {
224  _m.getSubscriptionId().clear();
225  _clearSubId = false;
226  }
227  else
228  {
229  _m.getSubscriptionId().assign(NULL, 0);
230  }
231  _subId = _m.getSubscriptionId();
232  return true;
233  }
234  }
235  return _subId.empty();
236  }
237 
238  bool paused() const
239  {
240  return _paused;
241  }
242 
243  void pause()
244  {
245  if (_paused)
246  {
247  return;
248  }
249  std::string opts(Message::Options::Pause());
250  opts.append(_m.getOptions());
251  _m.setOptions(opts);
252  _paused = true;
253  }
254 
255  std::string getMostRecent(Client& client_)
256  {
257  if (!_recent.empty())
258  {
259  return _recent;
260  }
261  std::map<amps_uint64_t, amps_uint64_t> publishers;
262  const char* start = _subId.data();
263  const char* end = _subId.data() + _subId.len();
264  while (start < end)
265  {
266  const char* comma = (const char*)memchr(start, ',',
267  (size_t)(end - start));
268  // No more commas found, just use start->end
269  if (!comma)
270  {
271  comma = end;
272  }
273  if (comma == start)
274  {
275  start = comma + 1;
276  continue;
277  }
278  Message::Field sid(start, (size_t)(comma - start));
279  Message::Field sidRecent = client_.getBookmarkStore().getMostRecent(sid);
280  const char* sidRecentStart = sidRecent.data();
281  const char* sidRecentEnd = sidRecent.data() + sidRecent.len();
282  while (sidRecentStart < sidRecentEnd)
283  {
284  const char* sidRecentComma = (const char*)
285  memchr(sidRecentStart, ',',
286  (size_t)(sidRecentEnd - sidRecentStart));
287  // No more commas found, just use start->end
288  if (!sidRecentComma)
289  {
290  sidRecentComma = sidRecentEnd;
291  }
292  if (sidRecentComma == sidRecentStart)
293  {
294  sidRecentStart = sidRecentComma + 1;
295  continue;
296  }
297  Message::Field bookmark(sidRecentStart,
298  (size_t)(sidRecentComma - sidRecentStart));
299  amps_uint64_t publisher = (amps_uint64_t)0;
300  amps_uint64_t seq = (amps_uint64_t)0;
301  Field::parseBookmark(bookmark, publisher, seq);
302  if (publishers.count(publisher) == 0
303  || publishers[publisher] > seq)
304  {
305  publishers[publisher] = seq;
306  }
307  // Move past comma
308  sidRecentStart = sidRecentComma + 1;
309  }
310  // Move past comma
311  start = comma + 1;
312  sidRecent.clear();
313  }
314  std::ostringstream os;
315  for (std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
316  i != publishers.end();
317  ++i)
318  {
319  if (i->first == 0 && i->second == 0)
320  {
321  continue;
322  }
323  if (os.tellp() > 0)
324  {
325  os << ',';
326  }
327  os << i->first << '|' << i->second << "|";
328  }
329  _recent = os.str();
330  return _recent;
331  }
332 
333  void setMostRecent(const std::string& recent_)
334  {
335  _recent = recent_;
336  }
337 
338  private:
339  std::string _recent;
340  MessageHandler _handler;
341  Message _m;
342  Message::Field _subId;
343  unsigned _requestedAckTypes;
344  bool _useBookmark;
345  bool _paused;
346  bool _clearSubId;
347 
348  };//class SubscriptionInfo
349 
350  typedef std::map<SubscriptionInfo*, AMPSException> FailedResubscribeMap;
351 
352  class Resubscriber
353  {
354  Client& _client;
355  int _timeout;
356  public:
357  FailedResubscribeMap* _failures;
358 
359  Resubscriber(Client& client_, int timeout_)
360  : _client(client_)
361  , _timeout(timeout_)
362  {
363  _failures = new FailedResubscribeMap();
364  }
365  // We want the same pointer
366  Resubscriber(const Resubscriber& rhs_)
367  : _client(rhs_._client)
368  , _timeout(rhs_._timeout)
369  , _failures(rhs_._failures)
370  { }
371  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
372  {
373  void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
374  iter_.second->resubscribe(_client, _timeout, data);
375  }
376  void operator()(SubscriptionInfo* iter_)
377  {
378  try
379  {
380  void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
381  iter_->resubscribe(_client, _timeout, data);
382  }
383  catch (const AMPSException& ex)
384  {
385  _failures->insert(std::make_pair(iter_, ex));
386  }
387  }
388  };
389 
390  class Deleter
391  {
392  bool _clearSubId;
393  public:
394  Deleter(bool clearSubId_ = false)
395  : _clearSubId(clearSubId_)
396  { }
397  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
398  {
399  if (_clearSubId)
400  {
401  iter_.first.clear();
402  }
403  else
404  {
405  amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
406  delete iter_.second;
407  }
408  }
409  void operator()(SubscriptionInfo* iter_)
410  {
411  delete iter_;
412  }
413  };
414 
415  virtual SubscriptionInfo* createSubscriptionInfo(MessageHandler messageHandler_,
416  const Message& message_,
417  unsigned requestedAckTypes_)
418  {
419  return new SubscriptionInfo(messageHandler_, message_,
420  requestedAckTypes_);
421  }
422 
423  public:
424  typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
425 
427  : _resubscribing(0)
428  , _resubscriptionTimeout(getDefaultResubscriptionTimeout())
429  { ; }
430 
432  {
433  _clear();
434  }
435 
442  void subscribe(MessageHandler messageHandler_,
443  const Message& message_, unsigned requestedAckTypes_)
444  {
445  const Message::Field& subId = message_.getSubscriptionId();
446  if (!subId.empty())
447  {
448  Lock<Mutex> l(_lock);
449  while (_resubscribing != 0)
450  {
451  _lock.wait(10);
452  }
453  std::string options = message_.getOptions();
454  if (options.find("resume") != std::string::npos)
455  {
456  // For a resume, we store each sub id with a single Subscription
457  SubscriptionInfo* subInfo = createSubscriptionInfo(MessageHandler(),
458  message_,
459  requestedAckTypes_);
460  bool saved = false;
461  Field fullSubId = subInfo->subId();
462  const char* start = fullSubId.data();
463  const char* end = fullSubId.data() + fullSubId.len();
464  while (start < end)
465  {
466  const char* comma = (const char*)memchr(start, ',',
467  (size_t)(end - start));
468  // No more commas found, just use start->end
469  if (!comma)
470  {
471  comma = end;
472  }
473  if (comma == start)
474  {
475  start = comma + 1;
476  continue;
477  }
478  Message::Field sid = Message::Field(start,
479  (size_t)(comma - start));
480  // Calling resume on something already resumed is ignored,
481  // so don't update anything that exists.
482  if (_resumed.find(sid) == _resumed.end())
483  {
484  _resumed[sid.deepCopy()] = subInfo;
485  saved = true;
486  }
487  // Move past comma
488  start = comma + 1;
489  }
490  if (saved)
491  {
492  _resumedSet.insert(subInfo);
493  }
494  else
495  {
496  delete subInfo;
497  }
498  }
499  else if (options.find("pause") != std::string::npos)
500  {
501  const char* start = subId.data();
502  const char* end = subId.data() + subId.len();
503  while (start < end)
504  {
505  MessageHandler messageHandler = messageHandler_;
506  const char* comma = (const char*)memchr(start, ',',
507  (size_t)(end - start));
508  // No more commas found, just use start->end
509  if (!comma)
510  {
511  comma = end;
512  }
513  if (comma == start)
514  {
515  start = comma + 1;
516  continue;
517  }
518  Message::Field sid = Message::Field(start,
519  (size_t)(comma - start));
520  SubscriptionMap::iterator resume = _resumed.find(sid);
521  if (resume != _resumed.end())
522  {
523  SubscriptionInfo* subPtr = resume->second;
524  Message::Field subField(resume->first);
525  _resumed.erase(resume); // Remove mapping for sid
526  subField.clear();
527  // If last subId, remove completely
528  if (subPtr->removeSubId(sid))
529  {
530  _resumedSet.erase(subPtr);
531  delete subPtr;
532  }
533  }
534  // Move past comma
535  start = comma + 1;
536  SubscriptionMap::iterator item = _active.find(sid);
537  if (item != _active.end())
538  {
539  if (options.find("replace") != std::string::npos)
540  {
541  messageHandler = item->second->messageHandler();
542  delete item->second;
543  _active.erase(item);
544  }
545  else
546  {
547  item->second->pause();
548  continue; // Leave current one
549  }
550  }
551  else
552  {
553  Unlock<Mutex> u(_lock);
554  void* data = amps_invoke_copy_route_function(
555  messageHandler_.userData());
556  if (data)
557  {
558  messageHandler = MessageHandler(messageHandler_.function(), data);
559  }
560  }
561  Message m = message_.deepCopy();
562  m.setSubscriptionId(sid.data(), sid.len());
563  SubscriptionInfo* s = createSubscriptionInfo(messageHandler, m,
564  requestedAckTypes_);
565  // Insert using the subId from s, which is deep copy of original
566  _active[s->subId()] = s;
567  }
568  }
569  else // Not a pause or resume
570  {
571  MessageHandler messageHandler = messageHandler_;
572  SubscriptionMap::iterator item = _active.find(subId);
573  if (item != _active.end())
574  {
575  messageHandler = item->second->messageHandler();
576  delete item->second;
577  _active.erase(item);
578  }
579  else
580  {
581  Unlock<Mutex> u(_lock);
582  void* data = amps_invoke_copy_route_function(
583  messageHandler_.userData());
584  if (data)
585  {
586  messageHandler = MessageHandler(messageHandler_.function(), data);
587  }
588  }
589  SubscriptionInfo* s = createSubscriptionInfo(messageHandler,
590  message_,
591  requestedAckTypes_);
592  // Insert using the subId from s, which is deep copy of original
593  _active[s->subId()] = s;
594  }
595  }
596  }
597 
601  void unsubscribe(const Message::Field& subId_)
602  {
603  Lock<Mutex> l(_lock);
604  SubscriptionMap::iterator item = _active.find(subId_);
605  if (item != _active.end())
606  {
607  SubscriptionInfo* subPtr = item->second;
608  _active.erase(item);
609  while (_resubscribing != 0)
610  {
611  _lock.wait(10);
612  }
613  Unlock<Mutex> u(_lock);
614  amps_invoke_remove_route_function(subPtr->messageHandler().userData());
615  delete subPtr;
616  }
617  item = _resumed.find(subId_);
618  if (item != _resumed.end())
619  {
620  SubscriptionInfo* subPtr = item->second;
621  Message::Field subField(item->first);
622  _resumed.erase(item);
623  subField.clear();
624  // If last subId, remove completely
625  if (subPtr->removeSubId(subId_))
626  {
627  _resumedSet.erase(subPtr);
628  while (_resubscribing != 0)
629  {
630  _lock.wait(10);
631  }
632  delete subPtr;
633  }
634  }
635  }
636 
639  void clear()
640  {
641  _clear();
642  }
643 
646  void _clear()
647  {
648  Lock<Mutex> l(_lock);
649  while (_resubscribing != 0)
650  {
651  _lock.wait(10);
652  }
653  // Settting _resubscribing keeps other threads from touching data
654  // even if lock isn't held. Don't want to hold lock when
655  // amps_invoke_remove_route_function is called.
656  AtomicFlagFlip resubFlip(&_resubscribing);
657  {
658  Unlock<Mutex> u(_lock);
659  std::for_each(_active.begin(), _active.end(), Deleter());
660  std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
661  std::for_each(_resumed.begin(), _resumed.end(), Deleter(true));
662  }
663  _active.clear();
664  _resumed.clear();
665  _resumedSet.clear();
666  }
667 
671  void resubscribe(Client& client_)
672  {
673  // At this point, it's better to throw an exception back to disconnect
674  // handling than to attempt a reconnect in send, so turn off retry.
675  bool retry = client_.getRetryOnDisconnect();
676  client_.setRetryOnDisconnect(false);
677 #ifdef AMPS_USE_FUNCTIONAL
678  std::forward_list<SubscriptionInfo*> subscriptions;
679 #else
680  std::list<SubscriptionInfo*> subscriptions;
681 #endif
682  Resubscriber resubscriber(client_, _resubscriptionTimeout);
683  try
684  {
685  AtomicFlagFlip resubFlip(&_resubscribing);
686  {
687  Lock<Mutex> l(_lock);
688  subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
689  for (SubscriptionMap::iterator iter = _active.begin();
690  iter != _active.end(); ++iter)
691  {
692  SubscriptionInfo* sub = iter->second;
693  if (sub->paused())
694  {
695  SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
696  // All pause subs resuming together should be sent with
697  // bookmark as list of the resumes' most recents
698  if (resIter != _resumed.end())
699  {
700  sub->setMostRecent(resIter->second->getMostRecent(client_));
701  }
702  }
703  subscriptions.push_front(iter->second);
704  }
705  }
706  std::for_each(subscriptions.begin(), subscriptions.end(),
707  resubscriber);
708  std::vector<SubscriptionInfo*> removals;
709  bool throwExcept = false;
710  AMPSException except("None", AMPS_E_OK);
711  if (_failedResubscribeHandler)
712  {
713  try
714  {
715  for (auto failedSub = resubscriber._failures->begin();
716  failedSub != resubscriber._failures->end(); ++failedSub)
717  {
718  SubscriptionInfo* pSubInfo = failedSub->first;
719  if (_failedResubscribeHandler->failure(pSubInfo->message(),
720  pSubInfo->messageHandler(),
721  pSubInfo->requestedAcks(),
722  failedSub->second))
723  {
724  removals.push_back(pSubInfo);
725  }
726  else // We'll rethrow an exception for failure left in place
727  {
728  except = failedSub->second;
729  throwExcept = true;
730  }
731  }
732  }
733  catch (const AMPSException& ex_)
734  {
735  except = ex_;
736  throwExcept = true;
737  }
738  catch (const std::exception& ex_)
739  {
740  except = AMPSException(ex_.what(), AMPS_E_RETRY);
741  throwExcept = true;
742  }
743  catch (...)
744  {
745  except = AMPSException("Unknown Exception thrown by FailedResubscribeHandler", AMPS_E_RETRY);
746  throwExcept = true;
747  }
748  }
749  else
750  {
751  throwExcept = !resubscriber._failures->empty();
752  if (throwExcept)
753  {
754  except = resubscriber._failures->begin()->second;
755  }
756  }
757  // Remove any failiures that should be removed
758  if (_failedResubscribeHandler && !removals.empty())
759  {
760  Lock<Mutex> l(_lock);
761  for (std::vector<SubscriptionInfo*>::iterator pSubInfo = removals.begin();
762  pSubInfo != removals.end(); ++pSubInfo)
763  {
764  _active.erase((*pSubInfo)->subId());
765  _resumed.erase((*pSubInfo)->subId());
766  _resumedSet.erase(*pSubInfo);
767  delete *pSubInfo;
768  }
769  }
770  // Throw an exception not removed
771  if (throwExcept)
772  {
773  throw except;
774  }
775  delete resubscriber._failures;
776  resubscriber._failures = 0;
777  client_.setRetryOnDisconnect(retry);
778  }
779  catch (const AMPSException&)
780  {
781  delete resubscriber._failures;
782  resubscriber._failures = 0;
783  client_.setRetryOnDisconnect(retry);
784  throw;
785  }
786  catch (const std::exception&)
787  {
788  delete resubscriber._failures;
789  resubscriber._failures = 0;
790  client_.setRetryOnDisconnect(retry);
791  throw;
792  }
793  }
794 
798  void setResubscriptionTimeout(int timeout_)
799  {
800  if (timeout_ >= 0)
801  {
802  _resubscriptionTimeout = timeout_;
803  }
804  }
805 
810  {
811  return _resubscriptionTimeout;
812  }
813 
818  static int setDefaultResubscriptionTimeout(int timeout_)
819  {
820  static int _defaultResubscriptionTimeout =
821  AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
822  if (timeout_ >= 0)
823  {
824  _defaultResubscriptionTimeout = timeout_;
825  }
826  return _defaultResubscriptionTimeout;
827  }
828 
834  {
836  }
837 
838  private:
839 
840  SubscriptionMap _active;
841  SubscriptionMap _resumed;
842  std::set<SubscriptionInfo*> _resumedSet;
843  Mutex _lock;
844  AMPS_ATOMIC_TYPE_8 _resubscribing;
845  int _resubscriptionTimeout;
846  }; //class MemorySubscriptionManager
847 
848 } // namespace AMPS
849 
850 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_
851 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a Field which references the under...
Definition: Message.hpp:1489
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:587
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:809
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:539
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:833
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5404
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7535
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:798
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:260
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1461
Success.
Definition: amps.h:221
Field getOptions() const
Retrieves the value of the Options header of the Message as a Field which references the underlying b...
Definition: Message.hpp:1378
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:273
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5167
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:646
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1489
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:671
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:280
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5538
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:442
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:639
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:601
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:51
Definition: ampsplusplus.hpp:103
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:818
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a Field which references the underlying ...
Definition: Message.hpp:1256
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7544