聊聊skywalking的httpasyncclientplugin

编程

skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/resources/skywalking-plugin.def

httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncClientInstrumentation

httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestInstrumentation

httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncRequestExecutorInstrumentation

  • skywalking的http-async-client-plugin定义了三个增强,分别是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation

HttpAsyncClientInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncClientInstrumentation.java

public class HttpAsyncClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS_MINIMAL = "org.apache.http.impl.nio.client.MinimalHttpAsyncClient";

private static final String ENHANCE_CLASS_INTERNAL = "org.apache.http.impl.nio.client.InternalHttpAsyncClient";

private static final String METHOD = "execute";

private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor";

private static final String FIRST_ARG_TYPE = "org.apache.http.nio.protocol.HttpAsyncRequestProducer";

@Override

public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return null;

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(METHOD).and(takesArguments(4)

.and(takesArgument(0, named(FIRST_ARG_TYPE))));

}

@Override

public String getMethodsInterceptor() {

return INTERCEPTOR_CLASS;

}

@Override

public boolean isOverrideArgs() {

return true;

}

}

};

}

@Override

protected ClassMatch enhanceClass() {

return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_MINIMAL, ENHANCE_CLASS_INTERNAL);

}

}

  • HttpAsyncClientInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor增强了org.apache.http.impl.nio.client.MinimalHttpAsyncClient以及org.apache.http.impl.nio.client.InternalHttpAsyncClient类的有四个参数且第一个参数类型为org.apache.http.nio.protocol.HttpAsyncRequestProducer的execute方法

HttpAsyncClientInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptor.java

public class HttpAsyncClientInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

HttpAsyncResponseConsumer consumer = (HttpAsyncResponseConsumer) allArguments[1];

HttpContext context = (HttpContext) allArguments[2];

FutureCallback callback = (FutureCallback) allArguments[3];

allArguments[1] = new HttpAsyncResponseConsumerWrapper(consumer);

allArguments[3] = new FutureCallbackWrapper(callback);

CONTEXT_LOCAL.set(context);

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

}

}

  • HttpAsyncClientInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法使用HttpAsyncResponseConsumerWrapper包装了第二个参数,使用FutureCallbackWrapper包装了第四个参数,最后执行CONTEXT_LOCAL.set(context);

SessionRequestInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/SessionRequestInstrumentation.java

public class SessionRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor";

private static final String COMPLETED_METHOD = "completed";

private static final String TIMEOUT_METHOD = "timeout";

private static final String FAILED_METHOD = "failed";

private static final String CANCEL_METHOD = "cancel";

private static final String SUCCESS_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor";

private static final String FAIL_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor";

private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl";

@Override

public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getConstructorMatcher() {

return any();

}

@Override

public String getConstructorInterceptor() {

return CONSTRUCTOR_INTERCEPTOR_CLASS;

}

}

};

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(COMPLETED_METHOD);

}

@Override

public String getMethodsInterceptor() {

return SUCCESS_INTERCEPTOR_CLASS;

}

@Override

public boolean isOverrideArgs() {

return false;

}

},new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(TIMEOUT_METHOD).or(named(FAILED_METHOD).or(named(CANCEL_METHOD)));

}

@Override

public String getMethodsInterceptor() {

return FAIL_INTERCEPTOR_CLASS;

}

@Override

public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override

protected ClassMatch enhanceClass() {

return byName(ENHANCE_CLASS);

}

}

  • SessionRequestInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它增强的类为org.apache.http.impl.nio.reactor.SessionRequestImpl;它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor增强了构造器方法,使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor增强了completed方法,使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor增强了timeout、failed、cancel方法

SessionRequestConstructorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestConstructorInterceptor.java

public class SessionRequestConstructorInterceptor implements InstanceConstructorInterceptor {

@Override

public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {

if (ContextManager.isActive()) {

if (ContextManager.activeSpan().isExit()) {

CONTEXT_LOCAL.remove();

return;

}

ContextSnapshot snapshot = ContextManager.capture();

objInst.setSkyWalkingDynamicField(new Object[]{snapshot, CONTEXT_LOCAL.get()});

}

CONTEXT_LOCAL.remove();

}

}

  • SessionRequestConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法执行ContextManager.capture()并将snapshot及CONTEXT_LOCAL设置到object的skyWalkingDynamicField

SessionRequestCompleteInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestCompleteInterceptor.java

public class SessionRequestCompleteInterceptor implements InstanceMethodsAroundInterceptor {

public static ThreadLocal<HttpContext> CONTEXT_LOCAL = new ThreadLocal<HttpContext>();

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

Object[] array = (Object[]) objInst.getSkyWalkingDynamicField();

if (array == null || array.length == 0) {

return;

}

ContextSnapshot snapshot = (ContextSnapshot) array[0];

ContextManager.createLocalSpan("httpasyncclient/local");

if (snapshot != null) {

ContextManager.continued(snapshot);

}

CONTEXT_LOCAL.set((HttpContext) array[1]);

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

}

}

  • SessionRequestCompleteInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法主要是获取snapshot,执行ContextManager.continued(snapshot),并设置CONTEXT_LOCAL

SessionRequestFailInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestFailInterceptor.java

public class SessionRequestFailInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

//this means actual request will not started. so the span has not been created,we cannot log the status.

CONTEXT_LOCAL.remove();

objInst.setSkyWalkingDynamicField(null);

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

}

}

  • SessionRequestFailInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行CONTEXT_LOCAL.remove()及objInst.setSkyWalkingDynamicField(null)

HttpAsyncRequestExecutorInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncRequestExecutorInstrumentation.java

public class HttpAsyncRequestExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor";

private static final String METHOD = "requestReady";

private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor";

@Override

public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return null;

}

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {

@Override

public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(METHOD);

}

@Override

public String getMethodsInterceptor() {

return INTERCEPTOR_CLASS;

}

@Override

public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override

protected ClassMatch enhanceClass() {

return byName(ENHANCE_CLASS);

}

}

  • HttpAsyncRequestExecutorInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor增强org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor的requestReady方法

HttpAsyncRequestExecutorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncRequestExecutorInterceptor.java

public class HttpAsyncRequestExecutorInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

HttpContext context = CONTEXT_LOCAL.get();

CONTEXT_LOCAL.remove();

if (context == null) {

return;

}

final ContextCarrier contextCarrier = new ContextCarrier();

HttpRequestWrapper requestWrapper = (HttpRequestWrapper) context.getAttribute(HttpClientContext.HTTP_REQUEST);

HttpHost httpHost = (HttpHost) context.getAttribute(HttpClientContext.HTTP_TARGET_HOST);

RequestLine requestLine = requestWrapper.getRequestLine();

String uri = requestLine.getUri();

String operationName = uri.startsWith("http") ? new URL(uri).getPath() : uri;

int port = httpHost.getPort();

AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, httpHost.getHostName() + ":" + (port == -1 ? 80 : port));

span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT);

Tags.URL.set(span, requestWrapper.getOriginal().getRequestLine().getUri());

Tags.HTTP.METHOD.set(span, requestLine.getMethod());

SpanLayer.asHttp(span);

CarrierItem next = contextCarrier.items();

while (next.hasNext()) {

next = next.next();

requestWrapper.setHeader(next.getHeadKey(), next.getHeadValue());

}

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {

return ret;

}

@Override

public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

}

}

  • HttpAsyncRequestExecutorInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.createExitSpan,然后设置url、method的tag,最后将contextCarrier.items()通过requestWrapper的header进行透传

小结

skywalking的http-async-client-plugin定义了三个增强,分别是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation

doc

  • HttpAsyncClientInstrumentation
  • SessionRequestInstrumentation
  • HttpAsyncRequestExecutorInstrumentation

以上是 聊聊skywalking的httpasyncclientplugin 的全部内容, 来源链接: utcz.com/z/514282.html

回到顶部