Skywalking数据采集与收集源码分析
skywalking官网:https://skywalking.apache.org/
skywalking的架构图如下:
Skywalking的agent负责采集数据,发送到collector,collector聚合,并且存储这些数据,且提供了一个简洁使用的UI端,可共我们查看监控的指标。
下面我们来开始分析skywalking的源码。
下载源码并构建
因为skywalking为了实现高性能通信,采用的是grpc的方式来实现服务器与客户端的数据传输的,所以导入之后我们需要稍微做一些事情,我们可以参考docs/en/guides/How-to-build.md
这篇文档来构建。
打包构建
我们可以在github上面将skywalking源码fork一份,然后下载到自己的本地。
// 直接git clone --recurse-submodules https://github.com/apache/skywalking.git
// 或者
git clone https://github.com/apache/skywalking.git
cd skywalking/
git submodule init
git submodule update
执行命令:
./mvnw clean package -DskipTests
最终打好的包在dist目录下面
在IDEA里面构建源码
用IDEA打开skywalking项目(作为maven项目导入)
然后在skywalking所在目录命令行运行脚本去编译构建(因为skywalking用到了grpc):
./mvnw compile -Dmaven.test.skip=false
然后查看设置生成的源代码(主要是看potobuf文件编译生成的源代码)
apm-protocol/apm-network/target/generated-sources/protobuf 选中这个目录下面的
grpc-java
和java
,然后右键选择Mark Directory As
-->Generated Sources Root
如下图所示
oap-server/server-core/target/generated-sources/protobuf目录的
grpc-java
和java
文件夹Mark Directory As-->
Generated Sources Root`oap-server/server-receiver-plugin/receiver-proto/target/generated-sources/protobuf 目录的
grpc-java
和java
文件夹Mark Directory As-->
Generated Sources Root`oap-server/exporter/target/generated-sources/protobuf目录的
grpc-java
和java
文件夹Mark Directory As-->
Generated Sources Root`oap-server/server-configuration/grpc-configuration-sync/target/generated-sources/protobuf目录的
grpc-java
和java
文件夹Mark Directory As-->
Generated Sources Root`oap-server/oal-grammar/target/generated-sources目录的
grpc-java
和java
文件夹Mark Directory As-->
Generated Sources Root`
在Eclipse里面构建源码
1、按照maven项目导入到eclipse中
2、添加一下内容到skywalking/pom.xml
中
<plugin><groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/java/main</source>
<source>apm-protocol/apm-network/target/generated-sources/protobuf</source>
<source>apm-collector/apm-collector-remote/collector-remote-grpc-provider/target/generated-sources/protobuf</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
3、添加如下内容,使得eclipse的M2e插件能够支持扩展配置
<pluginManagement><plugins>
<!--This plugin"s configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<versionRange>[1.8,)</versionRange>
<goals>
<goal>add-source</goal>
</goals>
</pluginExecutionFilter>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
4、apm-collector-remote/collector-remote-grpc-provider/pom.xml
文件中添加如下依赖
<dependency><groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
5、执行命令
./mvnw compile -Dmaven.test.skip=true
6、执行命令
先执行maven clean,然后maven update
7、执行命令:
./mvnw compile
8、刷新项目
源码分析
skywalking的分布式链路追踪流程大致如下:
Agent采集数据
Agent发送数据到Collector
Collector接收数据
Collector将接收的数据存储到持久层
我们这里主要探探Agent采集Java类系统的数据比如spring等,以spring cloud的一个简易的分布式系统来讲:
这是一个建议的订单系统,有Eureka
,Order
,Product
,Stock
,当下订单的时候,order会调用product,product又会调用stock。
我们从apm-sniffer工程开始出发(sniffer即嗅探器、探针的意思)
apm-agent工程
我们看到这个工程只有一个类
org.apache.skywalking.apm.agent.SkyWalkingAgent
这个类有一个方法:
/*** 主入口,使用byte-buddy来实现来增强插件里面定义的所有的类。
*/
publicstaticvoidpremain(StringagentArgs, Instrumentationinstrumentation) throwsPluginException, IOException {
}
Agent采集数据
我们这里主要介绍JVM的数据和spring相关的数据
JVM的数据
我们看到在apm-agent-core里面有类:org.apache.skywalking.apm.agent.core.jvm.JVMService
这个类实现了BootService
和java.lang.Runnable
接口,而这个类是怎么执行里面的一些方法的呢?在apm-agent-core
这个工程的/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
文件里面有很多类的全限定名信息:
org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClientorg.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
而这每个类都实现了BootService
这个借口,BootService
是所有当插件机制开始起作用时需要启动的远程交换需要实现的接口。 BootService
启动的时候将调用boot
方法。
org.apache.skywalking.apm.agent.core.boot.ServiceManager
这个类里面会将所有实现BootService
的类的实例都执行一遍。
JVMService
类实例化后执行的boot
方法内容如下
@Overridepublicvoidboot() throwsThrowable {
// 创建一个持续收集(生产)指标的单一线程的线程池,这个线程池会定期(每秒)执行,而且执行的是JVMService的run方法
collectMetricFuture=Executors
.newSingleThreadScheduledExecutor(newDefaultNamedThreadFactory("JVMService-produce"))
.scheduleAtFixedRate(newRunnableWithExceptionProtection(this, newRunnableWithExceptionProtection.CallbackWhenException() {
@Overridepublicvoidhandle(Throwablet) {
logger.error("JVMService produces metrics failure.", t);
}
}), 0, 1, TimeUnit.SECONDS);
// 创建一个持续发送(消费)数据的单一线程的线程池,这个线程池会定期(每秒)执行,而且执行的是JVMService的内部类Sender的run方法
sendMetricFuture=Executors
.newSingleThreadScheduledExecutor(newDefaultNamedThreadFactory("JVMService-consume"))
.scheduleAtFixedRate(newRunnableWithExceptionProtection(sender, newRunnableWithExceptionProtection.CallbackWhenException() {
@Overridepublicvoidhandle(Throwablet) {
logger.error("JVMService consumes and upload failure.", t);
}
}
), 0, 1, TimeUnit.SECONDS);
}
JVMService
类的run
方法:
publicvoidrun() {if (RemoteDownstreamConfig.Agent.SERVICE_ID!=DictionaryUtil.nullValue()
&&RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID!=DictionaryUtil.nullValue()
) {
longcurrentTimeMillis=System.currentTimeMillis();
try {
JVMMetric.BuilderjvmBuilder=JVMMetric.newBuilder();
jvmBuilder.setTime(currentTimeMillis);
jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());
jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());
jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricsList());
jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());
// JVM指标数据
JVMMetricjvmMetric=jvmBuilder.build();
// 收集数据后,放到消息队列LinkedBlockingQueue<JVMMetric> queue中
if (!queue.offer(jvmMetric)) {
queue.poll();
queue.offer(jvmMetric);
}
} catch (Exceptione) {
logger.error(e, "Collect JVM info fail.");
}
}
}
内部Sender类的run方法:
@Overridepublicvoidrun() {
if (RemoteDownstreamConfig.Agent.SERVICE_ID!=DictionaryUtil.nullValue()
&&RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID!=DictionaryUtil.nullValue()
) {
if (status==GRPCChannelStatus.CONNECTED) {
try {
JVMMetricCollection.Builderbuilder=JVMMetricCollection.newBuilder();
LinkedList<JVMMetric>buffer=newLinkedList<JVMMetric>();
queue.drainTo(buffer);
if (buffer.size() >0) {
builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
// 发送数据并接收返回的结果
Commandscommands=stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwablet) {
logger.error(t, "send JVM metrics to Collector fail.");
}
}
}
}
而具体数据是怎么发送的呢?我们来看采集的指标类JVMMetric.java
public finalclassJVMMetricextendscom.google.protobuf.GeneratedMessageV3implements
// @@protoc_insertion_point(message_implements:JVMMetric)
JVMMetricOrBuilder {
...
}
其实这个类是JVMMetric.proto编译后生成的,而JVMMetric.proto内容如下:
syntax = "proto3";
optionjava_multiple_files = true;
optionjava_package = "org.apache.skywalking.apm.network.language.agent.v2";
optioncsharp_namespace = "SkyWalking.NetworkProtocol";
import"common/common.proto";
import"common/JVM.proto";
serviceJVMMetricReportService {
// grpc定义的方法,参数类型JVMMetricCollection,返回类型为:Commands
rpccollect (JVMMetricCollection) returns (Commands) {
}
}
messageJVMMetricCollection {
repeatedJVMMetricmetrics = 1;
int32serviceInstanceId = 2;
}
common.proto内容如下:
syntax = "proto3";
optionjava_multiple_files = true;
optionjava_package = "org.apache.skywalking.apm.network.common";
optioncsharp_namespace = "SkyWalking.NetworkProtocol";
messageKeyStringValuePair {
stringkey = 1;
stringvalue = 2;
}
messageKeyIntValuePair {
stringkey = 1;
int32value = 2;
}
messageCPU {
doubleusagePercent = 2;
}
// In most cases, detect point should be `server` or `client`.
// Even in service mesh, this means `server`/`client` side sidecar
// `proxy` is reserved only.
enumDetectPoint {
client = 0;
server = 1;
proxy = 2;
}
messageCommands {
repeatedCommandcommands = 1;
}
messageCommand {
stringcommand = 1;
repeatedKeyStringValuePairargs = 2;
}
enumServiceType {
// An agent works inside the normal business application.
normal = 0;
// An agent works inside the database.
database = 1;
// An agent works inside the MQ.
mq = 2;
// An agent works inside the cache server.
cache = 3;
// An agent works inside the browser.
browser = 4;
}
jvm.proto内容如下:
syntax = "proto3";option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent";
option csharp_namespace = "SkyWalking.NetworkProtocol";
import "common/common.proto";
message JVMMetric {
int64 time = 1;
CPU cpu = 2;
repeated Memory memory = 3;
repeated MemoryPool memoryPool = 4;
repeated GC gc = 5;
}
message Memory {
bool isHeap = 1;
int64 init = 2;
int64 max = 3;
int64 used = 4;
int64 committed = 5;
}
message MemoryPool {
PoolType type = 1;
int64 init = 2;
int64 max = 3;
int64 used = 4;
int64 commited = 5;
}
enum PoolType {
CODE_CACHE_USAGE = 0;
NEWGEN_USAGE = 1;
OLDGEN_USAGE = 2;
SURVIVOR_USAGE = 3;
PERMGEN_USAGE = 4;
METASPACE_USAGE = 5;
}
message GC {
GCPhrase phrase = 1;
int64 count = 2;
int64 time = 3;
}
enum GCPhrase {
NEW = 0;
OLD = 1;
}
而服务接收端,即collector是怎么接收的呢?
接收端有一个类JVMMetricsServiceHandler
专门用来处理JVM的监控数据,这个类的collect
方法如下:
@Overridepublicvoidcollect(JVMMetricsrequest, StreamObserver<Downstream>responseObserver) {intserviceInstanceId=request.getApplicationInstanceId();
if (logger.isDebugEnabled()) {
logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
}
// 处理数据,jvmSourceDispatcher发送到下一环节处理
request.getMetricsList().forEach(metrics-> {
longminuteTimeBucket=TimeBucket.getMinuteTimeBucket(metrics.getTime());
jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
然后我们看一下JVMSourceDispatcher的sendMetric方法
voidsendMetric(intserviceInstanceId, longminuteTimeBucket, JVMMetricmetrics) {ServiceInstanceInventoryserviceInstanceInventory=instanceInventoryCache.get(serviceInstanceId);
intserviceId;
if (Objects.nonNull(serviceInstanceInventory)) {
serviceId=serviceInstanceInventory.getServiceId();
} else {
logger.warn("Can"t find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
return;
}
this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
}
然后我们看sendTopCpuMetricProcess方法
privatevoidsendToCpuMetricProcess(intserviceId, intserviceInstanceId, longtimeBucket, CPUcpu) {ServiceInstanceJVMCPUserviceInstanceJVMCPU=newServiceInstanceJVMCPU();
serviceInstanceJVMCPU.setId(serviceInstanceId);
serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);
serviceInstanceJVMCPU.setServiceId(serviceId);
serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);
serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());
serviceInstanceJVMCPU.setTimeBucket(timeBucket);
sourceReceiver.receive(serviceInstanceJVMCPU);
}
SourceReceiver
的receive
来接收数据
然而SourceReceiver
是一个接口
publicinterfaceSourceReceiverextendsService {voidreceive(Sourcesource);
}
这个接口只有一个实现类
publicclassSourceReceiverImplimplementsSourceReceiver {}
receive的实现:
@Overridepublicvoidreceive(Sourcesource) {dispatcherManager.forward(source);
}
我们看到又调用了DispatcherManager
的forward
方法
publicvoidforward(Sourcesource) {if (source==null) {
return;
}
List<SourceDispatcher>dispatchers=dispatcherMap.get(source.scope());
/**
* Dispatcher is only generated by oal script analysis result.
* So these will/could be possible, the given source doesn"t have the dispatcher,
* when the receiver is open, and oal script doesn"t ask for analysis.
*/
if (dispatchers!=null) {
for (SourceDispatcherdispatcher : dispatchers) {
dispatcher.dispatch(source);
}
}
}
然后会调用SourceDispatcher
的dispatch
方法
而我们看到有这么多类实现了SourceDispatcher
接口,具体是那个方法实现了呢?我们可以打日志也可以简单分析一下,首先可以排除的书EndpointCallRElationDispatcher
、HttpAccessLogDispatcher
、
JaegerSpanRecordDispatcher
、ServiceCallRelationDispatcher
、ServiceInstanceCallRelationDispatcher
、ZipkinSpanRecordDispatcher
这几个类也就是说我们可以重点关注
DatabaseStatementDispatcher和SegmentDispatcher这两个类,而这个DatabaseStatementDispatcher
并没有被使用,所以我们可以重点分析SegmentDispatcher
这个类
publicclassSegmentDispatcherimplementsSourceDispatcher<Segment> {
@Overridepublicvoiddispatch(Segmentsource) {
SegmentRecordsegment=newSegmentRecord();
segment.setSegmentId(source.getSegmentId());
segment.setTraceId(source.getTraceId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
// 构造SegmentRecord对象,然后RecordStreamProcessor的in方法去处理(消费)segment信息
RecordStreamProcessor.getInstance().in(segment);
}
}
然后我们看一下RecordStreamProcessor
的in方法
publicvoidin(Recordrecord) {RecordPersistentWorkerworker=workers.get(record.getClass());
if (worker!=null) {
worker.in(record);
}
}
然后是RecordPersistentWorker
的in方法
@Overridepublicvoidin(Recordrecord) {try {
InsertRequestinsertRequest=recordDAO.prepareBatchInsert(model, record);
batchDAO.asynchronous(insertRequest);
} catch (IOExceptione) {
logger.error(e.getMessage(), e);
}
}
到此我们能够看到持久化到数据库的操作(调用es或者h2的相关接口实现)
整个过程我们看到JVM的数据是在agent发送到collector后立马就持久化存储了(h2或者es)。
插件源码分析
关于插件开发,我们可以参考Java-Plugin-Development-Guide.md这篇文档,或者我翻译过来后的中文文档,接下来我们看看spring框架的数据是如何采集的,在apm-sniffer/apm-sdk-plugin
目录下,有个字项目spring-plugins
里面放的都是spring相关的插件用来实现spring框架的数据采集
我们以mvc-annotation-4.x-plugin
项目为例来看,skywalking的插件是如何开发的。
我们可以看到resources目录的文件src/main/resources/skywalking-plugin.def
这个skywalking-plugin.def就是用来定义插件的。
spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.ControllerInstrumentationspring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.RestControllerInstrumentation
spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.HandlerMethodInstrumentation
spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.InvocableHandlerInstrumentation
spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.ControllerForLowVersionInstrumentation
spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.RestControllerForLowVersionInstrumentation
这个里面定义了一下几个插件
ControllerInstrumentation
RestControllerInstrumentation
HandlerMethodInstrumentation
InvocableHandlerInstrumentation
ControllerForLowVersionInstrumentation
RestControllerForLowVersionInstrumentation
我们根据plugin的开发流程来分析代码,应该是有一个类定义拦截机制,另外一个类是增强机制。我们先看一下类的结构图:
我们看到AbstractClassEnhancePluginDefine
、ClassEnhancePluginDefine
、ClassInstanceMethodsEnhancePluginDefine
都是skywalking提供的基础类,而这个插件里面的类增强都是继承这些父类的。
我们先来看org.apache.skywalking.apm.plugin.spring.mvc.v4.define.ControllerInstrumentation
,我们先看抽象类AbstractSpring4Instrumentation
的内容:
publicabstractclassAbstractSpring4InstrumentationextendsClassInstanceMethodsEnhancePluginDefine {// 这块个人感觉应该是witness_class写错了,
publicstaticfinalStringWITHNESS_CLASSES="org.springframework.cache.interceptor.SimpleKey";
@Override
protectedString[] witnessClasses() {
returnnewString[] {WITHNESS_CLASSES, "org.springframework.cache.interceptor.DefaultKeyGenerator"};
}
}
然后它的子类:
/*** ControllerInstrumentation 增强所有有RequestMapping注解和Controller注解的类的构造函数和方法
* ControllerConstructorInterceptor 在执行构造函数之前将controller的base path(路径)放到动态
* 字段里面
* RequestMappingMethodInterceptor先从动态字段里面获取request path,如果没找到
* RequestMappingMethodInterceptor会结合路径和当前方法上面的注解和base path将新的路径放到动态
* 字段里面
* @author zhangxin
*/
publicabstractclassAbstractControllerInstrumentationextendsAbstractSpring4Instrumentation {
// 构造函数拦截点
@Override
publicConstructorInterceptPoint[] getConstructorsInterceptPoints() {
returnnewConstructorInterceptPoint[] {
newConstructorInterceptPoint() {
// 匹配方式,这里是返回了一个any()即总是匹配
@Override
publicElementMatcher<MethodDescription>getConstructorMatcher() {
returnany();
}
// 拦截器类
@Override
publicStringgetConstructorInterceptor() {
return"org.apache.skywalking.apm.plugin.spring.mvc.v4.ControllerConstructorInterceptor";
}
}
};
}
// 实例方法拦截点,返回了一个数组,一个是针对@RequestMapping这种类型的注解,一个是针对
// @GetMapping、@PostMapping、@PutMapping、@DeleteMapping、@PatchMapping这些类型的注解
@Override
publicInstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
returnnewInstanceMethodsInterceptPoint[] {
newDeclaredInstanceMethodsInterceptPoint() {
// 所有有RequestMapping这个注解的
@Override
publicElementMatcher<MethodDescription>getMethodsMatcher() {
returnisAnnotatedWith(named("org.springframework.web.bind.annotation.RequestMapping"));
}
// RequestMappingMethodInterceptor
@Override
publicStringgetMethodsInterceptor() {
returnConstants.REQUEST_MAPPING_METHOD_INTERCEPTOR;
}
@Override
publicbooleanisOverrideArgs() {
returnfalse;
}
},
newDeclaredInstanceMethodsInterceptPoint() {
@Override
publicElementMatcher<MethodDescription>getMethodsMatcher() {
returnisAnnotatedWith(named("org.springframework.web.bind.annotation.GetMapping"))
.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.PostMapping")))
.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.PutMapping")))
.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.DeleteMapping")))
.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.PatchMapping")));
}
// RestMappingMethodInterceptor
@Override
publicStringgetMethodsInterceptor() {
returnConstants.REST_MAPPING_METHOD_INTERCEPTOR;
}
@Override
publicbooleanisOverrideArgs() {
returnfalse;
}
}
};
}
// 需要增强的类的匹配方式
@Override
protectedClassMatchenhanceClass() {
// 抽象类不定义具体匹配方式,而是交给子类,让子类去实现getEnhanceAnnotations方法。
returnClassAnnotationMatch.byClassAnnotationMatch(getEnhanceAnnotations());
}
protectedabstractString[] getEnhanceAnnotations();
}
AbstractControllerInstrumentation
这个类并没有定义确定的类的匹配 然后是ControllerInstrumentation
publicclassControllerInstrumentationextendsAbstractControllerInstrumentation {
publicstaticfinalStringENHANCE_ANNOTATION="org.springframework.stereotype.Controller";
// 匹配所有有@Controller注解的类
@OverrideprotectedString[] getEnhanceAnnotations() {
returnnewString[] {ENHANCE_ANNOTATION};
}
}
接下来我们来看构造函数的拦截器类ControllerConstructorInterceptor
/*** The <code>ControllerConstructorInterceptor</code> intercepts the Controller"s constructor, in order to acquire the
* mapping annotation, if exist.
*
* But, you can see we only use the first mapping value, <B>Why?</B>
*
* Right now, we intercept the controller by annotation as you known, so we CAN"T know which uri patten is actually
* matched. Even we know, that costs a lot.
*
* If we want to resolve that, we must intercept the Spring MVC core codes, that is not a good choice for now.
*
* Comment by @wu-sheng
*/
publicclassControllerConstructorInterceptorimplementsInstanceConstructorInterceptor {
@Override
publicvoidonConstruct(EnhancedInstanceobjInst, Object[] allArguments) {
StringbasePath="";
// 获取@RequestMapping的信息,其实主要是想要获取到路径信息
RequestMappingbasePathRequestMapping=objInst.getClass().getAnnotation(RequestMapping.class);
if (basePathRequestMapping!=null) {
if (basePathRequestMapping.value().length>0) {
basePath=basePathRequestMapping.value()[0];
} elseif (basePathRequestMapping.path().length>0) {
basePath=basePathRequestMapping.path()[0];
}
}
EnhanceRequireObjectCacheenhanceRequireObjectCache=newEnhanceRequireObjectCache();
enhanceRequireObjectCache.setPathMappingCache(newPathMappingCache(basePath));
objInst.setSkyWalkingDynamicField(enhanceRequireObjectCache);
}
}
然后我们看到这个插件里面只是定义了需要增强的类的匹配形式,并没有具体的创建EntrySpan
,ExitSpan
的处理逻辑。其实这块处理逻辑是在AbstractControllerInstrumentation
方法拦截定义设置好具体由哪个类来处理的主要是两个类:RequestMappingMethodInterceptor
,RestMappingMethodInterceptor
。
一个是针对@RequestMapping
这种注解的,一个是针对@GetMapping
这类注解的。其实@GetMapping
也是又@RequestMapping
而来的。GetMapping
本身就用了@RequestMapping
,相当于是指定method
的@RequestMapping
。
@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)
@Documented
@RequestMapping(method=RequestMethod.GET)
public@interfaceGetMapping {
}
RequestMappingMethodInterceptor
,RestMappingMethodInterceptor
继承了同一个父类:
AbstractMethodInterceptor
。这两个类本身只重写了父类的两个方法:
publicabstractStringgetRequestURL(Methodmethod);
publicabstractStringgetAcceptedMethodTypes(Methodmethod);
所以我们重点关注父类(AbstractMethodInterceptor
)的两个方法:
beforeMethod (方法调用前的逻辑)
afterMethod(方法调用后的逻辑)
@Override
publicvoidbeforeMethod(EnhancedInstanceobjInst, Methodmethod, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResultresult) throwsThrowable {
// forwardRequestFlag
BooleanforwardRequestFlag= (Boolean)ContextManager.getRuntimeContext().get(FORWARD_REQUEST_FLAG);
// 如果是forwardRequest就直接返回
/**
* Spring MVC plugin do nothing if current request is forward request.
* Ref: https://github.com/apache/skywalking/pull/1325
*/
if (forwardRequestFlag!=null&&forwardRequestFlag) {
return;
}
StringoperationName;
if (Config.Plugin.SpringMVC.USE_QUALIFIED_NAME_AS_ENDPOINT_NAME) {
operationName=MethodUtil.generateOperationName(method);
} else {
EnhanceRequireObjectCachepathMappingCache= (EnhanceRequireObjectCache)objInst.getSkyWalkingDynamicField();
StringrequestURL=pathMappingCache.findPathMapping(method);
if (requestURL==null) {
requestURL=getRequestURL(method);
pathMappingCache.addPathMapping(method, requestURL);
requestURL=getAcceptedMethodTypes(method) +pathMappingCache.findPathMapping(method);
}
operationName=requestURL;
}
// 设置operationName为requestURL
// 获取HttpServletRequest
HttpServletRequestrequest= (HttpServletRequest)ContextManager.getRuntimeContext().get(REQUEST_KEY_IN_RUNTIME_CONTEXT);
if (request!=null) {
// 拿到StackDepth
StackDepthstackDepth= (StackDepth)ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);
if (stackDepth==null) {
// new一个ContextCarrier
ContextCarriercontextCarrier=newContextCarrier();
CarrierItemnext=contextCarrier.items();
while (next.hasNext()) {
next=next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
// 创建EntrySpan
AbstractSpanspan=ContextManager.createEntrySpan(operationName, contextCarrier);
Tags.URL.set(span, request.getRequestURL().toString());
Tags.HTTP.METHOD.set(span, request.getMethod());
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
SpanLayer.asHttp(span);
if (Config.Plugin.SpringMVC.COLLECT_HTTP_PARAMS) {
finalMap<String, String[]>parameterMap=request.getParameterMap();
if (parameterMap!=null&&!parameterMap.isEmpty()) {
StringtagValue=CollectionUtil.toString(parameterMap);
tagValue=Config.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD>0
?StringUtil.cut(tagValue, Config.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD)
: tagValue;
Tags.HTTP.PARAMS.set(span, tagValue);
}
}
stackDepth=newStackDepth();
ContextManager.getRuntimeContext().put(CONTROLLER_METHOD_STACK_DEPTH, stackDepth);
} else {
AbstractSpanspan=
ContextManager.createLocalSpan(buildOperationName(objInst, method));
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
}
stackDepth.increment();
}
}
privateStringbuildOperationName(Objectinvoker, Methodmethod) {
StringBuilderoperationName=newStringBuilder(invoker.getClass().getName())
.append(".").append(method.getName()).append("(");
for (Class<?>type : method.getParameterTypes()) {
operationName.append(type.getName()).append(",");
}
if (method.getParameterTypes().length>0) {
operationName=operationName.deleteCharAt(operationName.length() -1);
}
returnoperationName.append(")").toString();
}
afterMethod
@OverridepublicObjectafterMethod(EnhancedInstanceobjInst, Methodmethod, Object[] allArguments, Class<?>[] argumentsTypes,
Objectret) throwsThrowable {
BooleanforwardRequestFlag= (Boolean)ContextManager.getRuntimeContext().get(FORWARD_REQUEST_FLAG);
/**
* Spring MVC plugin do nothing if current request is forward request.
* Ref: https://github.com/apache/skywalking/pull/1325
*/
if (forwardRequestFlag!=null&&forwardRequestFlag) {
returnret;
}
HttpServletRequestrequest= (HttpServletRequest)ContextManager.getRuntimeContext().get(REQUEST_KEY_IN_RUNTIME_CONTEXT);
if (request!=null) {
StackDepthstackDepth= (StackDepth)ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);
if (stackDepth==null) {
thrownewIllegalMethodStackDepthException();
} else {
stackDepth.decrement();
}
// 获取当前的span
AbstractSpanspan=ContextManager.activeSpan();
if (stackDepth.depth() ==0) {
HttpServletResponseresponse= (HttpServletResponse)ContextManager.getRuntimeContext().get(RESPONSE_KEY_IN_RUNTIME_CONTEXT);
if (response==null) {
thrownewServletResponseNotFoundException();
}
if (IS_SERVLET_GET_STATUS_METHOD_EXIST&&response.getStatus() >=400) {
span.errorOccurred();
Tags.STATUS_CODE.set(span, Integer.toString(response.getStatus()));
}
// 清楚一些上下文信息
ContextManager.getRuntimeContext().remove(REQUEST_KEY_IN_RUNTIME_CONTEXT);
ContextManager.getRuntimeContext().remove(RESPONSE_KEY_IN_RUNTIME_CONTEXT);
ContextManager.getRuntimeContext().remove(CONTROLLER_METHOD_STACK_DEPTH);
}
// 停止span
ContextManager.stopSpan();
}
returnret;
}
以上是 Skywalking数据采集与收集源码分析 的全部内容, 来源链接: utcz.com/z/513538.html