26 #ifndef _HACLIENTIMPL_H_ 27 #define _HACLIENTIMPL_H_ 34 #if __cplusplus >= 201103L || _MSC_VER >= 1900 41 class HAClientImpl :
public ClientImpl
44 HAClientImpl(
const std::string& name_)
45 : ClientImpl(name_), _timeout(AMPS_HACLIENT_TIMEOUT_DEFAULT)
46 , _reconnectDelay(AMPS_HACLIENT_RECONNECT_DEFAULT)
47 , _reconnectDelayStrategy(new ExponentialDelayStrategy(_reconnectDelay))
48 , _disconnected(false)
50 #ifdef AMPS_USE_FUNCTIONAL 51 setDisconnectHandler(HADisconnectHandler());
55 setSubscriptionManager(
new MemorySubscriptionManager());
64 void setTimeout(
int timeout_)
69 int getTimeout()
const 74 unsigned int getReconnectDelay(
void)
const 76 return _reconnectDelay;
79 void setReconnectDelay(
unsigned int reconnectDelay_)
81 _reconnectDelay = reconnectDelay_;
82 setReconnectDelayStrategy(
new FixedDelayStrategy(
83 (
unsigned int)reconnectDelay_));
86 void setReconnectDelayStrategy(
const ReconnectDelayStrategy& strategy_)
88 _reconnectDelayStrategy = strategy_;
92 ReconnectDelayStrategy getReconnectDelayStrategy(
void)
const 94 return _reconnectDelayStrategy;
97 std::string getLogonOptions(
void)
const 102 void setLogonOptions(
const std::string& logonOptions_)
104 _logonOptions = logonOptions_;
107 void setLogonOptions(
const char* logonOptions_)
109 _logonOptions = logonOptions_;
112 ServerChooser getServerChooser()
const 114 return _serverChooser;
117 void setServerChooser(
const ServerChooser& serverChooser_)
119 _serverChooser = serverChooser_;
122 class HADisconnectHandler
125 HADisconnectHandler() {}
126 static void invoke(Client& client,
void* );
127 #ifdef AMPS_USE_FUNCTIONAL 128 void operator()(Client& client)
130 invoke(client, NULL);
134 void connectAndLogon()
136 Lock<Mutex> l(_connectAndLogonLock);
139 _reconnectDelayStrategy.reset();
140 _reconnectDelay = _reconnectDelayStrategy.getConnectWaitDuration(
"DUMMY_URI");
141 _reconnectDelayStrategy.reset();
146 _disconnected =
false;
147 connectAndLogonInternal();
151 if (_subscriptionManager)
154 _subscriptionManager->resubscribe(c);
155 broadcastConnectionStateChanged(
156 ConnectionStateListener::Resubscribed);
160 catch (
const AMPSException& subEx)
163 _disconnected =
true;
164 _serverChooser.reportFailure(subEx, getConnectionInfo());
165 ClientImpl::setDisconnected();
169 catch (
const RetryOperationException&)
181 virtual void connect(
const std::string& )
186 virtual std::string logon(
long , Authenticator& ,
191 throw DisconnectedException(
"Attempt to call logon on a disconnected HAClient. Use connectAndLogon() instead.");
193 throw AlreadyConnectedException(
"Attempt to call logon on an HAClient. Use connectAndLogon() instead.");
196 class DisconnectHandlerDisabler
199 DisconnectHandlerDisabler()
200 : _pClient(NULL), _queueAckTimeout(0), _disconnect(false) { }
201 DisconnectHandlerDisabler(HAClientImpl* pClient_)
203 , _queueAckTimeout(0)
207 _queueAckTimeout = _pClient->getAckTimeout();
208 _pClient->setAckTimeout(0);
210 ~DisconnectHandlerDisabler()
220 throw DisconnectedException(
"Client disconnected during logon.");
228 _pClient->getHandle(),
229 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
231 if (_queueAckTimeout)
233 _pClient->setAckTimeout(_queueAckTimeout);
234 _queueAckTimeout = 0;
239 void setClient(HAClientImpl* pClient_)
245 _queueAckTimeout = _pClient->getAckTimeout();
246 _pClient->setAckTimeout(0);
255 _pClient->getHandle(),
256 (amps_handler)HAClientImpl::DisconnectHandlerDisabler::HADoNothingDisconnectHandler,
257 (
void*)&_disconnect);
259 static void HADoNothingDisconnectHandler(
amps_handle ,
262 *(
bool*)pDisconnect_ =
true;
266 HAClientImpl* _pClient;
267 int _queueAckTimeout;
271 void connectAndLogonInternal()
273 if (!_serverChooser.isValid())
275 throw ConnectionException(
"No server chooser registered with HAClient");
278 DisconnectHandlerDisabler disconnectDisabler;
279 TryLock<Mutex> l(_connectLock);
282 throw RetryOperationException(
"Retry, another thread is handling reconnnect");
284 while (!_disconnected)
286 std::string uri = _serverChooser.getCurrentURI();
289 throw ConnectionException(
"No AMPS instances available for connection. " + _serverChooser.getError());
291 Authenticator& auth = _serverChooser.getCurrentAuthenticator();
292 _sleepBeforeConnecting(uri);
296 if (_disconnected || _connected)
301 disconnectDisabler.setClient((HAClientImpl*)
this);
304 Lock<Mutex> clientLock(_lock);
305 ClientImpl::_connect(uri);
308 if (_logonOptions.empty())
310 ClientImpl::_logon(_timeout, auth);
314 ClientImpl::_logon(_timeout, auth, _logonOptions.c_str());
317 catch (
const AuthenticationException&)
319 ClientImpl::setDisconnected();
322 catch (
const NotEntitledException&)
324 ClientImpl::setDisconnected();
327 catch (
const DuplicateLogonException&)
329 ClientImpl::setDisconnected();
332 catch (
const NameInUseException&)
334 ClientImpl::setDisconnected();
337 catch (
const TimedOutException&)
339 ClientImpl::setDisconnected();
345 _serverChooser.reportSuccess(getConnectionInfo());
347 _reconnectDelayStrategy.reset();
348 _reconnectDelay = _reconnectDelayStrategy.getConnectWaitDuration(
"DUMMY_URI");
349 _reconnectDelayStrategy.reset();
351 catch (
const AMPSException&)
353 ClientImpl::disconnect();
356 disconnectDisabler.clear();
359 catch (
const AMPSException& ex)
361 ConnectionInfo ci = getConnectionInfo();
363 ci[
"client.uri"] = uri;
364 _serverChooser.reportFailure(ex, ci);
367 ClientImpl::setDisconnected();
369 catch (
const std::exception& e)
373 _exceptionListener->exceptionThrown(e);
381 _exceptionListener->exceptionThrown(UnknownException(
"Unknown exception calling setDisconnected"));
391 ConnectionInfo gatherConnectionInfo()
const 393 return getConnectionInfo();
396 ConnectionInfo getConnectionInfo()
const 398 ConnectionInfo info = ClientImpl::getConnectionInfo();
399 std::ostringstream writer;
401 writer << getReconnectDelay();
402 info[
"haClient.reconnectDelay"] = writer.str();
403 writer.clear(); writer.str(
"");
405 info[
"haClient.timeout"] = writer.str();
410 bool disconnected()
const 412 return _disconnected;
418 _disconnected =
true;
420 Lock<Mutex> l(_connectLock);
421 ClientImpl::disconnect();
423 void _millisleep(
unsigned int millis_)
429 double waitTime = (double)millis_;
430 Timer timer(waitTime);
432 while (!timer.checkAndGetRemaining(&waitTime))
434 if (waitTime - 1000.0 > 0.0)
436 AMPS_USLEEP(1000000);
440 AMPS_USLEEP(1000UL * (
unsigned int)waitTime);
442 amps_invoke_waiting_function();
445 void _sleepBeforeConnecting(
const std::string& uri_)
449 _reconnectDelay = _reconnectDelayStrategy.getConnectWaitDuration(uri_);
450 _millisleep(_reconnectDelay);
452 catch (
const ConnectionException&)
456 catch (
const std::exception& ex_)
458 _exceptionListener->exceptionThrown(ex_);
459 throw ConnectionException(ex_.what());
463 throw ConnectionException(
"Unknown exception thrown by " 464 "the HAClient's delay strategy.");
469 Mutex _connectAndLogonLock;
471 unsigned int _reconnectDelay;
472 ReconnectDelayStrategy _reconnectDelayStrategy;
473 ServerChooser _serverChooser;
474 #if __cplusplus >= 201103L || _MSC_VER >= 1900 475 std::atomic<bool> _disconnected;
477 volatile bool _disconnected;
479 std::string _logonOptions;
485 #endif //_HACLIENTIMPL_H_ AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Provides AMPS::MemorySubscriptionManager, used by an AMPS::HAClient to resubmit subscriptions if conn...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::ReconnectDelayStrategy, called by an AMPS::HAClient to determine how long to wait betw...
Provides AMPS::ServerChooser, the abstract base class that defines the interface that an AMPS::HAClie...
Definition: ampsplusplus.hpp:103