聊聊skywalking的JVMService

编程

BootService

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/BootService.java

public interface BootService {

void prepare() throws Throwable;

void boot() throws Throwable;

void onComplete() throws Throwable;

void shutdown() throws Throwable;

}

  • BootService定义了prepare、boot、onComplete、shutdown四个方法

JVMService

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java

@DefaultImplementor

public class JVMService implements BootService, Runnable {

private static final ILog logger = LogManager.getLogger(JVMService.class);

private LinkedBlockingQueue<JVMMetric> queue;

private volatile ScheduledFuture<?> collectMetricFuture;

private volatile ScheduledFuture<?> sendMetricFuture;

private Sender sender;

@Override

public void prepare() throws Throwable {

queue = new LinkedBlockingQueue<JVMMetric>(Config.Jvm.BUFFER_SIZE);

sender = new Sender();

ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender);

}

@Override

public void boot() throws Throwable {

collectMetricFuture = Executors

.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))

.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {

@Override public void handle(Throwable t) {

logger.error("JVMService produces metrics failure.", t);

}

}), 0, 1, TimeUnit.SECONDS);

sendMetricFuture = Executors

.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))

.scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() {

@Override public void handle(Throwable t) {

logger.error("JVMService consumes and upload failure.", t);

}

}

), 0, 1, TimeUnit.SECONDS);

}

@Override

public void onComplete() throws Throwable {

}

@Override

public void shutdown() throws Throwable {

collectMetricFuture.cancel(true);

sendMetricFuture.cancel(true);

}

@Override

public void run() {

if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()

&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()

) {

long currentTimeMillis = System.currentTimeMillis();

try {

JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder();

jvmBuilder.setTime(currentTimeMillis);

jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());

jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());

jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricsList());

jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());

JVMMetric jvmMetric = jvmBuilder.build();

if (!queue.offer(jvmMetric)) {

queue.poll();

queue.offer(jvmMetric);

}

} catch (Exception e) {

logger.error(e, "Collect JVM info fail.");

}

}

}

//......

}

  • JVMService实现了BootService及Runnable接口,其prepare方法创建了JVMMetric类型的LinkedBlockingQueue,以及Sender,并往GRPCChannelManager添加了该Sender;其boot方法注册了collectMetricFuture、sendMetricFuture两个调度任务,调度间隔都是1秒;其shutdown方法执行collectMetricFuture.cancel(true)及sendMetricFuture.cancel(true);其run方法主要是构建JVMMetric然后添加到queue中

Sender

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java

    private class Sender implements Runnable, GRPCChannelListener {

private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;

private volatile JVMMetricReportServiceGrpc.JVMMetricReportServiceBlockingStub stub = null;

@Override

public void run() {

if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()

&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()

) {

if (status == GRPCChannelStatus.CONNECTED) {

try {

JVMMetricCollection.Builder builder = JVMMetricCollection.newBuilder();

LinkedList<JVMMetric> buffer = new LinkedList<JVMMetric>();

queue.drainTo(buffer);

if (buffer.size() > 0) {

builder.addAllMetrics(buffer);

builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);

Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());

ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);

}

} catch (Throwable t) {

logger.error(t, "send JVM metrics to Collector fail.");

}

}

}

}

@Override

public void statusChanged(GRPCChannelStatus status) {

if (GRPCChannelStatus.CONNECTED.equals(status)) {

Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();

stub = JVMMetricReportServiceGrpc.newBlockingStub(channel);

}

this.status = status;

}

}

  • Sender实现了Runnable, GRPCChannelListener接口,其run方法在status为GRPCChannelStatus.CONNECTED时执行queue.drainTo(buffer),在buffer不为空时构造commands,然后通过ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands)发送JVMMetric;statusChanged方法会设置status,并在GRPCChannelStatus.CONNECTED.equals(status)时通过JVMMetricReportServiceGrpc.newBlockingStub(channel)设置stub

RunnableWithExceptionProtection

skywalking-6.6.0/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/RunnableWithExceptionProtection.java

public class RunnableWithExceptionProtection implements Runnable {

private Runnable run;

private CallbackWhenException callback;

public RunnableWithExceptionProtection(Runnable run, CallbackWhenException callback) {

this.run = run;

this.callback = callback;

}

@Override

public void run() {

try {

run.run();

} catch (Throwable t) {

callback.handle(t);

}

}

public interface CallbackWhenException {

void handle(Throwable t);

}

}

  • RunnableWithExceptionProtection实现了Runnable接口,其构造器要求Runnable及CallbackWhenException参数,其run方法会catch住run.run()的Throwable异常,然后回调CallbackWhenException的handle方法

小结

JVMService实现了BootService及Runnable接口,其prepare方法创建了JVMMetric类型的LinkedBlockingQueue,以及Sender,并往GRPCChannelManager添加了该Sender;其boot方法注册了collectMetricFuture、sendMetricFuture两个调度任务,调度间隔都是1秒;其shutdown方法执行collectMetricFuture.cancel(true)及sendMetricFuture.cancel(true);其run方法主要是构建JVMMetric然后添加到queue中

doc

  • JVMService

以上是 聊聊skywalking的JVMService 的全部内容, 来源链接: utcz.com/z/513762.html

回到顶部