聊聊rocketmqclientgo的remoteBrokerOffsetStore

编程

本文主要研究一下rocketmq-client-go的remoteBrokerOffsetStore

remoteBrokerOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type remoteBrokerOffsetStore struct {

group string

OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`

client internal.RMQClient

namesrv internal.Namesrvs

mutex sync.RWMutex

}

  • remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性

NewRemoteOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {

return &remoteBrokerOffsetStore{

group: group,

client: client,

namesrv: namesrv,

OffsetTable: make(map[primitive.MessageQueue]int64),

}

}

  • NewRemoteOffsetStore方法实例化remoteBrokerOffsetStore

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {

r.mutex.Lock()

defer r.mutex.Unlock()

if len(mqs) == 0 {

return

}

used := make(map[primitive.MessageQueue]struct{}, 0)

for _, mq := range mqs {

used[*mq] = struct{}{}

}

for mq, off := range r.OffsetTable {

if _, ok := used[mq]; !ok {

delete(r.OffsetTable, mq)

continue

}

err := r.updateConsumeOffsetToBroker(r.group, mq, off)

if err != nil {

rlog.Warning("update offset to broker error", map[string]interface{}{

rlog.LogKeyConsumerGroup: r.group,

rlog.LogKeyMessageQueue: mq.String(),

rlog.LogKeyUnderlayError: err.Error(),

"offset": off,

})

} else {

rlog.Info("update offset to broker success", map[string]interface{}{

rlog.LogKeyConsumerGroup: r.group,

rlog.LogKeyMessageQueue: mq.String(),

"offset": off,

})

}

}

}

  • persist方法遍历OffsetTable,执行r.updateConsumeOffsetToBroker

remove

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {

r.mutex.Lock()

defer r.mutex.Unlock()

delete(r.OffsetTable, *mq)

rlog.Info("delete mq from offset table", map[string]interface{}{

rlog.LogKeyMessageQueue: mq,

})

}

  • remove方法执行delete(r.OffsetTable, *mq)

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {

r.mutex.RLock()

switch t {

case _ReadFromMemory, _ReadMemoryThenStore:

off, exist := r.OffsetTable[*mq]

if exist {

r.mutex.RUnlock()

return off

}

if t == _ReadFromMemory {

r.mutex.RUnlock()

return -1

}

fallthrough

case _ReadFromStore:

off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)

if err != nil {

rlog.Error("fecth offset of mq error", map[string]interface{}{

rlog.LogKeyMessageQueue: mq.String(),

rlog.LogKeyUnderlayError: err,

})

r.mutex.RUnlock()

return -1

}

r.mutex.RUnlock()

r.update(mq, off, true)

return off

default:

}

return -1

}

  • read方法针对_ReadFromStore会执行r.fetchConsumeOffsetFromBroker

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {

r.mutex.Lock()

defer r.mutex.Unlock()

localOffset, exist := r.OffsetTable[*mq]

if !exist {

r.OffsetTable[*mq] = offset

return

}

if increaseOnly {

if localOffset < offset {

r.OffsetTable[*mq] = offset

}

} else {

r.OffsetTable[*mq] = offset

}

}

  • update方法更新的是r.OffsetTable[*mq]

fetchConsumeOffsetFromBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {

broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)

if broker == "" {

r.namesrv.UpdateTopicRouteInfo(mq.Topic)

broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)

}

if broker == "" {

return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)

}

queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{

ConsumerGroup: group,

Topic: mq.Topic,

QueueId: mq.QueueId,

}

cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)

res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)

if err != nil {

return -1, err

}

if res.Code != internal.ResSuccess {

return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)

}

off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)

if err != nil {

return -1, err

}

return off, nil

}

  • fetchConsumeOffsetFromBroker方法构建QueryConsumerOffsetRequestHeader请求,然后通过r.client.InvokeSync发起请求

updateConsumeOffsetToBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {

broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)

if broker == "" {

r.namesrv.UpdateTopicRouteInfo(mq.Topic)

broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)

}

if broker == "" {

return fmt.Errorf("broker: %s address not found", mq.BrokerName)

}

updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{

ConsumerGroup: group,

Topic: mq.Topic,

QueueId: mq.QueueId,

CommitOffset: off,

}

cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)

return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)

}

  • updateConsumeOffsetToBroker方法构建UpdateConsumerOffsetRequestHeader请求,然后通过r.client.InvokeOneWay发起请求

小结

remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性;它提供了NewRemoteOffsetStore、persist、remove、read、update、fetchConsumeOffsetFromBroker、updateConsumeOffsetToBroker方法

doc

  • offset_store

以上是 聊聊rocketmqclientgo的remoteBrokerOffsetStore 的全部内容, 来源链接: utcz.com/z/518262.html

回到顶部