【SpringCloudEureka源码】EurekaServer处理服务注册过程

编程

 

Spring Cloud Eureka Server自动配置及初始化

@EnableEurekaServer

@EnableDiscoveryClient

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

@Documented

@Import(EurekaServerMarkerConfiguration.class)

public@interface EnableEurekaServer {

}

 

导入EurekaServerMarkerConfiguration: 激活EurekaServerAutoConfiguration

/**
 * Responsible for adding in a marker bean to activate
 * {@link
EurekaServerAutoConfiguration}.
 *
 * @author Biju Kunjummen
 */
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

   @Bean
   public Marker eurekaServerMarkerBean() {
      return new Marker();
   }
   class Marker {
   }
}

EurekaServerAutoConfiguration - 注册服务自动配置

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
      InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer

@Import(EurekaServerInitializerConfiguration.class)

@ConfigurationProperties(PREFIX)

public class InstanceRegistryProperties {

    。。。

    /**

     * Prefix for Eureka instance registry properties.

     */

    public static final String PREFIX = "eureka.instance.registry";

    /*

     * Default number of expected client, defaults to 1. Setting

     * expectedNumberOfClientsSendingRenews to non-zero to ensure that even an isolated

     * server can adjust its eviction policy to the number of registrations (when it"s

     * zero, even a successful registration won"t reset the rate threshold in

     * InstanceRegistry.register()).

       预期客户机的默认数量,默认为1。设置ExpectedNumberOfClientsSendings更新为非零,以确保

       服务器可以根据注册数量调整其逐出策略(当它为零,即使成功注册也不会重新设置实例注册表.register()的阈值).

     */

    @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards

    // compatibility

    private int expectedNumberOfClientsSendingRenews = 1;

 

    /**

     * Value used in determining when leases are cancelled, default to 1 for standalone.

     * Should be set to 0 for peer replicated eurekas

       决定租约何时取消的值

     * 单机默认值为1,对于同行复制的eurekas,应设置为0

     */

    @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility

    private int defaultOpenForTrafficCount = 1;

    。。

}

EurekaController# dashboardPath = /

@Controller

@RequestMapping("${eureka.dashboard.path:/}")

public class EurekaController {

    。。。

    @Value("${eureka.dashboard.path:/}")

    private String dashboardPath = "";

    。。。

}

EurekaServerAutoConfiguration#peerAwareInstanceRegistry

@Bean

public PeerAwareInstanceRegistry peerAwareInstanceRegistry(

       ServerCodecs serverCodecs) {

    this.eurekaClient.getApplications(); // force initialization 

    // 强制初始化eurekaClient,在之前看RefreshScopebug时,也使用到了这种方式强制创建eurekaClient

    // 创建InstanceRegistry(是spring cloud的实现)

    // 继承了PeerAwareInstanceRegistryImplPeerAwareInstanceRegistry接口的实现类

    returnnew InstanceRegistry(

            this.eurekaServerConfig, this.eurekaClientConfig,

           serverCodecs, this.eurekaClient,

           this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),

           this.instanceRegistryProperties.getDefaultOpenForTrafficCount());

}

EurekaServerAutoConfiguration#eurekaServerContext

    @Bean

    @ConditionalOnMissingBean

    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,

           PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {

       return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,

              registry, peerEurekaNodes, this.applicationInfoManager);

    }

DefaultEurekaServerContext# DefaultEurekaServerContext

    @Inject

    public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) {

        this.serverConfig = serverConfig;

        this.serverCodecs = serverCodecs;

        this.registry = registry;

        this.peerEurekaNodes = peerEurekaNodes;

        this.applicationInfoManager = applicationInfoManager;

    }

DefaultEurekaServerContext# initialize

    @PostConstruct

    public void initialize() {

        logger.info("Initializing ...");

   // 会启动更新PeerNode列表的定时线程

        this.peerEurekaNodes.start();

        try {

     // 启动numberOfReplicationsLastMin定时线程、initializedResponseCache()scheduleRenewalThresholdUpdateTask()initRemoteRegionRegistry(),还有添加JMX监控

            this.registry.init(this.peerEurekaNodes);

        } catch (Exception var2) {

            throw new RuntimeException(var2);

        }

        logger.info("Initialized");

}

@Override

    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {

        this.numberOfReplicationsLastMin.start();

        this.peerEurekaNodes = peerEurekaNodes;

        initializedResponseCache();

//计划定期更新<em>续订阈值的任务</em>

//续订阈值将用于确定续订是否减少

//因为网络分区和保护过期太厉害了

//一次有很多实例。

        scheduleRenewalThresholdUpdateTask();

//同步

        initRemoteRegionRegistry();

        try {

            Monitors.registerObject(this);

        } catch (Throwable e) {

            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);

        }

    }

// Eureka Server启动引导,会在Spring容器基本refresh()完毕时由//EurekaServerInitializerConfiguration#run()方法真正调用//eurekaServerBootstrap.contextInitialized()初始化,其中会//initEurekaEnvironment()initEurekaServerContext() Eureka Server启动分析重点

   @Bean

   public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,

          EurekaServerContext serverContext) {

       return new EurekaServerBootstrap(this.applicationInfoManager,

              this.eurekaClientConfig, this.eurekaServerConfig, registry,

              serverContext);

   }

DefaultEurekaServerContext#jerseyFilterRegistration

   /**

    * Register the Jersey filter.

所有/eureka的请求都需要经过Jersery Filter,其处理类是com.sun.jersey.spi.container.servlet.ServletContainer,其既是Filter,也是Servlet,包含Jersey的处理逻辑。在构造时已经将com.netflix.discovery com.netflix.eureka下的类作为处理请求的资源导入,如处理单个应用请求的com.netflix.eureka.resources.ApplicationResource

    * @param eurekaJerseyApp an {@link Application} for the filter to be registered

    * @return a jersey {@link FilterRegistrationBean}

    */

   @Bean

   public FilterRegistrationBean<?> jerseyFilterRegistration(

          javax.ws.rs.core.Application eurekaJerseyApp) {

       FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();

       bean.setFilter(new ServletContainer(eurekaJerseyApp));

       bean.setOrder(Ordered.LOWEST_PRECEDENCE);

       bean.setUrlPatterns(

              Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

 

       return bean;

   }

Eureka所有的动作都在EurekaServerContext

public interface EurekaServerContext {

    void initialize() throws Exception;

    void shutdown() throws Exception;

    EurekaServerConfig getServerConfig();

    PeerEurekaNodes getPeerEurekaNodes();

    ServerCodecs getServerCodecs();

    PeerAwareInstanceRegistry getRegistry();

    ApplicationInfoManager getApplicationInfoManager();

}

DefaultEurekaServerContext# initialize

@Singleton

public class DefaultEurekaServerContext implements EurekaServerContext {

    private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);

    private final EurekaServerConfig serverConfig;

    private final ServerCodecs serverCodecs;

    private final PeerAwareInstanceRegistry registry;

    private final PeerEurekaNodes peerEurekaNodes;

    private final ApplicationInfoManager applicationInfoManager;

    @Inject

    public DefaultEurekaServerContext(EurekaServerConfig serverConfig,

                               ServerCodecs serverCodecs,

                               PeerAwareInstanceRegistry registry,

                               PeerEurekaNodes peerEurekaNodes,

                               ApplicationInfoManager applicationInfoManager) {

        this.serverConfig = serverConfig;

        this.serverCodecs = serverCodecs;

        this.registry = registry;

        this.peerEurekaNodes = peerEurekaNodes;

        this.applicationInfoManager = applicationInfoManager;

    }

   @PostConstruct

    @Override

    public void initialize() {

        logger.info("Initializing ...");

        peerEurekaNodes.start();

        try {

            registry.init(peerEurekaNodes);

        } catch (Exception e) {

            throw new RuntimeException(e);

        }

        logger.info("Initialized");

    }

   。。。

}

 

peerEurekaNodes#start

public void start() {

// 后台运行的单线程定时任务执行器,定时线程名:Eureka-PeerNodesUpdater

        taskExecutor = Executors.newSingleThreadScheduledExecutor(

                new ThreadFactory() {

                    @Override

                    public Thread newThread(Runnable r) {

                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");

                        thread.setDaemon(true);

                        return thread;

                    }

                }

        );

        try {

       // 解析Eureka Server URL,并更新PeerEurekaNodes列表

            updatePeerEurekaNodes(resolvePeerUrls());

          // 启动定时执行任务peersUpdateTask(定时默认10min,由peerEurekaNodesUpdateIntervalMs配置)

            Runnable peersUpdateTask = new Runnable() {

                @Override

                public void run() {

                    try {

                        updatePeerEurekaNodes(resolvePeerUrls());

                    } catch (Throwable e) {

                        logger.error("Cannot update the replica Nodes", e);

                    }

 

                }

            };

            taskExecutor.scheduleWithFixedDelay(

                    peersUpdateTask,

                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),

                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),

                    TimeUnit.MILLISECONDS

            );

        } catch (Exception e) {

            throw new IllegalStateException(e);

        }

       // 打印对等体节点(应该没有当前节点自己)

        for (PeerEurekaNode node : peerEurekaNodes) {

            logger.info("Replica node URL:  {}", node.getServiceUrl());

        }

    }

 

 

PeerEurekaNodes#resolvePeerUrls

   /**

     * Resolve peer URLs.

     * 解析配置的对等体URL

     * @return peer URLs with node"s own URL filtered out

     */

    protected List<String> resolvePeerUrls() {

   // 当前Eureka Server自己的InstanceInfo信息

        InstanceInfo myInfo = applicationInfoManager.getInfo();

       // 当前Eureka Server所在的zone,默认是 defaultZone

        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);

       // 获取配置的service-url

        List<String> replicaUrls = EndpointUtils

                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

// 遍历service-url,排除自己

        int idx = 0;

        while (idx < replicaUrls.size()) {

            if (isThisMyUrl(replicaUrls.get(idx))) {

                replicaUrls.remove(idx);

            } else {

                idx++;

            }

        }

        return replicaUrls;

    }

 

 

PeerEurekaNodes#updatePeerEurekaNodes 更新PeerEurekaNodes列表#

 

// PeerEurekaNodes#updatePeerEurekaNodes()

// newPeerUrls为本次要更新的Eureka对等体URL列表

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {

    if (newPeerUrls.isEmpty()) {

        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");

        return;

    }

 

    // 计算peerEurekaNodeUrls - newPeerUrls 的差集,就是多余可shutdown节点

    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);

    toShutdown.removeAll(newPeerUrls);

   

    // 计算newPeerUrls - peerEurekaNodeUrls 的差集,就是需要新增节点

    Set<String> toAdd = new HashSet<>(newPeerUrls);

    toAdd.removeAll(peerEurekaNodeUrls);

 

    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 没有变更

        return;

    }

 

    // Remove peers no long available

    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

 

    // shutDown多余节点

    if (!toShutdown.isEmpty()) {

        logger.info("Removing no longer available peer nodes {}", toShutdown);

        int i = 0;

        while (i < newNodeList.size()) {

            PeerEurekaNode eurekaNode = newNodeList.get(i);

            if (toShutdown.contains(eurekaNode.getServiceUrl())) {

                newNodeList.remove(i);

                eurekaNode.shutDown();

            } else {

                i++;

            }

        }

    }

 

    // Add new peers

    // 添加新的peerEurekaNode - createPeerEurekaNode()

    if (!toAdd.isEmpty()) {

        logger.info("Adding new peer nodes {}", toAdd);

        for (String peerUrl : toAdd) {

            newNodeList.add(createPeerEurekaNode(peerUrl));

        }

    }

 

    this.peerEurekaNodes = newNodeList;

    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);

}

 

未完待续

以上是 【SpringCloudEureka源码】EurekaServer处理服务注册过程 的全部内容, 来源链接: utcz.com/z/518303.html

回到顶部