26 #ifndef _HACLIENTIMPL_H_ 27 #define _HACLIENTIMPL_H_ 34 #if __cplusplus >= 201103L || _MSC_VER >= 1900 41 class HAClientImpl :
public ClientImpl
44 HAClientImpl(
const std::string& name_)
45 : ClientImpl(name_), _timeout(AMPS_HACLIENT_TIMEOUT_DEFAULT)
46 , _reconnectDelay(AMPS_HACLIENT_RECONNECT_DEFAULT)
47 , _reconnectDelayStrategy(new ExponentialDelayStrategy(_reconnectDelay))
48 , _disconnected(false)
50 #ifdef AMPS_USE_FUNCTIONAL 51 setDisconnectHandler(HADisconnectHandler());
55 setSubscriptionManager(
new MemorySubscriptionManager());
64 void setTimeout(
int timeout_)
69 int getTimeout()
const 74 unsigned int getReconnectDelay(
void)
const 76 return _reconnectDelay;
79 void setReconnectDelay(
unsigned int reconnectDelay_)
81 _reconnectDelay = reconnectDelay_;
82 setReconnectDelayStrategy(
new FixedDelayStrategy(
83 (
unsigned int)reconnectDelay_));
86 void setReconnectDelayStrategy(
const ReconnectDelayStrategy& strategy_)
88 _reconnectDelayStrategy = strategy_;
91 ReconnectDelayStrategy getReconnectDelayStrategy(
void)
const 93 return _reconnectDelayStrategy;
96 std::string getLogonOptions(
void)
const 101 void setLogonOptions(
const std::string& logonOptions_)
103 _logonOptions = logonOptions_;
106 void setLogonOptions(
const char* logonOptions_)
108 _logonOptions = logonOptions_;
111 ServerChooser getServerChooser()
const 113 return _serverChooser;
116 void setServerChooser(
const ServerChooser& serverChooser_)
118 _serverChooser = serverChooser_;
121 class HADisconnectHandler
124 HADisconnectHandler() {}
125 static void invoke(Client& client,
void* );
126 #ifdef AMPS_USE_FUNCTIONAL 127 void operator()(Client& client)
129 invoke(client, NULL);
133 void connectAndLogon()
135 Lock<Mutex> l(_connectAndLogonLock);
137 _reconnectDelayStrategy.reset();
142 _disconnected =
false;
143 connectAndLogonInternal();
147 if (_subscriptionManager)
150 _subscriptionManager->resubscribe(c);
151 broadcastConnectionStateChanged(
152 ConnectionStateListener::Resubscribed);
156 catch (
const AMPSException& subEx)
159 _disconnected =
true;
160 _serverChooser.reportFailure(subEx, getConnectionInfo());
161 ClientImpl::setDisconnected();
165 catch (
const RetryOperationException&)
177 virtual void connect(
const std::string& )
182 virtual std::string logon(
long , Authenticator& ,
187 throw DisconnectedException(
"Attempt to call logon on a disconnected HAClient. Use connectAndLogon() instead.");
189 throw AlreadyConnectedException(
"Attempt to call logon on an HAClient. Use connectAndLogon() instead.");
192 class DisconnectHandlerDisabler
195 DisconnectHandlerDisabler()
196 : _pClient(NULL), _queueAckTimeout(0), _disconnect(false) { }
197 DisconnectHandlerDisabler(HAClientImpl* pClient_)
199 , _queueAckTimeout(0)
203 _queueAckTimeout = _pClient->getAckTimeout();
204 _pClient->setAckTimeout(0);
206 ~DisconnectHandlerDisabler()
216 throw DisconnectedException(
"Client disconnected during logon.");
224 _pClient->getHandle(),
225 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
227 if (_queueAckTimeout)
229 _pClient->setAckTimeout(_queueAckTimeout);
230 _queueAckTimeout = 0;
235 void setClient(HAClientImpl* pClient_)
241 _queueAckTimeout = _pClient->getAckTimeout();
242 _pClient->setAckTimeout(0);
251 _pClient->getHandle(),
252 (amps_handler)HAClientImpl::DisconnectHandlerDisabler::HADoNothingDisconnectHandler,
253 (
void*)&_disconnect);
255 static void HADoNothingDisconnectHandler(
amps_handle ,
258 *(
bool*)pDisconnect_ =
true;
262 HAClientImpl* _pClient;
263 int _queueAckTimeout;
267 void connectAndLogonInternal()
269 if (!_serverChooser.isValid())
271 throw ConnectionException(
"No server chooser registered with HAClient");
274 DisconnectHandlerDisabler disconnectDisabler;
275 TryLock<Mutex> l(_connectLock);
278 throw RetryOperationException(
"Retry, another thread is handling reconnnect");
280 while (!_disconnected)
282 std::string uri = _serverChooser.getCurrentURI();
285 throw ConnectionException(
"No AMPS instances available for connection. " + _serverChooser.getError());
287 Authenticator& auth = _serverChooser.getCurrentAuthenticator();
288 _sleepBeforeConnecting(uri);
292 if (_disconnected || _connected)
297 disconnectDisabler.setClient((HAClientImpl*)
this);
300 Lock<Mutex> clientLock(_lock);
301 ClientImpl::_connect(uri);
304 if (_logonOptions.empty())
306 ClientImpl::_logon(_timeout, auth);
310 ClientImpl::_logon(_timeout, auth, _logonOptions.c_str());
313 catch (
const AuthenticationException&)
315 ClientImpl::setDisconnected();
318 catch (
const NotEntitledException&)
320 ClientImpl::setDisconnected();
323 catch (
const DuplicateLogonException&)
325 ClientImpl::setDisconnected();
328 catch (
const NameInUseException&)
330 ClientImpl::setDisconnected();
333 catch (
const TimedOutException&)
335 ClientImpl::setDisconnected();
341 _serverChooser.reportSuccess(getConnectionInfo());
342 _reconnectDelayStrategy.reset();
344 catch (
const AMPSException&)
346 ClientImpl::disconnect();
349 disconnectDisabler.clear();
352 catch (
const AMPSException& ex)
354 ConnectionInfo ci = getConnectionInfo();
356 ci[
"client.uri"] = uri;
357 _serverChooser.reportFailure(ex, ci);
360 ClientImpl::setDisconnected();
362 catch (
const std::exception& e)
366 _exceptionListener->exceptionThrown(e);
374 _exceptionListener->exceptionThrown(UnknownException(
"Unknown exception calling setDisconnected"));
384 ConnectionInfo gatherConnectionInfo()
const 386 return getConnectionInfo();
389 ConnectionInfo getConnectionInfo()
const 391 ConnectionInfo info = ClientImpl::getConnectionInfo();
392 std::ostringstream writer;
394 writer << getReconnectDelay();
395 info[
"haClient.reconnectDelay"] = writer.str();
396 writer.clear(); writer.str(
"");
398 info[
"haClient.timeout"] = writer.str();
403 bool disconnected()
const 405 return _disconnected;
411 _disconnected =
true;
413 Lock<Mutex> l(_connectLock);
414 ClientImpl::disconnect();
416 void _millisleep(
unsigned int millis_)
422 double waitTime = (double)millis_;
423 Timer timer(waitTime);
425 while (!timer.checkAndGetRemaining(&waitTime))
427 if (waitTime - 1000.0 > 0.0)
429 AMPS_USLEEP(1000000);
433 AMPS_USLEEP(1000UL * (
unsigned int)waitTime);
435 amps_invoke_waiting_function();
438 void _sleepBeforeConnecting(
const std::string& uri_)
443 _reconnectDelayStrategy.getConnectWaitDuration(uri_));
445 catch (
const ConnectionException&)
449 catch (
const std::exception& ex_)
451 _exceptionListener->exceptionThrown(ex_);
452 throw ConnectionException(ex_.what());
456 throw ConnectionException(
"Unknown exception thrown by " 457 "the HAClient's delay strategy.");
462 Mutex _connectAndLogonLock;
464 unsigned int _reconnectDelay;
465 ReconnectDelayStrategy _reconnectDelayStrategy;
466 ServerChooser _serverChooser;
467 #if __cplusplus >= 201103L || _MSC_VER >= 1900 468 std::atomic<bool> _disconnected;
470 volatile bool _disconnected;
472 std::string _logonOptions;
478 #endif //_HACLIENTIMPL_H_ AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Provides AMPS::MemorySubscriptionManager, used by an AMPS::HAClient to resubmit subscriptions if conn...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:211
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::ReconnectDelayStrategy, called by an AMPS::HAClient to determine how long to wait betw...
Provides AMPS::ServerChooser, the abstract base class that defines the interface that an AMPS::HAClie...
Definition: ampsplusplus.hpp:103