聊聊rocketmqclientgo的pushConsumer

编程

本文主要研究一下rocketmq-client-go的pushConsumer

pushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

type pushConsumer struct {

*defaultConsumer

queueFlowControlTimes int

queueMaxSpanFlowControlTimes int

consumeFunc utils.Set

submitToConsume func(*processQueue, *primitive.MessageQueue)

subscribedTopic map[string]string

interceptor primitive.Interceptor

queueLock *QueueLock

done chan struct{}

closeOnce sync.Once

}

  • pushConsumer定义了queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce属性

NewPushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func NewPushConsumer(opts ...Option) (*pushConsumer, error) {

defaultOpts := defaultPushConsumerOptions()

for _, apply := range opts {

apply(&defaultOpts)

}

srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)

if err != nil {

return nil, errors.Wrap(err, "new Namesrv failed.")

}

if !defaultOpts.Credentials.IsEmpty() {

srvs.SetCredentials(defaultOpts.Credentials)

}

defaultOpts.Namesrv = srvs

if defaultOpts.Namespace != "" {

defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName

}

dc := &defaultConsumer{

client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),

consumerGroup: defaultOpts.GroupName,

cType: _PushConsume,

state: int32(internal.StateCreateJust),

prCh: make(chan PullRequest, 4),

model: defaultOpts.ConsumerModel,

consumeOrderly: defaultOpts.ConsumeOrderly,

fromWhere: defaultOpts.FromWhere,

allocate: defaultOpts.Strategy,

option: defaultOpts,

namesrv: srvs,

}

p := &pushConsumer{

defaultConsumer: dc,

subscribedTopic: make(map[string]string, 0),

queueLock: newQueueLock(),

done: make(chan struct{}, 1),

consumeFunc: utils.NewSet(),

}

dc.mqChanged = p.messageQueueChanged

if p.consumeOrderly {

p.submitToConsume = p.consumeMessageOrderly

} else {

p.submitToConsume = p.consumeMessageCurrently

}

p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)

return p, nil

}

  • NewPushConsumer方法实例化defaultConsumer及pushConsumer

Start

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Start() error {

var err error

pc.once.Do(func() {

rlog.Info("the consumer start beginning", map[string]interface{}{

rlog.LogKeyConsumerGroup: pc.consumerGroup,

"messageModel": pc.model,

"unitMode": pc.unitMode,

})

atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))

pc.validate()

err = pc.client.RegisterConsumer(pc.consumerGroup, pc)

if err != nil {

rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{

rlog.LogKeyConsumerGroup: pc.consumerGroup,

})

err = ErrCreated

return

}

err = pc.defaultConsumer.start()

if err != nil {

return

}

go func() {

// todo start clean msg expired

for {

select {

case pr := <-pc.prCh:

go func() {

pc.pullMessage(&pr)

}()

case <-pc.done:

rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{

rlog.LogKeyConsumerGroup: pc.consumerGroup,

})

return

}

}

}()

go primitive.WithRecover(func() {

// initial lock.

if !pc.consumeOrderly {

return

}

time.Sleep(1000 * time.Millisecond)

pc.lockAll()

lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)

defer lockTicker.Stop()

for {

select {

case <-lockTicker.C:

pc.lockAll()

case <-pc.done:

rlog.Info("push consumer close tick.", map[string]interface{}{

rlog.LogKeyConsumerGroup: pc.consumerGroup,

})

return

}

}

})

})

if err != nil {

return err

}

pc.client.UpdateTopicRouteInfo()

for k := range pc.subscribedTopic {

_, exist := pc.topicSubscribeInfoTable.Load(k)

if !exist {

pc.client.Shutdown()

return fmt.Errorf("the topic=%s route info not found, it may not exist", k)

}

}

pc.client.CheckClientInBroker()

pc.client.SendHeartbeatToAllBrokerWithLock()

pc.client.RebalanceImmediately()

return err

}

  • Start方法执行pc.client.RegisterConsumer及pc.defaultConsumer.start(),然后异步执行pc.pullMessage(&pr);对于非consumeOrderly则通过time.NewTicker创建lockTicker,执行pc.lockAll();之后执行pc.client.UpdateTopicRouteInfo()、pc.client.CheckClientInBroker()、pc.client.SendHeartbeatToAllBrokerWithLock()及pc.client.RebalanceImmediately()

Shutdown

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Shutdown() error {

var err error

pc.closeOnce.Do(func() {

close(pc.done)

pc.client.UnregisterConsumer(pc.consumerGroup)

err = pc.defaultConsumer.shutdown()

})

return err

}

  • Shutdown方法则执行pc.client.UnregisterConsumer及pc.defaultConsumer.shutdown()

Subscribe

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,

f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {

if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {

return errors.New("subscribe topic only started before")

}

if pc.option.Namespace != "" {

topic = pc.option.Namespace + "%" + topic

}

data := buildSubscriptionData(topic, selector)

pc.subscriptionDataTable.Store(topic, data)

pc.subscribedTopic[topic] = ""

pc.consumeFunc.Add(&PushConsumerCallback{

f: f,

topic: topic,

})

return nil

}

  • Subscribe方法先通过buildSubscriptionData构建data,之后执行pc.subscriptionDataTable.Store(topic, data)及pc.consumeFunc.Add

pullMessage

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) pullMessage(request *PullRequest) {

rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{

rlog.LogKeyPullRequest: request.String(),

})

var sleepTime time.Duration

pq := request.pq

go primitive.WithRecover(func() {

for {

select {

case <-pc.done:

rlog.Info("push consumer close pullMessage.", map[string]interface{}{

rlog.LogKeyConsumerGroup: pc.consumerGroup,

})

return

default:

pc.submitToConsume(request.pq, request.mq)

}

}

})

for {

NEXT:

select {

case <-pc.done:

rlog.Info("push consumer close message handle.", map[string]interface{}{

rlog.LogKeyConsumerGroup: pc.consumerGroup,

})

return

default:

}

if pq.IsDroppd() {

rlog.Debug("the request was dropped, so stop task", map[string]interface{}{

rlog.LogKeyPullRequest: request.String(),

})

return

}

if sleepTime > 0 {

rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)

time.Sleep(sleepTime)

}

// reset time

sleepTime = pc.option.PullInterval

pq.lastPullTime = time.Now()

err := pc.makeSureStateOK()

if err != nil {

rlog.Warning("consumer state error", map[string]interface{}{

rlog.LogKeyUnderlayError: err.Error(),

})

sleepTime = _PullDelayTimeWhenError

goto NEXT

}

if pc.pause {

rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",

pc.option.InstanceName, pc.consumerGroup, request.String()), nil)

sleepTime = _PullDelayTimeWhenSuspend

goto NEXT

}

cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)

if pq.cachedMsgCount > pc.option.PullThresholdForQueue {

if pc.queueFlowControlTimes%1000 == 0 {

rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{

"PullThresholdForQueue": pc.option.PullThresholdForQueue,

"minOffset": pq.Min(),

"maxOffset": pq.Max(),

"count": pq.msgCache,

"size(MiB)": cachedMessageSizeInMiB,

"flowControlTimes": pc.queueFlowControlTimes,

rlog.LogKeyPullRequest: request.String(),

})

}

pc.queueFlowControlTimes++

sleepTime = _PullDelayTimeWhenFlowControl

goto NEXT

}

if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {

if pc.queueFlowControlTimes%1000 == 0 {

rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{

"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,

"minOffset": pq.Min(),

"maxOffset": pq.Max(),

"count": pq.msgCache,

"size(MiB)": cachedMessageSizeInMiB,

"flowControlTimes": pc.queueFlowControlTimes,

rlog.LogKeyPullRequest: request.String(),

})

}

pc.queueFlowControlTimes++

sleepTime = _PullDelayTimeWhenFlowControl

goto NEXT

}

if !pc.consumeOrderly {

if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {

if pc.queueMaxSpanFlowControlTimes%1000 == 0 {

rlog.Warning("the queue"s messages span too long, so do flow control", map[string]interface{}{

"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,

"minOffset": pq.Min(),

"maxOffset": pq.Max(),

"maxSpan": pq.getMaxSpan(),

"flowControlTimes": pc.queueFlowControlTimes,

rlog.LogKeyPullRequest: request.String(),

})

}

sleepTime = _PullDelayTimeWhenFlowControl

goto NEXT

}

} else {

if pq.IsLock() {

if !request.lockedFirst {

offset := pc.computePullFromWhere(request.mq)

brokerBusy := offset < request.nextOffset

rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{

rlog.LogKeyPullRequest: request.String(),

rlog.LogKeyValueChangedFrom: request.nextOffset,

rlog.LogKeyValueChangedTo: offset,

"brokerBusy": brokerBusy,

})

if brokerBusy {

rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+

"broker consume offset", map[string]interface{}{"offset": offset})

}

request.lockedFirst = true

request.nextOffset = offset

}

} else {

rlog.Info("pull message later because not locked in broker", map[string]interface{}{

rlog.LogKeyPullRequest: request.String(),

})

sleepTime = _PullDelayTimeWhenError

goto NEXT

}

}

v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)

if !exist {

rlog.Info("find the consumer"s subscription failed", map[string]interface{}{

rlog.LogKeyPullRequest: request.String(),

})

sleepTime = _PullDelayTimeWhenError

goto NEXT

}

beginTime := time.Now()

var (

commitOffsetEnable bool

commitOffsetValue int64

subExpression string

)

if pc.model == Clustering {

commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)

if commitOffsetValue > 0 {

commitOffsetEnable = true

}

}

sd := v.(*internal.SubscriptionData)

classFilter := sd.ClassFilterMode

if pc.option.PostSubscriptionWhenPull && classFilter {

subExpression = sd.SubString

}

sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)

pullRequest := &internal.PullMessageRequestHeader{

ConsumerGroup: pc.consumerGroup,

Topic: request.mq.Topic,

QueueId: int32(request.mq.QueueId),

QueueOffset: request.nextOffset,

MaxMsgNums: pc.option.PullBatchSize,

SysFlag: sysFlag,

CommitOffset: commitOffsetValue,

SubExpression: _SubAll,

ExpressionType: string(TAG),

SuspendTimeoutMillis: 20 * time.Second,

}

//

//if data.ExpType == string(TAG) {

// pullRequest.SubVersion = 0

//} else {

// pullRequest.SubVersion = data.SubVersion

//}

brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)

if brokerResult == nil {

rlog.Warning("no broker found for mq", map[string]interface{}{

rlog.LogKeyPullRequest: request.mq.String(),

})

sleepTime = _PullDelayTimeWhenError

goto NEXT

}

if brokerResult.Slave {

pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)

}

result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)

if err != nil {

rlog.Warning("pull message from broker error", map[string]interface{}{

rlog.LogKeyBroker: brokerResult.BrokerAddr,

rlog.LogKeyUnderlayError: err.Error(),

})

sleepTime = _PullDelayTimeWhenError

goto NEXT

}

if result.Status == primitive.PullBrokerTimeout {

rlog.Warning("pull broker timeout", map[string]interface{}{

rlog.LogKeyBroker: brokerResult.BrokerAddr,

})

sleepTime = _PullDelayTimeWhenError

goto NEXT

}

switch result.Status {

case primitive.PullFound:

rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)

prevRequestOffset := request.nextOffset

request.nextOffset = result.NextBeginOffset

rt := time.Now().Sub(beginTime) / time.Millisecond

increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))

pc.processPullResult(request.mq, result, sd)

msgFounded := result.GetMessageExts()

firstMsgOffset := int64(math.MaxInt64)

if msgFounded != nil && len(msgFounded) != 0 {

firstMsgOffset = msgFounded[0].QueueOffset

increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))

pq.putMessage(msgFounded...)

}

if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {

rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{

"nextBeginOffset": result.NextBeginOffset,

"firstMsgOffset": firstMsgOffset,

"prevRequestOffset": prevRequestOffset,

})

}

case primitive.PullNoNewMsg:

rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",

request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)

case primitive.PullNoMsgMatched:

request.nextOffset = result.NextBeginOffset

pc.correctTagsOffset(request)

case primitive.PullOffsetIllegal:

rlog.Warning("the pull request offset illegal", map[string]interface{}{

rlog.LogKeyPullRequest: request.String(),

"result": result.String(),

})

request.nextOffset = result.NextBeginOffset

pq.WithDropped(true)

time.Sleep(10 * time.Second)

pc.storage.update(request.mq, request.nextOffset, false)

pc.storage.persist([]*primitive.MessageQueue{request.mq})

pc.processQueueTable.Delete(request.mq)

rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)

default:

rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)

sleepTime = _PullDelayTimeWhenError

}

}

}

  • pullMessage方法会创建internal.PullMessageRequestHeader,之后通过pc.defaultConsumer.tryFindBroker获取brokerResult,之后执行pc.client.PullMessage获取result;对于result.Status为primitive.PullFound执行pc.processPullResult、pq.putMessage提交到processQueue;pc.submitToConsume(request.pq, request.mq)对于p.consumeOrderly执行的是p.consumeMessageOrderly,否则执行的是p.consumeMessageCurrently,他们都会执行pc.consumeInner

consumeInner

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {

if len(subMsgs) == 0 {

return ConsumeRetryLater, errors.New("msg list empty")

}

f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)

// fix lost retry message

if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {

f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))

}

if !exist {

return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)

}

callback, ok := f.(*PushConsumerCallback)

if !ok {

return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)

}

if pc.interceptor == nil {

return callback.f(ctx, subMsgs...)

} else {

var container ConsumeResultHolder

err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {

msgs := req.([]*primitive.MessageExt)

r, e := callback.f(ctx, msgs...)

realReply := reply.(*ConsumeResultHolder)

realReply.ConsumeResult = r

msgCtx, _ := primitive.GetConsumerCtx(ctx)

msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess

if realReply.ConsumeResult == ConsumeSuccess {

msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)

} else {

msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)

}

return e

})

return container.ConsumeResult, err

}

}

  • consumeInner方法会触发f.(*PushConsumerCallback)

小结

pushConsumer是对pull模式的封装,拉到消息之后若consumeOrderly则执行consumeMessageOrderly,否则执行的是consumeMessageCurrently,他们内部调用了consumeInner,会触发PushConsumerCallback回调

doc

  • push_consumer

以上是 聊聊rocketmqclientgo的pushConsumer 的全部内容, 来源链接: utcz.com/z/518222.html

回到顶部