聊聊skywalking的springcloudgatewayplugin

编程

NettyRoutingFilterInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/NettyRoutingFilterInstrumentation.java

public class NettyRoutingFilterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

@Override

public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[]{

new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named("filter").and(takesArgumentWithType(0, "org.springframework.web.server.ServerWebExchange"));

}

@Override

public String getMethodsInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor";

}

@Override

public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override

public ClassMatch enhanceClass() {

return byName("org.springframework.cloud.gateway.filter.NettyRoutingFilter");

}

@Override

protected final String[] witnessClasses() {

return new String[]{"org.springframework.cloud.gateway.handler.FilteringWebHandler", "reactor.netty.http.client.HttpClientOperations"};

}

}

  • NettyRoutingFilterInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor拦截org.springframework.cloud.gateway.filter.NettyRoutingFilter带有org.springframework.web.server.ServerWebExchange参数的filter方法

NettyRoutingFilterInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/NettyRoutingFilterInterceptor.java

public class NettyRoutingFilterInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);

if (instance != null) {

SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();

ContextManager.getRuntimeContext().put(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER, swTransmitter);

}

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Object ret) throws Throwable {

if (ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER) != null) {

ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);

}

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

}

public static EnhancedInstance getInstance(Object o) {

EnhancedInstance instance = null;

if (o instanceof ServerWebExchangeDecorator) {

instance = getEnhancedInstance((ServerWebExchangeDecorator) o);

} else if (o instanceof DefaultServerWebExchange) {

instance = (EnhancedInstance) o;

}

return instance;

}

private static EnhancedInstance getEnhancedInstance(ServerWebExchangeDecorator serverWebExchangeDecorator) {

Object o = serverWebExchangeDecorator.getDelegate();

if (o instanceof ServerWebExchangeDecorator) {

return getEnhancedInstance((ServerWebExchangeDecorator) o);

} else if (o instanceof DefaultServerWebExchange) {

return (EnhancedInstance) o;

} else if (o == null) {

throw new NullPointerException("The expected class DefaultServerWebExchange is null");

} else {

throw new RuntimeException("Unknown parameter types:" + o.getClass());

}

}

}

  • NettyRoutingFilterInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取swTransmitter,然后以名为Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER的key放入到ContextManager.getRuntimeContext();其afterMethod方法执行ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER)

HttpClientOperationsInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/HttpClientOperationsInstrumentation.java

public class HttpClientOperationsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[]{

new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named("headers").and(takesArgumentWithType(0, "io.netty.handler.codec.http.HttpHeaders"));

}

@Override

public String getMethodsInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor";

}

@Override

public boolean isOverrideArgs() {

return false;

}

},new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named("send").and(takesArgumentWithType(0, "org.reactivestreams.Publisher"));

}

@Override

public String getMethodsInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor";

}

@Override

public boolean isOverrideArgs() {

return false;

}

},

new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named("status");

}

@Override

public String getMethodsInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor";

}

@Override

public boolean isOverrideArgs() {

return false;

}

},

};

}

@Override

public ClassMatch enhanceClass() {

return byName("reactor.netty.http.client.HttpClientOperations");

}

}

  • HttpClientOperationsInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine
  • 它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有io.netty.handler.codec.http.HttpHeaders参数的headers方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有org.reactivestreams.Publisher参数的send方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor拦截reactor.netty.http.client.HttpClientOperations的status方法

HttpClientOperationsHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsHeadersInterceptor.java

public class HttpClientOperationsHeadersInterceptor implements InstanceMethodsAroundInterceptor {

private static final ILog logger = LogManager.getLogger(HttpClientOperationsHeadersInterceptor.class);

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Object ret) throws Throwable {

Object transmitter = ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField();

if (transmitter != null) {

objInst.setSkyWalkingDynamicField(transmitter);

((EnhancedInstance) allArguments[0]).setSkyWalkingDynamicField(null);

}

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

}

}

  • HttpClientOperationsHeadersInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法执行objInst.setSkyWalkingDynamicField(transmitter)

HttpClientOperationsSendInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsSendInterceptor.java

public class HttpClientOperationsSendInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();

if (transmitter != null) {

HttpClientRequest request = (HttpClientRequest) objInst;

HttpHeaders header = request.requestHeaders();

ChannelOperations channelOpt = (ChannelOperations) objInst;

InetSocketAddress remote = (InetSocketAddress) (channelOpt.channel().remoteAddress());

String peer = remote.getHostName() + ":" + remote.getPort();

AbstractSpan span = ContextManager.createExitSpan(transmitter.getOperationName(), peer);

ContextManager.continued(transmitter.getSnapshot());

ContextCarrier contextCarrier = new ContextCarrier();

ContextManager.inject(contextCarrier);

span.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);

Tags.URL.set(span, peer + request.uri());

Tags.HTTP.METHOD.set(span, request.method().name());

SpanLayer.asHttp(span);

CarrierItem next = contextCarrier.items();

while (next.hasNext()) {

next = next.next();

header.set(next.getHeadKey(), next.getHeadValue());

}

transmitter.setSpanGateway(span.prepareForAsync());

ContextManager.stopSpan(span);

}

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Object ret) throws Throwable {

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

ContextManager.activeSpan().errorOccurred().log(t);

}

}

  • HttpClientOperationsSendInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取request header、uri、method等信息设置到span中,最后执行ContextManager.stopSpan(span);其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

HttpClientOperationsStatusInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsStatusInterceptor.java

public class HttpClientOperationsStatusInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();

if (transmitter != null) {

HttpResponseStatus response = (HttpResponseStatus) ret;

if (response.code() >= 400) {

Tags.STATUS_CODE.set(transmitter.getSpanGateway().errorOccurred(), String.valueOf(response.code()));

}

transmitter.getSpanGateway().asyncFinish();

objInst.setSkyWalkingDynamicField(null);

}

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

ContextManager.activeSpan().errorOccurred().log(t);

}

}

  • HttpClientOperationsStatusInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法获取transmitter,在response.code()大于等于400时设置statusCode的tag,然后执行transmitter.getSpanGateway().asyncFinish();其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)

FilteringWebHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/FilteringWebHandlerInstrumentation.java

public class FilteringWebHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

@Override

public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[]{

new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named("handle");

}

@Override

public String getMethodsInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor";

}

@Override

public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override

public ClassMatch enhanceClass() {

return byName("org.springframework.cloud.gateway.handler.FilteringWebHandler");

}

}

  • FilteringWebHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor拦截org.springframework.cloud.gateway.handler.FilteringWebHandler的handle方法

FilteringWebHandlerInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/FilteringWebHandlerInterceptor.java

public class FilteringWebHandlerInterceptor implements InstanceMethodsAroundInterceptor {

private static final String SPRING_CLOUD_GATEWAY_ROUTE_PREFIX = "GATEWAY/";

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);

if (instance == null) {

return;

}

ContextSnapshot contextSnapshot = (ContextSnapshot) instance.getSkyWalkingDynamicField();

if (contextSnapshot == null) {

return;

}

ServerWebExchange exchange = (ServerWebExchange) allArguments[0];

String operationName = SPRING_CLOUD_GATEWAY_ROUTE_PREFIX;

Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);

operationName = operationName + route.getId();

SWTransmitter transmitter = new SWTransmitter(contextSnapshot, operationName);

instance.setSkyWalkingDynamicField(transmitter);

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Object ret) throws Throwable {

EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);

if (instance == null) {

return ret;

}

SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();

if (swTransmitter == null) {

return ret;

}

Mono<Void> mono = (Mono) ret;

return mono.doFinally(d -> {

ServerWebExchange exchange = (ServerWebExchange) allArguments[0];

HttpStatus statusCode = exchange.getResponse().getStatusCode();

if (statusCode == HttpStatus.TOO_MANY_REQUESTS) {

AbstractSpan localSpan = ContextManager.createLocalSpan(swTransmitter.getOperationName());

Tags.STATUS_CODE.set(localSpan,statusCode.toString());

SpanLayer.asHttp(localSpan);

localSpan.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);

ContextManager.continued(swTransmitter.getSnapshot());

ContextManager.stopSpan(localSpan);

}

});

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

}

}

  • FilteringWebHandlerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法从exchange中获取route信息最后形成operationName创建SWTransmitter并执行instance.setSkyWalkingDynamicField(transmitter);其afterMethod方法获取SWTransmitter,然后注册mono的doFinally回调,在里头获取statusCode更细span,然后执行ContextManager.continued(swTransmitter.getSnapshot())及ContextManager.stopSpan(localSpan)

DefaultHttpHeadersInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/DefaultHttpHeadersInstrumentation.java

public class DefaultHttpHeadersInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[] {

new ConstructorInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {

return takesArgumentWithType(0, "io.netty.handler.codec.DefaultHeaders");

}

@Override public String getConstructorInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor";

}

}

};

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[0];

}

@Override

public ClassMatch enhanceClass() {

return byName("io.netty.handler.codec.http.DefaultHttpHeaders");

}

}

  • DefaultHttpHeadersInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor拦截io.netty.handler.codec.http.DefaultHttpHeaders的第一个参数为io.netty.handler.codec.DefaultHeaders的方法

DefaultHttpHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/DefaultHttpHeadersInterceptor.java

public class DefaultHttpHeadersInterceptor implements InstanceConstructorInterceptor {

@Override

public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {

Object transmitter = ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);

if (transmitter != null) {

objInst.setSkyWalkingDynamicField(transmitter);

ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);

}

}

}

  • DefaultHttpHeadersInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法主要是将ContextManager.getRuntimeContext()中的transmitter设置到objInst中,然后从ContextManager.getRuntimeContext()移除该transmitter

小结

skywalking的spring-cloud-gateway-plugin主要有四个instrument,分别是NettyRoutingFilterInstrumentation、HttpClientOperationsInstrumentation、FilteringWebHandlerInstrumentation、DefaultHttpHeadersInstrumentation

doc

  • NettyRoutingFilterInstrumentation
  • HttpClientOperationsInstrumentation
  • FilteringWebHandlerInstrumentation
  • DefaultHttpHeadersInstrumentation

以上是 聊聊skywalking的springcloudgatewayplugin 的全部内容, 来源链接: utcz.com/z/514185.html

回到顶部