聊聊skywalking的rocketmqplugin

编程

skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def

rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageConcurrentlyInstrumentation

rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation

rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation

rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation

  • skywalking的rocketmq-plugin定义了4个增强,分别是ConsumeMessageConcurrentlyInstrumentation、ConsumeMessageOrderlyInstrumentation、MQClientAPIImplInstrumentation、SendCallbackInstrumentation

ConsumeMessageConcurrentlyInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java

public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently";

private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";

private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageConcurrentlyConsumeInterceptor";

@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[] {

new InstanceMethodsInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(CONSUMER_MESSAGE_METHOD);

}

@Override public String getMethodsInterceptor() {

return INTERCEPTOR_CLASS;

}

@Override public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override protected ClassMatch enhanceClass() {

return HierarchyMatch.byHierarchyMatch(new String[] {ENHANCE_CLASS});

}

}

  • ConsumeMessageConcurrentlyInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageConcurrentlyConsumeInterceptor增强org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently的consumeMessage方法

AbstractMessageConsumeInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java

public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {

public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override

public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

List<MessageExt> msgs = (List<MessageExt>)allArguments[0];

ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));

AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);

span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);

SpanLayer.asMQ(span);

for (int i = 1; i < msgs.size(); i++) {

ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));

}

}

@Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

ContextManager.activeSpan().errorOccurred().log(t);

}

private ContextCarrier getContextCarrierFromMessage(MessageExt message) {

ContextCarrier contextCarrier = new ContextCarrier();

CarrierItem next = contextCarrier.items();

while (next.hasNext()) {

next = next.next();

next.setHeadValue(message.getUserProperty(next.getHeadKey()));

}

return contextCarrier;

}

}

  • AbstractMessageConsumeInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

MessageConcurrentlyConsumeInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java

public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret;

if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {

AbstractSpan activeSpan = ContextManager.activeSpan();

activeSpan.errorOccurred();

Tags.STATUS_CODE.set(activeSpan, status.name());

}

ContextManager.stopSpan();

return ret;

}

}

  • MessageConcurrentlyConsumeInterceptor继承了AbstractMessageConsumeInterceptor,其afterMethod方法获取ConsumeConcurrentlyStatus,在该值为ConsumeConcurrentlyStatus.RECONSUME_LATER时执行activeSpan.errorOccurred()并设置STATUS_CODE,最后执行ContextManager.stopSpan()

ConsumeMessageOrderlyInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java

public class ConsumeMessageOrderlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly";

private static final String ENHANCE_METHOD = "consumeMessage";

private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageOrderlyConsumeInterceptor";

@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[] {

new InstanceMethodsInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(ENHANCE_METHOD);

}

@Override public String getMethodsInterceptor() {

return INTERCEPTOR_CLASS;

}

@Override public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override protected ClassMatch enhanceClass() {

return byHierarchyMatch(new String[] {ENHANCE_CLASS});

}

}

  • ConsumeMessageOrderlyInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageOrderlyConsumeInterceptor增强org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly的consumeMessage方法

MessageOrderlyConsumeInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java

public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret;

if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {

AbstractSpan activeSpan = ContextManager.activeSpan();

activeSpan.errorOccurred();

Tags.STATUS_CODE.set(activeSpan, status.name());

}

ContextManager.stopSpan();

return ret;

}

}

  • MessageOrderlyConsumeInterceptor继承了AbstractMessageConsumeInterceptor,其afterMethod方法获取ConsumeOrderlyStatus,在该值为ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT时执行activeSpan.errorOccurred()并设置STATUS_CODE,最后执行ContextManager.stopSpan()

MQClientAPIImplInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java

public class MQClientAPIImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.MQClientAPIImpl";

private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";

private static final String ASYNC_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor";

public static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v4.UpdateNameServerInterceptor";

public static final String UPDATE_NAME_SERVER_METHOD_NAME = "updateNameServerAddressList";

@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[] {

new InstanceMethodsInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(SEND_MESSAGE_METHOD_NAME).and(takesArguments(12));

}

@Override public String getMethodsInterceptor() {

return ASYNC_METHOD_INTERCEPTOR;

}

@Override public boolean isOverrideArgs() {

return false;

}

},

new InstanceMethodsInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(UPDATE_NAME_SERVER_METHOD_NAME);

}

@Override public String getMethodsInterceptor() {

return UPDATE_NAME_SERVER_INTERCEPT_CLASS;

}

@Override public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override protected ClassMatch enhanceClass() {

return byName(ENHANCE_CLASS);

}

}

  • MQClientAPIImplInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它增强的是org.apache.rocketmq.client.impl.MQClientAPIImpl类;它使用org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor增强其sendMessage方法;它使用org.apache.skywalking.apm.plugin.rocketMQ.v4.UpdateNameServerInterceptor增强其updateNameServerAddressList方法

MessageSendInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java

public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {

public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

Message message = (Message)allArguments[2];

ContextCarrier contextCarrier = new ContextCarrier();

String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());

AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);

span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);

Tags.MQ_BROKER.set(span, (String)allArguments[0]);

Tags.MQ_TOPIC.set(span, message.getTopic());

SpanLayer.asMQ(span);

SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3];

StringBuilder properties = new StringBuilder(requestHeader.getProperties());

CarrierItem next = contextCarrier.items();

while (next.hasNext()) {

next = next.next();

if (!StringUtil.isEmpty(next.getHeadValue())) {

properties.append(next.getHeadKey());

properties.append(NAME_VALUE_SEPARATOR);

properties.append(next.getHeadValue());

properties.append(PROPERTY_SEPARATOR);

}

}

requestHeader.setProperties(properties.toString());

if (allArguments[6] != null) {

((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture()));

}

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

ContextManager.stopSpan();

return ret;

}

@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

ContextManager.activeSpan().errorOccurred().log(t);

}

private String buildOperationName(String topicName) {

return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";

}

}

  • MessageSendInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法设置MQ_BROKER、MQ_TOPIC,并将contextCarrier.items()放到SendMessageRequestHeader的properties进行透传;其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

UpdateNameServerInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/UpdateNameServerInterceptor.java

public class UpdateNameServerInterceptor implements InstanceMethodsAroundInterceptor {

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

objInst.setSkyWalkingDynamicField(allArguments[0]);

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

return ret;

}

@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

}

}

  • UpdateNameServerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行objInst.setSkyWalkingDynamicField(allArguments[0])

SendCallbackInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java

public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.producer.SendCallback";

private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";

private static final String ON_SUCCESS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor";

private static final String ON_EXCEPTION_METHOD = "onException";

private static final String ON_EXCEPTION_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor";

@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[0];

}

@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[] {

new InstanceMethodsInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, "org.apache.rocketmq.client.producer.SendResult"));

}

@Override public String getMethodsInterceptor() {

return ON_SUCCESS_INTERCEPTOR;

}

@Override public boolean isOverrideArgs() {

return false;

}

},

new InstanceMethodsInterceptPoint() {

@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {

return named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable"));

}

@Override public String getMethodsInterceptor() {

return ON_EXCEPTION_INTERCEPTOR;

}

@Override public boolean isOverrideArgs() {

return false;

}

}

};

}

@Override protected ClassMatch enhanceClass() {

return byHierarchyMatch(new String[] {ENHANCE_CLASS});

}

}

  • SendCallbackInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它增强的是org.apache.rocketmq.client.producer.SendCallback类;它使用org.apache.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor增强其第一个参数类型为org.apache.rocketmq.client.producer.SendResult的onSuccess方法;它使用org.apache.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor增强其第一个参数类型为java.lang.Throwable的onException方法

OnSuccessInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java

public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor {

public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();

AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback");

activeSpan.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);

SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus();

if (sendStatus != SendStatus.SEND_OK) {

activeSpan.errorOccurred();

Tags.STATUS_CODE.set(activeSpan, sendStatus.name());

}

ContextManager.continued(enhanceInfo.getContextSnapshot());

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

ContextManager.stopSpan();

return ret;

}

@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

ContextManager.activeSpan().errorOccurred().log(t);

}

}

  • OnSuccessInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.continued(enhanceInfo.getContextSnapshot());其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

OnExceptionInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java

public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor {

public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();

AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback");

activeSpan.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);

activeSpan.errorOccurred().log((Throwable)allArguments[0]);

ContextManager.continued(enhanceInfo.getContextSnapshot());

}

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,

Object ret) throws Throwable {

ContextManager.stopSpan();

return ret;

}

@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,

Class<?>[] argumentsTypes, Throwable t) {

ContextManager.activeSpan().log(t);

}

}

  • OnExceptionInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行activeSpan.errorOccurred().log((Throwable)allArguments[0])以及ContextManager.continued(enhanceInfo.getContextSnapshot());其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().log(t)方法

小结

skywalking的rocketmq-plugin定义了4个增强,分别是ConsumeMessageConcurrentlyInstrumentation、ConsumeMessageOrderlyInstrumentation、MQClientAPIImplInstrumentation、SendCallbackInstrumentation

doc

  • ConsumeMessageConcurrentlyInstrumentation
  • ConsumeMessageOrderlyInstrumentation
  • MQClientAPIImplInstrumentation
  • SendCallbackInstrumentation

以上是 聊聊skywalking的rocketmqplugin 的全部内容, 来源链接: utcz.com/z/514345.html

回到顶部