25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 28 #include "amps/ampscrc.hpp" 29 #include "amps/util.hpp" 38 template <
typename Func,
typename Object>
45 #ifdef AMPS_USE_FUNCTIONAL 46 std::function<void(Object)> _callable;
52 static void noOpHandler(Object) {;}
54 typedef Func FunctionType;
58 #ifdef AMPS_USE_FUNCTIONAL
59 , _callable(
Handler<Func, Object>::noOpHandler)
71 : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73 , _callable(noOpHandler)
82 : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84 , _callable(orig_._callable)
89 #ifdef AMPS_USE_FUNCTIONAL 94 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
98 void invoke(Object message)
102 _func(message, _userData);
104 #ifdef AMPS_USE_FUNCTIONAL 117 _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL 119 _callable = rhs_._callable;
121 _isValid = rhs_._isValid;
126 bool isValid(
void)
const 130 Func
function(void)
const 134 void* userData(
void)
const 142 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
152 MessageHandler _emptyMessageHandler;
153 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
159 MessageHandler _messageHandler;
160 unsigned _requestedAcks, _systemAcks, _terminationAck;
162 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
163 MessageRoute(
const MessageRoute& rhs_) :
164 _messageHandler(rhs_._messageHandler),
165 _requestedAcks (rhs_._requestedAcks),
166 _systemAcks (rhs_._systemAcks),
167 _terminationAck(rhs_._terminationAck)
169 const MessageRoute& operator=(
const MessageRoute& rhs_)
171 _messageHandler = rhs_._messageHandler;
172 _requestedAcks = rhs_._requestedAcks;
173 _systemAcks = rhs_._systemAcks;
174 _terminationAck = rhs_._terminationAck;
177 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
178 unsigned systemAcks_,
bool isSubscribe_) :
179 _messageHandler(messageHandler_),
180 _requestedAcks(requestedAcks_),
181 _systemAcks(systemAcks_),
188 unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
190 while (bitCounter > 0)
193 _terminationAck <<= 1;
199 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
201 if ( (_requestedAcks & ackType_) == 0)
207 _messageHandler.invoke(message_);
209 catch (std::exception& ex)
211 std::cerr << ex.what() << std::endl;
215 bool isTerminationAck(
unsigned ackType_)
const 217 return ackType_ == _terminationAck;
219 unsigned deliverData(
const Message& message_)
221 _messageHandler.invoke(message_);
224 const MessageHandler& getMessageHandler()
const 226 return _messageHandler;
228 MessageHandler& getMessageHandler()
230 return _messageHandler;
236 : _previousCommandId(0),
237 _lookupGenerationCount(0),
241 _crc = AMPS::CRC<0>::crcNoSSE;
243 if (AMPS::CRC<0>::isSSE42Enabled())
245 _crc = AMPS::CRC<0>::crc;
249 _crc = AMPS::CRC<0>::crcNoSSE;
255 unsigned requestedAcks_,
unsigned systemAcks_,
bool isSubscribe_)
257 Lock<Mutex> lock(_lock);
258 RouteMap::iterator i = _routes.find(commandId_);
259 if (i == _routes.end())
261 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
267 if (isSubscribe_ && !i->second.isTerminationAck(0))
269 void* routeData = i->second.getMessageHandler().userData();;
270 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
273 Unlock<Mutex> u(_lock);
274 amps_invoke_remove_route_function(routeData);
283 bool removeRoute(
const Field& commandId_)
285 Lock<Mutex> lock(_lock);
286 RouteMap::iterator i = _routes.find(commandId_);
287 if (i == _routes.end())
291 return _removeRoute(i);
296 AMPS_FETCH_ADD(&_generationCount, 1);
297 std::vector<void*> removeData;
299 Lock<Mutex> lock(_lock);
300 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
306 void* data = i->second.getMessageHandler().userData();
307 removeData.push_back(data);
312 for (
size_t i = 0; i < removeData.size(); ++i)
314 amps_invoke_remove_route_function(removeData[i]);
319 bool hasRoute(
const Field& commandId_)
const 321 Lock<Mutex> lock(_lock);
322 RouteMap::const_iterator it = _routes.find(commandId_);
323 return it != _routes.end();
327 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 329 Lock<Mutex> lock(_lock);
330 RouteMap::const_iterator it = _routes.find(commandId_);
331 if (it != _routes.end())
333 result_ = it->second.getMessageHandler();
338 result_ = _emptyMessageHandler;
350 MessageHandler handler;
352 class RouteCache :
public std::vector<RouteLookup>
354 RouteCache(
const RouteCache&);
355 void operator=(
const RouteCache&);
358 : _generationCount(0),
362 void invalidateCache(
void)
364 #if __cplusplus >= 201100L || _MSC_VER >= 1900 365 _generationCount.store(0);
367 _generationCount = 0;
372 #if __cplusplus >= 201100L || _MSC_VER >= 1900 373 void invalidateCache(
const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
375 _generationCount.store(generationCount_);
377 void invalidateCache(
const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
379 _generationCount = generationCount_;
385 #if __cplusplus >= 201100L || _MSC_VER >= 1900 386 bool isCacheHit(
const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
const 388 bool isCacheHit(
const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
const 391 return _generationCount == generationCount_ && _hashVal == hashVal_;
395 #if __cplusplus >= 201100L || _MSC_VER >= 1900 396 std::atomic<uint_fast64_t> _generationCount;
398 AMPS_ATOMIC_TYPE _generationCount;
400 amps_uint64_t _hashVal;
406 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
411 amps_uint64_t listHash = _crc(commandIdList_.
data(), commandIdList_.
len(), 0);
412 if (result_.isCacheHit(_generationCount, listHash))
414 return result_.size();
416 result_.invalidateCache(_generationCount, listHash);
419 Lock<Mutex> lockGuard(_lock);
420 size_t resultCount = 0;
421 const char* pStart = commandIdList_.
data();
422 for (
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
425 const char* delimiter = p;
426 while (delimiter != e && *delimiter !=
',')
431 #ifdef AMPS_USE_EMPLACE 432 result_.emplace_back(RouteLookup());
434 result_.push_back(RouteLookup());
438 RouteLookup& result = result_[resultCount];
439 result.idOffset = (size_t)(p - pStart);
440 result.idLength = (size_t)(delimiter - p);
442 RouteMap::const_iterator it = _routes.find(subId);
443 if (it != _routes.end())
445 result.handler = it->second.getMessageHandler();
449 result.handler = _emptyMessageHandler;
455 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
458 unsigned messagesDelivered = 0;
465 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
468 !key.
empty() && messagesDelivered == 0)
470 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
473 !key.
empty() && messagesDelivered == 0)
475 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
477 return messagesDelivered;
481 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
483 unsigned messagesDelivered = 0;
484 amps_uint64_t hval = _crc(commandId_.
data(), commandId_.
len(), 0);
485 if (_previousCommandId == hval &&
486 _lookupGenerationCount == _generationCount)
488 messagesDelivered += _previousHandler.deliverData(dataMessage_);
492 Lock<Mutex> lock(_lock);
493 RouteMap::iterator it = _routes.find(commandId_);
494 if (it != _routes.end())
496 _previousCommandId = hval;
497 #if __cplusplus >= 201100L || _MSC_VER >= 1900 498 _lookupGenerationCount.store(_generationCount);
500 _lookupGenerationCount = _generationCount;
502 _previousHandler = it->second;
503 messagesDelivered += it->second.deliverData(dataMessage_);
506 return messagesDelivered;
509 void invalidateCache(
void)
511 _previousCommandId = 0;
514 void unsubscribeAll(
void)
516 AMPS_FETCH_ADD(&_generationCount, 1);
517 std::vector<Field> removeIds;
518 std::vector<void*> removeData;
519 Lock<Mutex> lock(_lock);
520 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
522 if (it->second.isTerminationAck(0))
524 removeIds.push_back(it->first);
525 removeData.push_back(it->second.getMessageHandler().userData());
528 for (
size_t i = 0; i < removeIds.size(); ++i)
531 RouteMap::iterator it = _routes.find(removeIds[i]);
537 Unlock<Mutex> u(_lock);
538 for (
size_t i = 0; i < removeData.size(); ++i)
540 amps_invoke_remove_route_function(removeData[i]);
545 typedef std::map<Field, MessageRoute> RouteMap;
549 MessageRoute _previousHandler;
550 amps_uint64_t _previousCommandId;
551 #if __cplusplus >= 201100L || _MSC_VER >= 1900 552 mutable std::atomic<uint_fast64_t> _lookupGenerationCount;
553 mutable std::atomic<uint_fast64_t> _generationCount;
555 mutable AMPS_ATOMIC_TYPE _lookupGenerationCount;
556 mutable AMPS_ATOMIC_TYPE _generationCount;
562 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
564 Lock<Mutex> lock(_lock);
565 unsigned messagesDelivered = 0;
566 RouteMap::iterator it = _routes.find(commandId_);
567 if (it != _routes.end())
569 MessageRoute& route = it->second;
570 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
571 if (route.isTerminationAck(ackType_))
577 return messagesDelivered;
579 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
581 Lock<Mutex> lock(_lock);
582 RouteMap::iterator it = _routes.find(commandId_);
583 if (it != _routes.end())
585 MessageRoute& route = it->second;
586 if (route.isTerminationAck(ackType_))
596 bool _removeRoute(RouteMap::iterator& it_)
599 AMPS_FETCH_ADD(&_generationCount, 1);
601 Field f = it_->first;
602 void* routeData = it_->second.getMessageHandler().userData();
607 Unlock<Mutex> u(_lock);
608 amps_invoke_remove_route_function(routeData);
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4943
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:106