26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_ 27 #define _MEMORYSUBSCRIPTIONMANAGER_H_ 32 #ifdef AMPS_USE_FUNCTIONAL 33 #include <forward_list> 55 class SubscriptionInfo
60 unsigned requestedAckTypes_)
61 : _handler(messageHandler_)
64 , _requestedAckTypes(requestedAckTypes_)
68 std::string options = _m.getOptions();
69 size_t replace = options.find(
"replace");
71 static const size_t replaceLen = 7;
72 if (replace != std::string::npos)
74 options.erase(replace, replaceLen);
75 _m.setOptions(options);
77 _paused = (options.find(
"pause") != std::string::npos);
84 _m.getSubscriptionId().clear();
92 _m.setUserId((
const char*)0, 0);
96 if (_paused && !_recent.empty())
98 _m.setBookmark(_recent);
106 _m.setAckTypeEnum(_requestedAckTypes);
109 client_.
send(_handler, _m, timeout_);
114 client_.
send(handler, _m, timeout_);
133 unsigned requestedAcks()
const 135 return _requestedAckTypes;
141 size_t subIdLen = subId_.
len();
142 const char* subIdData = subId_.
data();
143 while (subIdLen && *subIdData ==
',')
148 while (subIdLen && subIdData[subIdLen - 1] ==
',')
152 if (subIdLen == 0 || subIdLen > _subId.len())
154 return _subId.empty();
157 size_t matchStart = 0;
158 size_t matchCount = 0;
159 for (
size_t i = 0; i < _subId.len(); ++i)
161 if (_subId.data()[i] ==
',')
163 if (matchCount == subIdLen)
173 if (_subId.data()[i] == subIdData[matchCount])
184 if (match && matchCount == subIdLen)
186 size_t newLen = _subId.len() - matchCount;
189 while (matchStart + matchCount < _subId.len() &&
190 _subId.data()[matchStart + matchCount] ==
',')
195 char* buffer =
new char[newLen];
199 memcpy(buffer, _subId.data(), matchStart);
202 if (matchStart + matchCount < _subId.len())
204 memcpy(buffer + matchStart,
205 _subId.data() + matchStart + matchCount,
206 _subId.len() - matchStart - matchCount);
210 _m.getSubscriptionId().clear();
216 _m.assignSubscriptionId(buffer, newLen);
217 _subId = _m.getSubscriptionId();
224 _m.getSubscriptionId().clear();
229 _m.getSubscriptionId().assign(NULL, 0);
231 _subId = _m.getSubscriptionId();
235 return _subId.empty();
249 std::string opts(Message::Options::Pause());
250 opts.append(_m.getOptions());
255 std::string getMostRecent(
Client& client_)
257 if (!_recent.empty())
261 std::map<amps_uint64_t, amps_uint64_t> publishers;
262 const char* start = _subId.data();
263 const char* end = _subId.data() + _subId.len();
266 const char* comma = (
const char*)memchr(start,
',',
267 (
size_t)(end - start));
280 const char* sidRecentStart = sidRecent.
data();
281 const char* sidRecentEnd = sidRecent.
data() + sidRecent.
len();
282 while (sidRecentStart < sidRecentEnd)
284 const char* sidRecentComma = (
const char*)
285 memchr(sidRecentStart,
',',
286 (
size_t)(sidRecentEnd - sidRecentStart));
290 sidRecentComma = sidRecentEnd;
292 if (sidRecentComma == sidRecentStart)
294 sidRecentStart = sidRecentComma + 1;
298 (
size_t)(sidRecentComma - sidRecentStart));
299 amps_uint64_t publisher = (amps_uint64_t)0;
300 amps_uint64_t seq = (amps_uint64_t)0;
301 Field::parseBookmark(bookmark, publisher, seq);
302 if (publishers.count(publisher) == 0
303 || publishers[publisher] > seq)
305 publishers[publisher] = seq;
308 sidRecentStart = sidRecentComma + 1;
313 std::ostringstream os;
314 for (std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
315 i != publishers.end();
318 if (i->first == 0 && i->second == 0)
326 os << i->first <<
'|' << i->second <<
"|";
332 void setMostRecent(
const std::string& recent_)
342 unsigned _requestedAckTypes;
349 typedef std::map<SubscriptionInfo*, AMPSException> FailedResubscribeMap;
356 FailedResubscribeMap* _failures;
358 Resubscriber(
Client& client_,
int timeout_)
362 _failures =
new FailedResubscribeMap();
365 Resubscriber(
const Resubscriber& rhs_)
366 : _client(rhs_._client)
367 , _timeout(rhs_._timeout)
368 , _failures(rhs_._failures)
370 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
372 void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
373 iter_.second->resubscribe(_client, _timeout, data);
375 void operator()(SubscriptionInfo* iter_)
379 void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
380 iter_->resubscribe(_client, _timeout, data);
382 catch (
const AMPSException& ex)
384 _failures->insert(std::make_pair(iter_, ex));
393 Deleter(
bool clearSubId_ =
false)
394 : _clearSubId(clearSubId_)
396 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
404 amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
408 void operator()(SubscriptionInfo* iter_)
414 virtual SubscriptionInfo* createSubscriptionInfo(
MessageHandler messageHandler_,
416 unsigned requestedAckTypes_)
418 return new SubscriptionInfo(messageHandler_, message_,
423 typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
442 const Message& message_,
unsigned requestedAckTypes_)
447 Lock<Mutex> l(_lock);
448 while (_resubscribing != 0)
453 if (options.find(
"resume") != std::string::npos)
456 SubscriptionInfo* subInfo = createSubscriptionInfo(
MessageHandler(),
460 Field fullSubId = subInfo->subId();
461 const char* start = fullSubId.
data();
462 const char* end = fullSubId.
data() + fullSubId.
len();
465 const char* comma = (
const char*)memchr(start,
',',
466 (
size_t)(end - start));
478 (
size_t)(comma - start));
481 if (_resumed.find(sid) == _resumed.end())
491 _resumedSet.insert(subInfo);
498 else if (options.find(
"pause") != std::string::npos)
500 const char* start = subId.
data();
501 const char* end = subId.
data() + subId.
len();
505 const char* comma = (
const char*)memchr(start,
',',
506 (
size_t)(end - start));
518 (
size_t)(comma - start));
519 SubscriptionMap::iterator resume = _resumed.find(sid);
520 if (resume != _resumed.end())
522 SubscriptionInfo* subPtr = resume->second;
524 _resumed.erase(resume);
527 if (subPtr->removeSubId(sid))
529 _resumedSet.erase(subPtr);
535 SubscriptionMap::iterator item = _active.find(sid);
536 if (item != _active.end())
538 if (options.find(
"replace") != std::string::npos)
540 messageHandler = item->second->messageHandler();
546 item->second->pause();
552 Unlock<Mutex> u(_lock);
553 void* data = amps_invoke_copy_route_function(
554 messageHandler_.userData());
557 messageHandler =
MessageHandler(messageHandler_.function(), data);
562 SubscriptionInfo* s = createSubscriptionInfo(messageHandler, m,
565 _active[s->subId()] = s;
571 SubscriptionMap::iterator item = _active.find(subId);
572 if (item != _active.end())
574 messageHandler = item->second->messageHandler();
580 Unlock<Mutex> u(_lock);
581 void* data = amps_invoke_copy_route_function(
582 messageHandler_.userData());
585 messageHandler =
MessageHandler(messageHandler_.function(), data);
588 SubscriptionInfo* s = createSubscriptionInfo(messageHandler,
592 _active[s->subId()] = s;
602 Lock<Mutex> l(_lock);
603 SubscriptionMap::iterator item = _active.find(subId_);
604 if (item != _active.end())
606 SubscriptionInfo* subPtr = item->second;
608 while (_resubscribing != 0)
612 Unlock<Mutex> u(_lock);
613 amps_invoke_remove_route_function(subPtr->messageHandler().userData());
616 item = _resumed.find(subId_);
617 if (item != _resumed.end())
619 SubscriptionInfo* subPtr = item->second;
621 _resumed.erase(item);
624 if (subPtr->removeSubId(subId_))
626 _resumedSet.erase(subPtr);
627 while (_resubscribing != 0)
647 Lock<Mutex> l(_lock);
648 while (_resubscribing != 0)
655 AtomicFlagFlip resubFlip(&_resubscribing);
657 Unlock<Mutex> u(_lock);
658 std::for_each(_active.begin(), _active.end(), Deleter());
659 std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
660 std::for_each(_resumed.begin(), _resumed.end(), Deleter(
true));
676 #ifdef AMPS_USE_FUNCTIONAL 677 std::forward_list<SubscriptionInfo*> subscriptions;
679 std::list<SubscriptionInfo*> subscriptions;
681 Resubscriber resubscriber(client_, _resubscriptionTimeout);
684 AtomicFlagFlip resubFlip(&_resubscribing);
686 Lock<Mutex> l(_lock);
687 subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
688 for (SubscriptionMap::iterator iter = _active.begin();
689 iter != _active.end(); ++iter)
691 SubscriptionInfo* sub = iter->second;
694 SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
697 if (resIter != _resumed.end())
699 sub->setMostRecent(resIter->second->getMostRecent(client_));
702 subscriptions.push_front(iter->second);
705 std::for_each(subscriptions.begin(), subscriptions.end(),
707 std::vector<SubscriptionInfo*> removals;
708 bool throwExcept =
false;
710 if (_failedResubscribeHandler)
714 for (
auto failedSub = resubscriber._failures->begin();
715 failedSub != resubscriber._failures->end(); ++failedSub)
717 SubscriptionInfo* pSubInfo = failedSub->first;
718 if (_failedResubscribeHandler->failure(pSubInfo->message(),
719 pSubInfo->messageHandler(),
720 pSubInfo->requestedAcks(),
723 removals.push_back(pSubInfo);
727 except = failedSub->second;
732 catch (
const AMPSException& ex_)
737 catch (
const std::exception& ex_)
744 except = AMPSException(
"Unknown Exception thrown by FailedResubscribeHandler",
AMPS_E_RETRY);
750 throwExcept = !resubscriber._failures->empty();
753 except = resubscriber._failures->begin()->second;
757 if (_failedResubscribeHandler && !removals.empty())
759 Lock<Mutex> l(_lock);
760 for (std::vector<SubscriptionInfo*>::iterator pSubInfo = removals.begin();
761 pSubInfo != removals.end(); ++pSubInfo)
763 _active.erase((*pSubInfo)->subId());
764 _resumed.erase((*pSubInfo)->subId());
765 _resumedSet.erase(*pSubInfo);
774 delete resubscriber._failures;
775 resubscriber._failures = 0;
778 catch (
const AMPSException&)
780 delete resubscriber._failures;
781 resubscriber._failures = 0;
785 catch (
const std::exception&)
787 delete resubscriber._failures;
788 resubscriber._failures = 0;
801 _resubscriptionTimeout = timeout_;
810 return _resubscriptionTimeout;
819 static int _defaultResubscriptionTimeout =
820 AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
823 _defaultResubscriptionTimeout = timeout_;
825 return _defaultResubscriptionTimeout;
839 SubscriptionMap _active;
840 SubscriptionMap _resumed;
841 std::set<SubscriptionInfo*> _resumedSet;
843 AMPS_ATOMIC_TYPE_8 _resubscribing;
844 int _resubscriptionTimeout;
849 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:579
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:808
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:832
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5268
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7362
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:797
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1450
Success.
Definition: amps.h:221
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1316
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:5063
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:645
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1427
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:670
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5389
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:441
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:638
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
The operation has not succeeded, but ought to be retried.
Definition: amps.h:245
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:600
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:51
Definition: ampsplusplus.hpp:102
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:817
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1194
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7371