聊聊skywalking的ServiceResetCommand

编程

本文主要研究一下skywalking的ServiceResetCommand

ServiceResetCommand

skywalking-6.6.0/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ServiceResetCommand.java

public class ServiceResetCommand extends BaseCommand implements Serializable, Deserializable<ServiceResetCommand> {

public static final Deserializable<ServiceResetCommand> DESERIALIZER = new ServiceResetCommand("");

public static final String NAME = "ServiceMetadataReset";

public ServiceResetCommand(String serialNumber) {

super(NAME, serialNumber);

}

@Override

public Command.Builder serialize() {

return commandBuilder();

}

@Override

public ServiceResetCommand deserialize(Command command) {

final List<KeyStringValuePair> argsList = command.getArgsList();

String serialNumber = null;

for (final KeyStringValuePair pair : argsList) {

if ("SerialNumber".equals(pair.getKey())) {

serialNumber = pair.getValue();

break;

}

}

return new ServiceResetCommand(serialNumber);

}

}

  • ServiceResetCommand继承了BaseCommand,实现了Serializable、Deserializable接口

InstancePing.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/register/InstancePing.proto

syntax = "proto3";

option java_multiple_files = true;

option java_package = "org.apache.skywalking.apm.network.register.v2";

option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";

service ServiceInstancePing {

rpc doPing (ServiceInstancePingPkg) returns (Commands) {

}

}

message ServiceInstancePingPkg {

int32 serviceInstanceId = 1;

int64 time = 2;

string serviceInstanceUUID = 3;

}

  • InstancePing.proto定义了ServiceInstancePing服务,它定义了doPing方法

ServiceInstancePingServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java

public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler {

private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingServiceHandler.class);

private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;

private final IServiceInventoryRegister serviceInventoryRegister;

private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;

private final CommandService commandService;

public ServiceInstancePingServiceHandler(ModuleManager moduleManager) {

this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);

this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);

this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);

this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class);

}

@Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {

int serviceInstanceId = request.getServiceInstanceId();

long heartBeatTime = request.getTime();

serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);

ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);

if (Objects.nonNull(serviceInstanceInventory)) {

serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);

responseObserver.onNext(Commands.getDefaultInstance());

} else {

logger.warn("Can"t find service by service instance id from cache," +

" service instance id is: {}, will send a reset command to agent side", serviceInstanceId);

final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID());

final Command command = resetCommand.serialize().build();

final Commands nextCommands = Commands.newBuilder().addCommands(command).build();

responseObserver.onNext(nextCommands);

}

responseObserver.onCompleted();

}

}

  • ServiceInstancePingServiceHandler继承了ServiceInstancePingGrpc.ServiceInstancePingImplBase,实现了GRPCHandler接口;其构造器获取serviceInstanceInventoryCache、serviceInventoryRegister、serviceInstanceInventoryRegister、commandService;其doPing方法执行serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime),若serviceInstanceInventoryCache.get(serviceInstanceId)为nul则给agent发送ServiceResetCommand

ServiceResetCommandExecutor

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ServiceResetCommandExecutor.java

public class ServiceResetCommandExecutor implements CommandExecutor {

private static final ILog LOGGER = LogManager.getLogger(ServiceResetCommandExecutor.class);

@Override

public void execute(final BaseCommand command) throws CommandExecutionException {

LOGGER.warn("Received ServiceResetCommand, a re-register task is scheduled.");

ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown();

RemoteDownstreamConfig.Agent.SERVICE_ID = DictionaryUtil.nullValue();

RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = DictionaryUtil.nullValue();

RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = DictionaryUtil.nullValue();

NetworkAddressDictionary.INSTANCE.clear();

EndpointNameDictionary.INSTANCE.clear();

}

}

  • ServiceResetCommandExecutor实现了CommandExecutor接口,其execute方法接收ServiceResetCommand,然后执行ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown(),重置RemoteDownstreamConfig.Agent.SERVICE_ID、RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID、RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME,清空NetworkAddressDictionary.INSTANCE、EndpointNameDictionary.INSTANCE

小结

ServiceInstancePingServiceHandler继承了ServiceInstancePingGrpc.ServiceInstancePingImplBase,实现了GRPCHandler接口;其构造器获取serviceInstanceInventoryCache、serviceInventoryRegister、serviceInstanceInventoryRegister、commandService;其doPing方法执行serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime),若serviceInstanceInventoryCache.get(serviceInstanceId)为nul则给agent发送ServiceResetCommand

doc

  • ServiceInstancePingServiceHandler

以上是 聊聊skywalking的ServiceResetCommand 的全部内容, 来源链接: utcz.com/z/514877.html

回到顶部