聊聊rocketmqclientgo的PullConsumer

编程

本文主要研究一下rocketmq-client-go的PullConsumer

PullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type PullConsumer interface {

// Start

Start()

// Shutdown refuse all new pull operation, finish all submitted.

Shutdown()

// Pull pull message of topic, selector indicate which queue to pull.

Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)

// PullFrom pull messages of queue from the offset to offset + numbers

PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

// updateOffset update offset of queue in mem

UpdateOffset(queue *primitive.MessageQueue, offset int64) error

// PersistOffset persist all offset in mem.

PersistOffset(ctx context.Context) error

// CurrentOffset return the current offset of queue in mem.

CurrentOffset(queue *primitive.MessageQueue) (int64, error)

}

  • PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset方法

defaultPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type defaultPullConsumer struct {

*defaultConsumer

option consumerOptions

client internal.RMQClient

GroupName string

Model MessageModel

UnitMode bool

interceptor primitive.Interceptor

}

  • defaultPullConsumer定义了consumerOptions、client、GroupName、Model、UnitMode属性

NewPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {

defaultOpts := defaultPullConsumerOptions()

for _, apply := range options {

apply(&defaultOpts)

}

srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)

if err != nil {

return nil, errors.Wrap(err, "new Namesrv failed.")

}

dc := &defaultConsumer{

client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),

consumerGroup: defaultOpts.GroupName,

cType: _PullConsume,

state: int32(internal.StateCreateJust),

prCh: make(chan PullRequest, 4),

model: defaultOpts.ConsumerModel,

option: defaultOpts,

namesrv: srvs,

}

c := &defaultPullConsumer{

defaultConsumer: dc,

}

return c, nil

}

  • NewPullConsumer方法实例化defaultConsumer

Start

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Start() error {

atomic.StoreInt32(&c.state, int32(internal.StateRunning))

var err error

c.once.Do(func() {

err = c.start()

if err != nil {

return

}

})

return err

}

  • Start方法执行defaultPullConsumer的start方法

Pull

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) {

mq := c.getNextQueueOf(topic)

if mq == nil {

return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)

}

data := buildSubscriptionData(mq.Topic, selector)

result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)

if err != nil {

return nil, err

}

c.processPullResult(mq, result, data)

return result, nil

}

  • Pull方法先通过c.getNextQueueOf(topic)获取mq,然后通过buildSubscriptionData(mq.Topic, selector)构造data,之后执行c.pull,最后执行c.processPullResult(mq, result, data)

getNextQueueOf

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {

queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)

if err != nil && len(queues) > 0 {

rlog.Error("get next mq error", map[string]interface{}{

rlog.LogKeyTopic: topic,

rlog.LogKeyUnderlayError: err.Error(),

})

return nil

}

var index int64

v, exist := queueCounterTable.Load(topic)

if !exist {

index = -1

queueCounterTable.Store(topic, 0)

} else {

index = v.(int64)

}

return queues[int(atomic.AddInt64(&index, 1))%len(queues)]

}

  • getNextQueueOf方法先通过c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)获取queues,然后执行queueCounterTable.Load(topic)获取index

PullFrom

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PullFrom pull messages of queue from the offset to offset + numbers

func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {

if err := c.checkPull(ctx, queue, offset, numbers); err != nil {

return nil, err

}

selector := MessageSelector{}

data := buildSubscriptionData(queue.Topic, selector)

return c.pull(ctx, queue, data, offset, numbers)

}

  • PullFrom方法先执行c.checkPull,之后通过buildSubscriptionData构造subscriptionData,最后通过c.pull(ctx, queue, data, offset, numbers)拉取数据

UpdateOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// updateOffset update offset of queue in mem

func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error {

return c.updateOffset(queue, offset)

}

  • UpdateOffset方法通过defaultPullConsumer来提交

PersistOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PersistOffset persist all offset in mem.

func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {

return c.persistConsumerOffset()

}

  • PersistOffset方法通过defaultPullConsumer来持久化

CurrentOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// CurrentOffset return the current offset of queue in mem.

func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) {

v := c.queryOffset(queue)

return v, nil

}

  • CurrentOffset方法执行c.queryOffset(queue)

Shutdown

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// Shutdown close defaultConsumer, refuse new request.

func (c *defaultPullConsumer) Shutdown() error {

return c.defaultConsumer.shutdown()

}

  • Shutdown方法则执行c.defaultConsumer.shutdown()

小结

PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset方法

doc

  • pull_consumer

以上是 聊聊rocketmqclientgo的PullConsumer 的全部内容, 来源链接: utcz.com/z/518194.html

回到顶部