聊聊artemis的handleConnectionFailure

编程

handleConnectionFailure

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

//......

private void handleConnectionFailure(final Object connectionID,

final ActiveMQException me,

String scaleDownTargetNodeID) {

try {

failoverOrReconnect(connectionID, me, scaleDownTargetNodeID);

} catch (ActiveMQInterruptedException e1) {

// this is just a debug, since an interrupt is an expected event (in case of a shutdown)

logger.debug(e1.getMessage(), e1);

} catch (Throwable t) {

ActiveMQClientLogger.LOGGER.unableToHandleConnectionFailure(t);

//for anything else just close so clients are un blocked

close();

throw t;

}

}

private void failoverOrReconnect(final Object connectionID,

final ActiveMQException me,

String scaleDownTargetNodeID) {

ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);

for (ClientSessionInternal session : sessions) {

SessionContext context = session.getSessionContext();

if (context instanceof ActiveMQSessionContext) {

ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;

if (sessionContext.isKilled()) {

setReconnectAttempts(0);

}

}

}

Set<ClientSessionInternal> sessionsToClose = null;

if (!clientProtocolManager.isAlive())

return;

Lock localFailoverLock = lockFailover();

try {

if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) {

// We already failed over/reconnected - probably the first failure came in, all the connections were failed

// over then an async connection exception or disconnect

// came in for one of the already exitLoop connections, so we return true - we don"t want to call the

// listeners again

return;

}

if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {

logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);

}

callFailoverListeners(FailoverEventType.FAILURE_DETECTED);

// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages

callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);

// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure

// There are either no threads executing in createSession, or one is blocking on a createSession

// result.

// Then interrupt the channel 1 that is blocking (could just interrupt them all)

// Then release all channel 1 locks - this allows the createSession to exit the monitor

// Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and

// returned all its connections to the connection manager (the code to return connections to connection manager

// must be inside the lock

// Then perform failover

// Then release failoverLock

// The other side of the bargain - during createSession:

// The calling thread must get the failoverLock and get its" connections when this is

// locked.

// While this is still locked it must then get the channel1 lock

// It can then release the failoverLock

// It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking

// It should then return its connections, with channel 1 lock still held

// It can then release the channel 1 lock, and retry (which will cause locking on

// failoverLock

// until failover is complete

if (reconnectAttempts != 0) {

if (clientProtocolManager.cleanupBeforeFailover(me)) {

// Now we absolutely know that no threads are executing in or blocked in

// createSession,

// and no

// more will execute it until failover is complete

// So.. do failover / reconnection

RemotingConnection oldConnection = connection;

connection = null;

Connector localConnector = connector;

if (localConnector != null) {

try {

localConnector.close();

} catch (Exception ignore) {

// no-op

}

}

cancelScheduledTasks();

connector = null;

reconnectSessions(oldConnection, reconnectAttempts, me);

if (oldConnection != null) {

oldConnection.destroy();

}

if (connection != null) {

callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);

}

}

} else {

RemotingConnection connectionToDestory = connection;

if (connectionToDestory != null) {

connectionToDestory.destroy();

}

connection = null;

}

if (connection == null) {

synchronized (sessions) {

sessionsToClose = new HashSet<>(sessions);

}

callFailoverListeners(FailoverEventType.FAILOVER_FAILED);

callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);

}

} finally {

localFailoverLock.unlock();

}

// This needs to be outside the failover lock to prevent deadlock

if (connection != null) {

callSessionFailureListeners(me, true, true);

}

if (sessionsToClose != null) {

// If connection is null it means we didn"t succeed in failing over or reconnecting

// so we close all the sessions, so they will throw exceptions when attempted to be used

for (ClientSessionInternal session : sessionsToClose) {

try {

session.cleanUp(true);

} catch (Exception cause) {

ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);

}

}

}

}

//......

}

  • handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()

reconnectSessions

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

//......

private void reconnectSessions(final RemotingConnection oldConnection,

final int reconnectAttempts,

final ActiveMQException cause) {

HashSet<ClientSessionInternal> sessionsToFailover;

synchronized (sessions) {

sessionsToFailover = new HashSet<>(sessions);

}

for (ClientSessionInternal session : sessionsToFailover) {

session.preHandleFailover(connection);

}

getConnectionWithRetry(reconnectAttempts, oldConnection);

if (connection == null) {

if (!clientProtocolManager.isAlive())

ActiveMQClientLogger.LOGGER.failedToConnectToServer();

return;

}

List<FailureListener> oldListeners = oldConnection.getFailureListeners();

List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners());

for (FailureListener listener : oldListeners) {

// Add all apart from the old DelegatingFailureListener

if (listener instanceof DelegatingFailureListener == false) {

newListeners.add(listener);

}

}

connection.setFailureListeners(newListeners);

// This used to be done inside failover

// it needs to be done on the protocol

((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());

for (ClientSessionInternal session : sessionsToFailover) {

if (!session.handleFailover(connection, cause)) {

connection.destroy();

this.connection = null;

return;

}

}

}

private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {

if (!clientProtocolManager.isAlive())

return;

if (logger.isTraceEnabled()) {

logger.trace("getConnectionWithRetry::" + reconnectAttempts +

" with retryInterval = " +

retryInterval +

" multiplier = " +

retryIntervalMultiplier, new Exception("trace"));

}

long interval = retryInterval;

int count = 0;

while (clientProtocolManager.isAlive()) {

if (logger.isDebugEnabled()) {

logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);

}

if (getConnection() != null) {

if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {

// transferring old connection version into the new connection

((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());

}

if (logger.isDebugEnabled()) {

logger.debug("Reconnection successful");

}

return;

} else {

// Failed to get connection

if (reconnectAttempts != 0) {

count++;

if (reconnectAttempts != -1 && count == reconnectAttempts) {

if (reconnectAttempts != 1) {

ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);

}

return;

}

if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {

ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);

}

if (waitForRetry(interval))

return;

// Exponential back-off

long newInterval = (long) (interval * retryIntervalMultiplier);

if (newInterval > maxRetryInterval) {

newInterval = maxRetryInterval;

}

interval = newInterval;

} else {

logger.debug("Could not connect to any server. Didn"t have reconnection configured on the ClientSessionFactory");

return;

}

}

}

}

public RemotingConnection getConnection() {

if (closed)

throw new IllegalStateException("ClientSessionFactory is closed!");

if (!clientProtocolManager.isAlive())

return null;

synchronized (connectionLock) {

if (connection != null) {

// a connection already exists, so returning the same one

return connection;

} else {

RemotingConnection connection = establishNewConnection();

this.connection = connection;

//we check if we can actually connect.

// we do it here as to receive the reply connection has to be not null

//make sure to reset this.connection == null

if (connection != null && liveNodeID != null) {

try {

if (!clientProtocolManager.checkForFailover(liveNodeID)) {

connection.destroy();

this.connection = null;

return null;

}

} catch (ActiveMQException e) {

connection.destroy();

this.connection = null;

return null;

}

}

if (connection != null && serverLocator.getAfterConnectInternalListener() != null) {

serverLocator.getAfterConnectInternalListener().onConnection(this);

}

if (serverLocator.getTopology() != null) {

if (connection != null) {

if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {

logger.trace(this + "::Subscribing Topology");

}

clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());

}

} else {

logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));

}

return connection;

}

}

}

//......

}

  • reconnectSessions方法首先执行getConnectionWithRetry,然后挨个将oldListeners添加到新的connection中,最后遍历sessionsToFailover执行session.handleFailover(connection, cause),对于返回false的执行connection.destroy()然后return;getConnectionWithRetry方法通过getConnection()获取新连接并赋值给connection,如果connection为null则进行重试直到reconnectAttempts小于等于0,重试时通过waitForRetry(interval)来控制重试的间隔

小结

ClientSessionFactoryImpl的handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()

doc

  • ClientSessionFactoryImpl

以上是 聊聊artemis的handleConnectionFailure 的全部内容, 来源链接: utcz.com/z/513580.html

回到顶部