AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.5.1
HAClientImpl.hpp
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HACLIENTIMPL_H_
27 #define _HACLIENTIMPL_H_
28 
29 #include <typeinfo>
30 #include <amps/ampsplusplus.hpp>
31 #include <amps/ServerChooser.hpp>
34 #if __cplusplus >= 201103L || _MSC_VER >= 1900
35  #include <atomic>
36 #endif
37 
38 namespace AMPS
39 {
40 
41  class HAClientImpl : public ClientImpl
42  {
43  public:
44  HAClientImpl(const std::string& name_)
45  : ClientImpl(name_), _timeout(AMPS_HACLIENT_TIMEOUT_DEFAULT)
46  , _reconnectDelay(AMPS_HACLIENT_RECONNECT_DEFAULT)
47  , _reconnectDelayStrategy(new ExponentialDelayStrategy(_reconnectDelay))
48  , _disconnected(false)
49  {
50 #ifdef AMPS_USE_FUNCTIONAL
51  setDisconnectHandler(HADisconnectHandler());
52 #else
53  setDisconnectHandler(AMPS::DisconnectHandler(&HADisconnectHandler::invoke, NULL));
54 #endif
55  setSubscriptionManager(new MemorySubscriptionManager());
56  }
57 
58  ~HAClientImpl()
59  {
60  _disconnected = true;
61  _cleanup();
62  }
63 
64  void setTimeout(int timeout_)
65  {
66  _timeout = timeout_;
67  }
68 
69  int getTimeout() const
70  {
71  return _timeout;
72  }
73 
74  unsigned int getReconnectDelay(void) const
75  {
76  return _reconnectDelay;
77  }
78 
79  void setReconnectDelay(unsigned int reconnectDelay_)
80  {
81  _reconnectDelay = reconnectDelay_;
82  setReconnectDelayStrategy(new FixedDelayStrategy(
83  (unsigned int)reconnectDelay_));
84  }
85 
86  void setReconnectDelayStrategy(const ReconnectDelayStrategy& strategy_)
87  {
88  _reconnectDelayStrategy = strategy_;
89  _reconnectDelay = 0;
90  }
91 
92  ReconnectDelayStrategy getReconnectDelayStrategy(void) const
93  {
94  return _reconnectDelayStrategy;
95  }
96 
97  std::string getLogonOptions(void) const
98  {
99  return _logonOptions;
100  }
101 
102  void setLogonOptions(const std::string& logonOptions_)
103  {
104  _logonOptions = logonOptions_;
105  }
106 
107  void setLogonOptions(const char* logonOptions_)
108  {
109  _logonOptions = logonOptions_;
110  }
111 
112  ServerChooser getServerChooser() const
113  {
114  return _serverChooser;
115  }
116 
117  void setServerChooser(const ServerChooser& serverChooser_)
118  {
119  _serverChooser = serverChooser_;
120  }
121 
122  class HADisconnectHandler
123  {
124  public:
125  HADisconnectHandler() {}
126  static void invoke(Client& client, void* );
127 #ifdef AMPS_USE_FUNCTIONAL
128  void operator()(Client& client)
129  {
130  invoke(client, NULL);
131  }
132 #endif
133  };
134  void connectAndLogon()
135  {
136  Lock<Mutex> l(_connectAndLogonLock);
137  // AC-1030 In case this is called on a client after delay strategy caused a failure.
138  // Reset delay strategy, then get the duration default and reset again
139  _reconnectDelayStrategy.reset();
140  _reconnectDelay = _reconnectDelayStrategy.getConnectWaitDuration("DUMMY_URI");
141  _reconnectDelayStrategy.reset();
142  try
143  {
144  while (true)
145  {
146  _disconnected = false;
147  connectAndLogonInternal();
148  try
149  {
150  // Resubscribe
151  if (_subscriptionManager)
152  {
153  Client c(this);
154  _subscriptionManager->resubscribe(c);
155  broadcastConnectionStateChanged(
156  ConnectionStateListener::Resubscribed);
157  }
158  return;
159  }
160  catch (const AMPSException& subEx)
161  {
162  // Keep receive thread from reconnecting
163  _disconnected = true;
164  _serverChooser.reportFailure(subEx, getConnectionInfo());
165  ClientImpl::setDisconnected();
166  }
167  }
168  }
169  catch (const RetryOperationException&)
170  {
171  // pass - recv thread was started and is handling reconnect
172  }
173  catch (...)
174  {
175  // Failure, make sure we're disconnected
176  disconnect();
177  throw;
178  }
179  }
180 
181  virtual void connect(const std::string& /*uri*/)
182  {
183  connectAndLogon();
184  }
185 
186  virtual std::string logon(long /*timeout_*/, Authenticator& /*authenticator_*/,
187  const char* /*options_*/)
188  {
189  if (_disconnected)
190  {
191  throw DisconnectedException("Attempt to call logon on a disconnected HAClient. Use connectAndLogon() instead.");
192  }
193  throw AlreadyConnectedException("Attempt to call logon on an HAClient. Use connectAndLogon() instead.");
194  }
195 
196  class DisconnectHandlerDisabler
197  {
198  public:
199  DisconnectHandlerDisabler()
200  : _pClient(NULL), _queueAckTimeout(0), _disconnect(false) { }
201  DisconnectHandlerDisabler(HAClientImpl* pClient_)
202  : _pClient(pClient_)
203  , _queueAckTimeout(0)
204  , _disconnect(false)
205  {
206  setHandler();
207  _queueAckTimeout = _pClient->getAckTimeout();
208  _pClient->setAckTimeout(0);
209  }
210  ~DisconnectHandlerDisabler()
211  {
212  _clear();
213  }
214  void clear()
215  {
216  _clear();
217  if (_disconnect)
218  {
219  _disconnect = false;
220  throw DisconnectedException("Client disconnected during logon.");
221  }
222  }
223  void _clear()
224  {
225  if (_pClient)
226  {
228  _pClient->getHandle(),
229  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
230  _pClient);
231  if (_queueAckTimeout)
232  {
233  _pClient->setAckTimeout(_queueAckTimeout);
234  _queueAckTimeout = 0;
235  }
236  _pClient = NULL;
237  }
238  }
239  void setClient(HAClientImpl* pClient_)
240  {
241  if (!_pClient)
242  {
243  _pClient = pClient_;
244  setHandler();
245  _queueAckTimeout = _pClient->getAckTimeout();
246  _pClient->setAckTimeout(0);
247  amps_client_disconnect(_pClient->getHandle());
248  _disconnect = false;
249  }
250  }
251  void setHandler()
252  {
253  _disconnect = false;
255  _pClient->getHandle(),
256  (amps_handler)HAClientImpl::DisconnectHandlerDisabler::HADoNothingDisconnectHandler,
257  (void*)&_disconnect);
258  }
259  static void HADoNothingDisconnectHandler(amps_handle /*client*/,
260  void* pDisconnect_)
261  {
262  *(bool*)pDisconnect_ = true;
263  }
264 
265  private:
266  HAClientImpl* _pClient;
267  int _queueAckTimeout;
268  bool _disconnect;
269  };
270 
271  void connectAndLogonInternal()
272  {
273  if (!_serverChooser.isValid())
274  {
275  throw ConnectionException("No server chooser registered with HAClient");
276  }
277  {
278  DisconnectHandlerDisabler disconnectDisabler;
279  TryLock<Mutex> l(_connectLock);
280  if (!l.isLocked())
281  {
282  throw RetryOperationException("Retry, another thread is handling reconnnect");
283  }
284  while (!_disconnected)
285  {
286  std::string uri = _serverChooser.getCurrentURI();
287  if (uri.empty())
288  {
289  throw ConnectionException("No AMPS instances available for connection. " + _serverChooser.getError());
290  }
291  Authenticator& auth = _serverChooser.getCurrentAuthenticator();
292  _sleepBeforeConnecting(uri);
293  try
294  {
295  // Check if another thread disconnected or already connected
296  if (_disconnected || _connected)
297  {
298  return;
299  }
300  // Temporarily unset the disconnect handler since we will loop
301  disconnectDisabler.setClient((HAClientImpl*)this);
302  // Connect and logon while holding the _lock
303  {
304  Lock<Mutex> clientLock(_lock);
305  ClientImpl::_connect(uri);
306  try
307  {
308  if (_logonOptions.empty())
309  {
310  ClientImpl::_logon(_timeout, auth);
311  }
312  else
313  {
314  ClientImpl::_logon(_timeout, auth, _logonOptions.c_str());
315  }
316  } // All of the following may not disconnect the client
317  catch (const AuthenticationException&)
318  {
319  ClientImpl::setDisconnected();
320  throw;
321  }
322  catch (const NotEntitledException&)
323  {
324  ClientImpl::setDisconnected();
325  throw;
326  }
327  catch (const DuplicateLogonException&)
328  {
329  ClientImpl::setDisconnected();
330  throw;
331  }
332  catch (const NameInUseException&)
333  {
334  ClientImpl::setDisconnected();
335  throw;
336  }
337  catch (const TimedOutException&)
338  {
339  ClientImpl::setDisconnected();
340  throw;
341  }
342  }
343  try
344  {
345  _serverChooser.reportSuccess(getConnectionInfo());
346  // We're clear, reset delay strategy, then get the duration default and reset again
347  _reconnectDelayStrategy.reset();
348  _reconnectDelay = _reconnectDelayStrategy.getConnectWaitDuration("DUMMY_URI");
349  _reconnectDelayStrategy.reset();
350  }
351  catch (const AMPSException&)
352  {
353  ClientImpl::disconnect();
354  throw;
355  }
356  disconnectDisabler.clear();
357  break;
358  }
359  catch (const AMPSException& ex)
360  {
361  ConnectionInfo ci = getConnectionInfo();
362  // Substitute the URI on the connection info with the one we attempted
363  ci["client.uri"] = uri;
364  _serverChooser.reportFailure(ex, ci);
365  try
366  {
367  ClientImpl::setDisconnected();
368  }
369  catch (const std::exception& e)
370  {
371  try
372  {
373  _exceptionListener->exceptionThrown(e);
374  }
375  catch (...) { } // -V565
376  }
377  catch (...)
378  {
379  try
380  {
381  _exceptionListener->exceptionThrown(UnknownException("Unknown exception calling setDisconnected"));
382  }
383  catch (...) { } // -V565
384  }
385  }
386  }
387  }
388  return;
389  }
390 
391  ConnectionInfo gatherConnectionInfo() const
392  {
393  return getConnectionInfo();
394  }
395 
396  ConnectionInfo getConnectionInfo() const
397  {
398  ConnectionInfo info = ClientImpl::getConnectionInfo();
399  std::ostringstream writer;
400 
401  writer << getReconnectDelay();
402  info["haClient.reconnectDelay"] = writer.str();
403  writer.clear(); writer.str("");
404  writer << _timeout;
405  info["haClient.timeout"] = writer.str();
406 
407  return info;
408  }
409 
410  bool disconnected() const
411  {
412  return _disconnected;
413  }
414  private:
415 
416  void disconnect()
417  {
418  _disconnected = true;
419  // Grabbing this lock ensures no other thread is trying to reconnect
420  Lock<Mutex> l(_connectLock);
421  ClientImpl::disconnect();
422  }
423  void _millisleep(unsigned int millis_)
424  {
425  if (millis_ == 0)
426  {
427  return;
428  }
429  double waitTime = (double)millis_;
430  Timer timer(waitTime);
431  timer.start();
432  while (!timer.checkAndGetRemaining(&waitTime))
433  {
434  if (waitTime - 1000.0 > 0.0)
435  {
436  AMPS_USLEEP(1000000);
437  }
438  else
439  {
440  AMPS_USLEEP(1000UL * (unsigned int)waitTime);
441  }
442  amps_invoke_waiting_function();
443  }
444  }
445  void _sleepBeforeConnecting(const std::string& uri_)
446  {
447  try
448  {
449  _reconnectDelay = _reconnectDelayStrategy.getConnectWaitDuration(uri_);
450  _millisleep(_reconnectDelay);
451  }
452  catch (const ConnectionException&)
453  {
454  throw;
455  }
456  catch (const std::exception& ex_)
457  {
458  _exceptionListener->exceptionThrown(ex_);
459  throw ConnectionException(ex_.what());
460  }
461  catch (...)
462  {
463  throw ConnectionException("Unknown exception thrown by "
464  "the HAClient's delay strategy.");
465  }
466  }
467 
468  Mutex _connectLock;
469  Mutex _connectAndLogonLock;
470  int _timeout;
471  unsigned int _reconnectDelay;
472  ReconnectDelayStrategy _reconnectDelayStrategy;
473  ServerChooser _serverChooser;
474 #if __cplusplus >= 201103L || _MSC_VER >= 1900
475  std::atomic<bool> _disconnected;
476 #else
477  volatile bool _disconnected;
478 #endif
479  std::string _logonOptions;
480 
481  }; // class HAClientImpl
482 
483 }// namespace AMPS
484 
485 #endif //_HACLIENTIMPL_H_
486 
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Provides AMPS::MemorySubscriptionManager, used by an AMPS::HAClient to resubmit subscriptions if conn...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::ReconnectDelayStrategy, called by an AMPS::HAClient to determine how long to wait betw...
Provides AMPS::ServerChooser, the abstract base class that defines the interface that an AMPS::HAClie...
Definition: ampsplusplus.hpp:103