AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.5
MessageRouter.hpp
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _MESSAGEROUTER_HPP_
26 #define _MESSAGEROUTER_HPP_
27 #include <map>
28 #include "amps/ampscrc.hpp"
29 #include "amps/util.hpp"
30 #include "amps/Message.hpp"
31 
32 namespace AMPS
33 {
38  template <typename Func, typename Object>
39  class Handler
40  {
41  protected:
42  friend class MessageStream;
43  Func _func;
44  void* _userData;
45 #ifdef AMPS_USE_FUNCTIONAL
46  std::function<void(Object)> _callable;
47 #endif
48  bool _isValid;
49 
50  public:
51  // No op function for handlers
52  static void noOpHandler(Object) {;}
53 
54  typedef Func FunctionType;
57  Handler() : _func(NULL), _userData(NULL)
58 #ifdef AMPS_USE_FUNCTIONAL
59  , _callable(Handler<Func, Object>::noOpHandler)
60 #endif
61  , _isValid(false)
62  {
63  }
64 
70  Handler(Func func_, void* userData_)
71  : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73  , _callable(noOpHandler)
74 #endif
75  , _isValid(true)
76  {
77  }
78 
81  Handler(const Handler& orig_)
82  : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84  , _callable(orig_._callable)
85 #endif
86  , _isValid(true)
87  {
88  }
89 #ifdef AMPS_USE_FUNCTIONAL
90  template <typename T>
93  Handler(const T& callback_)
94  : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
95  {
96  }
97 #endif
98  void invoke(Object message)
99  {
100  if (_func)
101  {
102  _func(message, _userData);
103  }
104 #ifdef AMPS_USE_FUNCTIONAL
105  else
106  {
107  _callable(message);
108  }
109 #endif
110  }
111 
112  Handler& operator=(const Handler& rhs_)
113  {
114  if (this != &rhs_)
115  {
116  _func = rhs_._func;
117  _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL
119  _callable = rhs_._callable;
120 #endif
121  _isValid = rhs_._isValid;
122  }
123  return *this;
124  }
125 
126  bool isValid(void) const
127  {
128  return _isValid;
129  }
130  Func function(void) const
131  {
132  return _func;
133  }
134  void* userData(void) const
135  {
136  return _userData;
137  }
138  };
139  class Message;
142  typedef void(*MessageHandlerFunc)(const Message&, void* userData);
143 
145 
150  {
151  private:
152  MessageHandler _emptyMessageHandler;
153  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
154  // Function used to calculate the CRC if one is used
155  CRCFunction _crc;
156  class MessageRoute
157  {
158  MessageHandler _messageHandler;
159  unsigned _requestedAcks, _systemAcks, _terminationAck;
160  public:
161  MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
162  MessageRoute(const MessageRoute& rhs_) :
163  _messageHandler(rhs_._messageHandler),
164  _requestedAcks (rhs_._requestedAcks),
165  _systemAcks (rhs_._systemAcks),
166  _terminationAck(rhs_._terminationAck)
167  {;}
168  const MessageRoute& operator=(const MessageRoute& rhs_)
169  {
170  _messageHandler = rhs_._messageHandler;
171  _requestedAcks = rhs_._requestedAcks;
172  _systemAcks = rhs_._systemAcks;
173  _terminationAck = rhs_._terminationAck;
174  return *this;
175  }
176  MessageRoute(MessageHandler messageHandler_, unsigned requestedAcks_,
177  unsigned systemAcks_, Message::Command::Type commandType_) :
178  _messageHandler(messageHandler_),
179  _requestedAcks(requestedAcks_),
180  _systemAcks(systemAcks_),
181  _terminationAck(0)
182  {
183  bool isSubscribeOrSOW = commandType_ & Message::Command::Subscribe
184  || commandType_ & Message::Command::DeltaSubscribe
185  || commandType_ & Message::Command::SOWAndSubscribe
186  || commandType_ & Message::Command::SOWAndDeltaSubscribe
187  || commandType_ & Message::Command::SOW;
188  if (!isSubscribeOrSOW)
189  {
190  // The ack to terminate the route on is whatever the highest
191  // bit set in requestedAcks is.
192  unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
193  _terminationAck = 1;
194  while (bitCounter > 0)
195  {
196  bitCounter >>= 1;
197  _terminationAck <<= 1;
198  }
199  }
200  else if (commandType_ == Message::Command::SOW)
201  {
202  if (requestedAcks_ >= Message::AckType::Completed)
203  {
204  // The ack to terminate the route on is whatever the highest
205  // bit set in requestedAcks is.
206  unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
207  _terminationAck = 1;
208  while (bitCounter > 0)
209  {
210  bitCounter >>= 1;
211  _terminationAck <<= 1;
212  }
213  }
214  else
215  {
216  _terminationAck = Message::AckType::Completed;
217  }
218  }
219  }
220 
221  // Deliver an ack to registered handler if the ack type was requested
222  unsigned deliverAck(const Message& message_, unsigned ackType_)
223  {
224  if ( (_requestedAcks & ackType_) == 0)
225  {
226  return 0;
227  }
228  try
229  {
230  _messageHandler.invoke(message_);
231  }
232  catch (std::exception& ex)
233  {
234  std::cerr << ex.what() << std::endl;
235  }
236  return 1;
237  }
238  bool isTerminationAck(unsigned ackType_) const
239  {
240  return ackType_ == _terminationAck;
241  }
242  unsigned deliverData(const Message& message_)
243  {
244  _messageHandler.invoke(message_);
245  return 1;
246  }
247  const MessageHandler& getMessageHandler() const
248  {
249  return _messageHandler;
250  }
251  MessageHandler& getMessageHandler()
252  {
253  return _messageHandler;
254  }
255  };
256 
257  public:
258  MessageRouter()
259  : _previousCommandId(0),
260  _lookupGenerationCount(0),
261  _generationCount(0)
262  {
263 #ifndef AMPS_SSE_42
264  _crc = AMPS::CRC<0>::crcNoSSE;
265 #else
266  if (AMPS::CRC<0>::isSSE42Enabled())
267  {
268  _crc = AMPS::CRC<0>::crc;
269  }
270  else
271  {
272  _crc = AMPS::CRC<0>::crcNoSSE;
273  }
274 #endif
275  }
276 
277  int addRoute(const Field& commandId_, const AMPS::MessageHandler& messageHandler_,
278  unsigned requestedAcks_, unsigned systemAcks_, Message::Command::Type commandType_)
279  {
280  Lock<Mutex> lock(_lock);
281  RouteMap::iterator i = _routes.find(commandId_);
282  if (i == _routes.end())
283  {
284  _routes[commandId_.deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
285  return 1;
286  }
287  else
288  {
289  bool isSubscribe = commandType_ & Message::Command::Subscribe
290  || commandType_ & Message::Command::DeltaSubscribe
291  || commandType_ & Message::Command::SOWAndSubscribe
292  || commandType_ & Message::Command::SOWAndDeltaSubscribe;
293 
294  // Only replace a non-subscribe with a subscribe
295  if (isSubscribe
296  && !i->second.isTerminationAck(0))
297  {
298  void* routeData = i->second.getMessageHandler().userData();;
299  i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, commandType_);
300  if (routeData)
301  {
302  Unlock<Mutex> u(_lock);
303  amps_invoke_remove_route_function(routeData);
304  }
305  return 1;
306  }
307  }
308  return 0;
309  }
310 
311  // returns true if a route was removed.
312  bool removeRoute(const Field& commandId_)
313  {
314  Lock<Mutex> lock(_lock);
315  RouteMap::iterator i = _routes.find(commandId_);
316  if (i == _routes.end())
317  {
318  return false;
319  }
320  return _removeRoute(i);
321  }
322 
323  void clear()
324  {
325  AMPS_FETCH_ADD(&_generationCount, 1);
326  std::vector<void*> removeData;
327  {
328  Lock<Mutex> lock(_lock);
329  for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
330  {
331  // Make a non-const copy of Field and clear it, which will clear i
332  // as well but won't actually affect the map, which is unaware that
333  // the key's shared pointer has been deleted.
334  Field f = i->first;
335  void* data = i->second.getMessageHandler().userData();
336  removeData.push_back(data);
337  f.clear();
338  }
339  _routes.clear();
340  }
341  for (size_t i = 0; i < removeData.size(); ++i)
342  {
343  amps_invoke_remove_route_function(removeData[i]);
344  }
345  }
346 
347  // Returns true if a route exists for a single id.
348  bool hasRoute(const Field& commandId_) const
349  {
350  Lock<Mutex> lock(_lock);
351  RouteMap::const_iterator it = _routes.find(commandId_);
352  return it != _routes.end();
353  }
354 
355  // Find a single route and return true if here, setting result_ to the handler.
356  bool getRoute(const Field& commandId_, MessageHandler& result_) const
357  {
358  Lock<Mutex> lock(_lock);
359  RouteMap::const_iterator it = _routes.find(commandId_);
360  if (it != _routes.end())
361  {
362  result_ = it->second.getMessageHandler();
363  return true;
364  }
365  else
366  {
367  result_ = _emptyMessageHandler;
368  return false;
369  }
370  }
371 
372  // RouteCache is the result type for a parseRoutes(); we do extra work
373  // to avoid hitting the map or its lock when the subids field on
374  // publish messages does not change.
375  struct RouteLookup
376  {
377  size_t idOffset;
378  size_t idLength;
379  MessageHandler handler;
380  };
381  class RouteCache : public std::vector<RouteLookup>
382  {
383  RouteCache(const RouteCache&);
384  void operator=(const RouteCache&);
385  public:
386  RouteCache(void)
387  : _generationCount(0),
388  _hashVal(0)
389  {;}
390 
391  void invalidateCache(void)
392  {
393 #if __cplusplus >= 201100L || _MSC_VER >= 1900
394  _generationCount.store(0);
395 #else
396  _generationCount = 0;
397 #endif
398  _hashVal = 0;
399  clear();
400  }
401 #if __cplusplus >= 201100L || _MSC_VER >= 1900
402  void invalidateCache(const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_)
403  {
404  _generationCount.store(generationCount_);
405  _hashVal = hashVal_;
406  clear();
407  }
408 #else
409  void invalidateCache(const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_)
410  {
411  _generationCount = generationCount_;
412  _hashVal = hashVal_;
413  clear();
414  }
415 #endif
416 
417 #if __cplusplus >= 201100L || _MSC_VER >= 1900
418  bool isCacheHit(const std::atomic<uint_fast64_t>& generationCount_, amps_uint64_t hashVal_) const
419  {
420  return _generationCount == generationCount_ && _hashVal == hashVal_;
421  }
422 #else
423  bool isCacheHit(const AMPS_ATOMIC_TYPE& generationCount_, amps_uint64_t hashVal_) const
424  {
425  return _generationCount == generationCount_ && _hashVal == hashVal_;
426  }
427 #endif
428 
429  private:
430 #if __cplusplus >= 201100L || _MSC_VER >= 1900
431  std::atomic<uint_fast64_t> _generationCount;
432 #else
433  AMPS_ATOMIC_TYPE _generationCount;
434 #endif
435  amps_uint64_t _hashVal;
436  };
437 
438  // Parses the command id list into the route lookup vector and assigns
439  // the found handlers into the list. Only intended to be called by the
440  // message handler thread. Returns the number of command/sub IDs parsed.
441  size_t parseRoutes(const Field& commandIdList_, RouteCache& result_)
442  {
443  // Super shortcut: if the whole subID list is the same as the previous one,
444  // then assume the result_ contains all the right handlers already, and that
445  // the offsets and lengths of subIds are unchanged.
446  amps_uint64_t listHash = _crc(commandIdList_.data(), commandIdList_.len(), 0);
447  if (result_.isCacheHit(_generationCount, listHash))
448  {
449  return result_.size();
450  }
451  result_.invalidateCache(_generationCount, listHash);
452 
453  // Lock required now that we'll be using the route map.
454  Lock<Mutex> lockGuard(_lock);
455  size_t resultCount = 0;
456  const char* pStart = commandIdList_.data();
457  for (const char* p = pStart, *e = commandIdList_.len() + pStart; p < e;
458  ++p, ++resultCount)
459  {
460  const char* delimiter = p;
461  while (delimiter != e && *delimiter != ',')
462  {
463  ++delimiter;
464  }
465  AMPS::Field subId(p, (size_t)(delimiter - p));
466 #ifdef AMPS_USE_EMPLACE
467  result_.emplace_back(RouteLookup());
468 #else
469  result_.push_back(RouteLookup());
470 #endif
471  // Push back and then copy over fields; would emplace_back if available on
472  // all supported compilers.
473  RouteLookup& result = result_[resultCount];
474  result.idOffset = (size_t)(p - pStart);
475  result.idLength = (size_t)(delimiter - p);
476 
477  RouteMap::const_iterator it = _routes.find(subId);
478  if (it != _routes.end())
479  {
480  result.handler = it->second.getMessageHandler();
481  }
482  else
483  {
484  result.handler = _emptyMessageHandler;
485  }
486  p = delimiter;
487  }
488  return resultCount;
489  }
490  unsigned deliverAck(const Message& ackMessage_, unsigned ackType_)
491  {
492  assert(ackMessage_.getCommand() == "ack");
493  unsigned messagesDelivered = 0;
494  Field key;
495 
496  // Call _deliverAck, which will deliver to any waiting handlers
497  // AND remove the route if it's a termination ack
498  if (key = ackMessage_.getCommandId(), !key.empty())
499  {
500  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
501  }
502  if (key = ackMessage_.getQueryID(),
503  !key.empty() && messagesDelivered == 0)
504  {
505  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
506  }
507  if (key = ackMessage_.getSubscriptionId(),
508  !key.empty() && messagesDelivered == 0)
509  {
510  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
511  }
512  return messagesDelivered;
513  }
514 
515  // deliverData may only be called by the message handler thread.
516  unsigned deliverData(const Message& dataMessage_, const Field& commandId_)
517  {
518  unsigned messagesDelivered = 0;
519  amps_uint64_t hval = _crc(commandId_.data(), commandId_.len(), 0);
520  if (_previousCommandId == hval &&
521  _lookupGenerationCount == _generationCount)
522  {
523  messagesDelivered += _previousHandler.deliverData(dataMessage_);
524  }
525  else
526  {
527  Lock<Mutex> lock(_lock);
528  RouteMap::iterator it = _routes.find(commandId_);
529  if (it != _routes.end())
530  {
531  _previousCommandId = hval;
532 #if __cplusplus >= 201100L || _MSC_VER >= 1900
533  _lookupGenerationCount.store(_generationCount);
534 #else
535  _lookupGenerationCount = _generationCount;
536 #endif
537  _previousHandler = it->second;
538  messagesDelivered += it->second.deliverData(dataMessage_);
539  }
540  }
541  return messagesDelivered;
542  }
543 
544  void invalidateCache(void)
545  {
546  _previousCommandId = 0;
547  }
548 
549  void unsubscribeAll(void)
550  {
551  AMPS_FETCH_ADD(&_generationCount, 1);
552  std::vector<Field> removeIds;
553  std::vector<void*> removeData;
554  Lock<Mutex> lock(_lock);
555  for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
556  {
557  if (it->second.isTerminationAck(0))
558  {
559  removeIds.push_back(it->first);
560  removeData.push_back(it->second.getMessageHandler().userData());
561  }
562  }
563  for (size_t i = 0; i < removeIds.size(); ++i)
564  {
565  // it can't be end() b/c we have the lock and found id above
566  RouteMap::iterator it = _routes.find(removeIds[i]);
567  // Make a non-const copy of Field and clear it, which will clear i as well
568  Field f = it->first; // -V783
569  f.clear();
570  _routes.erase(it);
571  }
572  Unlock<Mutex> u(_lock);
573  for (size_t i = 0; i < removeData.size(); ++i)
574  {
575  amps_invoke_remove_route_function(removeData[i]);
576  }
577  }
578 
579  private:
580  typedef std::map<Field, MessageRoute> RouteMap;
581  RouteMap _routes;
582  mutable Mutex _lock;
583 
584  MessageRoute _previousHandler;
585  amps_uint64_t _previousCommandId;
586 #if __cplusplus >= 201100L || _MSC_VER >= 1900
587  mutable std::atomic<uint_fast64_t> _lookupGenerationCount;
588  mutable std::atomic<uint_fast64_t> _generationCount;
589 #else
590  mutable AMPS_ATOMIC_TYPE _lookupGenerationCount;
591  mutable AMPS_ATOMIC_TYPE _generationCount;
592 #endif
593 
594 
595  // Deliver the ack to any waiting handlers
596  // AND remove the route if it's a termination ack
597  unsigned _deliverAck(const Message& ackMessage_, unsigned ackType_, Field& commandId_)
598  {
599  Lock<Mutex> lock(_lock);
600  unsigned messagesDelivered = 0;
601  RouteMap::iterator it = _routes.find(commandId_);
602  if (it != _routes.end())
603  {
604  MessageRoute& route = it->second;
605  messagesDelivered += route.deliverAck(ackMessage_, ackType_);
606  if (route.isTerminationAck(ackType_))
607  {
608  _removeRoute(it);
609  ++messagesDelivered;
610  }
611  }
612  return messagesDelivered;
613  }
614  unsigned _processAckForRemoval(unsigned ackType_, Field& commandId_)
615  {
616  Lock<Mutex> lock(_lock);
617  RouteMap::iterator it = _routes.find(commandId_);
618  if (it != _routes.end())
619  {
620  MessageRoute& route = it->second;
621  if (route.isTerminationAck(ackType_))
622  {
623  _removeRoute(it);
624  return 1U;
625  }
626  }
627  return 0U;
628  }
629 
630  // returns true if a route was removed.
631  bool _removeRoute(RouteMap::iterator& it_)
632  {
633  // Called with lock already held
634  AMPS_FETCH_ADD(&_generationCount, 1);
635  // Make a non-const copy of Field and clear it, which will clear i as well
636  Field f = it_->first;
637  void* routeData = it_->second.getMessageHandler().userData();
638  _routes.erase(it_);
639  f.clear();
640  if (routeData)
641  {
642  Unlock<Mutex> u(_lock);
643  amps_invoke_remove_route_function(routeData);
644  }
645  return true;
646  }
647 
648  };
649 
650 
651 }
652 
653 #endif
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1427
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:531
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:247
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:260
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1195
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1302
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:128
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:267
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:86
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4996
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1417
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:218
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103