聊聊skywalking的kafkaplugin
序
本文主要研究一下skywalking的kafka-plugin
skywalking-plugin.def
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentationkafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
- skywalking的kafka-plugin提供了CallbackInstrumentation、KafkaConsumerInstrumentation、KafkaProducerInstrumentation、KafkaProducerMapInstrumentation、KafkaTemplateInstrumentation、KafkaTemplateCallbackInstrumentation这几个增强
AbstractKafkaInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
public abstract class AbstractKafkaInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override protected String[] witnessClasses() {
return new String[]{"org.apache.kafka.clients.ApiVersions"};
}
}
- AbstractKafkaInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其witnessClasses为org.apache.kafka.clients.ApiVersions
AbstractKafkaTemplateInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
public abstract class AbstractKafkaTemplateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override protected String[] witnessClasses() {
return new String[]{"org.springframework.kafka.core.KafkaTemplate"};
}
}
- AbstractKafkaTemplateInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其witnessClasses为org.springframework.kafka.core.KafkaTemplate
CallbackInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
public class CallbackInstrumentation extends AbstractKafkaInstrumentation { public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback";
public static final String ENHANCE_METHOD = "onCompletion";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor";
@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {ENHANCE_CLASS});
}
}
- CallbackInstrumentation继承了AbstractKafkaInstrumentation,它使用org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor增强实现了org.apache.kafka.clients.producer.Callback接口的类的onCompletion方法
CallbackInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
public class CallbackInterceptor implements InstanceMethodsAroundInterceptor { @Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
if (null != cache) {
ContextSnapshot snapshot = getSnapshot(cache);
RecordMetadata metadata = (RecordMetadata) allArguments[0];
AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
ContextManager.continued(snapshot);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
if (null != cache) {
ContextSnapshot snapshot = getSnapshot(cache);
if (null != snapshot) {
Exception exceptions = (Exception) allArguments[1];
if (exceptions != null) {
ContextManager.activeSpan().errorOccurred().log(exceptions);
}
ContextManager.stopSpan();
}
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
private ContextSnapshot getSnapshot(CallbackCache cache) {
ContextSnapshot snapshot = cache.getSnapshot();
if (snapshot == null) {
snapshot = ((CallbackCache) ((EnhancedInstance) cache.getCallback()).getSkyWalkingDynamicField()).getSnapshot();
}
return snapshot;
}
}
- CallbackInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法设置Tags.MQ_TOPIC;其afterMethod方法有异常时执行ContextManager.activeSpan().errorOccurred().log(exceptions),然后执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)
KafkaConsumerInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation { public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe";
public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE));
}
@Override public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
- KafkaConsumerInstrumentation继承了AbstractKafkaInstrumentation,它增强的是org.apache.kafka.clients.consumer.KafkaConsumer类;它使用org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor增强其参数为org.apache.kafka.clients.consumer.ConsumerConfig的构造器;它使用org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor增强pollOnce、pollForFetches方法;它使用org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor增强第二个参数为org.apache.kafka.clients.consumer.ConsumerRebalanceListener的subscribe方法
ConsumerConstructorInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ConsumerConfig config = (ConsumerConfig)allArguments[0];
// set the bootstrap server address
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
requiredInfo.setGroupId(config.getString("group.id"));
objInst.setSkyWalkingDynamicField(requiredInfo);
}
}
- ConsumerConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法创建ConsumerEnhanceRequiredInfo并设置到objInst的skyWalkingDynamicField
KafkaConsumerInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor { public static final String OPERATE_NAME_PREFIX = "Kafka/";
public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
requiredInfo.setStartTime(System.currentTimeMillis());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>)ret;
//
// The entry span will only be created when the consumer received at least one message.
//
if (records.size() > 0) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(), null).start(requiredInfo.getStartTime());
activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
SpanLayer.asMQ(activeSpan);
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) {
for (ConsumerRecord<?, ?> record : consumerRecords) {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
Iterator<Header> iterator = record.headers().headers(next.getHeadKey()).iterator();
if (iterator.hasNext()) {
next.setHeadValue(new String(iterator.next().value()));
}
}
ContextManager.extract(contextCarrier);
}
}
ContextManager.stopSpan();
}
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
- KafkaConsumerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取ConsumerEnhanceRequiredInfo并设置其startTime;其afterMethod方法在records.size()大于0的时候设置Tags.MQ_BROKER、Tags.MQ_TOPIC,然后从ConsumerRecord提取contextCarrier.items()指定的header,然后执行ContextManager.extract(contextCarrier),最后执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)
SubscribeMethodInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor { @Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
requiredInfo.setTopics((Collection<String>)allArguments[0]);
objInst.setSkyWalkingDynamicField(requiredInfo);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
- SubscribeMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法创建ConsumerEnhanceRequiredInfo并设置到objInst的skyWalkingDynamicField;其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)
KafkaProducerInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
public class KafkaProducerInstrumentation extends AbstractKafkaInstrumentation { public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
public static final String ENHANCE_METHOD = "doSend";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor";
public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
- KafkaProducerInstrumentation继承了AbstractKafkaInstrumentation,它增强的是org.apache.kafka.clients.producer.KafkaProducer类;它使用org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor增强了其参数为org.apache.kafka.clients.producer.ProducerConfig的构造器;它使用org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor增强其doSend方法
ProducerConstructorInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor { @Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ProducerConfig config = (ProducerConfig) allArguments[0];
objInst.setSkyWalkingDynamicField(StringUtil.join(";", config.getList("bootstrap.servers").toArray(new String[0])));
}
}
- ProducerConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct将bootstrap.servers信息设置到objInst的skyWalkingDynamicField
KafkaProducerInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor { public static final String OPERATE_NAME_PREFIX = "Kafka/";
public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
ProducerRecord record = (ProducerRecord) allArguments[0];
String topicName = record.topic();
AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst.getSkyWalkingDynamicField());
Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, topicName);
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
}
EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
if (null != callbackInstance) {
ContextSnapshot snapshot = ContextManager.capture();
if (null != snapshot) {
CallbackCache cache = new CallbackCache();
cache.setSnapshot(snapshot);
callbackInstance.setSkyWalkingDynamicField(cache);
}
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
- KafkaProducerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法设置Tags.MQ_BROKER、Tags.MQ_TOPIC,然后将contextCarrier.items()的信息设置到ProducerRecord的header中,另外若ContextSnapshot不为null则执行callbackInstance.setSkyWalkingDynamicField(cache);其afterMethod方法执行ContextManager.stopSpan()
KafkaProducerMapInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
public class KafkaProducerMapInstrumentation extends AbstractKafkaInstrumentation { public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor";
public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "java.util.Map";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
- KafkaProducerMapInstrumentation继承了AbstractKafkaInstrumentation,它使用org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor增强了org.apache.kafka.clients.producer.KafkaProducer参数为java.util.Map的构造器
ProducerConstructorMapInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor { @Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Map<String, Object> config = (Map<String, Object>) allArguments[0];
// prevent errors caused by secondary interception in kafkaTemplate
if (objInst.getSkyWalkingDynamicField() == null) {
objInst.setSkyWalkingDynamicField(StringUtil.join(";", ((String) config.get("bootstrap.servers")).split(",")));
}
}
}
- ProducerConstructorMapInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法将bootstrap.servers信息设置到objInst的skyWalkingDynamicField
KafkaTemplateInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java
public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumentation { private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate";
private static final String ENHANCE_METHOD = "buildCallback";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
- KafkaTemplateInstrumentation继承了AbstractKafkaTemplateInstrumentation;它使用org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor增强了org.springframework.kafka.core.KafkaTemplate的buildCallback方法
KafkaTemplateCallbackInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor { @Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst);
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
- KafkaTemplateCallbackInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法将org.apache.kafka.clients.producer.Callback包装为CallbackAdapter返回
KafkaTemplateCallbackInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java
public class KafkaTemplateCallbackInstrumentation extends AbstractKafkaTemplateInstrumentation { private static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback";
private static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.producer.Callback";
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
- KafkaTemplateCallbackInstrumentation继承了AbstractKafkaTemplateInstrumentation;它使用org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor增强了org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback的参数为org.apache.kafka.clients.producer.Callback的构造器
CallbackConstructorInterceptor
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor { @Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Callback callback = (Callback) allArguments[0];
CallbackCache cache;
if (null != objInst.getSkyWalkingDynamicField()) {
cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
} else {
cache = new CallbackCache();
}
cache.setCallback(callback);
objInst.setSkyWalkingDynamicField(cache);
}
}
- CallbackConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法将CallbackCache设置到objInst的skyWalkingDynamicField
小结
skywalking的kafka-plugin提供了CallbackInstrumentation、KafkaConsumerInstrumentation、KafkaProducerInstrumentation、KafkaProducerMapInstrumentation、KafkaTemplateInstrumentation、KafkaTemplateCallbackInstrumentation这几个增强
doc
- AbstractKafkaInstrumentation
- AbstractKafkaTemplateInstrumentation
- CallbackInstrumentation
- KafkaConsumerInstrumentation
- KafkaProducerInstrumentation
- KafkaProducerMapInstrumentation
- KafkaTemplateInstrumentation
- KafkaTemplateCallbackInstrumentation
以上是 聊聊skywalking的kafkaplugin 的全部内容, 来源链接: utcz.com/z/514475.html