聊聊rocketmqclientgo的localFileOffsetStore

编程

本文主要研究一下rocketmq-client-go的localFileOffsetStore

OffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type OffsetStore interface {

persist(mqs []*primitive.MessageQueue)

remove(mq *primitive.MessageQueue)

read(mq *primitive.MessageQueue, t readType) int64

update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)

}

  • OffsetStore定义了persist、remove、read、update方法

localFileOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type localFileOffsetStore struct {

group string

path string

OffsetTable map[MessageQueueKey]int64

// mutex for offset file

mutex sync.Mutex

}

  • localFileOffsetStore定义了group、path、OffsetTable、mutex属性

NewLocalFileOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewLocalFileOffsetStore(clientID, group string) OffsetStore {

store := &localFileOffsetStore{

group: group,

path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),

OffsetTable: make(map[MessageQueueKey]int64),

}

store.load()

return store

}

  • NewLocalFileOffsetStore创建localFileOffsetStore,然后执行store.load()

load

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) load() {

local.mutex.Lock()

defer local.mutex.Unlock()

data, err := utils.FileReadAll(local.path)

if os.IsNotExist(err) {

return

}

if err != nil {

rlog.Info("read from local store error, try to use bak file", map[string]interface{}{

rlog.LogKeyUnderlayError: err,

})

data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))

}

if err != nil {

rlog.Info("read from local store bak file error", map[string]interface{}{

rlog.LogKeyUnderlayError: err,

})

return

}

datas := make(map[MessageQueueKey]int64)

wrapper := OffsetSerializeWrapper{

OffsetTable: datas,

}

err = jsoniter.Unmarshal(data, &wrapper)

if err != nil {

rlog.Warning("unmarshal local offset error", map[string]interface{}{

"local_path": local.path,

rlog.LogKeyUnderlayError: err.Error(),

})

return

}

if datas != nil {

local.OffsetTable = datas

}

}

  • load方法通过utils.FileReadAll(local.path)读取data,然后通过jsoniter.Unmarshal(data, &wrapper)将数据组装到local.OffsetTable

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {

switch t {

case _ReadFromMemory, _ReadMemoryThenStore:

off := readFromMemory(local.OffsetTable, mq)

if off >= 0 || (off == -1 && t == _ReadFromMemory) {

return off

}

fallthrough

case _ReadFromStore:

local.load()

return readFromMemory(local.OffsetTable, mq)

default:

}

return -1

}

  • read方法根据readType来执行是readFromMemory还是执行ReadFromStore

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {

local.mutex.Lock()

defer local.mutex.Unlock()

rlog.Debug("update offset", map[string]interface{}{

rlog.LogKeyMessageQueue: mq,

"new_offset": offset,

})

key := MessageQueueKey(*mq)

localOffset, exist := local.OffsetTable[key]

if !exist {

local.OffsetTable[key] = offset

return

}

if increaseOnly {

if localOffset < offset {

local.OffsetTable[key] = offset

}

} else {

local.OffsetTable[key] = offset

}

}

  • update方法更新local.OffsetTable[key]

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {

if len(mqs) == 0 {

return

}

local.mutex.Lock()

defer local.mutex.Unlock()

wrapper := OffsetSerializeWrapper{

OffsetTable: local.OffsetTable,

}

data, _ := jsoniter.Marshal(wrapper)

utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))

}

  • persist方法执行utils.WriteToFile(local.path, data)

小结

OffsetStore定义了persist、remove、read、update方法;localFileOffsetStore定义了group、path、OffsetTable、mutex属性

doc

  • offset_store

以上是 聊聊rocketmqclientgo的localFileOffsetStore 的全部内容, 来源链接: utcz.com/z/518242.html

回到顶部