聊聊artemis的reconnectAttempts

编程

reconnectAttempts

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {

//......

public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,

int reconnectAttempts) throws Exception {

assertOpen();

initialize();

ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);

addToConnecting(factory);

try {

try {

factory.connect(reconnectAttempts);

} catch (ActiveMQException e1) {

//we need to make sure is closed just for garbage collection

factory.close();

throw e1;

}

addFactory(factory);

return factory;

} finally {

removeFromConnecting(factory);

}

}

//......

}

  • ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)

ClientSessionFactoryImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

//......

public void connect(final int initialConnectAttempts) throws ActiveMQException {

// Get the connection

getConnectionWithRetry(initialConnectAttempts, null);

if (connection == null) {

StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);

if (backupConfig != null) {

msg.append(" and backup configuration ").append(backupConfig);

}

throw new ActiveMQNotConnectedException(msg.toString());

}

}

private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {

if (!clientProtocolManager.isAlive())

return;

if (logger.isTraceEnabled()) {

logger.trace("getConnectionWithRetry::" + reconnectAttempts +

" with retryInterval = " +

retryInterval +

" multiplier = " +

retryIntervalMultiplier, new Exception("trace"));

}

long interval = retryInterval;

int count = 0;

while (clientProtocolManager.isAlive()) {

if (logger.isDebugEnabled()) {

logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);

}

if (getConnection() != null) {

if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {

// transferring old connection version into the new connection

((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());

}

if (logger.isDebugEnabled()) {

logger.debug("Reconnection successful");

}

return;

} else {

// Failed to get connection

if (reconnectAttempts != 0) {

count++;

if (reconnectAttempts != -1 && count == reconnectAttempts) {

if (reconnectAttempts != 1) {

ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);

}

return;

}

if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {

ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);

}

if (waitForRetry(interval))

return;

// Exponential back-off

long newInterval = (long) (interval * retryIntervalMultiplier);

if (newInterval > maxRetryInterval) {

newInterval = maxRetryInterval;

}

interval = newInterval;

} else {

logger.debug("Could not connect to any server. Didn"t have reconnection configured on the ClientSessionFactory");

return;

}

}

}

}

public boolean waitForRetry(long interval) {

try {

if (clientProtocolManager.waitOnLatch(interval)) {

return true;

}

} catch (InterruptedException ignore) {

throw new ActiveMQInterruptedException(createTrace);

}

return false;

}

//......

}

  • ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

public class ActiveMQClientProtocolManager implements ClientProtocolManager {

//......

private final CountDownLatch waitLatch = new CountDownLatch(1);

//......

public boolean waitOnLatch(long milliseconds) throws InterruptedException {

return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);

}

public void stop() {

alive = false;

synchronized (inCreateSessionGuard) {

if (inCreateSessionLatch != null)

inCreateSessionLatch.countDown();

}

Channel channel1 = getChannel1();

if (channel1 != null) {

channel1.returnBlocking();

}

waitLatch.countDown();

}

//......

}

  • ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()

小结

ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

doc

  • ClientSessionFactoryImpl

以上是 聊聊artemis的reconnectAttempts 的全部内容, 来源链接: utcz.com/z/513412.html

回到顶部