AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.3
MessageRouter.hpp
1 //
3 // Copyright (c) 2010-2024 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _MESSAGEROUTER_HPP_
26 #define _MESSAGEROUTER_HPP_
27 #include <map>
28 #include "amps/ampscrc.hpp"
29 #include "amps/util.hpp"
30 #include "amps/Message.hpp"
31 
32 namespace AMPS
33 {
38  template <typename Func, typename Object>
39  class Handler
40  {
41  protected:
42  friend class MessageStream;
43  Func _func;
44  void* _userData;
45 #ifdef AMPS_USE_FUNCTIONAL
46  std::function<void(Object)> _callable;
47 #endif
48  bool _isValid;
49 
50  public:
51  // No op function for handlers
52  static void noOpHandler(Object) {;}
53 
54  typedef Func FunctionType;
57  Handler() : _func(NULL), _userData(NULL)
58 #ifdef AMPS_USE_FUNCTIONAL
59  , _callable(Handler<Func, Object>::noOpHandler)
60 #endif
61  , _isValid(false)
62  {
63  }
64 
70  Handler(Func func_, void* userData_)
71  : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73  , _callable(noOpHandler)
74 #endif
75  , _isValid(true)
76  {
77  }
78 
81  Handler(const Handler& orig_)
82  : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84  , _callable(orig_._callable)
85 #endif
86  , _isValid(true)
87  {
88  }
89 #ifdef AMPS_USE_FUNCTIONAL
90  template <typename T>
93  Handler(const T& callback_)
94  : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
95  {
96  }
97 #endif
98  void invoke(Object message)
99  {
100  if (_func)
101  {
102  _func(message, _userData);
103  }
104 #ifdef AMPS_USE_FUNCTIONAL
105  else
106  {
107  _callable(message);
108  }
109 #endif
110  }
111 
112  Handler& operator=(const Handler& rhs_)
113  {
114  if (this != &rhs_)
115  {
116  _func = rhs_._func;
117  _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL
119  _callable = rhs_._callable;
120 #endif
121  _isValid = rhs_._isValid;
122  }
123  return *this;
124  }
125 
126  bool isValid(void) const
127  {
128  return _isValid;
129  }
130  Func function(void) const
131  {
132  return _func;
133  }
134  void* userData(void) const
135  {
136  return _userData;
137  }
138  };
139  class Message;
142  typedef void(*MessageHandlerFunc)(const Message&, void* userData);
143 
145 
150  {
151  private:
152  MessageHandler _emptyMessageHandler;
153  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
154  // Function used to calculate the CRC if one is used
155  CRCFunction _crc;
156  class MessageRoute
157  {
158  MessageHandler _messageHandler;
159  unsigned _requestedAcks, _systemAcks, _terminationAck;
160  public:
161  MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
162  MessageRoute(const MessageRoute& rhs_) :
163  _messageHandler(rhs_._messageHandler),
164  _requestedAcks (rhs_._requestedAcks),
165  _systemAcks (rhs_._systemAcks),
166  _terminationAck(rhs_._terminationAck)
167  {;}
168  const MessageRoute& operator=(const MessageRoute& rhs_)
169  {
170  _messageHandler = rhs_._messageHandler;
171  _requestedAcks = rhs_._requestedAcks;
172  _systemAcks = rhs_._systemAcks;
173  _terminationAck = rhs_._terminationAck;
174  return *this;
175  }
176  MessageRoute(MessageHandler messageHandler_, unsigned requestedAcks_,
177  unsigned systemAcks_, Message::Command::Type commandType_) :
178  _messageHandler(messageHandler_),
179  _requestedAcks(requestedAcks_),
180  _systemAcks(systemAcks_),
181  _terminationAck(0)
182  {
183  bool isSubscribeOrSOW = commandType_ & Message::Command::Subscribe
184  || commandType_ & Message::Command::DeltaSubscribe
185  || commandType_ & Message::Command::SOWAndSubscribe
186  || commandType_ & Message::Command::SOWAndDeltaSubscribe
187  || commandType_ & Message::Command::SOW;
188  if (!isSubscribeOrSOW)
189  {
190  // The ack to terminate the route on is whatever the highest
191  // bit set in requestedAcks is.
192  unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
193  _terminationAck = 1;
194  while (bitCounter > 0)
195  {
196  bitCounter >>= 1;
197  _terminationAck <<= 1;
198  }
199  }
200  else if (commandType_ == Message::Command::SOW)
201  {
202  if (requestedAcks_ >= Message::AckType::Completed)
203  {
204  // The ack to terminate the route on is whatever the highest
205  // bit set in requestedAcks is.
206  unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
207  _terminationAck = 1;
208  while (bitCounter > 0)
209  {
210  bitCounter >>= 1;
211  _terminationAck <<= 1;
212  }
213  }
214  else
215  {
216  _terminationAck = Message::AckType::Completed;
217  }
218  }
219  }
220 
221  // Deliver an ack to registered handler if the ack type was requested
222  unsigned deliverAck(const Message& message_, unsigned ackType_)
223  {
224  if ( (_requestedAcks & ackType_) == 0)
225  {
226  return 0;
227  }
228  try
229  {
230  _messageHandler.invoke(message_);
231  }
232  catch (std::exception& ex)
233  {
234  std::cerr << ex.what() << std::endl;
235  }
236  return 1;
237  }
238  bool isTerminationAck(unsigned ackType_) const
239  {
240  return ackType_ == _terminationAck;
241  }
242  unsigned deliverData(const Message& message_)
243  {
244  _messageHandler.invoke(message_);
245  return 1;
246  }
247  const MessageHandler& getMessageHandler() const
248  {
249  return _messageHandler;
250  }
251  MessageHandler& getMessageHandler()
252  {
253  return _messageHandler;
254  }
255  };
256 
257  public:
258  MessageRouter()
259  : _previousCommandId(0),
260  _lookupGenerationCount(0),
261  _generationCount(0)
262  {
263 #ifndef AMPS_SSE_42
264  _crc = AMPS::CRC<0>::crcNoSSE;
265 #else
266  if (AMPS::CRC<0>::isSSE42Enabled())
267  {
268  _crc = AMPS::CRC<0>::crc;
269  }
270  else
271  {
272  _crc = AMPS::CRC<0>::crcNoSSE;
273  }
274 #endif
275  }
276 
277  int addRoute(const Field& commandId_, const AMPS::MessageHandler& messageHandler_,
278  unsigned requestedAcks_, unsigned systemAcks_, Message::Command::Type commandType_)
279  {
280  Lock<Mutex> lock(_lock);
281  RouteMap::iterator i = _routes.find(commandId_);
282  if (i == _routes.end())
283  {
284  _routes[commandId_.deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
285  return 1;
286  }
287  else
288  {
289  bool isSubscribe = commandType_ & Message::Command::Subscribe
290  || commandType_ & Message::Command::DeltaSubscribe
291  || commandType_ & Message::Command::SOWAndSubscribe
292  || commandType_ & Message::Command::SOWAndDeltaSubscribe;
293 
294  // Only replace a non-subscribe with a subscribe
295  if (isSubscribe
296  && !i->second.isTerminationAck(0))
297  {
298  void* routeData = i->second.getMessageHandler().userData();;
299  i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
300  if (routeData)
301  {
302  Unlock<Mutex> u(_lock);
303  amps_invoke_remove_route_function(routeData);
304  }
305  return 1;
306  }
307  }
308  return 0;
309  }
310 
311  // returns true if a route was removed.
312  bool removeRoute(const Field& commandId_)
313  {
314  Lock<Mutex> lock(_lock);
315  RouteMap::iterator i = _routes.find(commandId_);
316  if (i == _routes.end())
317  {
318  return false;
319  }
320  return _removeRoute(i);
321  }
322 
323  void clear()
324  {
325  AMPS_FETCH_ADD(&_generationCount, 1);
326  std::vector<void*> removeData;
327  {
328  Lock<Mutex> lock(_lock);
329  for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
330  {
331  // Make a non-const copy of Field and clear it, which will clear i
332  // as well but won't actually affect the map, which is unaware that
333  // the key's shared pointer has been deleted.
334  Field f = i->first;
335  void* data = i->second.getMessageHandler().userData();
336  removeData.push_back(data);
337  f.clear();
338  }
339  _routes.clear();
340  }
341  for (size_t i = 0; i < removeData.size(); ++i)
342  {
343  amps_invoke_remove_route_function(removeData[i]);
344  }
345  }
346 
347  // Returns true if a route exists for a single id.
348  bool hasRoute(const Field& commandId_) const
349  {
350  Lock<Mutex> lock(_lock);
351  RouteMap::const_iterator it = _routes.find(commandId_);
352  return it != _routes.end();
353  }
354 
355  // Find a single route and return true if here, setting result_ to the handler.
356  bool getRoute(const Field& commandId_, MessageHandler& result_) const
357  {
358  Lock<Mutex> lock(_lock);
359  RouteMap::const_iterator it = _routes.find(commandId_);
360  if (it != _routes.end())
361  {
362  result_ = it->second.getMessageHandler();
363  return true;
364  }
365  else
366  {
367  result_ = _emptyMessageHandler;
368  return false;
369  }
370  }
371 
372  // RouteCache is the result type for a parseRoutes(); we do extra work
373  // to avoid hitting the map or its lock when the subids field on
374  // publish messages does not change.
375  struct RouteLookup
376  {
377  size_t idOffset;
378  size_t idLength;
379  MessageHandler handler;
380  };
381  class RouteCache : public std::vector<RouteLookup>
382  {
383  RouteCache(const RouteCache&);
384  void operator=(const RouteCache&);
385  public:
386  RouteCache(void)
387  : _generationCount(0),
388  _hashVal(0)
389  {;}
390 
391  void invalidateCache(void)
392  {
393 #if __cplusplus >= 201100L || _MSC_VER >= 1900
394  _generationCount.store(0);
395 #else
396  _generationCount = 0;
397 #endif
398  _hashVal = 0;
399  clear();
400  }
401 #if __cplusplus >= 201100L || _MSC_VER >= 1900
402  void invalidateCache(const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
403  {
404  _generationCount.store(generationCount_);
405 #else
406  void invalidateCache(const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
407  {
408  _generationCount = generationCount_;
409 #endif
410  _hashVal = hashVal_;
411  clear();
412  }
413 
414 #if __cplusplus >= 201100L || _MSC_VER >= 1900
415  bool isCacheHit(const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_) const
416 #else
417  bool isCacheHit(const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_) const
418 #endif
419  {
420  return _generationCount == generationCount_ && _hashVal == hashVal_;
421  }
422 
423  private:
424 #if __cplusplus >= 201100L || _MSC_VER >= 1900
425  std::atomic<uint_fast64_t> _generationCount;
426 #else
427  AMPS_ATOMIC_TYPE _generationCount;
428 #endif
429  amps_uint64_t _hashVal;
430  };
431 
432  // Parses the command id list into the route lookup vector and assigns
433  // the found handlers into the list. Only intended to be called by the
434  // message handler thread. Returns the number of command/sub IDs parsed.
435  size_t parseRoutes(const Field& commandIdList_, RouteCache& result_)
436  {
437  // Super shortcut: if the whole subID list is the same as the previous one,
438  // then assume the result_ contains all the right handlers already, and that
439  // the offsets and lengths of subIds are unchanged.
440  amps_uint64_t listHash = _crc(commandIdList_.data(), commandIdList_.len(), 0);
441  if (result_.isCacheHit(_generationCount, listHash))
442  {
443  return result_.size();
444  }
445  result_.invalidateCache(_generationCount, listHash);
446 
447  // Lock required now that we'll be using the route map.
448  Lock<Mutex> lockGuard(_lock);
449  size_t resultCount = 0;
450  const char* pStart = commandIdList_.data();
451  for (const char* p = pStart, *e = commandIdList_.len() + pStart; p < e;
452  ++p, ++resultCount)
453  {
454  const char* delimiter = p;
455  while (delimiter != e && *delimiter != ',')
456  {
457  ++delimiter;
458  }
459  AMPS::Field subId(p, (size_t)(delimiter - p));
460 #ifdef AMPS_USE_EMPLACE
461  result_.emplace_back(RouteLookup());
462 #else
463  result_.push_back(RouteLookup());
464 #endif
465  // Push back and then copy over fields; would emplace_back if available on
466  // all supported compilers.
467  RouteLookup& result = result_[resultCount];
468  result.idOffset = (size_t)(p - pStart);
469  result.idLength = (size_t)(delimiter - p);
470 
471  RouteMap::const_iterator it = _routes.find(subId);
472  if (it != _routes.end())
473  {
474  result.handler = it->second.getMessageHandler();
475  }
476  else
477  {
478  result.handler = _emptyMessageHandler;
479  }
480  p = delimiter;
481  }
482  return resultCount;
483  }
484  unsigned deliverAck(const Message& ackMessage_, unsigned ackType_)
485  {
486  assert(ackMessage_.getCommand() == "ack");
487  unsigned messagesDelivered = 0;
488  Field key;
489 
490  // Call _deliverAck, which will deliver to any waiting handlers
491  // AND remove the route if it's a termination ack
492  if (key = ackMessage_.getCommandId(), !key.empty())
493  {
494  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
495  }
496  if (key = ackMessage_.getQueryID(),
497  !key.empty() && messagesDelivered == 0)
498  {
499  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
500  }
501  if (key = ackMessage_.getSubscriptionId(),
502  !key.empty() && messagesDelivered == 0)
503  {
504  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
505  }
506  return messagesDelivered;
507  }
508 
509  // deliverData may only be called by the message handler thread.
510  unsigned deliverData(const Message& dataMessage_, const Field& commandId_)
511  {
512  unsigned messagesDelivered = 0;
513  amps_uint64_t hval = _crc(commandId_.data(), commandId_.len(), 0);
514  if (_previousCommandId == hval &&
515  _lookupGenerationCount == _generationCount)
516  {
517  messagesDelivered += _previousHandler.deliverData(dataMessage_);
518  }
519  else
520  {
521  Lock<Mutex> lock(_lock);
522  RouteMap::iterator it = _routes.find(commandId_);
523  if (it != _routes.end())
524  {
525  _previousCommandId = hval;
526 #if __cplusplus >= 201100L || _MSC_VER >= 1900
527  _lookupGenerationCount.store(_generationCount);
528 #else
529  _lookupGenerationCount = _generationCount;
530 #endif
531  _previousHandler = it->second;
532  messagesDelivered += it->second.deliverData(dataMessage_);
533  }
534  }
535  return messagesDelivered;
536  }
537 
538  void invalidateCache(void)
539  {
540  _previousCommandId = 0;
541  }
542 
543  void unsubscribeAll(void)
544  {
545  AMPS_FETCH_ADD(&_generationCount, 1);
546  std::vector<Field> removeIds;
547  std::vector<void*> removeData;
548  Lock<Mutex> lock(_lock);
549  for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
550  {
551  if (it->second.isTerminationAck(0))
552  {
553  removeIds.push_back(it->first);
554  removeData.push_back(it->second.getMessageHandler().userData());
555  }
556  }
557  for (size_t i = 0; i < removeIds.size(); ++i)
558  {
559  // it can't be end() b/c we have the lock and found id above
560  RouteMap::iterator it = _routes.find(removeIds[i]);
561  // Make a non-const copy of Field and clear it, which will clear i as well
562  Field f = it->first; // -V783
563  f.clear();
564  _routes.erase(it);
565  }
566  Unlock<Mutex> u(_lock);
567  for (size_t i = 0; i < removeData.size(); ++i)
568  {
569  amps_invoke_remove_route_function(removeData[i]);
570  }
571  }
572 
573  private:
574  typedef std::map<Field, MessageRoute> RouteMap;
575  RouteMap _routes;
576  mutable Mutex _lock;
577 
578  MessageRoute _previousHandler;
579  amps_uint64_t _previousCommandId;
580 #if __cplusplus >= 201100L || _MSC_VER >= 1900
581  mutable std::atomic<uint_fast64_t> _lookupGenerationCount;
582  mutable std::atomic<uint_fast64_t> _generationCount;
583 #else
584  mutable AMPS_ATOMIC_TYPE _lookupGenerationCount;
585  mutable AMPS_ATOMIC_TYPE _generationCount;
586 #endif
587 
588 
589  // Deliver the ack to any waiting handlers
590  // AND remove the route if it's a termination ack
591  unsigned _deliverAck(const Message& ackMessage_, unsigned ackType_, Field& commandId_)
592  {
593  Lock<Mutex> lock(_lock);
594  unsigned messagesDelivered = 0;
595  RouteMap::iterator it = _routes.find(commandId_);
596  if (it != _routes.end())
597  {
598  MessageRoute& route = it->second;
599  messagesDelivered += route.deliverAck(ackMessage_, ackType_);
600  if (route.isTerminationAck(ackType_))
601  {
602  _removeRoute(it);
603  ++messagesDelivered;
604  }
605  }
606  return messagesDelivered;
607  }
608  unsigned _processAckForRemoval(unsigned ackType_, Field& commandId_)
609  {
610  Lock<Mutex> lock(_lock);
611  RouteMap::iterator it = _routes.find(commandId_);
612  if (it != _routes.end())
613  {
614  MessageRoute& route = it->second;
615  if (route.isTerminationAck(ackType_))
616  {
617  _removeRoute(it);
618  return 1U;
619  }
620  }
621  return 0U;
622  }
623 
624  // returns true if a route was removed.
625  bool _removeRoute(RouteMap::iterator& it_)
626  {
627  // Called with lock already held
628  AMPS_FETCH_ADD(&_generationCount, 1);
629  // Make a non-const copy of Field and clear it, which will clear i as well
630  Field f = it_->first;
631  void* routeData = it_->second.getMessageHandler().userData();
632  _routes.erase(it_);
633  f.clear();
634  if (routeData)
635  {
636  Unlock<Mutex> u(_lock);
637  amps_invoke_remove_route_function(routeData);
638  }
639  return true;
640  }
641 
642  };
643 
644 
645 }
646 
647 #endif
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4931
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:102