【SpringCloudEureka源码】EurekaServer处理服务注册过程
Spring Cloud Eureka Server自动配置及初始化
@EnableEurekaServer
@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public@interface EnableEurekaServer {
}
导入EurekaServerMarkerConfiguration
: 激活EurekaServerAutoConfiguration
/**
* Responsible for adding in a marker bean to activate
* {@link EurekaServerAutoConfiguration}.
*
* @author Biju Kunjummen
*/
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
EurekaServerAutoConfiguration - 注册服务自动配置类
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer
@Import(EurekaServerInitializerConfiguration.class)
@ConfigurationProperties(PREFIX)
public class InstanceRegistryProperties {
。。。
/**
* Prefix for Eureka instance registry properties.
*/
public static final String PREFIX = "eureka.instance.registry";
/*
* Default number of expected client, defaults to 1. Setting
* expectedNumberOfClientsSendingRenews to non-zero to ensure that even an isolated
* server can adjust its eviction policy to the number of registrations (when it"s
* zero, even a successful registration won"t reset the rate threshold in
* InstanceRegistry.register()).
预期客户机的默认数量,默认为1。设置ExpectedNumberOfClientsSendings更新为非零,以确保
服务器可以根据注册数量调整其逐出策略(当它为零,即使成功注册也不会重新设置实例注册表.register()的阈值).
*/
@Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards
// compatibility
private int expectedNumberOfClientsSendingRenews = 1;
/**
* Value used in determining when leases are cancelled, default to 1 for standalone.
* Should be set to 0 for peer replicated eurekas
决定租约何时取消的值
* 单机默认值为1,对于同行复制的eurekas,应设置为0
*/
@Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility
private int defaultOpenForTrafficCount = 1;
。。
}
EurekaController# dashboardPath = /
@Controller
@RequestMapping("${eureka.dashboard.path:/}")
public class EurekaController {
。。。
@Value("${eureka.dashboard.path:/}")
private String dashboardPath = "";
。。。
}
EurekaServerAutoConfiguration#peerAwareInstanceRegistry
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
// 强制初始化eurekaClient,在之前看RefreshScope的bug时,也使用到了这种方式强制创建eurekaClient
// 创建InstanceRegistry(是spring cloud的实现)
// 继承了PeerAwareInstanceRegistryImpl,PeerAwareInstanceRegistry接口的实现类
returnnew InstanceRegistry(
this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
EurekaServerAutoConfiguration#eurekaServerContext
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
DefaultEurekaServerContext# DefaultEurekaServerContext
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
DefaultEurekaServerContext# initialize
@PostConstruct
public void initialize() {
logger.info("Initializing ...");
// 会启动更新PeerNode列表的定时线程
this.peerEurekaNodes.start();
try {
// 启动numberOfReplicationsLastMin定时线程、initializedResponseCache()、scheduleRenewalThresholdUpdateTask()、initRemoteRegionRegistry(),还有添加JMX监控
this.registry.init(this.peerEurekaNodes);
} catch (Exception var2) {
throw new RuntimeException(var2);
}
logger.info("Initialized");
}
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
initializedResponseCache();
//计划定期更新<em>续订阈值的任务</em>。
//续订阈值将用于确定续订是否减少
//因为网络分区和保护过期太厉害了
//一次有很多实例。
scheduleRenewalThresholdUpdateTask();
//同步
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
// Eureka Server启动引导,会在Spring容器基本refresh()完毕时由//EurekaServerInitializerConfiguration#run()方法真正调用//eurekaServerBootstrap.contextInitialized()
初始化,其中会//initEurekaEnvironment()
、initEurekaServerContext()
Eureka Server启动分析重点
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
DefaultEurekaServerContext#jerseyFilterRegistration
/**
* Register the Jersey filter.
所有/eureka的请求都需要经过Jersery Filter,其处理类是com.sun.jersey.spi.container.servlet.ServletContainer,其既是Filter,也是Servlet,包含Jersey的处理逻辑。在构造时已经将com.netflix.discovery包和 com.netflix.eureka包下的类作为处理请求的资源导入,如处理单个应用请求的com.netflix.eureka.resources.ApplicationResource
* @param eurekaJerseyApp an {@link Application} for the filter to be registered
* @return a jersey {@link FilterRegistrationBean}
*/
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
Eureka所有的动作都在EurekaServerContext
public interface EurekaServerContext {
void initialize() throws Exception;
void shutdown() throws Exception;
EurekaServerConfig getServerConfig();
PeerEurekaNodes getPeerEurekaNodes();
ServerCodecs getServerCodecs();
PeerAwareInstanceRegistry getRegistry();
ApplicationInfoManager getApplicationInfoManager();
}
DefaultEurekaServerContext# initialize
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
private final EurekaServerConfig serverConfig;
private final ServerCodecs serverCodecs;
private final PeerAwareInstanceRegistry registry;
private final PeerEurekaNodes peerEurekaNodes;
private final ApplicationInfoManager applicationInfoManager;
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes,
ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
。。。
}
peerEurekaNodes#start
public void start() {
// 后台运行的单线程定时任务执行器,定时线程名:Eureka-PeerNodesUpdater
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 解析Eureka Server URL,并更新PeerEurekaNodes列表
updatePeerEurekaNodes(resolvePeerUrls());
// 启动定时执行任务peersUpdateTask(定时默认10min,由peerEurekaNodesUpdateIntervalMs配置)
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
// 打印对等体节点(应该没有当前节点自己)
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
PeerEurekaNodes#resolvePeerUrls
/**
* Resolve peer URLs.
* 解析配置的对等体URL
* @return peer URLs with node"s own URL filtered out
*/
protected List<String> resolvePeerUrls() {
// 当前Eureka Server自己的InstanceInfo信息
InstanceInfo myInfo = applicationInfoManager.getInfo();
// 当前Eureka Server所在的zone,默认是 defaultZone
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
// 获取配置的service-url
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
// 遍历service-url,排除自己
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
PeerEurekaNodes#updatePeerEurekaNodes 更新PeerEurekaNodes列表#
// PeerEurekaNodes#updatePeerEurekaNodes()
// newPeerUrls为本次要更新的Eureka对等体URL列表
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 计算原peerEurekaNodeUrls - 新newPeerUrls 的差集,就是多余可shutdown节点
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
// 计算新newPeerUrls - 原peerEurekaNodeUrls 的差集,就是需要新增节点
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 没有变更
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// shutDown多余节点
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
// 添加新的peerEurekaNode - createPeerEurekaNode()
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
未完待续
以上是 【SpringCloudEureka源码】EurekaServer处理服务注册过程 的全部内容, 来源链接: utcz.com/z/518303.html