nacos服务自动注册原理源码解析

编程

服务自动注册核心类:NacosDiscoveryAutoConfiguration

源码:

@Configuration

@EnableConfigurationProperties

@ConditionalOnNacosDiscoveryEnabled

@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)

@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,

AutoServiceRegistrationAutoConfiguration.class })

public class NacosDiscoveryAutoConfiguration {

@Bean

public NacosServiceRegistry nacosServiceRegistry(

NacosDiscoveryProperties nacosDiscoveryProperties) {

return new NacosServiceRegistry(nacosDiscoveryProperties);

}

@Bean

@ConditionalOnBean(AutoServiceRegistrationProperties.class)

public NacosRegistration nacosRegistration(

NacosDiscoveryProperties nacosDiscoveryProperties,

ApplicationContext context) {

return new NacosRegistration(nacosDiscoveryProperties, context);

}

@Bean

@ConditionalOnBean(AutoServiceRegistrationProperties.class)

public NacosAutoServiceRegistration nacosAutoServiceRegistration(

NacosServiceRegistry registry,

AutoServiceRegistrationProperties autoServiceRegistrationProperties,

NacosRegistration registration) {

return new NacosAutoServiceRegistration(registry,

autoServiceRegistrationProperties, registration);

}

}

解释:

  • NacosServiceRegistry nacos服务注册实现类
  • NacosRegistration: nacos服务注册类
  • NacosAutoServiceRegistration: nacos服务自动注册类


NacosServiceRegistry服务注册类

@Bean

public NacosServiceRegistry nacosServiceRegistry(

NacosDiscoveryProperties nacosDiscoveryProperties) {

return new NacosServiceRegistry(nacosDiscoveryProperties);

}

说明:NacosServiceRegistry 自动依赖 NacosDiscoveryProperties,源码表明NacosDiscoveryProperties由spring.cloud.nacos.discovery

前缀自动封装的JavaBean,包含Nacos服务注册基本信息,例如:

spring.cloud.nacos.discovery.server-addr=${nacos.server.url}

spring.cloud.nacos.discovery.namespace=${spring.profiles.active}

spring.cloud.nacos.discovery.group=DEFAULT_GROUP

spring.cloud.nacos.discovery.username=${nacos.auth.username}

spring.cloud.nacos.discovery.password=${nacos.auth.password}

spring.cloud.nacos.discovery.metadata.version=1.1.0

spring.cloud.nacos.discovery.metadata.description=1.1.0版本

NacosDiscoveryProperties源码声明:

@ConfigurationProperties("spring.cloud.nacos.discovery")

public class NacosDiscoveryProperties


NacosRegistration nacos 服务注册类

作用:将NacosDiscoveryProperties配置转换为Spring Cloud 微服务规范实例,使用ServiceInstance接口标识

public class NacosRegistration implements Registration, ServiceInstance {

public static final String MANAGEMENT_PORT = "management.port";

public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path";

public static final String MANAGEMENT_ADDRESS = "management.address";

public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path";

private NacosDiscoveryProperties nacosDiscoveryProperties;

private ApplicationContext context;

public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties,

ApplicationContext context) {

this.nacosDiscoveryProperties = nacosDiscoveryProperties;

this.context = context;

}

/** springcloud 微服务原生数据追加到nacosDiscoveryProperties,便于配置数据统一 **/

@PostConstruct

public void init() {

Map<String, String> metadata = nacosDiscoveryProperties.getMetadata();

Environment env = context.getEnvironment();

String endpointBasePath = env.getProperty(MANAGEMENT_ENDPOINT_BASE_PATH);

if (!StringUtils.isEmpty(endpointBasePath)) {

metadata.put(MANAGEMENT_ENDPOINT_BASE_PATH, endpointBasePath);

}

Integer managementPort = ManagementServerPortUtils.getPort(context);

if (null != managementPort) {

metadata.put(MANAGEMENT_PORT, managementPort.toString());

String contextPath = env

.getProperty("management.server.servlet.context-path");

String address = env.getProperty("management.server.address");

if (!StringUtils.isEmpty(contextPath)) {

metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath);

}

if (!StringUtils.isEmpty(address)) {

metadata.put(MANAGEMENT_ADDRESS, address);

}

}

if (null != nacosDiscoveryProperties.getHeartBeatInterval()) {

metadata.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL,

nacosDiscoveryProperties.getHeartBeatInterval().toString());

}

if (null != nacosDiscoveryProperties.getHeartBeatTimeout()) {

metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,

nacosDiscoveryProperties.getHeartBeatTimeout().toString());

}

if (null != nacosDiscoveryProperties.getIpDeleteTimeout()) {

metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT,

nacosDiscoveryProperties.getIpDeleteTimeout().toString());

}

}

@Override

public String getServiceId() {

return nacosDiscoveryProperties.getService();

}

@Override

public String getHost() {

return nacosDiscoveryProperties.getIp();

}

@Override

public int getPort() {

return nacosDiscoveryProperties.getPort();

}

public void setPort(int port) {

this.nacosDiscoveryProperties.setPort(port);

}

@Override

public boolean isSecure() {

return nacosDiscoveryProperties.isSecure();

}

@Override

public URI getUri() {

return DefaultServiceInstance.getUri(this);

}

@Override

public Map<String, String> getMetadata() {

return nacosDiscoveryProperties.getMetadata();

}

public boolean isRegisterEnabled() {

return nacosDiscoveryProperties.isRegisterEnabled();

}

public String getCluster() {

return nacosDiscoveryProperties.getClusterName();

}

public float getRegisterWeight() {

return nacosDiscoveryProperties.getWeight();

}

public NacosDiscoveryProperties getNacosDiscoveryProperties() {

return nacosDiscoveryProperties;

}

public NamingService getNacosNamingService() {

return nacosDiscoveryProperties.namingServiceInstance();

}

@Override

public String toString() {

return "NacosRegistration{" + "nacosDiscoveryProperties="

+ nacosDiscoveryProperties + "}";

}

}

NacosAutoServiceRegistration nacos自动服务注册

作用:应用启动自动注册服务到Nacos服务注册中心

@Slf4j

public class NacosAutoServiceRegistration

extends AbstractAutoServiceRegistration<Registration> {

private final NacosRegistration registration;

public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,

AutoServiceRegistrationProperties autoServiceRegistrationProperties,

NacosRegistration registration) {

super(serviceRegistry, autoServiceRegistrationProperties);

this.registration = registration;

}

@Deprecated

public void setPort(int port) {

getPort().set(port);

}

@Override

protected NacosRegistration getRegistration() {

if (this.registration.getPort() < 0 && this.getPort().get() > 0) {

this.registration.setPort(this.getPort().get());

}

Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set");

return this.registration;

}

@Override

protected NacosRegistration getManagementRegistration() {

return null;

}

/** 服务注册 **/

@Override

protected void register() {

if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {

log.debug("Registration disabled.");

return;

}

if (this.registration.getPort() < 0) {

this.registration.setPort(getPort().get());

}

super.register();

}

@Override

protected void registerManagement() {

if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {

return;

}

super.registerManagement();

}

@Override

protected Object getConfiguration() {

return this.registration.getNacosDiscoveryProperties();

}

@Override

protected boolean isEnabled() {

return this.registration.getNacosDiscoveryProperties().isRegisterEnabled();

}

@Override

@SuppressWarnings("deprecation")

protected String getAppName() {

String appName = registration.getNacosDiscoveryProperties().getService();

return StringUtils.isEmpty(appName) ? super.getAppName() : appName;

}

}

注意:NacosAutoServiceRegistration 继承AbstractAutoServiceRegistration, AbstractAutoServiceRegistration实现ApplicationListener<WebServerInitializedEvent> 接口,

服务自动注册由其接口onApplicationEvent(WebServerInitializedEvent event) 实现,底层调用Register接口,因NacosAutoServiceRegistration重写register方法,具体注册逻辑源码如下:

/**

* 重写设置端口,然后调用父类register方法,register: this.serviceRegistry.register(getRegistration());,

* 根据配置依赖,serviceRegistry由NacosServiceRegistry实现

**/

@Override

protected void register() {

if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {

log.debug("Registration disabled.");

return;

}

if (this.registration.getPort() < 0) {

this.registration.setPort(getPort().get());

}

super.register();

}


NacosServiceRegistry nacos实际服务注册逻辑

@Slf4j

public class NacosServiceRegistry implements ServiceRegistry<Registration> {

private final NacosDiscoveryProperties nacosDiscoveryProperties;

private final NamingService namingService;

public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {

this.nacosDiscoveryProperties = nacosDiscoveryProperties;

this.namingService = nacosDiscoveryProperties.namingServiceInstance();

}

/**

* 服务注册

**/

@Override

public void register(Registration registration) {

if (StringUtils.isEmpty(registration.getServiceId())) {

log.warn("No service to register for nacos client...");

return;

}

String serviceId = registration.getServiceId();

String group = nacosDiscoveryProperties.getGroup();

Instance instance = getNacosInstanceFromRegistration(registration);

try {

namingService.registerInstance(serviceId, group, instance);

log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,

instance.getIp(), instance.getPort());

}

catch (Exception e) {

log.error("nacos registry, {} register failed...{},", serviceId,

registration.toString(), e);

}

}

/**

* 解除注册:例如应用关闭等场景在正常彻底关闭前会解除服务注册,而不是等心跳检测不健康注册中心自动剔除

**/

@Override

public void deregister(Registration registration) {

log.info("De-registering from Nacos Server now...");

if (StringUtils.isEmpty(registration.getServiceId())) {

log.warn("No dom to de-register for nacos client...");

return;

}

String serviceId = registration.getServiceId();

String group = nacosDiscoveryProperties.getGroup();

try {

namingService.deregisterInstance(serviceId, group, registration.getHost(),

registration.getPort(), nacosDiscoveryProperties.getClusterName());

}

catch (Exception e) {

log.error("ERR_NACOS_DEREGISTER, de-register failed...{},",

registration.toString(), e);

}

log.info("De-registration finished.");

}

@Override

public void close() {

}

@Override

public void setStatus(Registration registration, String status) {

if (!status.equalsIgnoreCase("UP") && !status.equalsIgnoreCase("DOWN")) {

log.warn("can"t support status {},please choose UP or DOWN", status);

return;

}

String serviceId = registration.getServiceId();

Instance instance = getNacosInstanceFromRegistration(registration);

instance.setEnabled(!status.equalsIgnoreCase("DOWN"));

try {

nacosDiscoveryProperties.namingMaintainServiceInstance()

.updateInstance(serviceId, instance);

}

catch (Exception e) {

throw new RuntimeException("update nacos instance status fail", e);

}

}

@Override

public Object getStatus(Registration registration) {

String serviceName = registration.getServiceId();

try {

List<Instance> instances = nacosDiscoveryProperties.namingServiceInstance()

.getAllInstances(serviceName);

for (Instance instance : instances) {

if (instance.getIp().equalsIgnoreCase(nacosDiscoveryProperties.getIp())

&& instance.getPort() == nacosDiscoveryProperties.getPort())

return instance.isEnabled() ? "UP" : "DOWN";

}

}

catch (Exception e) {

log.error("get all instance of {} error,", serviceName, e);

}

return null;

}

private Instance getNacosInstanceFromRegistration(Registration registration) {

Instance instance = new Instance();

instance.setIp(registration.getHost());

instance.setPort(registration.getPort());

instance.setWeight(nacosDiscoveryProperties.getWeight());

instance.setClusterName(nacosDiscoveryProperties.getClusterName());

instance.setMetadata(registration.getMetadata());

return instance;namingService

}

}

分析: 实际注册逻辑:namingService.registerInstance(serviceId, group, instance);


namingService 真实服务注册( 实现: NacosNamingService)

NacosNamingService 服务注册:

@Override

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

// 判断服务节点是否是临时节点

// 临时服务节点: 心跳检测服务不健康后自动剔除,不显示在注册中心

// 持久服务节点: 服务节点信息状态信息存储数据库,若服务不健康,消息中心显示不健康状态,但不会剔除显示

//beatReactor 心跳反应堆,默认定时任务5s发送心跳信号

if (instance.isEphemeral()) {

BeatInfo beatInfo = new BeatInfo();

beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));

beatInfo.setIp(instance.getIp());

beatInfo.setPort(instance.getPort());

beatInfo.setCluster(instance.getClusterName());

beatInfo.setWeight(instance.getWeight());

beatInfo.setMetadata(instance.getMetadata());

beatInfo.setScheduled(false);

beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);

}

// nacos 服务注册中心客户端,执行具体的服务注册请求

serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

}


serverProxy.registerService 底层Http服务注册(实现:NamingProxy)

//服务注册: 配置Http请求基础参数

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

log.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",

namespaceId, serviceName, instance);

final Map<String, String> params = new HashMap<String, String>(9);

params.put(CommonParams.NAMESPACE_ID, namespaceId);

params.put(CommonParams.SERVICE_NAME, serviceName);

params.put(CommonParams.GROUP_NAME, groupName);

params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());

params.put("ip", instance.getIp());

params.put("port", String.valueOf(instance.getPort()));

params.put("weight", String.valueOf(instance.getWeight()));

params.put("enable", String.valueOf(instance.isEnabled()));

params.put("healthy", String.valueOf(instance.isHealthy()));

params.put("ephemeral", String.valueOf(instance.isEphemeral()));

params.put("metadata", JSON.toJSONString(instance.getMetadata()));

reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {

params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {

throw new NacosException(NacosException.INVALID_PARAM, "no server available");

}

NacosException exception = new NacosException();

if (servers != null && !servers.isEmpty()) {

Random random = new Random(System.currentTimeMillis());

int index = random.nextInt(servers.size());

// spring.cloud.nacos.config.server-addr 配置的nacos服务集群同步注册,保证每个nacos集群节点包含服务注册信息

for (int i = 0; i < servers.size(); i++) {

String server = servers.get(index);

try {

return callServer(api, params, body, server, method);

} catch (NacosException e) {

exception = e;

if (log.isDebugEnabled()) {

log.debug("request {} failed.", server, e);

}

}

index = (index + 1) % servers.size();

}

}

if (StringUtils.isNotBlank(nacosDomain)) {

for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {

try {

return callServer(api, params, body, nacosDomain, method);

} catch (NacosException e) {

exception = e;

if (log.isDebugEnabled()) {

log.debug("request {} failed.", nacosDomain, e);

}

}

}

}

log.error("request: {} failed, servers: {}, code: {}, msg: {}",

api, servers, exception.getErrCode(), exception.getErrMsg());

throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "

+ exception.getMessage());

}

//发送Http注册请求:/nacos/v1/ns/instance (POST请求)

public String callServer(String api, Map<String, String> params, String body, String curServer, String method)

throws NacosException {

long start = System.currentTimeMillis();

long end = 0;

injectSecurityInfo(params);

List<String> headers = builderHeaders();

String url;

if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {

url = curServer + api;

} else {

if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {

curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;

}

url = HttpClient.getPrefix() + curServer + api;

}

HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);

end = System.currentTimeMillis();

MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))

.observe(end - start);

if (HttpURLConnection.HTTP_OK == result.code) {

return result.content;

}

if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {

return StringUtils.EMPTY;

}

throw new NacosException(result.code, result.content);

}

以上是 nacos服务自动注册原理源码解析 的全部内容, 来源链接: utcz.com/z/517332.html

回到顶部