聊聊kingbus的binlog_progress.go

编程

本文主要研究一下kingbus的binlog_progress.go

BinlogProgress

kingbus/server/binlog_progress.go

//BinlogProgress is the progress of receiving binlog

type BinlogProgress struct {

currentGtid *atomic.String

lastSaveGtid string

//for heartbeat event

lastBinlogFile *atomic.String

lastFilePosition *atomic.Uint32

executedGtidSetStr *atomic.String

trxBoundaryParser *mysql.TransactionBoundaryParser

persistentTime time.Time

persistentAppliedIndex uint64

executedGtidSet gomysql.GTIDSet

store storage.Storage

}

  • BinlogProgress定义了currentGtid、lastSaveGtid、lastBinlogFile、lastFilePosition、executedGtidSetStr、trxBoundaryParser、persistentTime、persistentAppliedIndex、executedGtidSet、store属性

newBinlogProgress

kingbus/server/binlog_progress.go

func newBinlogProgress(store storage.Storage) (*BinlogProgress, error) {

var err error

p := new(BinlogProgress)

p.trxBoundaryParser = new(mysql.TransactionBoundaryParser)

p.trxBoundaryParser.Reset()

p.currentGtid = atomic.NewString("")

p.lastBinlogFile = atomic.NewString("")

p.lastFilePosition = atomic.NewUint32(0)

p.persistentAppliedIndex = 0

p.persistentTime = time.Unix(0, 0)

//get executed gtid_set

//This value may be old, but resetBinlogProgress will update it to the latest

p.executedGtidSet, err = store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)

if err != nil {

log.Log.Errorf("newBinlogProgress:get executedGtidSet error,err:%s", err)

return nil, err

}

p.executedGtidSetStr = atomic.NewString(p.executedGtidSet.String())

p.store = store

return p, nil

}

  • newBinlogProgress方法创建了BinlogProgress及mysql.TransactionBoundaryParser,之后通过store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)获取executedGtidSet

updateProcess

kingbus/server/binlog_progress.go

//updateProcess update and save executedGtid set

func (s *BinlogProgress) updateProcess(raftIndex uint64, eventRawData []byte) error {

var err error

//parse event header

h := new(replication.EventHeader)

err = h.Decode(eventRawData)

if err != nil {

log.Log.Errorf("Decode error,err:%s,buf:%v", err, eventRawData)

return err

}

//set the heartbeat info

s.lastFilePosition.Store(h.LogPos)

//remove header

eventRawData = eventRawData[replication.EventHeaderSize:]

eventLen := int(h.EventSize) - replication.EventHeaderSize

if len(eventRawData) != eventLen {

return fmt.Errorf("invalid data size %d in event %s, less event length %d",

len(eventRawData), h.EventType, eventLen)

}

//remove crc32

eventRawData = eventRawData[:len(eventRawData)-replication.BinlogChecksumLength]

//the eventRawData maybe the first divided packet, but must not be query event

//so don"t worry

eventBoundaryType, err := s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData)

if err != nil {

log.Log.Errorf("GetEventBoundaryType error,err:%s,header:%v",

err, *h)

return err

}

//ignore updateState error, maybe a partial trx

err = s.trxBoundaryParser.UpdateState(eventBoundaryType)

if err != nil {

log.Log.Warnf("trxBoundaryParser UpdateState error,err:%s,header:%v", err, *h)

s.trxBoundaryParser.Reset()

s.currentGtid.Store("")

return nil

}

currentGtidStr := s.currentGtid.Load()

if s.trxBoundaryParser.IsNotInsideTransaction() &&

len(currentGtidStr) != 0 && s.lastSaveGtid != currentGtidStr {

log.Log.Debugf("current gtid is :%s,add into executedGtidSet:%s",

currentGtidStr, s.executedGtidSet.String())

//update executedGtidSet

err = s.executedGtidSet.Update(currentGtidStr)

if err != nil {

return err

}

s.lastSaveGtid = currentGtidStr

s.executedGtidSetStr.Store(s.executedGtidSet.String())

//save the raftIndex and executedGtidSet at the same time

if raftIndex-s.persistentAppliedIndex > persistentCount ||

time.Now().Sub(s.persistentTime) > persistentTimeInterval {

err = s.store.SetBinlogProgress(raftIndex, s.executedGtidSet)

if err != nil {

log.Log.Errorf("SetGtidSet error,err:%s,key:%s,value:%s",

err, storage.ExecutedGtidSetKey, s.executedGtidSet.String())

return err

}

s.persistentAppliedIndex = raftIndex

s.persistentTime = time.Now()

}

}

return nil

}

  • updateProcess方法会解析eventRawData为replication.EventHeader,然后存储h.LogPos;之后通过s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData)获取eventBoundaryType,然后通过s.trxBoundaryParser.UpdateState(eventBoundaryType)更新;之后通过s.executedGtidSet.Update(currentGtidStr)更新currentGtidStr;最后通过s.store.SetBinlogProgress(raftIndex, s.executedGtidSet)更新binlogProgress

小结

kingbus的binlog_progress.go提供了newBinlogProgress、updateProcess方法用于存储binglogProgress

doc

  • binlog_progress

以上是 聊聊kingbus的binlog_progress.go 的全部内容, 来源链接: utcz.com/z/517732.html

回到顶部