聊聊artemis的reconnectAttempts

reconnectAttempts
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {   //......
   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,
                                                    int reconnectAttempts) throws Exception {
      assertOpen();
      initialize();
      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
      addToConnecting(factory);
      try {
         try {
            factory.connect(reconnectAttempts);
         } catch (ActiveMQException e1) {
            //we need to make sure is closed just for garbage collection
            factory.close();
            throw e1;
         }
         addFactory(factory);
         return factory;
      } finally {
         removeFromConnecting(factory);
      }
   }
   //......
}
- ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)
ClientSessionFactoryImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {   //......
   public void connect(final int initialConnectAttempts) throws ActiveMQException {
      // Get the connection
      getConnectionWithRetry(initialConnectAttempts, null);
      if (connection == null) {
         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
         if (backupConfig != null) {
            msg.append(" and backup configuration ").append(backupConfig);
         }
         throw new ActiveMQNotConnectedException(msg.toString());
      }
   }
   private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
      if (!clientProtocolManager.isAlive())
         return;
      if (logger.isTraceEnabled()) {
         logger.trace("getConnectionWithRetry::" + reconnectAttempts +
                         " with retryInterval = " +
                         retryInterval +
                         " multiplier = " +
                         retryIntervalMultiplier, new Exception("trace"));
      }
      long interval = retryInterval;
      int count = 0;
      while (clientProtocolManager.isAlive()) {
         if (logger.isDebugEnabled()) {
            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
         }
         if (getConnection() != null) {
            if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
               // transferring old connection version into the new connection
               ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
            }
            if (logger.isDebugEnabled()) {
               logger.debug("Reconnection successful");
            }
            return;
         } else {
            // Failed to get connection
            if (reconnectAttempts != 0) {
               count++;
               if (reconnectAttempts != -1 && count == reconnectAttempts) {
                  if (reconnectAttempts != 1) {
                     ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
                  }
                  return;
               }
               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
                  ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
               }
               if (waitForRetry(interval))
                  return;
               // Exponential back-off
               long newInterval = (long) (interval * retryIntervalMultiplier);
               if (newInterval > maxRetryInterval) {
                  newInterval = maxRetryInterval;
               }
               interval = newInterval;
            } else {
               logger.debug("Could not connect to any server. Didn"t have reconnection configured on the ClientSessionFactory");
               return;
            }
         }
      }
   }
   public boolean waitForRetry(long interval) {
      try {
         if (clientProtocolManager.waitOnLatch(interval)) {
            return true;
         }
      } catch (InterruptedException ignore) {
         throw new ActiveMQInterruptedException(createTrace);
      }
      return false;
   }
   //......
}   
- ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待
ActiveMQClientProtocolManager
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
public class ActiveMQClientProtocolManager implements ClientProtocolManager {   //......
   private final CountDownLatch waitLatch = new CountDownLatch(1);
   //......
   public boolean waitOnLatch(long milliseconds) throws InterruptedException {
      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);
   }
   public void stop() {
      alive = false;
      synchronized (inCreateSessionGuard) {
         if (inCreateSessionLatch != null)
            inCreateSessionLatch.countDown();
      }
      Channel channel1 = getChannel1();
      if (channel1 != null) {
         channel1.returnBlocking();
      }
      waitLatch.countDown();
   }
   //......
}
- ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()
小结
ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待
doc
- ClientSessionFactoryImpl
以上是 聊聊artemis的reconnectAttempts 的全部内容, 来源链接: utcz.com/z/513412.html






