25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 28 #include "amps/ampscrc.hpp" 29 #include "amps/util.hpp" 38 template <
typename Func,
typename Object>
45 #ifdef AMPS_USE_FUNCTIONAL 46 std::function<void(Object)> _callable;
52 static void noOpHandler(Object) {;}
54 typedef Func FunctionType;
58 #ifdef AMPS_USE_FUNCTIONAL
59 , _callable(
Handler<Func, Object>::noOpHandler)
71 : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73 , _callable(noOpHandler)
82 : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84 , _callable(orig_._callable)
89 #ifdef AMPS_USE_FUNCTIONAL 94 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
98 void invoke(Object message)
102 _func(message, _userData);
104 #ifdef AMPS_USE_FUNCTIONAL 117 _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL 119 _callable = rhs_._callable;
121 _isValid = rhs_._isValid;
126 bool isValid(
void)
const 130 Func
function(void)
const 134 void* userData(
void)
const 142 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
152 MessageHandler _emptyMessageHandler;
153 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
158 MessageHandler _messageHandler;
159 unsigned _requestedAcks, _systemAcks, _terminationAck;
161 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
162 MessageRoute(
const MessageRoute& rhs_) :
163 _messageHandler(rhs_._messageHandler),
164 _requestedAcks (rhs_._requestedAcks),
165 _systemAcks (rhs_._systemAcks),
166 _terminationAck(rhs_._terminationAck)
168 const MessageRoute& operator=(
const MessageRoute& rhs_)
170 _messageHandler = rhs_._messageHandler;
171 _requestedAcks = rhs_._requestedAcks;
172 _systemAcks = rhs_._systemAcks;
173 _terminationAck = rhs_._terminationAck;
176 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
177 unsigned systemAcks_, Message::Command::Type commandType_) :
178 _messageHandler(messageHandler_),
179 _requestedAcks(requestedAcks_),
180 _systemAcks(systemAcks_),
183 bool isSubscribeOrSOW = commandType_ & Message::Command::Subscribe
184 || commandType_ & Message::Command::DeltaSubscribe
185 || commandType_ & Message::Command::SOWAndSubscribe
186 || commandType_ & Message::Command::SOWAndDeltaSubscribe
187 || commandType_ & Message::Command::SOW;
188 if (!isSubscribeOrSOW)
192 unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
194 while (bitCounter > 0)
197 _terminationAck <<= 1;
200 else if (commandType_ == Message::Command::SOW)
202 if (requestedAcks_ >= Message::AckType::Completed)
206 unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
208 while (bitCounter > 0)
211 _terminationAck <<= 1;
216 _terminationAck = Message::AckType::Completed;
222 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
224 if ( (_requestedAcks & ackType_) == 0)
230 _messageHandler.invoke(message_);
232 catch (std::exception& ex)
234 std::cerr << ex.what() << std::endl;
238 bool isTerminationAck(
unsigned ackType_)
const 240 return ackType_ == _terminationAck;
242 unsigned deliverData(
const Message& message_)
244 _messageHandler.invoke(message_);
247 const MessageHandler& getMessageHandler()
const 249 return _messageHandler;
251 MessageHandler& getMessageHandler()
253 return _messageHandler;
259 : _previousCommandId(0),
260 _lookupGenerationCount(0),
264 _crc = AMPS::CRC<0>::crcNoSSE;
266 if (AMPS::CRC<0>::isSSE42Enabled())
268 _crc = AMPS::CRC<0>::crc;
272 _crc = AMPS::CRC<0>::crcNoSSE;
278 unsigned requestedAcks_,
unsigned systemAcks_, Message::Command::Type commandType_)
280 Lock<Mutex> lock(_lock);
281 RouteMap::iterator i = _routes.find(commandId_);
282 if (i == _routes.end())
284 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
289 bool isSubscribe = commandType_ & Message::Command::Subscribe
290 || commandType_ & Message::Command::DeltaSubscribe
291 || commandType_ & Message::Command::SOWAndSubscribe
292 || commandType_ & Message::Command::SOWAndDeltaSubscribe;
296 && !i->second.isTerminationAck(0))
298 void* routeData = i->second.getMessageHandler().userData();;
299 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
302 Unlock<Mutex> u(_lock);
303 amps_invoke_remove_route_function(routeData);
312 bool removeRoute(
const Field& commandId_)
314 Lock<Mutex> lock(_lock);
315 RouteMap::iterator i = _routes.find(commandId_);
316 if (i == _routes.end())
320 return _removeRoute(i);
325 AMPS_FETCH_ADD(&_generationCount, 1);
326 std::vector<void*> removeData;
328 Lock<Mutex> lock(_lock);
329 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
335 void* data = i->second.getMessageHandler().userData();
336 removeData.push_back(data);
341 for (
size_t i = 0; i < removeData.size(); ++i)
343 amps_invoke_remove_route_function(removeData[i]);
348 bool hasRoute(
const Field& commandId_)
const 350 Lock<Mutex> lock(_lock);
351 RouteMap::const_iterator it = _routes.find(commandId_);
352 return it != _routes.end();
356 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 358 Lock<Mutex> lock(_lock);
359 RouteMap::const_iterator it = _routes.find(commandId_);
360 if (it != _routes.end())
362 result_ = it->second.getMessageHandler();
367 result_ = _emptyMessageHandler;
379 MessageHandler handler;
381 class RouteCache :
public std::vector<RouteLookup>
383 RouteCache(
const RouteCache&);
384 void operator=(
const RouteCache&);
387 : _generationCount(0),
391 void invalidateCache(
void)
393 #if __cplusplus >= 201100L || _MSC_VER >= 1900 394 _generationCount.store(0);
396 _generationCount = 0;
401 #if __cplusplus >= 201100L || _MSC_VER >= 1900 402 void invalidateCache(
const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
404 _generationCount.store(generationCount_);
409 void invalidateCache(
const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
411 _generationCount = generationCount_;
417 #if __cplusplus >= 201100L || _MSC_VER >= 1900 418 bool isCacheHit(
const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
const 420 return _generationCount == generationCount_ && _hashVal == hashVal_;
423 bool isCacheHit(
const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
const 425 return _generationCount == generationCount_ && _hashVal == hashVal_;
430 #if __cplusplus >= 201100L || _MSC_VER >= 1900 431 std::atomic<uint_fast64_t> _generationCount;
433 AMPS_ATOMIC_TYPE _generationCount;
435 amps_uint64_t _hashVal;
441 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
446 amps_uint64_t listHash = _crc(commandIdList_.
data(), commandIdList_.
len(), 0);
447 if (result_.isCacheHit(_generationCount, listHash))
449 return result_.size();
451 result_.invalidateCache(_generationCount, listHash);
454 Lock<Mutex> lockGuard(_lock);
455 size_t resultCount = 0;
456 const char* pStart = commandIdList_.
data();
457 for (
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
460 const char* delimiter = p;
461 while (delimiter != e && *delimiter !=
',')
466 #ifdef AMPS_USE_EMPLACE 467 result_.emplace_back(RouteLookup());
469 result_.push_back(RouteLookup());
473 RouteLookup& result = result_[resultCount];
474 result.idOffset = (size_t)(p - pStart);
475 result.idLength = (size_t)(delimiter - p);
477 RouteMap::const_iterator it = _routes.find(subId);
478 if (it != _routes.end())
480 result.handler = it->second.getMessageHandler();
484 result.handler = _emptyMessageHandler;
490 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
493 unsigned messagesDelivered = 0;
500 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
503 !key.
empty() && messagesDelivered == 0)
505 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
508 !key.
empty() && messagesDelivered == 0)
510 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
512 return messagesDelivered;
516 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
518 unsigned messagesDelivered = 0;
519 amps_uint64_t hval = _crc(commandId_.
data(), commandId_.
len(), 0);
520 if (_previousCommandId == hval &&
521 _lookupGenerationCount == _generationCount)
523 messagesDelivered += _previousHandler.deliverData(dataMessage_);
527 Lock<Mutex> lock(_lock);
528 RouteMap::iterator it = _routes.find(commandId_);
529 if (it != _routes.end())
531 _previousCommandId = hval;
532 #if __cplusplus >= 201100L || _MSC_VER >= 1900 533 _lookupGenerationCount.store(_generationCount);
535 _lookupGenerationCount = _generationCount;
537 _previousHandler = it->second;
538 messagesDelivered += it->second.deliverData(dataMessage_);
541 return messagesDelivered;
544 void invalidateCache(
void)
546 _previousCommandId = 0;
549 void unsubscribeAll(
void)
551 AMPS_FETCH_ADD(&_generationCount, 1);
552 std::vector<Field> removeIds;
553 std::vector<void*> removeData;
554 Lock<Mutex> lock(_lock);
555 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
557 if (it->second.isTerminationAck(0))
559 removeIds.push_back(it->first);
560 removeData.push_back(it->second.getMessageHandler().userData());
563 for (
size_t i = 0; i < removeIds.size(); ++i)
566 RouteMap::iterator it = _routes.find(removeIds[i]);
572 Unlock<Mutex> u(_lock);
573 for (
size_t i = 0; i < removeData.size(); ++i)
575 amps_invoke_remove_route_function(removeData[i]);
580 typedef std::map<Field, MessageRoute> RouteMap;
584 MessageRoute _previousHandler;
585 amps_uint64_t _previousCommandId;
586 #if __cplusplus >= 201100L || _MSC_VER >= 1900 587 mutable std::atomic<uint_fast64_t> _lookupGenerationCount;
588 mutable std::atomic<uint_fast64_t> _generationCount;
590 mutable AMPS_ATOMIC_TYPE _lookupGenerationCount;
591 mutable AMPS_ATOMIC_TYPE _generationCount;
597 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
599 Lock<Mutex> lock(_lock);
600 unsigned messagesDelivered = 0;
601 RouteMap::iterator it = _routes.find(commandId_);
602 if (it != _routes.end())
604 MessageRoute& route = it->second;
605 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
606 if (route.isTerminationAck(ackType_))
612 return messagesDelivered;
614 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
616 Lock<Mutex> lock(_lock);
617 RouteMap::iterator it = _routes.find(commandId_);
618 if (it != _routes.end())
620 MessageRoute& route = it->second;
621 if (route.isTerminationAck(ackType_))
631 bool _removeRoute(RouteMap::iterator& it_)
634 AMPS_FETCH_ADD(&_generationCount, 1);
636 Field f = it_->first;
637 void* routeData = it_->second.getMessageHandler().userData();
642 Unlock<Mutex> u(_lock);
643 amps_invoke_remove_route_function(routeData);
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4996
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103