聊聊rocketmqclientgo的TraceInterceptor

编程

本文主要研究一下rocketmq-client-go的TraceInterceptor

TraceInterceptor

rocketmq-client-go-v2.0.0/producer/interceptor.go

// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.

func WithTrace(traceCfg *primitive.TraceConfig) Option {

return func(options *producerOptions) {

ori := options.Interceptors

options.Interceptors = make([]primitive.Interceptor, 0)

options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg))

options.Interceptors = append(options.Interceptors, ori...)

}

}

  • WithTrace方法在options.Interceptors后追加TraceInterceptor

newTraceInterceptor

rocketmq-client-go-v2.0.0/producer/interceptor.go

func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {

dispatcher := internal.NewTraceDispatcher(traceCfg)

dispatcher.Start()

return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {

beginT := time.Now()

err := next(ctx, req, reply)

producerCtx := primitive.GetProducerCtx(ctx)

if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {

return next(ctx, req, reply)

}

// SendOneway && SendAsync has no reply.

if reply == nil {

return err

}

result := reply.(*primitive.SendResult)

if result.RegionID == "" || !result.TraceOn {

return err

}

sendSuccess := result.Status == primitive.SendOK

costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)

storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2

traceBean := internal.TraceBean{

Topic: producerCtx.Message.Topic,

Tags: producerCtx.Message.GetTags(),

Keys: producerCtx.Message.GetKeys(),

StoreHost: producerCtx.BrokerAddr,

ClientHost: utils.LocalIP,

BodyLength: len(producerCtx.Message.Body),

MsgType: producerCtx.MsgType,

MsgId: result.MsgID,

OffsetMsgId: result.OffsetMsgID,

StoreTime: storeT,

}

traceCtx := internal.TraceContext{

RequestId: primitive.CreateUniqID(), // set id

TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),

TraceType: internal.Pub,

GroupName: producerCtx.ProducerGroup,

RegionId: result.RegionID,

TraceBeans: []internal.TraceBean{traceBean},

CostTime: costT,

IsSuccess: sendSuccess,

}

dispatcher.Append(traceCtx)

return err

}

}

  • newTraceInterceptor方法首先通过internal.NewTraceDispatcher(traceCfg)创建dispatcher,然后执行dispatcher.Start方法,之后返回一个func,该func会构造traceCtx,然后执行dispatcher.Append(traceCtx)

小结

WithTrace方法在options.Interceptors后追加TraceInterceptor;而newTraceInterceptor方法则创建TraceInterceptor

doc

  • interceptor

以上是 聊聊rocketmqclientgo的TraceInterceptor 的全部内容, 来源链接: utcz.com/z/518122.html

回到顶部