聊聊rocketmqclientgo的defaultProducer

编程

本文主要研究一下rocketmq-client-go的defaultProducer

defaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type defaultProducer struct {

group string

client internal.RMQClient

state int32

options producerOptions

publishInfo sync.Map

callbackCh chan interface{}

interceptor primitive.Interceptor

}

  • defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor

NewDefaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {

defaultOpts := defaultProducerOptions()

for _, apply := range opts {

apply(&defaultOpts)

}

srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)

if err != nil {

return nil, errors.Wrap(err, "new Namesrv failed.")

}

if !defaultOpts.Credentials.IsEmpty() {

srvs.SetCredentials(defaultOpts.Credentials)

}

defaultOpts.Namesrv = srvs

producer := &defaultProducer{

group: defaultOpts.GroupName,

callbackCh: make(chan interface{}),

options: defaultOpts,

}

producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)

producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)

return producer, nil

}

  • NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptors

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Start() error {

atomic.StoreInt32(&p.state, int32(internal.StateRunning))

if len(p.options.NameServerAddrs) == 0 {

p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)

}

p.client.RegisterProducer(p.group, p)

p.client.Start()

return nil

}

  • Start方法之执行p.client.RegisterProducer及p.client.Start()

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Shutdown() error {

atomic.StoreInt32(&p.state, int32(internal.StateShutdown))

p.client.UnregisterProducer(p.group)

p.client.Shutdown()

return nil

}

  • Shutdown方法执行p.client.UnregisterProducer及p.client.Shutdown()

SendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error) {

if err := p.checkMsg(msgs...); err != nil {

return nil, err

}

msg := p.encodeBatch(msgs...)

resp := new(primitive.SendResult)

if p.interceptor != nil {

primitive.WithMethod(ctx, primitive.SendSync)

producerCtx := &primitive.ProducerCtx{

ProducerGroup: p.group,

CommunicationMode: primitive.SendSync,

BornHost: utils.LocalIP,

Message: *msg,

SendResult: resp,

}

ctx = primitive.WithProducerCtx(ctx, producerCtx)

err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {

var err error

realReq := req.(*primitive.Message)

realReply := reply.(*primitive.SendResult)

err = p.sendSync(ctx, realReq, realReply)

return err

})

return resp, err

}

err := p.sendSync(ctx, msg, resp)

return resp, err

}

  • SendSync方法首先通过p.checkMsg校验消息,然后通过p.encodeBatch编码,之后对于p.interceptor不为null的执行p.interceptor,最后执行p.sendSync(ctx, msg, resp)

sendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {

retryTime := 1 + p.options.RetryTimes

var (

err error

)

if p.options.Namespace != "" {

msg.Topic = p.options.Namespace + "%" + msg.Topic

}

var producerCtx *primitive.ProducerCtx

for retryCount := 0; retryCount < retryTime; retryCount++ {

mq := p.selectMessageQueue(msg)

if mq == nil {

err = fmt.Errorf("the topic=%s route info not found", msg.Topic)

continue

}

addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)

if addr == "" {

return fmt.Errorf("topic=%s route info not found", mq.Topic)

}

if p.interceptor != nil {

producerCtx = primitive.GetProducerCtx(ctx)

producerCtx.BrokerAddr = addr

producerCtx.MQ = *mq

}

res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)

if _err != nil {

err = _err

continue

}

return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)

}

return err

}

  • sendSync会重试retryCount,每次是先通过p.selectMessageQueue(msg)选择mq,然后通过p.options.Namesrv.FindBrokerAddrByName寻找addr,最后执行p.client.InvokeSync(ctx, addr, p.buildSendRequest

SendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ...*primitive.Message) error {

if err := p.checkMsg(msgs...); err != nil {

return err

}

msg := p.encodeBatch(msgs...)

if p.interceptor != nil {

primitive.WithMethod(ctx, primitive.SendAsync)

return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {

return p.sendAsync(ctx, msg, f)

})

}

return p.sendAsync(ctx, msg, f)

}

  • SendAsync方法主要是执行p.sendAsync(ctx, msg, f)

sendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {

if p.options.Namespace != "" {

msg.Topic = p.options.Namespace + "%" + msg.Topic

}

mq := p.selectMessageQueue(msg)

if mq == nil {

return errors.Errorf("the topic=%s route info not found", msg.Topic)

}

addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)

if addr == "" {

return errors.Errorf("topic=%s route info not found", mq.Topic)

}

ctx, _ = context.WithTimeout(ctx, 3*time.Second)

return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {

resp := new(primitive.SendResult)

if err != nil {

h(ctx, nil, err)

} else {

p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)

h(ctx, resp, nil)

}

})

}

  • sendAsync主要是执行p.client.InvokeAsync

SendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error {

if err := p.checkMsg(msgs...); err != nil {

return err

}

msg := p.encodeBatch(msgs...)

if p.interceptor != nil {

primitive.WithMethod(ctx, primitive.SendOneway)

return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {

return p.SendOneWay(ctx, msg)

})

}

return p.sendOneWay(ctx, msg)

}

  • SendOneWay主要是执行p.sendOneWay(ctx, msg)

sendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {

retryTime := 1 + p.options.RetryTimes

if p.options.Namespace != "" {

msg.Topic = p.options.Namespace + "%" + msg.Topic

}

var err error

for retryCount := 0; retryCount < retryTime; retryCount++ {

mq := p.selectMessageQueue(msg)

if mq == nil {

err = fmt.Errorf("the topic=%s route info not found", msg.Topic)

continue

}

addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)

if addr == "" {

return fmt.Errorf("topic=%s route info not found", mq.Topic)

}

_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)

if _err != nil {

err = _err

continue

}

return nil

}

return err

}

  • sendOneWay主要是重试执行p.client.InvokeOneWay

小结

defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor;它提供了NewDefaultProducer、Start、Shutdown、SendSync、SendAsync、SendOneWay方法

doc

  • producer

以上是 聊聊rocketmqclientgo的defaultProducer 的全部内容, 来源链接: utcz.com/z/518054.html

回到顶部