25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 28 #include "amps/ampscrc.hpp" 29 #include "amps/util.hpp" 38 template <
typename Func,
typename Object>
45 #ifdef AMPS_USE_FUNCTIONAL 46 std::function<void(Object)> _callable;
52 static void noOpHandler(Object) {;}
54 typedef Func FunctionType;
58 #ifdef AMPS_USE_FUNCTIONAL
59 , _callable(
Handler<Func, Object>::noOpHandler)
71 : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73 , _callable(noOpHandler)
82 : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84 , _callable(orig_._callable)
89 #ifdef AMPS_USE_FUNCTIONAL 94 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
98 void invoke(Object message)
102 _func(message, _userData);
104 #ifdef AMPS_USE_FUNCTIONAL 117 _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL 119 _callable = rhs_._callable;
121 _isValid = rhs_._isValid;
126 bool isValid(
void)
const 130 Func
function(void)
const 134 void* userData(
void)
const 142 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
152 MessageHandler _emptyMessageHandler;
153 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
158 MessageHandler _messageHandler;
159 unsigned _requestedAcks, _systemAcks, _terminationAck;
161 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
162 MessageRoute(
const MessageRoute& rhs_) :
163 _messageHandler(rhs_._messageHandler),
164 _requestedAcks (rhs_._requestedAcks),
165 _systemAcks (rhs_._systemAcks),
166 _terminationAck(rhs_._terminationAck)
168 const MessageRoute& operator=(
const MessageRoute& rhs_)
170 _messageHandler = rhs_._messageHandler;
171 _requestedAcks = rhs_._requestedAcks;
172 _systemAcks = rhs_._systemAcks;
173 _terminationAck = rhs_._terminationAck;
176 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
177 unsigned systemAcks_, Message::Command::Type commandType_) :
178 _messageHandler(messageHandler_),
179 _requestedAcks(requestedAcks_),
180 _systemAcks(systemAcks_),
183 bool isSubscribeOrSOW = commandType_ & Message::Command::Subscribe
184 || commandType_ & Message::Command::DeltaSubscribe
185 || commandType_ & Message::Command::SOWAndSubscribe
186 || commandType_ & Message::Command::SOWAndDeltaSubscribe
187 || commandType_ & Message::Command::SOW;
188 if (!isSubscribeOrSOW)
192 unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
194 while (bitCounter > 0)
197 _terminationAck <<= 1;
200 else if (commandType_ == Message::Command::SOW)
202 if (requestedAcks_ >= Message::AckType::Completed)
206 unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
208 while (bitCounter > 0)
211 _terminationAck <<= 1;
216 _terminationAck = Message::AckType::Completed;
222 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
224 if ( (_requestedAcks & ackType_) == 0)
230 _messageHandler.invoke(message_);
232 catch (std::exception& ex)
234 std::cerr << ex.what() << std::endl;
238 bool isTerminationAck(
unsigned ackType_)
const 240 return ackType_ == _terminationAck;
242 unsigned deliverData(
const Message& message_)
244 _messageHandler.invoke(message_);
247 const MessageHandler& getMessageHandler()
const 249 return _messageHandler;
251 MessageHandler& getMessageHandler()
253 return _messageHandler;
259 : _previousCommandId(0),
260 _lookupGenerationCount(0),
264 _crc = AMPS::CRC<0>::crcNoSSE;
266 if (AMPS::CRC<0>::isSSE42Enabled())
268 _crc = AMPS::CRC<0>::crc;
272 _crc = AMPS::CRC<0>::crcNoSSE;
278 unsigned requestedAcks_,
unsigned systemAcks_, Message::Command::Type commandType_)
280 Lock<Mutex> lock(_lock);
281 RouteMap::iterator i = _routes.find(commandId_);
282 if (i == _routes.end())
284 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
289 bool isSubscribe = commandType_ & Message::Command::Subscribe
290 || commandType_ & Message::Command::DeltaSubscribe
291 || commandType_ & Message::Command::SOWAndSubscribe
292 || commandType_ & Message::Command::SOWAndDeltaSubscribe;
296 && !i->second.isTerminationAck(0))
298 void* routeData = i->second.getMessageHandler().userData();;
299 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
302 Unlock<Mutex> u(_lock);
303 amps_invoke_remove_route_function(routeData);
312 bool removeRoute(
const Field& commandId_)
314 Lock<Mutex> lock(_lock);
315 RouteMap::iterator i = _routes.find(commandId_);
316 if (i == _routes.end())
320 return _removeRoute(i);
325 AMPS_FETCH_ADD(&_generationCount, 1);
326 std::vector<void*> removeData;
328 Lock<Mutex> lock(_lock);
329 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
335 void* data = i->second.getMessageHandler().userData();
336 removeData.push_back(data);
341 for (
size_t i = 0; i < removeData.size(); ++i)
343 amps_invoke_remove_route_function(removeData[i]);
348 bool hasRoute(
const Field& commandId_)
const 350 Lock<Mutex> lock(_lock);
351 RouteMap::const_iterator it = _routes.find(commandId_);
352 return it != _routes.end();
356 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 358 Lock<Mutex> lock(_lock);
359 RouteMap::const_iterator it = _routes.find(commandId_);
360 if (it != _routes.end())
362 result_ = it->second.getMessageHandler();
367 result_ = _emptyMessageHandler;
379 MessageHandler handler;
381 class RouteCache :
public std::vector<RouteLookup>
383 RouteCache(
const RouteCache&);
384 void operator=(
const RouteCache&);
387 : _generationCount(0),
391 void invalidateCache(
void)
393 #if __cplusplus >= 201100L || _MSC_VER >= 1900 394 _generationCount.store(0);
396 _generationCount = 0;
401 #if __cplusplus >= 201100L || _MSC_VER >= 1900 402 void invalidateCache(
const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
404 _generationCount.store(generationCount_);
406 void invalidateCache(
const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
408 _generationCount = generationCount_;
414 #if __cplusplus >= 201100L || _MSC_VER >= 1900 415 bool isCacheHit(
const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
const 417 bool isCacheHit(
const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
const 420 return _generationCount == generationCount_ && _hashVal == hashVal_;
424 #if __cplusplus >= 201100L || _MSC_VER >= 1900 425 std::atomic<uint_fast64_t> _generationCount;
427 AMPS_ATOMIC_TYPE _generationCount;
429 amps_uint64_t _hashVal;
435 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
440 amps_uint64_t listHash = _crc(commandIdList_.
data(), commandIdList_.
len(), 0);
441 if (result_.isCacheHit(_generationCount, listHash))
443 return result_.size();
445 result_.invalidateCache(_generationCount, listHash);
448 Lock<Mutex> lockGuard(_lock);
449 size_t resultCount = 0;
450 const char* pStart = commandIdList_.
data();
451 for (
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
454 const char* delimiter = p;
455 while (delimiter != e && *delimiter !=
',')
460 #ifdef AMPS_USE_EMPLACE 461 result_.emplace_back(RouteLookup());
463 result_.push_back(RouteLookup());
467 RouteLookup& result = result_[resultCount];
468 result.idOffset = (size_t)(p - pStart);
469 result.idLength = (size_t)(delimiter - p);
471 RouteMap::const_iterator it = _routes.find(subId);
472 if (it != _routes.end())
474 result.handler = it->second.getMessageHandler();
478 result.handler = _emptyMessageHandler;
484 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
487 unsigned messagesDelivered = 0;
494 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
497 !key.
empty() && messagesDelivered == 0)
499 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
502 !key.
empty() && messagesDelivered == 0)
504 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
506 return messagesDelivered;
510 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
512 unsigned messagesDelivered = 0;
513 amps_uint64_t hval = _crc(commandId_.
data(), commandId_.
len(), 0);
514 if (_previousCommandId == hval &&
515 _lookupGenerationCount == _generationCount)
517 messagesDelivered += _previousHandler.deliverData(dataMessage_);
521 Lock<Mutex> lock(_lock);
522 RouteMap::iterator it = _routes.find(commandId_);
523 if (it != _routes.end())
525 _previousCommandId = hval;
526 #if __cplusplus >= 201100L || _MSC_VER >= 1900 527 _lookupGenerationCount.store(_generationCount);
529 _lookupGenerationCount = _generationCount;
531 _previousHandler = it->second;
532 messagesDelivered += it->second.deliverData(dataMessage_);
535 return messagesDelivered;
538 void invalidateCache(
void)
540 _previousCommandId = 0;
543 void unsubscribeAll(
void)
545 AMPS_FETCH_ADD(&_generationCount, 1);
546 std::vector<Field> removeIds;
547 std::vector<void*> removeData;
548 Lock<Mutex> lock(_lock);
549 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
551 if (it->second.isTerminationAck(0))
553 removeIds.push_back(it->first);
554 removeData.push_back(it->second.getMessageHandler().userData());
557 for (
size_t i = 0; i < removeIds.size(); ++i)
560 RouteMap::iterator it = _routes.find(removeIds[i]);
566 Unlock<Mutex> u(_lock);
567 for (
size_t i = 0; i < removeData.size(); ++i)
569 amps_invoke_remove_route_function(removeData[i]);
574 typedef std::map<Field, MessageRoute> RouteMap;
578 MessageRoute _previousHandler;
579 amps_uint64_t _previousCommandId;
580 #if __cplusplus >= 201100L || _MSC_VER >= 1900 581 mutable std::atomic<uint_fast64_t> _lookupGenerationCount;
582 mutable std::atomic<uint_fast64_t> _generationCount;
584 mutable AMPS_ATOMIC_TYPE _lookupGenerationCount;
585 mutable AMPS_ATOMIC_TYPE _generationCount;
591 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
593 Lock<Mutex> lock(_lock);
594 unsigned messagesDelivered = 0;
595 RouteMap::iterator it = _routes.find(commandId_);
596 if (it != _routes.end())
598 MessageRoute& route = it->second;
599 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
600 if (route.isTerminationAck(ackType_))
606 return messagesDelivered;
608 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
610 Lock<Mutex> lock(_lock);
611 RouteMap::iterator it = _routes.find(commandId_);
612 if (it != _routes.end())
614 MessageRoute& route = it->second;
615 if (route.isTerminationAck(ackType_))
625 bool _removeRoute(RouteMap::iterator& it_)
628 AMPS_FETCH_ADD(&_generationCount, 1);
630 Field f = it_->first;
631 void* routeData = it_->second.getMessageHandler().userData();
636 Unlock<Mutex> u(_lock);
637 amps_invoke_remove_route_function(routeData);
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4931
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:102