聊聊kingbus的DumpBinlogAt

编程

本文主要研究一下kingbus的DumpBinlogAt

DumpBinlogAt

kingbus/server/binlog_server.go

//DumpBinlogAt implements dump binlog event by slave executed gtid set

func (s *BinlogServer) DumpBinlogAt(ctx context.Context,

startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet,

eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error {

var inExcludeGroup = false

//new a binlog event reader from startRaftIndex, then send event to slave one by one

reader, err := s.store.NewEntryReaderAt(startRaftIndex)

if err != nil {

log.Log.Errorf("NewEntryReaderAt error,err:%s,raftIndex:%d", err, startRaftIndex)

return err

}

nextRaftIndex := reader.NextRaftIndex()

log.Log.Infof("DumpBinlogAt:raftIndex:%d,slaveGtids:%s", nextRaftIndex, slaveGtids.String())

go func() {

for {

//the next read raftIndex must be little than AppliedIndex

if nextRaftIndex <= s.kingbusInfo.AppliedIndex() {

raftEntry, err := reader.GetNext()

if err != nil {

log.Log.Errorf("reader.GetNext error,err:%s,nextRaftIndex:%d,AppliedIndex:%d",

err, nextRaftIndex, s.kingbusInfo.AppliedIndex())

select {

case errorC <- err:

default:

}

return //need quit

}

nextRaftIndex = reader.NextRaftIndex()

//this entry is not binlog event

if utils.IsBinlogEvent(raftEntry) == false {

continue

}

event := utils.DecodeBinlogEvent(raftEntry)

//filter the event in slave gtids,if the event has send to slave

inExcludeGroup = s.skipEvent(event, slaveGtids, inExcludeGroup)

if inExcludeGroup {

continue

}

select {

case eventC <- event:

case <-ctx.Done():

log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err())

select {

case errorC <- ctx.Err():

default:

}

return //need quit

}

} else {

select {

case <-s.broadcast.Receive():

break

case <-ctx.Done():

log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err())

select {

case errorC <- ctx.Err():

default:

}

return //need quit

}

}

}

}()

return nil

}

  • DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC

NewEntryReaderAt

kingbus/storage/disk_storage.go

//NewEntryReaderAt create a DiskEntryReader at raftIndex

func (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error) {

err := s.checkRaftIndex(raftIndex)

if err != nil {

log.Log.Errorf("checkRaftIndex error,err:%s,raftIndex:%d", err, raftIndex)

return nil, err

}

reader := new(DiskEntryReader)

reader.indexReadAt = raftIndex

reader.store = s

return reader, nil

}

  • NewEntryReaderAt方法先通过s.checkRaftIndex(raftIndex)校验一下index,然后创建DiskEntryReader,设置indexReadAt为raftIndex

skipEvent

kingbus/server/binlog_server.go

//skipEvent filter the event has been executed by slave

func (s *BinlogServer) skipEvent(event *storagepb.BinlogEvent, slaveGtids *gomysql.MysqlGTIDSet, inExcludeGroup bool) bool {

switch replication.EventType(event.Type) {

case replication.GTID_EVENT:

//remove header

eventBody := event.Data[replication.EventHeaderSize:]

//remove crc32

eventBody = eventBody[:len(eventBody)-replication.BinlogChecksumLength]

gtidEvent := &replication.GTIDEvent{}

if err := gtidEvent.Decode(eventBody); err != nil {

log.Log.Errorf("Decode gtid event error,err:%s", err)

return true

}

u, err := uuid.FromBytes(gtidEvent.SID)

if err != nil {

log.Log.Errorf("FromBytes error,err:%s,sid:%v", err, gtidEvent.SID)

return true

}

gtidStr := fmt.Sprintf("%s:%d", u.String(), gtidEvent.GNO)

currentGtidset, err := gomysql.ParseMysqlGTIDSet(gtidStr)

if err != nil {

log.Log.Errorf("ParseMysqlGTIDSet error,err:%s,gtid:%s", err, gtidStr)

return true

}

return slaveGtids.Contain(currentGtidset)

case replication.ROTATE_EVENT:

return false

}

return inExcludeGroup

}

  • skipEvent方法根据replication.EventType(event.Type)类型来判断,对于能够取到currentGtidset的通过slaveGtids.Contain(currentGtidset)判断,对于replication.ROTATE_EVENT返回false

小结

DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC

doc

  • binlog_server

以上是 聊聊kingbus的DumpBinlogAt 的全部内容, 来源链接: utcz.com/z/517671.html

回到顶部