python项目注册nacos,健康实例数不稳定,是为什么?
使用Tornado框架,向2.0版本的nacos注册服务,然后每隔5秒发送心跳。注册服务,发送心跳都是用的V2版本的API。
可以成功注册,在日志也可以看到每隔5秒1次的心跳发送成功,但是在nacos的管理页面刷新实例列表,实例数和健康实例数一直在变,不稳定。
我会有4个服务节点注册到nacos, 实例数和健康实例数一直在1到4之前变化,每次刷新管理平台页面都不一样。
请问是什么原因呢?有没有人遇到过这个问题,,
nacos客户端类
import json, tornado.genimport aiohttp
from libs.apollo.apollo_util import init_ip
from libs.apollo.client import AsyncApollo
import logging
import asyncio
logger = logging.getLogger(__name__)
DEFAULT_GROUP_NAME = "DEFAULT_GROUP"
class NacosException(Exception):
pass
class NacosRequestException(NacosException):
pass
class NacosClient(object):
def __init__(self, server_addresses, namespace=None, username=None, password=None):
self.server_list = server_addresses.split(',')
self.current_server = self.server_list[0]
self.namespace = namespace
self.username = username
self.password = password
self.server_offset = 0
self.service_ip = init_ip()
def change_server(self):
self.server_offset = (self.server_offset + 1) % len(self.server_list)
self.current_server = self.server_list[self.server_offset]
def get_server(self):
return self.current_server
def _build_metadata(self, metadata, params):
if metadata:
if isinstance(metadata, dict):
params["metadata"] = json.dumps(metadata)
else:
params["metadata"] = metadata
async def _async_post(self, url, **kwargs):
conn = aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False, limit=5)
async with aiohttp.ClientSession(connector=conn) as session:
async with session.post(url, timeout=10, **kwargs) as response:
return await response.json()
async def _async_put(self, url, **kwargs):
conn = aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False, limit=5)
async with aiohttp.ClientSession(connector=conn) as session:
async with session.put(url, timeout=10, **kwargs) as response:
return await response.json()
async def _async_delete(self, url, **kwargs):
conn = aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False, limit=5)
async with aiohttp.ClientSession(connector=conn) as session:
async with session.delete(url, timeout=10, **kwargs) as response:
return await response.json()
async def register(self, service_name, port, cluster_name='default', weight=1.0, metadata=None,
enable=True, healthy=True, ephemeral=True, group_name=DEFAULT_GROUP_NAME):
url = '/nacos/v2/ns/instance'
params = {
'ip': self.service_ip,
'port': port,
'serviceName': service_name,
'weight': weight,
'enable': enable,
'healthy': healthy,
'clusterName': cluster_name,
'ephemeral': ephemeral,
'groupName': group_name,
'namespaceId': self.namespace,
'username': self.username,
'password': self.password
}
self._build_metadata(metadata, params)
tries = 0
while True:
try:
response = await self._async_post(self.current_server + url, data=params)
logger.info(response)
if response['data'] == 'ok':
return 'ok'
except Exception as e:
logger.error(e)
tries += 1
if tries >= len(self.server_list):
raise NacosRequestException("All server are not available")
self.change_server()
async def deregister(self, service_name, port, cluster_name='default', ephemeral=True, group_name=DEFAULT_GROUP_NAME):
url = '/nacos/v2/ns/instance'
params = {
'serviceName': service_name,
'ip': self.service_ip,
'port': port,
'namespaceId': self.namespace,
'healthy': True,
'username': self.username,
'password': self.password,
'ephemeral': ephemeral,
'groupName': group_name,
'clusterName': cluster_name
}
tries = 0
while True:
try:
response = await self._async_delete(self.current_server + url, data=params)
logger.info(response)
if response['data'] == 'ok':
return 'ok'
except Exception as e:
logger.error(e)
tries += 1
if tries >= len(self.server_list):
raise NacosRequestException("deregister instance error!")
self.change_server()
async def send_beat(self, service_name, port, cluster_name='default', weight=1.0,
ephemeral=True, group_name=DEFAULT_GROUP_NAME):
url = '/nacos/v2/ns/instance/beat'
beat_data = {
'serviceName': f'{group_name}@@{service_name}',
'ip': self.service_ip,
'port': port,
'weight': weight,
'ephemeral': ephemeral,
'cluster': cluster_name
}
params = {
'serviceName': f'{group_name}@@{service_name}',
'beat': json.dumps(beat_data),
'groupName': group_name,
'namespaceId': self.namespace,
'username': self.username,
'password': self.password
}
tries = 0
while True:
try:
response = await self._async_put(self.current_server + url, data=params)
logger.info(response)
if response['code'] == 10200:
return 'ok'
except Exception as e:
logger.error(e)
tries += 1
if tries >= len(self.server_list):
return
self.change_server()
async def init_nacos_new():
host = await AsyncApollo.get_value('nacos_host', namespace='risk-model-nacos')
namespace = await AsyncApollo.get_value('nacos_namespace', namespace='')
username = await AsyncApollo.get_value('nacos_username', namespace='')
password = await AsyncApollo.get_value('nacos_password', namespace='')
return NacosClient(host, namespace, username, password)
nacos_client = asyncio.get_event_loop().run_until_complete(init_nacos_new())
async def register_instance(service_name):
logger.info('start register instance to nacos!')
await nacos_client.register(service_name, 8094)
async def deregister_instance(service_name):
logger.info('start deregister instance from nacos!')
await nacos_client.deregister(service_name, 8094)
async def send_heartbeat(service_name):
while True:
await nacos_client.send_beat(service_name, 8094)
logger.info('Send heartbeat successful')
await tornado.gen.sleep(5)
项目启动:
def start_app(args): io_loop = tornado.ioloop.IOLoop.current()
app = Application(
urls, log_function=log_func, debug=CurrentConfig.DEBUG
)
app.listen(args.port, args.host, xheaders=True)
io_loop.spawn_callback(register_instance, name)
io_loop.spawn_callback(send_heartbeat, name)
io_loop.start()
回答:
没人回复,我自己来回复吧。
因为使用的是2.X版本的Nacos,所以下意识的会使用V2版本的API发送心跳,出现了上述我说的问题。
于是我把注册,发送心跳,注销的接口都改为了V1版本,似乎解决了健康实例数不稳定的问题。
这是否意味着V2版本的API有一些问题呢?官方的支持2.X版本的python sdk一直都没有,所以只能先这样了。
以上是 python项目注册nacos,健康实例数不稳定,是为什么? 的全部内容, 来源链接: utcz.com/p/939039.html