聊聊skywalking的jvmreceiverplugin

编程

本文主要研究一下skywalking的jvm-receiver-plugin

JVMModuleProvider

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/JVMModuleProvider.java

public class JVMModuleProvider extends ModuleProvider {

@Override public String name() {

return "default";

}

@Override public Class<? extends ModuleDefine> module() {

return JVMModule.class;

}

@Override public ModuleConfig createConfigBeanIfAbsent() {

return null;

}

@Override public void prepare() {

}

@Override public void start() {

GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);

grpcHandlerRegister.addHandler(new JVMMetricsServiceHandler(getManager()));

grpcHandlerRegister.addHandler(new JVMMetricReportServiceHandler(getManager()));

}

@Override public void notifyAfterCompleted() {

}

@Override public String[] requiredModules() {

return new String[] {CoreModule.NAME, SharingServerModule.NAME};

}

}

  • JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler

JVMMetricsService.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent/JVMMetricsService.proto

syntax = "proto3";

option java_multiple_files = true;

option java_package = "org.apache.skywalking.apm.network.language.agent";

option csharp_namespace = "SkyWalking.NetworkProtocol";

import "language-agent/Downstream.proto";

import "common/JVM.proto";

service JVMMetricsService {

rpc collect (JVMMetrics) returns (Downstream) {

}

}

message JVMMetrics {

repeated JVMMetric metrics = 1;

int32 applicationInstanceId = 2;

}

  • JVMMetricsService.proto定义了JVMMetricsService服务,它有一个collect方法接收JVMMetrics类型的参数

JVMMetricsServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricsServiceHandler.java

public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {

private static final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);

private final JVMSourceDispatcher jvmSourceDispatcher;

public JVMMetricsServiceHandler(ModuleManager moduleManager) {

this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);

}

@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {

int serviceInstanceId = request.getApplicationInstanceId();

if (logger.isDebugEnabled()) {

logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);

}

request.getMetricsList().forEach(metrics -> {

long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());

jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);

});

responseObserver.onNext(Downstream.newBuilder().build());

responseObserver.onCompleted();

}

}

  • JVMMetricsServiceHandler继承了JVMMetricsServiceGrpc.JVMMetricsServiceImplBase,其构造器创建JVMSourceDispatcher;其collect方法遍历request.getMetricsList()挨个执行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMMetric.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent-v2/JVMMetric.proto

syntax = "proto3";

option java_multiple_files = true;

option java_package = "org.apache.skywalking.apm.network.language.agent.v2";

option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";

import "common/JVM.proto";

service JVMMetricReportService {

rpc collect (JVMMetricCollection) returns (Commands) {

}

}

message JVMMetricCollection {

repeated JVMMetric metrics = 1;

int32 serviceInstanceId = 2;

}

  • JVMMetric.proto定义了JVMMetricReportService服务,它有一个collect方法接收JVMMetricCollection类型的参数

JVMMetricReportServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricReportServiceHandler.java

public class JVMMetricReportServiceHandler extends JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase implements GRPCHandler {

private static final Logger logger = LoggerFactory.getLogger(JVMMetricReportServiceHandler.class);

private final JVMSourceDispatcher jvmSourceDispatcher;

public JVMMetricReportServiceHandler(ModuleManager moduleManager) {

this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);

}

@Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) {

int serviceInstanceId = request.getServiceInstanceId();

if (logger.isDebugEnabled()) {

logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);

}

request.getMetricsList().forEach(metrics -> {

long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());

jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);

});

responseObserver.onNext(Commands.newBuilder().build());

responseObserver.onCompleted();

}

}

  • JVMMetricReportServiceHandler继承了JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase,其构造器创建JVMSourceDispatcher;其collect方法遍历request.getMetricsList()挨个执行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMSourceDispatcher

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMSourceDispatcher.java

public class JVMSourceDispatcher {

private static final Logger logger = LoggerFactory.getLogger(JVMSourceDispatcher.class);

private final SourceReceiver sourceReceiver;

private final ServiceInstanceInventoryCache instanceInventoryCache;

public JVMSourceDispatcher(ModuleManager moduleManager) {

this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);

this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);

}

void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {

ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);

int serviceId;

if (Objects.nonNull(serviceInstanceInventory)) {

serviceId = serviceInstanceInventory.getServiceId();

} else {

logger.warn("Can"t find service by service instance id from cache, service instance id is: {}", serviceInstanceId);

return;

}

this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());

this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());

this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());

this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());

}

private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) {

ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();

serviceInstanceJVMCPU.setId(serviceInstanceId);

serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);

serviceInstanceJVMCPU.setServiceId(serviceId);

serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);

serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());

serviceInstanceJVMCPU.setTimeBucket(timeBucket);

sourceReceiver.receive(serviceInstanceJVMCPU);

}

private void sendToGCMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List<GC> gcs) {

gcs.forEach(gc -> {

ServiceInstanceJVMGC serviceInstanceJVMGC = new ServiceInstanceJVMGC();

serviceInstanceJVMGC.setId(serviceInstanceId);

serviceInstanceJVMGC.setName(Const.EMPTY_STRING);

serviceInstanceJVMGC.setServiceId(serviceId);

serviceInstanceJVMGC.setServiceName(Const.EMPTY_STRING);

switch (gc.getPhrase()) {

case NEW:

serviceInstanceJVMGC.setPhrase(GCPhrase.NEW);

break;

case OLD:

serviceInstanceJVMGC.setPhrase(GCPhrase.OLD);

break;

}

serviceInstanceJVMGC.setTime(gc.getTime());

serviceInstanceJVMGC.setCount(gc.getCount());

serviceInstanceJVMGC.setTimeBucket(timeBucket);

sourceReceiver.receive(serviceInstanceJVMGC);

});

}

private void sendToMemoryMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,

List<Memory> memories) {

memories.forEach(memory -> {

ServiceInstanceJVMMemory serviceInstanceJVMMemory = new ServiceInstanceJVMMemory();

serviceInstanceJVMMemory.setId(serviceInstanceId);

serviceInstanceJVMMemory.setName(Const.EMPTY_STRING);

serviceInstanceJVMMemory.setServiceId(serviceId);

serviceInstanceJVMMemory.setServiceName(Const.EMPTY_STRING);

serviceInstanceJVMMemory.setHeapStatus(memory.getIsHeap());

serviceInstanceJVMMemory.setInit(memory.getInit());

serviceInstanceJVMMemory.setMax(memory.getMax());

serviceInstanceJVMMemory.setUsed(memory.getUsed());

serviceInstanceJVMMemory.setCommitted(memory.getCommitted());

serviceInstanceJVMMemory.setTimeBucket(timeBucket);

sourceReceiver.receive(serviceInstanceJVMMemory);

});

}

private void sendToMemoryPoolMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,

List<MemoryPool> memoryPools) {

memoryPools.forEach(memoryPool -> {

ServiceInstanceJVMMemoryPool serviceInstanceJVMMemoryPool = new ServiceInstanceJVMMemoryPool();

serviceInstanceJVMMemoryPool.setId(serviceInstanceId);

serviceInstanceJVMMemoryPool.setName(Const.EMPTY_STRING);

serviceInstanceJVMMemoryPool.setServiceId(serviceId);

serviceInstanceJVMMemoryPool.setServiceName(Const.EMPTY_STRING);

switch (memoryPool.getType()) {

case NEWGEN_USAGE:

serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.NEWGEN_USAGE);

break;

case OLDGEN_USAGE:

serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.OLDGEN_USAGE);

break;

case PERMGEN_USAGE:

serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.PERMGEN_USAGE);

break;

case SURVIVOR_USAGE:

serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.SURVIVOR_USAGE);

break;

case METASPACE_USAGE:

serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.METASPACE_USAGE);

break;

case CODE_CACHE_USAGE:

serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.CODE_CACHE_USAGE);

break;

}

serviceInstanceJVMMemoryPool.setInit(memoryPool.getInit());

serviceInstanceJVMMemoryPool.setMax(memoryPool.getMax());

serviceInstanceJVMMemoryPool.setUsed(memoryPool.getUsed());

serviceInstanceJVMMemoryPool.setCommitted(memoryPool.getCommited());

serviceInstanceJVMMemoryPool.setTimeBucket(timeBucket);

sourceReceiver.receive(serviceInstanceJVMMemoryPool);

});

}

}

  • JVMSourceDispatcher主要是提供了sendMetric方法,该方法执行sendToCpuMetricProcess、sendToMemoryMetricProcess、sendToMemoryPoolMetricProcess、sendToGCMetricProcess方法;sendToCpuMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMCPU);sendToMemoryMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMMemory);sendToMemoryPoolMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMMemoryPool);sendToGCMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMGC)

小结

JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler;前者使用的是JVMMetricsService.proto,后者使用的是agent-v2的JVMMetric.proto

doc

  • JVMModuleProvider
  • JVMMetricsServiceHandler
  • JVMMetricReportServiceHandler

以上是 聊聊skywalking的jvmreceiverplugin 的全部内容, 来源链接: utcz.com/z/514630.html

回到顶部