AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.4.5
HAClientImpl.hpp
1 //
3 // Copyright (c) 2010-2025 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HACLIENTIMPL_H_
27 #define _HACLIENTIMPL_H_
28 
29 #include <typeinfo>
30 #include <amps/ampsplusplus.hpp>
31 #include <amps/ServerChooser.hpp>
34 #if __cplusplus >= 201103L || _MSC_VER >= 1900
35  #include <atomic>
36 #endif
37 
38 namespace AMPS
39 {
40 
41  class HAClientImpl : public ClientImpl
42  {
43  public:
44  HAClientImpl(const std::string& name_)
45  : ClientImpl(name_), _timeout(AMPS_HACLIENT_TIMEOUT_DEFAULT)
46  , _reconnectDelay(AMPS_HACLIENT_RECONNECT_DEFAULT)
47  , _reconnectDelayStrategy(new ExponentialDelayStrategy(_reconnectDelay))
48  , _disconnected(false)
49  {
50 #ifdef AMPS_USE_FUNCTIONAL
51  setDisconnectHandler(HADisconnectHandler());
52 #else
53  setDisconnectHandler(AMPS::DisconnectHandler(&HADisconnectHandler::invoke, NULL));
54 #endif
55  setSubscriptionManager(new MemorySubscriptionManager());
56  }
57 
58  ~HAClientImpl()
59  {
60  _disconnected = true;
61  _cleanup();
62  }
63 
64  void setTimeout(int timeout_)
65  {
66  _timeout = timeout_;
67  }
68 
69  int getTimeout() const
70  {
71  return _timeout;
72  }
73 
74  unsigned int getReconnectDelay(void) const
75  {
76  return _reconnectDelay;
77  }
78 
79  void setReconnectDelay(unsigned int reconnectDelay_)
80  {
81  _reconnectDelay = reconnectDelay_;
82  setReconnectDelayStrategy(new FixedDelayStrategy(
83  (unsigned int)reconnectDelay_));
84  }
85 
86  void setReconnectDelayStrategy(const ReconnectDelayStrategy& strategy_)
87  {
88  _reconnectDelayStrategy = strategy_;
89  }
90 
91  ReconnectDelayStrategy getReconnectDelayStrategy(void) const
92  {
93  return _reconnectDelayStrategy;
94  }
95 
96  std::string getLogonOptions(void) const
97  {
98  return _logonOptions;
99  }
100 
101  void setLogonOptions(const std::string& logonOptions_)
102  {
103  _logonOptions = logonOptions_;
104  }
105 
106  void setLogonOptions(const char* logonOptions_)
107  {
108  _logonOptions = logonOptions_;
109  }
110 
111  ServerChooser getServerChooser() const
112  {
113  return _serverChooser;
114  }
115 
116  void setServerChooser(const ServerChooser& serverChooser_)
117  {
118  _serverChooser = serverChooser_;
119  }
120 
121  class HADisconnectHandler
122  {
123  public:
124  HADisconnectHandler() {}
125  static void invoke(Client& client, void* );
126 #ifdef AMPS_USE_FUNCTIONAL
127  void operator()(Client& client)
128  {
129  invoke(client, NULL);
130  }
131 #endif
132  };
133  void connectAndLogon()
134  {
135  Lock<Mutex> l(_connectAndLogonLock);
136  // AC-1030 In case this is called on a client after delay strategy caused a failure.
137  _reconnectDelayStrategy.reset();
138  try
139  {
140  while (true)
141  {
142  _disconnected = false;
143  connectAndLogonInternal();
144  try
145  {
146  // Resubscribe
147  if (_subscriptionManager)
148  {
149  Client c(this);
150  _subscriptionManager->resubscribe(c);
151  broadcastConnectionStateChanged(
152  ConnectionStateListener::Resubscribed);
153  }
154  return;
155  }
156  catch (const AMPSException& subEx)
157  {
158  // Keep receive thread from reconnecting
159  _disconnected = true;
160  _serverChooser.reportFailure(subEx, getConnectionInfo());
161  ClientImpl::setDisconnected();
162  }
163  }
164  }
165  catch (const RetryOperationException&)
166  {
167  // pass - recv thread was started and is handling reconnect
168  }
169  catch (...)
170  {
171  // Failure, make sure we're disconnected
172  disconnect();
173  throw;
174  }
175  }
176 
177  virtual void connect(const std::string& /*uri*/)
178  {
179  connectAndLogon();
180  }
181 
182  virtual std::string logon(long /*timeout_*/, Authenticator& /*authenticator_*/,
183  const char* /*options_*/)
184  {
185  if (_disconnected)
186  {
187  throw DisconnectedException("Attempt to call logon on a disconnected HAClient. Use connectAndLogon() instead.");
188  }
189  throw AlreadyConnectedException("Attempt to call logon on an HAClient. Use connectAndLogon() instead.");
190  }
191 
192  class DisconnectHandlerDisabler
193  {
194  public:
195  DisconnectHandlerDisabler()
196  : _pClient(NULL), _queueAckTimeout(0), _disconnect(false) { }
197  DisconnectHandlerDisabler(HAClientImpl* pClient_)
198  : _pClient(pClient_)
199  , _queueAckTimeout(0)
200  , _disconnect(false)
201  {
202  setHandler();
203  _queueAckTimeout = _pClient->getAckTimeout();
204  _pClient->setAckTimeout(0);
205  }
206  ~DisconnectHandlerDisabler()
207  {
208  _clear();
209  }
210  void clear()
211  {
212  _clear();
213  if (_disconnect)
214  {
215  _disconnect = false;
216  throw DisconnectedException("Client disconnected during logon.");
217  }
218  }
219  void _clear()
220  {
221  if (_pClient)
222  {
224  _pClient->getHandle(),
225  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
226  _pClient);
227  if (_queueAckTimeout)
228  {
229  _pClient->setAckTimeout(_queueAckTimeout);
230  _queueAckTimeout = 0;
231  }
232  _pClient = NULL;
233  }
234  }
235  void setClient(HAClientImpl* pClient_)
236  {
237  if (!_pClient)
238  {
239  _pClient = pClient_;
240  setHandler();
241  _queueAckTimeout = _pClient->getAckTimeout();
242  _pClient->setAckTimeout(0);
243  amps_client_disconnect(_pClient->getHandle());
244  _disconnect = false;
245  }
246  }
247  void setHandler()
248  {
249  _disconnect = false;
251  _pClient->getHandle(),
252  (amps_handler)HAClientImpl::DisconnectHandlerDisabler::HADoNothingDisconnectHandler,
253  (void*)&_disconnect);
254  }
255  static void HADoNothingDisconnectHandler(amps_handle /*client*/,
256  void* pDisconnect_)
257  {
258  *(bool*)pDisconnect_ = true;
259  }
260 
261  private:
262  HAClientImpl* _pClient;
263  int _queueAckTimeout;
264  bool _disconnect;
265  };
266 
267  void connectAndLogonInternal()
268  {
269  if (!_serverChooser.isValid())
270  {
271  throw ConnectionException("No server chooser registered with HAClient");
272  }
273  {
274  DisconnectHandlerDisabler disconnectDisabler;
275  TryLock<Mutex> l(_connectLock);
276  if (!l.isLocked())
277  {
278  throw RetryOperationException("Retry, another thread is handling reconnnect");
279  }
280  while (!_disconnected)
281  {
282  std::string uri = _serverChooser.getCurrentURI();
283  if (uri.empty())
284  {
285  throw ConnectionException("No AMPS instances available for connection. " + _serverChooser.getError());
286  }
287  Authenticator& auth = _serverChooser.getCurrentAuthenticator();
288  _sleepBeforeConnecting(uri);
289  try
290  {
291  // Check if another thread disconnected or already connected
292  if (_disconnected || _connected)
293  {
294  return;
295  }
296  // Temporarily unset the disconnect handler since we will loop
297  disconnectDisabler.setClient((HAClientImpl*)this);
298  // Connect and logon while holding the _lock
299  {
300  Lock<Mutex> clientLock(_lock);
301  ClientImpl::_connect(uri);
302  try
303  {
304  if (_logonOptions.empty())
305  {
306  ClientImpl::_logon(_timeout, auth);
307  }
308  else
309  {
310  ClientImpl::_logon(_timeout, auth, _logonOptions.c_str());
311  }
312  } // All of the following may not disconnect the client
313  catch (const AuthenticationException&)
314  {
315  ClientImpl::setDisconnected();
316  throw;
317  }
318  catch (const NotEntitledException&)
319  {
320  ClientImpl::setDisconnected();
321  throw;
322  }
323  catch (const DuplicateLogonException&)
324  {
325  ClientImpl::setDisconnected();
326  throw;
327  }
328  catch (const NameInUseException&)
329  {
330  ClientImpl::setDisconnected();
331  throw;
332  }
333  catch (const TimedOutException&)
334  {
335  ClientImpl::setDisconnected();
336  throw;
337  }
338  }
339  try
340  {
341  _serverChooser.reportSuccess(getConnectionInfo());
342  _reconnectDelayStrategy.reset();
343  }
344  catch (const AMPSException&)
345  {
346  ClientImpl::disconnect();
347  throw;
348  }
349  disconnectDisabler.clear();
350  break;
351  }
352  catch (const AMPSException& ex)
353  {
354  ConnectionInfo ci = getConnectionInfo();
355  // Substitute the URI on the connection info with the one we attempted
356  ci["client.uri"] = uri;
357  _serverChooser.reportFailure(ex, ci);
358  try
359  {
360  ClientImpl::setDisconnected();
361  }
362  catch (const std::exception& e)
363  {
364  try
365  {
366  _exceptionListener->exceptionThrown(e);
367  }
368  catch (...) { } // -V565
369  }
370  catch (...)
371  {
372  try
373  {
374  _exceptionListener->exceptionThrown(UnknownException("Unknown exception calling setDisconnected"));
375  }
376  catch (...) { } // -V565
377  }
378  }
379  }
380  }
381  return;
382  }
383 
384  ConnectionInfo gatherConnectionInfo() const
385  {
386  return getConnectionInfo();
387  }
388 
389  ConnectionInfo getConnectionInfo() const
390  {
391  ConnectionInfo info = ClientImpl::getConnectionInfo();
392  std::ostringstream writer;
393 
394  writer << getReconnectDelay();
395  info["haClient.reconnectDelay"] = writer.str();
396  writer.clear(); writer.str("");
397  writer << _timeout;
398  info["haClient.timeout"] = writer.str();
399 
400  return info;
401  }
402 
403  bool disconnected() const
404  {
405  return _disconnected;
406  }
407  private:
408 
409  void disconnect()
410  {
411  _disconnected = true;
412  // Grabbing this lock ensures no other thread is trying to reconnect
413  Lock<Mutex> l(_connectLock);
414  ClientImpl::disconnect();
415  }
416  void _millisleep(unsigned int millis_)
417  {
418  if (millis_ == 0)
419  {
420  return;
421  }
422  double waitTime = (double)millis_;
423  Timer timer(waitTime);
424  timer.start();
425  while (!timer.checkAndGetRemaining(&waitTime))
426  {
427  if (waitTime - 1000.0 > 0.0)
428  {
429  AMPS_USLEEP(1000000);
430  }
431  else
432  {
433  AMPS_USLEEP(1000UL * (unsigned int)waitTime);
434  }
435  amps_invoke_waiting_function();
436  }
437  }
438  void _sleepBeforeConnecting(const std::string& uri_)
439  {
440  try
441  {
442  _millisleep(
443  _reconnectDelayStrategy.getConnectWaitDuration(uri_));
444  }
445  catch (const ConnectionException&)
446  {
447  throw;
448  }
449  catch (const std::exception& ex_)
450  {
451  _exceptionListener->exceptionThrown(ex_);
452  throw ConnectionException(ex_.what());
453  }
454  catch (...)
455  {
456  throw ConnectionException("Unknown exception thrown by "
457  "the HAClient's delay strategy.");
458  }
459  }
460 
461  Mutex _connectLock;
462  Mutex _connectAndLogonLock;
463  int _timeout;
464  unsigned int _reconnectDelay;
465  ReconnectDelayStrategy _reconnectDelayStrategy;
466  ServerChooser _serverChooser;
467 #if __cplusplus >= 201103L || _MSC_VER >= 1900
468  std::atomic<bool> _disconnected;
469 #else
470  volatile bool _disconnected;
471 #endif
472  std::string _logonOptions;
473 
474  }; // class HAClientImpl
475 
476 }// namespace AMPS
477 
478 #endif //_HACLIENTIMPL_H_
479 
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Provides AMPS::MemorySubscriptionManager, used by an AMPS::HAClient to resubmit subscriptions if conn...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::ReconnectDelayStrategy, called by an AMPS::HAClient to determine how long to wait betw...
Provides AMPS::ServerChooser, the abstract base class that defines the interface that an AMPS::HAClie...
Definition: ampsplusplus.hpp:103